1
# Copyright (C) 2005-2014, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for finding and reading the bzr config file[s]."""
19
from textwrap import dedent
25
from testtools import matchers
38
registry as _mod_registry,
45
from ..sixish import (
49
from ..transport import remote as transport_remote
57
def lockable_config_scenarios():
60
{'config_class': config.GlobalConfig,
62
'config_section': 'DEFAULT'}),
64
{'config_class': config.LocationConfig,
66
'config_section': '.'}),]
69
load_tests = scenarios.load_tests_apply_scenarios
71
# Register helpers to build stores
72
config.test_store_builder_registry.register(
73
'configobj', lambda test: config.TransportIniFileStore(
74
test.get_transport(), 'configobj.conf'))
75
config.test_store_builder_registry.register(
76
'breezy', lambda test: config.GlobalStore())
77
config.test_store_builder_registry.register(
78
'location', lambda test: config.LocationStore())
81
def build_backing_branch(test, relpath,
82
transport_class=None, server_class=None):
83
"""Test helper to create a backing branch only once.
85
Some tests needs multiple stores/stacks to check concurrent update
86
behaviours. As such, they need to build different branch *objects* even if
87
they share the branch on disk.
89
:param relpath: The relative path to the branch. (Note that the helper
90
should always specify the same relpath).
92
:param transport_class: The Transport class the test needs to use.
94
:param server_class: The server associated with the ``transport_class``
97
Either both or neither of ``transport_class`` and ``server_class`` should
100
if transport_class is not None and server_class is not None:
101
test.transport_class = transport_class
102
test.transport_server = server_class
103
elif not (transport_class is None and server_class is None):
104
raise AssertionError('Specify both ``transport_class`` and '
105
'``server_class`` or neither of them')
106
if getattr(test, 'backing_branch', None) is None:
107
# First call, let's build the branch on disk
108
test.backing_branch = test.make_branch(relpath)
111
def build_branch_store(test):
112
build_backing_branch(test, 'branch')
113
b = branch.Branch.open('branch')
114
return config.BranchStore(b)
115
config.test_store_builder_registry.register('branch', build_branch_store)
118
def build_control_store(test):
119
build_backing_branch(test, 'branch')
120
b = controldir.ControlDir.open('branch')
121
return config.ControlStore(b)
122
config.test_store_builder_registry.register('control', build_control_store)
125
def build_remote_branch_store(test):
126
# There is only one permutation (but we won't be able to handle more with
127
# this design anyway)
129
server_class) = transport_remote.get_test_permutations()[0]
130
build_backing_branch(test, 'branch', transport_class, server_class)
131
b = branch.Branch.open(test.get_url('branch'))
132
return config.BranchStore(b)
133
config.test_store_builder_registry.register('remote_branch',
134
build_remote_branch_store)
137
config.test_stack_builder_registry.register(
138
'breezy', lambda test: config.GlobalStack())
139
config.test_stack_builder_registry.register(
140
'location', lambda test: config.LocationStack('.'))
143
def build_branch_stack(test):
144
build_backing_branch(test, 'branch')
145
b = branch.Branch.open('branch')
146
return config.BranchStack(b)
147
config.test_stack_builder_registry.register('branch', build_branch_stack)
150
def build_branch_only_stack(test):
151
# There is only one permutation (but we won't be able to handle more with
152
# this design anyway)
154
server_class) = transport_remote.get_test_permutations()[0]
155
build_backing_branch(test, 'branch', transport_class, server_class)
156
b = branch.Branch.open(test.get_url('branch'))
157
return config.BranchOnlyStack(b)
158
config.test_stack_builder_registry.register('branch_only',
159
build_branch_only_stack)
161
def build_remote_control_stack(test):
162
# There is only one permutation (but we won't be able to handle more with
163
# this design anyway)
165
server_class) = transport_remote.get_test_permutations()[0]
166
# We need only a bzrdir for this, not a full branch, but it's not worth
167
# creating a dedicated helper to create only the bzrdir
168
build_backing_branch(test, 'branch', transport_class, server_class)
169
b = branch.Branch.open(test.get_url('branch'))
170
return config.RemoteControlStack(b.controldir)
171
config.test_stack_builder_registry.register('remote_control',
172
build_remote_control_stack)
175
sample_long_alias="log -r-15..-1 --line"
176
sample_config_text = u"""
178
email=Erik B\u00e5gfors <erik@bagfors.nu>
180
change_editor=vimdiff -of @new_path @old_path
181
gpg_signing_key=DD4D5088
183
validate_signatures_in_log=true
185
user_global_option=something
186
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
187
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
188
bzr.mergetool.newtool='"newtool with spaces" {this_temp}'
189
bzr.default_mergetool=sometool
192
ll=""".encode('utf-8') + sample_long_alias.encode('utf-8') + b"\n"
195
sample_always_signatures = b"""
197
check_signatures=ignore
198
create_signatures=always
201
sample_ignore_signatures = b"""
203
check_signatures=require
204
create_signatures=never
207
sample_maybe_signatures = b"""
209
check_signatures=ignore
210
create_signatures=when-required
213
sample_branches_text = b"""
214
[http://www.example.com]
216
email=Robert Collins <robertc@example.org>
217
normal_option = normal
218
appendpath_option = append
219
appendpath_option:policy = appendpath
220
norecurse_option = norecurse
221
norecurse_option:policy = norecurse
222
[http://www.example.com/ignoreparent]
223
# different project: ignore parent dir config
225
[http://www.example.com/norecurse]
226
# configuration items that only apply to this dir
228
normal_option = norecurse
229
[http://www.example.com/dir]
230
appendpath_option = normal
232
check_signatures=require
233
# test trailing / matching with no children
235
check_signatures=check-available
236
gpg_signing_key=default
237
user_local_option=local
238
# test trailing / matching
240
#subdirs will match but not the parent
242
check_signatures=ignore
243
post_commit=breezy.tests.test_config.post_commit
244
#testing explicit beats globs
248
def create_configs(test):
249
"""Create configuration files for a given test.
251
This requires creating a tree (and populate the ``test.tree`` attribute)
252
and its associated branch and will populate the following attributes:
254
- branch_config: A BranchConfig for the associated branch.
256
- locations_config : A LocationConfig for the associated branch
258
- breezy_config: A GlobalConfig.
260
The tree and branch are created in a 'tree' subdirectory so the tests can
261
still use the test directory to stay outside of the branch.
263
tree = test.make_branch_and_tree('tree')
265
test.branch_config = config.BranchConfig(tree.branch)
266
test.locations_config = config.LocationConfig(tree.basedir)
267
test.breezy_config = config.GlobalConfig()
270
def create_configs_with_file_option(test):
271
"""Create configuration files with a ``file`` option set in each.
273
This builds on ``create_configs`` and add one ``file`` option in each
274
configuration with a value which allows identifying the configuration file.
277
test.breezy_config.set_user_option('file', 'breezy')
278
test.locations_config.set_user_option('file', 'locations')
279
test.branch_config.set_user_option('file', 'branch')
282
class TestOptionsMixin:
284
def assertOptions(self, expected, conf):
285
# We don't care about the parser (as it will make tests hard to write
286
# and error-prone anyway)
287
self.assertThat([opt[:4] for opt in conf._get_options()],
288
matchers.Equals(expected))
291
class InstrumentedConfigObj(object):
292
"""A config obj look-enough-alike to record calls made to it."""
294
def __contains__(self, thing):
295
self._calls.append(('__contains__', thing))
298
def __getitem__(self, key):
299
self._calls.append(('__getitem__', key))
302
def __init__(self, input, encoding=None):
303
self._calls = [('__init__', input, encoding)]
305
def __setitem__(self, key, value):
306
self._calls.append(('__setitem__', key, value))
308
def __delitem__(self, key):
309
self._calls.append(('__delitem__', key))
312
self._calls.append(('keys',))
316
self._calls.append(('reload',))
318
def write(self, arg):
319
self._calls.append(('write',))
321
def as_bool(self, value):
322
self._calls.append(('as_bool', value))
325
def get_value(self, section, name):
326
self._calls.append(('get_value', section, name))
330
class FakeBranch(object):
332
def __init__(self, base=None):
334
self.base = "http://example.com/branches/demo"
337
self._transport = self.control_files = \
338
FakeControlFilesAndTransport()
340
def _get_config(self):
341
return config.TransportConfig(self._transport, 'branch.conf')
343
def lock_write(self):
344
return lock.LogicalLockResult(self.unlock)
350
class FakeControlFilesAndTransport(object):
354
self._transport = self
356
def get(self, filename):
359
return BytesIO(self.files[filename])
361
raise errors.NoSuchFile(filename)
363
def get_bytes(self, filename):
366
return self.files[filename]
368
raise errors.NoSuchFile(filename)
370
def put(self, filename, fileobj):
371
self.files[filename] = fileobj.read()
373
def put_file(self, filename, fileobj):
374
return self.put(filename, fileobj)
377
class InstrumentedConfig(config.Config):
378
"""An instrumented config that supplies stubs for template methods."""
381
super(InstrumentedConfig, self).__init__()
383
self._signatures = config.CHECK_NEVER
385
def _get_user_id(self):
386
self._calls.append('_get_user_id')
387
return "Robert Collins <robert.collins@example.org>"
389
def _get_signature_checking(self):
390
self._calls.append('_get_signature_checking')
391
return self._signatures
393
def _get_change_editor(self):
394
self._calls.append('_get_change_editor')
395
return 'vimdiff -fo @new_path @old_path'
398
bool_config = b"""[DEFAULT]
407
class TestConfigObj(tests.TestCase):
409
def test_get_bool(self):
410
co = config.ConfigObj(BytesIO(bool_config))
411
self.assertIs(co.get_bool('DEFAULT', 'active'), True)
412
self.assertIs(co.get_bool('DEFAULT', 'inactive'), False)
413
self.assertIs(co.get_bool('UPPERCASE', 'active'), True)
414
self.assertIs(co.get_bool('UPPERCASE', 'nonactive'), False)
416
def test_hash_sign_in_value(self):
418
Before 4.5.0, ConfigObj did not quote # signs in values, so they'd be
419
treated as comments when read in again. (#86838)
421
co = config.ConfigObj()
422
co['test'] = 'foo#bar'
424
co.write(outfile=outfile)
425
lines = outfile.getvalue().splitlines()
426
self.assertEqual(lines, [b'test = "foo#bar"'])
427
co2 = config.ConfigObj(lines)
428
self.assertEqual(co2['test'], 'foo#bar')
430
def test_triple_quotes(self):
431
# Bug #710410: if the value string has triple quotes
432
# then ConfigObj versions up to 4.7.2 will quote them wrong
433
# and won't able to read them back
434
triple_quotes_value = '''spam
435
""" that's my spam """
437
co = config.ConfigObj()
438
co['test'] = triple_quotes_value
439
# While writing this test another bug in ConfigObj has been found:
440
# method co.write() without arguments produces list of lines
441
# one option per line, and multiline values are not split
442
# across multiple lines,
443
# and that breaks the parsing these lines back by ConfigObj.
444
# This issue only affects test, but it's better to avoid
445
# `co.write()` construct at all.
446
# [bialix 20110222] bug report sent to ConfigObj's author
448
co.write(outfile=outfile)
449
output = outfile.getvalue()
450
# now we're trying to read it back
451
co2 = config.ConfigObj(BytesIO(output))
452
self.assertEqual(triple_quotes_value, co2['test'])
455
erroneous_config = b"""[section] # line 1
458
whocares=notme # line 4
462
class TestConfigObjErrors(tests.TestCase):
464
def test_duplicate_section_name_error_line(self):
466
co = configobj.ConfigObj(BytesIO(erroneous_config),
468
except config.configobj.DuplicateError as e:
469
self.assertEqual(3, e.line_number)
471
self.fail('Error in config file not detected')
474
class TestConfig(tests.TestCase):
476
def test_constructs(self):
479
def test_user_email(self):
480
my_config = InstrumentedConfig()
481
self.assertEqual('robert.collins@example.org', my_config.user_email())
482
self.assertEqual(['_get_user_id'], my_config._calls)
484
def test_username(self):
485
my_config = InstrumentedConfig()
486
self.assertEqual('Robert Collins <robert.collins@example.org>',
487
my_config.username())
488
self.assertEqual(['_get_user_id'], my_config._calls)
490
def test_get_user_option_default(self):
491
my_config = config.Config()
492
self.assertEqual(None, my_config.get_user_option('no_option'))
494
def test_validate_signatures_in_log_default(self):
495
my_config = config.Config()
496
self.assertEqual(False, my_config.validate_signatures_in_log())
498
def test_get_change_editor(self):
499
my_config = InstrumentedConfig()
500
change_editor = my_config.get_change_editor('old_tree', 'new_tree')
501
self.assertEqual(['_get_change_editor'], my_config._calls)
502
self.assertIs(diff.DiffFromTool, change_editor.__class__)
503
self.assertEqual(['vimdiff', '-fo', '@new_path', '@old_path'],
504
change_editor.command_template)
507
class TestConfigPath(tests.TestCase):
510
super(TestConfigPath, self).setUp()
511
self.overrideEnv('HOME', '/home/bogus')
512
self.overrideEnv('XDG_CACHE_HOME', '')
513
if sys.platform == 'win32':
516
r'C:\Documents and Settings\bogus\Application Data')
518
'C:/Documents and Settings/bogus/Application Data/breezy'
520
self.brz_home = '/home/bogus/.config/breezy'
522
def test_config_dir(self):
523
self.assertEqual(config.config_dir(), self.brz_home)
525
def test_config_dir_is_unicode(self):
526
self.assertIsInstance(config.config_dir(), text_type)
528
def test_config_filename(self):
529
self.assertEqual(config.config_filename(),
530
self.brz_home + '/breezy.conf')
532
def test_locations_config_filename(self):
533
self.assertEqual(config.locations_config_filename(),
534
self.brz_home + '/locations.conf')
536
def test_authentication_config_filename(self):
537
self.assertEqual(config.authentication_config_filename(),
538
self.brz_home + '/authentication.conf')
540
def test_xdg_cache_dir(self):
541
self.assertEqual(config.xdg_cache_dir(),
542
'/home/bogus/.cache')
545
class TestConfigPathFallback(tests.TestCaseInTempDir):
548
super(TestConfigPathFallback, self).setUp()
549
self.overrideEnv('HOME', self.test_dir)
550
self.overrideEnv('XDG_CACHE_HOME', '')
551
self.bzr_home = os.path.join(self.test_dir, '.bazaar')
552
os.mkdir(self.bzr_home)
554
def test_config_dir(self):
555
self.assertEqual(config.config_dir(), self.bzr_home)
557
def test_config_dir_is_unicode(self):
558
self.assertIsInstance(config.config_dir(), text_type)
560
def test_config_filename(self):
561
self.assertEqual(config.config_filename(),
562
self.bzr_home + '/bazaar.conf')
564
def test_locations_config_filename(self):
565
self.assertEqual(config.locations_config_filename(),
566
self.bzr_home + '/locations.conf')
568
def test_authentication_config_filename(self):
569
self.assertEqual(config.authentication_config_filename(),
570
self.bzr_home + '/authentication.conf')
572
def test_xdg_cache_dir(self):
573
self.assertEqual(config.xdg_cache_dir(),
574
os.path.join(self.test_dir, '.cache'))
577
class TestXDGConfigDir(tests.TestCaseInTempDir):
578
# must be in temp dir because config tests for the existence of the bazaar
579
# subdirectory of $XDG_CONFIG_HOME
582
if sys.platform == 'win32':
583
raise tests.TestNotApplicable(
584
'XDG config dir not used on this platform')
585
super(TestXDGConfigDir, self).setUp()
586
self.overrideEnv('HOME', self.test_home_dir)
587
# BRZ_HOME overrides everything we want to test so unset it.
588
self.overrideEnv('BRZ_HOME', None)
590
def test_xdg_config_dir_exists(self):
591
"""When ~/.config/bazaar exists, use it as the config dir."""
592
newdir = osutils.pathjoin(self.test_home_dir, '.config', 'bazaar')
594
self.assertEqual(config.config_dir(), newdir)
596
def test_xdg_config_home(self):
597
"""When XDG_CONFIG_HOME is set, use it."""
598
xdgconfigdir = osutils.pathjoin(self.test_home_dir, 'xdgconfig')
599
self.overrideEnv('XDG_CONFIG_HOME', xdgconfigdir)
600
newdir = osutils.pathjoin(xdgconfigdir, 'bazaar')
602
self.assertEqual(config.config_dir(), newdir)
605
class TestIniConfig(tests.TestCaseInTempDir):
607
def make_config_parser(self, s):
608
conf = config.IniBasedConfig.from_string(s)
609
return conf, conf._get_parser()
612
class TestIniConfigBuilding(TestIniConfig):
614
def test_contructs(self):
615
config.IniBasedConfig()
617
def test_from_fp(self):
618
my_config = config.IniBasedConfig.from_string(sample_config_text)
619
self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
621
def test_cached(self):
622
my_config = config.IniBasedConfig.from_string(sample_config_text)
623
parser = my_config._get_parser()
624
self.assertTrue(my_config._get_parser() is parser)
626
def _dummy_chown(self, path, uid, gid):
627
self.path, self.uid, self.gid = path, uid, gid
629
def test_ini_config_ownership(self):
630
"""Ensure that chown is happening during _write_config_file"""
631
self.requireFeature(features.chown_feature)
632
self.overrideAttr(os, 'chown', self._dummy_chown)
633
self.path = self.uid = self.gid = None
634
conf = config.IniBasedConfig(file_name='./foo.conf')
635
conf._write_config_file()
636
self.assertEqual(self.path, './foo.conf')
637
self.assertTrue(isinstance(self.uid, int))
638
self.assertTrue(isinstance(self.gid, int))
641
class TestIniConfigSaving(tests.TestCaseInTempDir):
643
def test_cant_save_without_a_file_name(self):
644
conf = config.IniBasedConfig()
645
self.assertRaises(AssertionError, conf._write_config_file)
647
def test_saved_with_content(self):
648
content = b'foo = bar\n'
649
config.IniBasedConfig.from_string(content, file_name='./test.conf',
651
self.assertFileEqual(content, 'test.conf')
654
class TestIniConfigOptionExpansion(tests.TestCase):
655
"""Test option expansion from the IniConfig level.
657
What we really want here is to test the Config level, but the class being
658
abstract as far as storing values is concerned, this can't be done
661
# FIXME: This should be rewritten when all configs share a storage
662
# implementation -- vila 2011-02-18
664
def get_config(self, string=None):
667
c = config.IniBasedConfig.from_string(string)
670
def assertExpansion(self, expected, conf, string, env=None):
671
self.assertEqual(expected, conf.expand_options(string, env))
673
def test_no_expansion(self):
674
c = self.get_config('')
675
self.assertExpansion('foo', c, 'foo')
677
def test_env_adding_options(self):
678
c = self.get_config('')
679
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
681
def test_env_overriding_options(self):
682
c = self.get_config('foo=baz')
683
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
685
def test_simple_ref(self):
686
c = self.get_config('foo=xxx')
687
self.assertExpansion('xxx', c, '{foo}')
689
def test_unknown_ref(self):
690
c = self.get_config('')
691
self.assertRaises(config.ExpandingUnknownOption,
692
c.expand_options, '{foo}')
694
def test_indirect_ref(self):
695
c = self.get_config('''
699
self.assertExpansion('xxx', c, '{bar}')
701
def test_embedded_ref(self):
702
c = self.get_config('''
706
self.assertExpansion('xxx', c, '{{bar}}')
708
def test_simple_loop(self):
709
c = self.get_config('foo={foo}')
710
self.assertRaises(config.OptionExpansionLoop, c.expand_options,
713
def test_indirect_loop(self):
714
c = self.get_config('''
718
e = self.assertRaises(config.OptionExpansionLoop,
719
c.expand_options, '{foo}')
720
self.assertEqual('foo->bar->baz', e.refs)
721
self.assertEqual('{foo}', e.string)
724
conf = self.get_config('''
728
list={foo},{bar},{baz}
730
self.assertEqual(['start', 'middle', 'end'],
731
conf.get_user_option('list', expand=True))
733
def test_cascading_list(self):
734
conf = self.get_config('''
740
self.assertEqual(['start', 'middle', 'end'],
741
conf.get_user_option('list', expand=True))
743
def test_pathological_hidden_list(self):
744
conf = self.get_config('''
750
hidden={start}{middle}{end}
752
# Nope, it's either a string or a list, and the list wins as soon as a
753
# ',' appears, so the string concatenation never occur.
754
self.assertEqual(['{foo', '}', '{', 'bar}'],
755
conf.get_user_option('hidden', expand=True))
758
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
760
def get_config(self, location, string=None):
763
# Since we don't save the config we won't strictly require to inherit
764
# from TestCaseInTempDir, but an error occurs so quickly...
765
c = config.LocationConfig.from_string(string, location)
768
def test_dont_cross_unrelated_section(self):
769
c = self.get_config('/another/branch/path', '''
774
[/another/branch/path]
777
self.assertRaises(config.ExpandingUnknownOption,
778
c.get_user_option, 'bar', expand=True)
780
def test_cross_related_sections(self):
781
c = self.get_config('/project/branch/path', '''
785
[/project/branch/path]
788
self.assertEqual('quux', c.get_user_option('bar', expand=True))
791
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
793
def test_cannot_reload_without_name(self):
794
conf = config.IniBasedConfig.from_string(sample_config_text)
795
self.assertRaises(AssertionError, conf.reload)
797
def test_reload_see_new_value(self):
798
c1 = config.IniBasedConfig.from_string('editor=vim\n',
799
file_name='./test/conf')
800
c1._write_config_file()
801
c2 = config.IniBasedConfig.from_string('editor=emacs\n',
802
file_name='./test/conf')
803
c2._write_config_file()
804
self.assertEqual('vim', c1.get_user_option('editor'))
805
self.assertEqual('emacs', c2.get_user_option('editor'))
806
# Make sure we get the Right value
808
self.assertEqual('emacs', c1.get_user_option('editor'))
811
class TestLockableConfig(tests.TestCaseInTempDir):
813
scenarios = lockable_config_scenarios()
818
config_section = None
821
super(TestLockableConfig, self).setUp()
822
self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
823
self.config = self.create_config(self._content)
825
def get_existing_config(self):
826
return self.config_class(*self.config_args)
828
def create_config(self, content):
829
kwargs = dict(save=True)
830
c = self.config_class.from_string(content, *self.config_args, **kwargs)
833
def test_simple_read_access(self):
834
self.assertEqual('1', self.config.get_user_option('one'))
836
def test_simple_write_access(self):
837
self.config.set_user_option('one', 'one')
838
self.assertEqual('one', self.config.get_user_option('one'))
840
def test_listen_to_the_last_speaker(self):
842
c2 = self.get_existing_config()
843
c1.set_user_option('one', 'ONE')
844
c2.set_user_option('two', 'TWO')
845
self.assertEqual('ONE', c1.get_user_option('one'))
846
self.assertEqual('TWO', c2.get_user_option('two'))
847
# The second update respect the first one
848
self.assertEqual('ONE', c2.get_user_option('one'))
850
def test_last_speaker_wins(self):
851
# If the same config is not shared, the same variable modified twice
852
# can only see a single result.
854
c2 = self.get_existing_config()
855
c1.set_user_option('one', 'c1')
856
c2.set_user_option('one', 'c2')
857
self.assertEqual('c2', c2._get_user_option('one'))
858
# The first modification is still available until another refresh
860
self.assertEqual('c1', c1._get_user_option('one'))
861
c1.set_user_option('two', 'done')
862
self.assertEqual('c2', c1._get_user_option('one'))
864
def test_writes_are_serialized(self):
866
c2 = self.get_existing_config()
868
# We spawn a thread that will pause *during* the write
869
before_writing = threading.Event()
870
after_writing = threading.Event()
871
writing_done = threading.Event()
872
c1_orig = c1._write_config_file
873
def c1_write_config_file():
876
# The lock is held. We wait for the main thread to decide when to
879
c1._write_config_file = c1_write_config_file
881
c1.set_user_option('one', 'c1')
883
t1 = threading.Thread(target=c1_set_option)
884
# Collect the thread after the test
885
self.addCleanup(t1.join)
886
# Be ready to unblock the thread if the test goes wrong
887
self.addCleanup(after_writing.set)
889
before_writing.wait()
890
self.assertTrue(c1._lock.is_held)
891
self.assertRaises(errors.LockContention,
892
c2.set_user_option, 'one', 'c2')
893
self.assertEqual('c1', c1.get_user_option('one'))
894
# Let the lock be released
897
c2.set_user_option('one', 'c2')
898
self.assertEqual('c2', c2.get_user_option('one'))
900
def test_read_while_writing(self):
902
# We spawn a thread that will pause *during* the write
903
ready_to_write = threading.Event()
904
do_writing = threading.Event()
905
writing_done = threading.Event()
906
c1_orig = c1._write_config_file
907
def c1_write_config_file():
909
# The lock is held. We wait for the main thread to decide when to
914
c1._write_config_file = c1_write_config_file
916
c1.set_user_option('one', 'c1')
917
t1 = threading.Thread(target=c1_set_option)
918
# Collect the thread after the test
919
self.addCleanup(t1.join)
920
# Be ready to unblock the thread if the test goes wrong
921
self.addCleanup(do_writing.set)
923
# Ensure the thread is ready to write
924
ready_to_write.wait()
925
self.assertTrue(c1._lock.is_held)
926
self.assertEqual('c1', c1.get_user_option('one'))
927
# If we read during the write, we get the old value
928
c2 = self.get_existing_config()
929
self.assertEqual('1', c2.get_user_option('one'))
930
# Let the writing occur and ensure it occurred
933
# Now we get the updated value
934
c3 = self.get_existing_config()
935
self.assertEqual('c1', c3.get_user_option('one'))
938
class TestGetUserOptionAs(TestIniConfig):
940
def test_get_user_option_as_bool(self):
941
conf, parser = self.make_config_parser("""
944
an_invalid_bool = maybe
945
a_list = hmm, who knows ? # This is interpreted as a list !
947
get_bool = conf.get_user_option_as_bool
948
self.assertEqual(True, get_bool('a_true_bool'))
949
self.assertEqual(False, get_bool('a_false_bool'))
952
warnings.append(args[0] % args[1:])
953
self.overrideAttr(trace, 'warning', warning)
954
msg = 'Value "%s" is not a boolean for "%s"'
955
self.assertIs(None, get_bool('an_invalid_bool'))
956
self.assertEqual(msg % ('maybe', 'an_invalid_bool'), warnings[0])
958
self.assertIs(None, get_bool('not_defined_in_this_config'))
959
self.assertEqual([], warnings)
961
def test_get_user_option_as_list(self):
962
conf, parser = self.make_config_parser("""
967
get_list = conf.get_user_option_as_list
968
self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
969
self.assertEqual(['1'], get_list('length_1'))
970
self.assertEqual('x', conf.get_user_option('one_item'))
971
# automatically cast to list
972
self.assertEqual(['x'], get_list('one_item'))
975
class TestSupressWarning(TestIniConfig):
977
def make_warnings_config(self, s):
978
conf, parser = self.make_config_parser(s)
979
return conf.suppress_warning
981
def test_suppress_warning_unknown(self):
982
suppress_warning = self.make_warnings_config('')
983
self.assertEqual(False, suppress_warning('unknown_warning'))
985
def test_suppress_warning_known(self):
986
suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
987
self.assertEqual(False, suppress_warning('c'))
988
self.assertEqual(True, suppress_warning('a'))
989
self.assertEqual(True, suppress_warning('b'))
992
class TestGetConfig(tests.TestCaseInTempDir):
994
def test_constructs(self):
995
config.GlobalConfig()
997
def test_calls_read_filenames(self):
998
# replace the class that is constructed, to check its parameters
999
oldparserclass = config.ConfigObj
1000
config.ConfigObj = InstrumentedConfigObj
1001
my_config = config.GlobalConfig()
1003
parser = my_config._get_parser()
1005
config.ConfigObj = oldparserclass
1006
self.assertIsInstance(parser, InstrumentedConfigObj)
1007
self.assertEqual(parser._calls, [('__init__', config.config_filename(),
1011
class TestBranchConfig(tests.TestCaseWithTransport):
1013
def test_constructs_valid(self):
1014
branch = FakeBranch()
1015
my_config = config.BranchConfig(branch)
1016
self.assertIsNot(None, my_config)
1018
def test_constructs_error(self):
1019
self.assertRaises(TypeError, config.BranchConfig)
1021
def test_get_location_config(self):
1022
branch = FakeBranch()
1023
my_config = config.BranchConfig(branch)
1024
location_config = my_config._get_location_config()
1025
self.assertEqual(branch.base, location_config.location)
1026
self.assertIs(location_config, my_config._get_location_config())
1028
def test_get_config(self):
1029
"""The Branch.get_config method works properly"""
1030
b = controldir.ControlDir.create_standalone_workingtree('.').branch
1031
my_config = b.get_config()
1032
self.assertIs(my_config.get_user_option('wacky'), None)
1033
my_config.set_user_option('wacky', 'unlikely')
1034
self.assertEqual(my_config.get_user_option('wacky'), 'unlikely')
1036
# Ensure we get the same thing if we start again
1037
b2 = branch.Branch.open('.')
1038
my_config2 = b2.get_config()
1039
self.assertEqual(my_config2.get_user_option('wacky'), 'unlikely')
1041
def test_has_explicit_nickname(self):
1042
b = self.make_branch('.')
1043
self.assertFalse(b.get_config().has_explicit_nickname())
1045
self.assertTrue(b.get_config().has_explicit_nickname())
1047
def test_config_url(self):
1048
"""The Branch.get_config will use section that uses a local url"""
1049
branch = self.make_branch('branch')
1050
self.assertEqual('branch', branch.nick)
1052
local_url = urlutils.local_path_to_url('branch')
1053
conf = config.LocationConfig.from_string(
1054
'[%s]\nnickname = foobar' % (local_url,),
1055
local_url, save=True)
1056
self.assertIsNot(None, conf)
1057
self.assertEqual('foobar', branch.nick)
1059
def test_config_local_path(self):
1060
"""The Branch.get_config will use a local system path"""
1061
branch = self.make_branch('branch')
1062
self.assertEqual('branch', branch.nick)
1064
local_path = osutils.getcwd().encode('utf8')
1065
config.LocationConfig.from_string(
1066
b'[%s/branch]\nnickname = barry' % (local_path,),
1067
'branch', save=True)
1068
# Now the branch will find its nick via the location config
1069
self.assertEqual('barry', branch.nick)
1071
def test_config_creates_local(self):
1072
"""Creating a new entry in config uses a local path."""
1073
branch = self.make_branch('branch', format='knit')
1074
branch.set_push_location('http://foobar')
1075
local_path = osutils.getcwd().encode('utf8')
1076
# Surprisingly ConfigObj doesn't create a trailing newline
1077
self.check_file_contents(config.locations_config_filename(),
1079
b'push_location = http://foobar\n'
1080
b'push_location:policy = norecurse\n'
1083
def test_autonick_urlencoded(self):
1084
b = self.make_branch('!repo')
1085
self.assertEqual('!repo', b.get_config().get_nickname())
1087
def test_autonick_uses_branch_name(self):
1088
b = self.make_branch('foo', name='bar')
1089
self.assertEqual('bar', b.get_config().get_nickname())
1091
def test_warn_if_masked(self):
1094
warnings.append(args[0] % args[1:])
1095
self.overrideAttr(trace, 'warning', warning)
1097
def set_option(store, warn_masked=True):
1099
conf.set_user_option('example_option', repr(store), store=store,
1100
warn_masked=warn_masked)
1101
def assertWarning(warning):
1103
self.assertEqual(0, len(warnings))
1105
self.assertEqual(1, len(warnings))
1106
self.assertEqual(warning, warnings[0])
1107
branch = self.make_branch('.')
1108
conf = branch.get_config()
1109
set_option(config.STORE_GLOBAL)
1111
set_option(config.STORE_BRANCH)
1113
set_option(config.STORE_GLOBAL)
1114
assertWarning('Value "4" is masked by "3" from branch.conf')
1115
set_option(config.STORE_GLOBAL, warn_masked=False)
1117
set_option(config.STORE_LOCATION)
1119
set_option(config.STORE_BRANCH)
1120
assertWarning('Value "3" is masked by "0" from locations.conf')
1121
set_option(config.STORE_BRANCH, warn_masked=False)
1125
class TestGlobalConfigItems(tests.TestCaseInTempDir):
1127
def _get_empty_config(self):
1128
my_config = config.GlobalConfig()
1131
def _get_sample_config(self):
1132
my_config = config.GlobalConfig.from_string(sample_config_text)
1135
def test_user_id(self):
1136
my_config = config.GlobalConfig.from_string(sample_config_text)
1137
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
1138
my_config._get_user_id())
1140
def test_absent_user_id(self):
1141
my_config = config.GlobalConfig()
1142
self.assertEqual(None, my_config._get_user_id())
1144
def test_get_user_option_default(self):
1145
my_config = self._get_empty_config()
1146
self.assertEqual(None, my_config.get_user_option('no_option'))
1148
def test_get_user_option_global(self):
1149
my_config = self._get_sample_config()
1150
self.assertEqual("something",
1151
my_config.get_user_option('user_global_option'))
1153
def test_configured_validate_signatures_in_log(self):
1154
my_config = self._get_sample_config()
1155
self.assertEqual(True, my_config.validate_signatures_in_log())
1157
def test_get_alias(self):
1158
my_config = self._get_sample_config()
1159
self.assertEqual('help', my_config.get_alias('h'))
1161
def test_get_aliases(self):
1162
my_config = self._get_sample_config()
1163
aliases = my_config.get_aliases()
1164
self.assertEqual(2, len(aliases))
1165
sorted_keys = sorted(aliases)
1166
self.assertEqual('help', aliases[sorted_keys[0]])
1167
self.assertEqual(sample_long_alias, aliases[sorted_keys[1]])
1169
def test_get_no_alias(self):
1170
my_config = self._get_sample_config()
1171
self.assertEqual(None, my_config.get_alias('foo'))
1173
def test_get_long_alias(self):
1174
my_config = self._get_sample_config()
1175
self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
1177
def test_get_change_editor(self):
1178
my_config = self._get_sample_config()
1179
change_editor = my_config.get_change_editor('old', 'new')
1180
self.assertIs(diff.DiffFromTool, change_editor.__class__)
1181
self.assertEqual('vimdiff -of @new_path @old_path',
1182
' '.join(change_editor.command_template))
1184
def test_get_no_change_editor(self):
1185
my_config = self._get_empty_config()
1186
change_editor = my_config.get_change_editor('old', 'new')
1187
self.assertIs(None, change_editor)
1189
def test_get_merge_tools(self):
1190
conf = self._get_sample_config()
1191
tools = conf.get_merge_tools()
1192
self.log(repr(tools))
1194
{u'funkytool' : u'funkytool "arg with spaces" {this_temp}',
1195
u'sometool' : u'sometool {base} {this} {other} -o {result}',
1196
u'newtool' : u'"newtool with spaces" {this_temp}'},
1199
def test_get_merge_tools_empty(self):
1200
conf = self._get_empty_config()
1201
tools = conf.get_merge_tools()
1202
self.assertEqual({}, tools)
1204
def test_find_merge_tool(self):
1205
conf = self._get_sample_config()
1206
cmdline = conf.find_merge_tool('sometool')
1207
self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
1209
def test_find_merge_tool_not_found(self):
1210
conf = self._get_sample_config()
1211
cmdline = conf.find_merge_tool('DOES NOT EXIST')
1212
self.assertIs(cmdline, None)
1214
def test_find_merge_tool_known(self):
1215
conf = self._get_empty_config()
1216
cmdline = conf.find_merge_tool('kdiff3')
1217
self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
1219
def test_find_merge_tool_override_known(self):
1220
conf = self._get_empty_config()
1221
conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
1222
cmdline = conf.find_merge_tool('kdiff3')
1223
self.assertEqual('kdiff3 blah', cmdline)
1226
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
1228
def test_empty(self):
1229
my_config = config.GlobalConfig()
1230
self.assertEqual(0, len(my_config.get_aliases()))
1232
def test_set_alias(self):
1233
my_config = config.GlobalConfig()
1234
alias_value = 'commit --strict'
1235
my_config.set_alias('commit', alias_value)
1236
new_config = config.GlobalConfig()
1237
self.assertEqual(alias_value, new_config.get_alias('commit'))
1239
def test_remove_alias(self):
1240
my_config = config.GlobalConfig()
1241
my_config.set_alias('commit', 'commit --strict')
1242
# Now remove the alias again.
1243
my_config.unset_alias('commit')
1244
new_config = config.GlobalConfig()
1245
self.assertIs(None, new_config.get_alias('commit'))
1248
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
1250
def test_constructs_valid(self):
1251
config.LocationConfig('http://example.com')
1253
def test_constructs_error(self):
1254
self.assertRaises(TypeError, config.LocationConfig)
1256
def test_branch_calls_read_filenames(self):
1257
# This is testing the correct file names are provided.
1258
# TODO: consolidate with the test for GlobalConfigs filename checks.
1260
# replace the class that is constructed, to check its parameters
1261
oldparserclass = config.ConfigObj
1262
config.ConfigObj = InstrumentedConfigObj
1264
my_config = config.LocationConfig('http://www.example.com')
1265
parser = my_config._get_parser()
1267
config.ConfigObj = oldparserclass
1268
self.assertIsInstance(parser, InstrumentedConfigObj)
1269
self.assertEqual(parser._calls,
1270
[('__init__', config.locations_config_filename(),
1273
def test_get_global_config(self):
1274
my_config = config.BranchConfig(FakeBranch('http://example.com'))
1275
global_config = my_config._get_global_config()
1276
self.assertIsInstance(global_config, config.GlobalConfig)
1277
self.assertIs(global_config, my_config._get_global_config())
1279
def assertLocationMatching(self, expected):
1280
self.assertEqual(expected,
1281
list(self.my_location_config._get_matching_sections()))
1283
def test__get_matching_sections_no_match(self):
1284
self.get_branch_config('/')
1285
self.assertLocationMatching([])
1287
def test__get_matching_sections_exact(self):
1288
self.get_branch_config('http://www.example.com')
1289
self.assertLocationMatching([('http://www.example.com', '')])
1291
def test__get_matching_sections_suffix_does_not(self):
1292
self.get_branch_config('http://www.example.com-com')
1293
self.assertLocationMatching([])
1295
def test__get_matching_sections_subdir_recursive(self):
1296
self.get_branch_config('http://www.example.com/com')
1297
self.assertLocationMatching([('http://www.example.com', 'com')])
1299
def test__get_matching_sections_ignoreparent(self):
1300
self.get_branch_config('http://www.example.com/ignoreparent')
1301
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1304
def test__get_matching_sections_ignoreparent_subdir(self):
1305
self.get_branch_config(
1306
'http://www.example.com/ignoreparent/childbranch')
1307
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1310
def test__get_matching_sections_subdir_trailing_slash(self):
1311
self.get_branch_config('/b')
1312
self.assertLocationMatching([('/b/', '')])
1314
def test__get_matching_sections_subdir_child(self):
1315
self.get_branch_config('/a/foo')
1316
self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
1318
def test__get_matching_sections_subdir_child_child(self):
1319
self.get_branch_config('/a/foo/bar')
1320
self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
1322
def test__get_matching_sections_trailing_slash_with_children(self):
1323
self.get_branch_config('/a/')
1324
self.assertLocationMatching([('/a/', '')])
1326
def test__get_matching_sections_explicit_over_glob(self):
1327
# XXX: 2006-09-08 jamesh
1328
# This test only passes because ord('c') > ord('*'). If there
1329
# was a config section for '/a/?', it would get precedence
1331
self.get_branch_config('/a/c')
1332
self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
1334
def test__get_option_policy_normal(self):
1335
self.get_branch_config('http://www.example.com')
1337
self.my_location_config._get_config_policy(
1338
'http://www.example.com', 'normal_option'),
1341
def test__get_option_policy_norecurse(self):
1342
self.get_branch_config('http://www.example.com')
1344
self.my_location_config._get_option_policy(
1345
'http://www.example.com', 'norecurse_option'),
1346
config.POLICY_NORECURSE)
1347
# Test old recurse=False setting:
1349
self.my_location_config._get_option_policy(
1350
'http://www.example.com/norecurse', 'normal_option'),
1351
config.POLICY_NORECURSE)
1353
def test__get_option_policy_normal(self):
1354
self.get_branch_config('http://www.example.com')
1356
self.my_location_config._get_option_policy(
1357
'http://www.example.com', 'appendpath_option'),
1358
config.POLICY_APPENDPATH)
1360
def test__get_options_with_policy(self):
1361
self.get_branch_config('/dir/subdir',
1362
location_config="""\
1364
other_url = /other-dir
1365
other_url:policy = appendpath
1367
other_url = /other-subdir
1370
[(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
1371
(u'other_url', u'/other-dir', u'/dir', 'locations'),
1372
(u'other_url:policy', u'appendpath', u'/dir', 'locations')],
1373
self.my_location_config)
1375
def test_location_without_username(self):
1376
self.get_branch_config('http://www.example.com/ignoreparent')
1377
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1378
self.my_config.username())
1380
def test_location_not_listed(self):
1381
"""Test that the global username is used when no location matches"""
1382
self.get_branch_config('/home/robertc/sources')
1383
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1384
self.my_config.username())
1386
def test_overriding_location(self):
1387
self.get_branch_config('http://www.example.com/foo')
1388
self.assertEqual('Robert Collins <robertc@example.org>',
1389
self.my_config.username())
1391
def test_get_user_option_global(self):
1392
self.get_branch_config('/a')
1393
self.assertEqual('something',
1394
self.my_config.get_user_option('user_global_option'))
1396
def test_get_user_option_local(self):
1397
self.get_branch_config('/a')
1398
self.assertEqual('local',
1399
self.my_config.get_user_option('user_local_option'))
1401
def test_get_user_option_appendpath(self):
1402
# returned as is for the base path:
1403
self.get_branch_config('http://www.example.com')
1404
self.assertEqual('append',
1405
self.my_config.get_user_option('appendpath_option'))
1406
# Extra path components get appended:
1407
self.get_branch_config('http://www.example.com/a/b/c')
1408
self.assertEqual('append/a/b/c',
1409
self.my_config.get_user_option('appendpath_option'))
1410
# Overriden for http://www.example.com/dir, where it is a
1412
self.get_branch_config('http://www.example.com/dir/a/b/c')
1413
self.assertEqual('normal',
1414
self.my_config.get_user_option('appendpath_option'))
1416
def test_get_user_option_norecurse(self):
1417
self.get_branch_config('http://www.example.com')
1418
self.assertEqual('norecurse',
1419
self.my_config.get_user_option('norecurse_option'))
1420
self.get_branch_config('http://www.example.com/dir')
1421
self.assertEqual(None,
1422
self.my_config.get_user_option('norecurse_option'))
1423
# http://www.example.com/norecurse is a recurse=False section
1424
# that redefines normal_option. Subdirectories do not pick up
1425
# this redefinition.
1426
self.get_branch_config('http://www.example.com/norecurse')
1427
self.assertEqual('norecurse',
1428
self.my_config.get_user_option('normal_option'))
1429
self.get_branch_config('http://www.example.com/norecurse/subdir')
1430
self.assertEqual('normal',
1431
self.my_config.get_user_option('normal_option'))
1433
def test_set_user_option_norecurse(self):
1434
self.get_branch_config('http://www.example.com')
1435
self.my_config.set_user_option('foo', 'bar',
1436
store=config.STORE_LOCATION_NORECURSE)
1438
self.my_location_config._get_option_policy(
1439
'http://www.example.com', 'foo'),
1440
config.POLICY_NORECURSE)
1442
def test_set_user_option_appendpath(self):
1443
self.get_branch_config('http://www.example.com')
1444
self.my_config.set_user_option('foo', 'bar',
1445
store=config.STORE_LOCATION_APPENDPATH)
1447
self.my_location_config._get_option_policy(
1448
'http://www.example.com', 'foo'),
1449
config.POLICY_APPENDPATH)
1451
def test_set_user_option_change_policy(self):
1452
self.get_branch_config('http://www.example.com')
1453
self.my_config.set_user_option('norecurse_option', 'normal',
1454
store=config.STORE_LOCATION)
1456
self.my_location_config._get_option_policy(
1457
'http://www.example.com', 'norecurse_option'),
1460
def get_branch_config(self, location, global_config=None,
1461
location_config=None):
1462
my_branch = FakeBranch(location)
1463
if global_config is None:
1464
global_config = sample_config_text
1465
if location_config is None:
1466
location_config = sample_branches_text
1468
config.GlobalConfig.from_string(global_config, save=True)
1469
config.LocationConfig.from_string(location_config, my_branch.base,
1471
my_config = config.BranchConfig(my_branch)
1472
self.my_config = my_config
1473
self.my_location_config = my_config._get_location_config()
1475
def test_set_user_setting_sets_and_saves2(self):
1476
self.get_branch_config('/a/c')
1477
self.assertIs(self.my_config.get_user_option('foo'), None)
1478
self.my_config.set_user_option('foo', 'bar')
1480
self.my_config.branch.control_files.files['branch.conf'].strip(),
1482
self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
1483
self.my_config.set_user_option('foo', 'baz',
1484
store=config.STORE_LOCATION)
1485
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1486
self.my_config.set_user_option('foo', 'qux')
1487
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1489
def test_get_bzr_remote_path(self):
1490
my_config = config.LocationConfig('/a/c')
1491
self.assertEqual('bzr', my_config.get_bzr_remote_path())
1492
my_config.set_user_option('bzr_remote_path', '/path-bzr')
1493
self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
1494
self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
1495
self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
1498
precedence_global = b'option = global'
1499
precedence_branch = b'option = branch'
1500
precedence_location = b"""
1504
[http://example.com/specific]
1508
class TestBranchConfigItems(tests.TestCaseInTempDir):
1510
def get_branch_config(self, global_config=None, location=None,
1511
location_config=None, branch_data_config=None):
1512
my_branch = FakeBranch(location)
1513
if global_config is not None:
1514
config.GlobalConfig.from_string(global_config, save=True)
1515
if location_config is not None:
1516
config.LocationConfig.from_string(location_config, my_branch.base,
1518
my_config = config.BranchConfig(my_branch)
1519
if branch_data_config is not None:
1520
my_config.branch.control_files.files['branch.conf'] = \
1524
def test_user_id(self):
1525
branch = FakeBranch()
1526
my_config = config.BranchConfig(branch)
1527
self.assertIsNot(None, my_config.username())
1528
my_config.branch.control_files.files['email'] = "John"
1529
my_config.set_user_option('email',
1530
"Robert Collins <robertc@example.org>")
1531
self.assertEqual("Robert Collins <robertc@example.org>",
1532
my_config.username())
1534
def test_BRZ_EMAIL_OVERRIDES(self):
1535
self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
1536
branch = FakeBranch()
1537
my_config = config.BranchConfig(branch)
1538
self.assertEqual("Robert Collins <robertc@example.org>",
1539
my_config.username())
1541
def test_get_user_option_global(self):
1542
my_config = self.get_branch_config(global_config=sample_config_text)
1543
self.assertEqual('something',
1544
my_config.get_user_option('user_global_option'))
1546
def test_config_precedence(self):
1547
# FIXME: eager test, luckily no persitent config file makes it fail
1549
my_config = self.get_branch_config(global_config=precedence_global)
1550
self.assertEqual(my_config.get_user_option('option'), 'global')
1551
my_config = self.get_branch_config(global_config=precedence_global,
1552
branch_data_config=precedence_branch)
1553
self.assertEqual(my_config.get_user_option('option'), 'branch')
1554
my_config = self.get_branch_config(
1555
global_config=precedence_global,
1556
branch_data_config=precedence_branch,
1557
location_config=precedence_location)
1558
self.assertEqual(my_config.get_user_option('option'), 'recurse')
1559
my_config = self.get_branch_config(
1560
global_config=precedence_global,
1561
branch_data_config=precedence_branch,
1562
location_config=precedence_location,
1563
location='http://example.com/specific')
1564
self.assertEqual(my_config.get_user_option('option'), 'exact')
1567
class TestMailAddressExtraction(tests.TestCase):
1569
def test_extract_email_address(self):
1570
self.assertEqual('jane@test.com',
1571
config.extract_email_address('Jane <jane@test.com>'))
1572
self.assertRaises(config.NoEmailInUsername,
1573
config.extract_email_address, 'Jane Tester')
1575
def test_parse_username(self):
1576
self.assertEqual(('', 'jdoe@example.com'),
1577
config.parse_username('jdoe@example.com'))
1578
self.assertEqual(('', 'jdoe@example.com'),
1579
config.parse_username('<jdoe@example.com>'))
1580
self.assertEqual(('John Doe', 'jdoe@example.com'),
1581
config.parse_username('John Doe <jdoe@example.com>'))
1582
self.assertEqual(('John Doe', ''),
1583
config.parse_username('John Doe'))
1584
self.assertEqual(('John Doe', 'jdoe@example.com'),
1585
config.parse_username('John Doe jdoe@example.com'))
1587
class TestTreeConfig(tests.TestCaseWithTransport):
1589
def test_get_value(self):
1590
"""Test that retreiving a value from a section is possible"""
1591
branch = self.make_branch('.')
1592
tree_config = config.TreeConfig(branch)
1593
tree_config.set_option('value', 'key', 'SECTION')
1594
tree_config.set_option('value2', 'key2')
1595
tree_config.set_option('value3-top', 'key3')
1596
tree_config.set_option('value3-section', 'key3', 'SECTION')
1597
value = tree_config.get_option('key', 'SECTION')
1598
self.assertEqual(value, 'value')
1599
value = tree_config.get_option('key2')
1600
self.assertEqual(value, 'value2')
1601
self.assertEqual(tree_config.get_option('non-existant'), None)
1602
value = tree_config.get_option('non-existant', 'SECTION')
1603
self.assertEqual(value, None)
1604
value = tree_config.get_option('non-existant', default='default')
1605
self.assertEqual(value, 'default')
1606
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1607
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1608
self.assertEqual(value, 'default')
1609
value = tree_config.get_option('key3')
1610
self.assertEqual(value, 'value3-top')
1611
value = tree_config.get_option('key3', 'SECTION')
1612
self.assertEqual(value, 'value3-section')
1615
class TestTransportConfig(tests.TestCaseWithTransport):
1617
def test_load_utf8(self):
1618
"""Ensure we can load an utf8-encoded file."""
1619
t = self.get_transport()
1620
unicode_user = u'b\N{Euro Sign}ar'
1621
unicode_content = u'user=%s' % (unicode_user,)
1622
utf8_content = unicode_content.encode('utf8')
1623
# Store the raw content in the config file
1624
t.put_bytes('foo.conf', utf8_content)
1625
conf = config.TransportConfig(t, 'foo.conf')
1626
self.assertEqual(unicode_user, conf.get_option('user'))
1628
def test_load_non_ascii(self):
1629
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
1630
t = self.get_transport()
1631
t.put_bytes('foo.conf', b'user=foo\n#\xff\n')
1632
conf = config.TransportConfig(t, 'foo.conf')
1633
self.assertRaises(config.ConfigContentError, conf._get_configobj)
1635
def test_load_erroneous_content(self):
1636
"""Ensure we display a proper error on content that can't be parsed."""
1637
t = self.get_transport()
1638
t.put_bytes('foo.conf', b'[open_section\n')
1639
conf = config.TransportConfig(t, 'foo.conf')
1640
self.assertRaises(config.ParseConfigError, conf._get_configobj)
1642
def test_load_permission_denied(self):
1643
"""Ensure we get an empty config file if the file is inaccessible."""
1646
warnings.append(args[0] % args[1:])
1647
self.overrideAttr(trace, 'warning', warning)
1649
class DenyingTransport(object):
1651
def __init__(self, base):
1654
def get_bytes(self, relpath):
1655
raise errors.PermissionDenied(relpath, "")
1657
cfg = config.TransportConfig(
1658
DenyingTransport("nonexisting://"), 'control.conf')
1659
self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
1662
[u'Permission denied while trying to open configuration file '
1663
u'nonexisting:///control.conf.'])
1665
def test_get_value(self):
1666
"""Test that retreiving a value from a section is possible"""
1667
bzrdir_config = config.TransportConfig(self.get_transport('.'),
1669
bzrdir_config.set_option('value', 'key', 'SECTION')
1670
bzrdir_config.set_option('value2', 'key2')
1671
bzrdir_config.set_option('value3-top', 'key3')
1672
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
1673
value = bzrdir_config.get_option('key', 'SECTION')
1674
self.assertEqual(value, 'value')
1675
value = bzrdir_config.get_option('key2')
1676
self.assertEqual(value, 'value2')
1677
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
1678
value = bzrdir_config.get_option('non-existant', 'SECTION')
1679
self.assertEqual(value, None)
1680
value = bzrdir_config.get_option('non-existant', default='default')
1681
self.assertEqual(value, 'default')
1682
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
1683
value = bzrdir_config.get_option('key2', 'NOSECTION',
1685
self.assertEqual(value, 'default')
1686
value = bzrdir_config.get_option('key3')
1687
self.assertEqual(value, 'value3-top')
1688
value = bzrdir_config.get_option('key3', 'SECTION')
1689
self.assertEqual(value, 'value3-section')
1691
def test_set_unset_default_stack_on(self):
1692
my_dir = self.make_controldir('.')
1693
bzrdir_config = config.BzrDirConfig(my_dir)
1694
self.assertIs(None, bzrdir_config.get_default_stack_on())
1695
bzrdir_config.set_default_stack_on('Foo')
1696
self.assertEqual('Foo', bzrdir_config._config.get_option(
1697
'default_stack_on'))
1698
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
1699
bzrdir_config.set_default_stack_on(None)
1700
self.assertIs(None, bzrdir_config.get_default_stack_on())
1703
class TestOldConfigHooks(tests.TestCaseWithTransport):
1706
super(TestOldConfigHooks, self).setUp()
1707
create_configs_with_file_option(self)
1709
def assertGetHook(self, conf, name, value):
1713
config.OldConfigHooks.install_named_hook('get', hook, None)
1715
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1716
self.assertLength(0, calls)
1717
actual_value = conf.get_user_option(name)
1718
self.assertEqual(value, actual_value)
1719
self.assertLength(1, calls)
1720
self.assertEqual((conf, name, value), calls[0])
1722
def test_get_hook_breezy(self):
1723
self.assertGetHook(self.breezy_config, 'file', 'breezy')
1725
def test_get_hook_locations(self):
1726
self.assertGetHook(self.locations_config, 'file', 'locations')
1728
def test_get_hook_branch(self):
1729
# Since locations masks branch, we define a different option
1730
self.branch_config.set_user_option('file2', 'branch')
1731
self.assertGetHook(self.branch_config, 'file2', 'branch')
1733
def assertSetHook(self, conf, name, value):
1737
config.OldConfigHooks.install_named_hook('set', hook, None)
1739
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1740
self.assertLength(0, calls)
1741
conf.set_user_option(name, value)
1742
self.assertLength(1, calls)
1743
# We can't assert the conf object below as different configs use
1744
# different means to implement set_user_option and we care only about
1746
self.assertEqual((name, value), calls[0][1:])
1748
def test_set_hook_breezy(self):
1749
self.assertSetHook(self.breezy_config, 'foo', 'breezy')
1751
def test_set_hook_locations(self):
1752
self.assertSetHook(self.locations_config, 'foo', 'locations')
1754
def test_set_hook_branch(self):
1755
self.assertSetHook(self.branch_config, 'foo', 'branch')
1757
def assertRemoveHook(self, conf, name, section_name=None):
1761
config.OldConfigHooks.install_named_hook('remove', hook, None)
1763
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1764
self.assertLength(0, calls)
1765
conf.remove_user_option(name, section_name)
1766
self.assertLength(1, calls)
1767
# We can't assert the conf object below as different configs use
1768
# different means to implement remove_user_option and we care only about
1770
self.assertEqual((name,), calls[0][1:])
1772
def test_remove_hook_breezy(self):
1773
self.assertRemoveHook(self.breezy_config, 'file')
1775
def test_remove_hook_locations(self):
1776
self.assertRemoveHook(self.locations_config, 'file',
1777
self.locations_config.location)
1779
def test_remove_hook_branch(self):
1780
self.assertRemoveHook(self.branch_config, 'file')
1782
def assertLoadHook(self, name, conf_class, *conf_args):
1786
config.OldConfigHooks.install_named_hook('load', hook, None)
1788
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1789
self.assertLength(0, calls)
1791
conf = conf_class(*conf_args)
1792
# Access an option to trigger a load
1793
conf.get_user_option(name)
1794
self.assertLength(1, calls)
1795
# Since we can't assert about conf, we just use the number of calls ;-/
1797
def test_load_hook_breezy(self):
1798
self.assertLoadHook('file', config.GlobalConfig)
1800
def test_load_hook_locations(self):
1801
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1803
def test_load_hook_branch(self):
1804
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1806
def assertSaveHook(self, conf):
1810
config.OldConfigHooks.install_named_hook('save', hook, None)
1812
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1813
self.assertLength(0, calls)
1814
# Setting an option triggers a save
1815
conf.set_user_option('foo', 'bar')
1816
self.assertLength(1, calls)
1817
# Since we can't assert about conf, we just use the number of calls ;-/
1819
def test_save_hook_breezy(self):
1820
self.assertSaveHook(self.breezy_config)
1822
def test_save_hook_locations(self):
1823
self.assertSaveHook(self.locations_config)
1825
def test_save_hook_branch(self):
1826
self.assertSaveHook(self.branch_config)
1829
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1830
"""Tests config hooks for remote configs.
1832
No tests for the remove hook as this is not implemented there.
1836
super(TestOldConfigHooksForRemote, self).setUp()
1837
self.transport_server = test_server.SmartTCPServer_for_testing
1838
create_configs_with_file_option(self)
1840
def assertGetHook(self, conf, name, value):
1844
config.OldConfigHooks.install_named_hook('get', hook, None)
1846
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1847
self.assertLength(0, calls)
1848
actual_value = conf.get_option(name)
1849
self.assertEqual(value, actual_value)
1850
self.assertLength(1, calls)
1851
self.assertEqual((conf, name, value), calls[0])
1853
def test_get_hook_remote_branch(self):
1854
remote_branch = branch.Branch.open(self.get_url('tree'))
1855
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1857
def test_get_hook_remote_bzrdir(self):
1858
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1859
conf = remote_bzrdir._get_config()
1860
conf.set_option('remotedir', 'file')
1861
self.assertGetHook(conf, 'file', 'remotedir')
1863
def assertSetHook(self, conf, name, value):
1867
config.OldConfigHooks.install_named_hook('set', hook, None)
1869
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1870
self.assertLength(0, calls)
1871
conf.set_option(value, name)
1872
self.assertLength(1, calls)
1873
# We can't assert the conf object below as different configs use
1874
# different means to implement set_user_option and we care only about
1876
self.assertEqual((name, value), calls[0][1:])
1878
def test_set_hook_remote_branch(self):
1879
remote_branch = branch.Branch.open(self.get_url('tree'))
1880
self.addCleanup(remote_branch.lock_write().unlock)
1881
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1883
def test_set_hook_remote_bzrdir(self):
1884
remote_branch = branch.Branch.open(self.get_url('tree'))
1885
self.addCleanup(remote_branch.lock_write().unlock)
1886
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1887
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1889
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1893
config.OldConfigHooks.install_named_hook('load', hook, None)
1895
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1896
self.assertLength(0, calls)
1898
conf = conf_class(*conf_args)
1899
# Access an option to trigger a load
1900
conf.get_option(name)
1901
self.assertLength(expected_nb_calls, calls)
1902
# Since we can't assert about conf, we just use the number of calls ;-/
1904
def test_load_hook_remote_branch(self):
1905
remote_branch = branch.Branch.open(self.get_url('tree'))
1906
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
1908
def test_load_hook_remote_bzrdir(self):
1909
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1910
# The config file doesn't exist, set an option to force its creation
1911
conf = remote_bzrdir._get_config()
1912
conf.set_option('remotedir', 'file')
1913
# We get one call for the server and one call for the client, this is
1914
# caused by the differences in implementations betwen
1915
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1916
# SmartServerBranchGetConfigFile (in smart/branch.py)
1917
self.assertLoadHook(2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1919
def assertSaveHook(self, conf):
1923
config.OldConfigHooks.install_named_hook('save', hook, None)
1925
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1926
self.assertLength(0, calls)
1927
# Setting an option triggers a save
1928
conf.set_option('foo', 'bar')
1929
self.assertLength(1, calls)
1930
# Since we can't assert about conf, we just use the number of calls ;-/
1932
def test_save_hook_remote_branch(self):
1933
remote_branch = branch.Branch.open(self.get_url('tree'))
1934
self.addCleanup(remote_branch.lock_write().unlock)
1935
self.assertSaveHook(remote_branch._get_config())
1937
def test_save_hook_remote_bzrdir(self):
1938
remote_branch = branch.Branch.open(self.get_url('tree'))
1939
self.addCleanup(remote_branch.lock_write().unlock)
1940
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1941
self.assertSaveHook(remote_bzrdir._get_config())
1944
class TestOptionNames(tests.TestCase):
1946
def is_valid(self, name):
1947
return config._option_ref_re.match('{%s}' % name) is not None
1949
def test_valid_names(self):
1950
self.assertTrue(self.is_valid('foo'))
1951
self.assertTrue(self.is_valid('foo.bar'))
1952
self.assertTrue(self.is_valid('f1'))
1953
self.assertTrue(self.is_valid('_'))
1954
self.assertTrue(self.is_valid('__bar__'))
1955
self.assertTrue(self.is_valid('a_'))
1956
self.assertTrue(self.is_valid('a1'))
1957
# Don't break bzr-svn for no good reason
1958
self.assertTrue(self.is_valid('guessed-layout'))
1960
def test_invalid_names(self):
1961
self.assertFalse(self.is_valid(' foo'))
1962
self.assertFalse(self.is_valid('foo '))
1963
self.assertFalse(self.is_valid('1'))
1964
self.assertFalse(self.is_valid('1,2'))
1965
self.assertFalse(self.is_valid('foo$'))
1966
self.assertFalse(self.is_valid('!foo'))
1967
self.assertFalse(self.is_valid('foo.'))
1968
self.assertFalse(self.is_valid('foo..bar'))
1969
self.assertFalse(self.is_valid('{}'))
1970
self.assertFalse(self.is_valid('{a}'))
1971
self.assertFalse(self.is_valid('a\n'))
1972
self.assertFalse(self.is_valid('-'))
1973
self.assertFalse(self.is_valid('-a'))
1974
self.assertFalse(self.is_valid('a-'))
1975
self.assertFalse(self.is_valid('a--a'))
1977
def assertSingleGroup(self, reference):
1978
# the regexp is used with split and as such should match the reference
1979
# *only*, if more groups needs to be defined, (?:...) should be used.
1980
m = config._option_ref_re.match('{a}')
1981
self.assertLength(1, m.groups())
1983
def test_valid_references(self):
1984
self.assertSingleGroup('{a}')
1985
self.assertSingleGroup('{{a}}')
1988
class TestOption(tests.TestCase):
1990
def test_default_value(self):
1991
opt = config.Option('foo', default='bar')
1992
self.assertEqual('bar', opt.get_default())
1994
def test_callable_default_value(self):
1995
def bar_as_unicode():
1997
opt = config.Option('foo', default=bar_as_unicode)
1998
self.assertEqual('bar', opt.get_default())
2000
def test_default_value_from_env(self):
2001
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
2002
self.overrideEnv('FOO', 'quux')
2003
# Env variable provides a default taking over the option one
2004
self.assertEqual('quux', opt.get_default())
2006
def test_first_default_value_from_env_wins(self):
2007
opt = config.Option('foo', default='bar',
2008
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
2009
self.overrideEnv('FOO', 'foo')
2010
self.overrideEnv('BAZ', 'baz')
2011
# The first env var set wins
2012
self.assertEqual('foo', opt.get_default())
2014
def test_not_supported_list_default_value(self):
2015
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
2017
def test_not_supported_object_default_value(self):
2018
self.assertRaises(AssertionError, config.Option, 'foo',
2021
def test_not_supported_callable_default_value_not_unicode(self):
2022
def bar_not_unicode():
2024
opt = config.Option('foo', default=bar_not_unicode)
2025
self.assertRaises(AssertionError, opt.get_default)
2027
def test_get_help_topic(self):
2028
opt = config.Option('foo')
2029
self.assertEqual('foo', opt.get_help_topic())
2032
class TestOptionConverter(tests.TestCase):
2034
def assertConverted(self, expected, opt, value):
2035
self.assertEqual(expected, opt.convert_from_unicode(None, value))
2037
def assertCallsWarning(self, opt, value):
2041
warnings.append(args[0] % args[1:])
2042
self.overrideAttr(trace, 'warning', warning)
2043
self.assertEqual(None, opt.convert_from_unicode(None, value))
2044
self.assertLength(1, warnings)
2046
'Value "%s" is not valid for "%s"' % (value, opt.name),
2049
def assertCallsError(self, opt, value):
2050
self.assertRaises(config.ConfigOptionValueError,
2051
opt.convert_from_unicode, None, value)
2053
def assertConvertInvalid(self, opt, invalid_value):
2055
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2056
opt.invalid = 'warning'
2057
self.assertCallsWarning(opt, invalid_value)
2058
opt.invalid = 'error'
2059
self.assertCallsError(opt, invalid_value)
2062
class TestOptionWithBooleanConverter(TestOptionConverter):
2064
def get_option(self):
2065
return config.Option('foo', help='A boolean.',
2066
from_unicode=config.bool_from_store)
2068
def test_convert_invalid(self):
2069
opt = self.get_option()
2070
# A string that is not recognized as a boolean
2071
self.assertConvertInvalid(opt, u'invalid-boolean')
2072
# A list of strings is never recognized as a boolean
2073
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2075
def test_convert_valid(self):
2076
opt = self.get_option()
2077
self.assertConverted(True, opt, u'True')
2078
self.assertConverted(True, opt, u'1')
2079
self.assertConverted(False, opt, u'False')
2082
class TestOptionWithIntegerConverter(TestOptionConverter):
2084
def get_option(self):
2085
return config.Option('foo', help='An integer.',
2086
from_unicode=config.int_from_store)
2088
def test_convert_invalid(self):
2089
opt = self.get_option()
2090
# A string that is not recognized as an integer
2091
self.assertConvertInvalid(opt, u'forty-two')
2092
# A list of strings is never recognized as an integer
2093
self.assertConvertInvalid(opt, [u'a', u'list'])
2095
def test_convert_valid(self):
2096
opt = self.get_option()
2097
self.assertConverted(16, opt, u'16')
2100
class TestOptionWithSIUnitConverter(TestOptionConverter):
2102
def get_option(self):
2103
return config.Option('foo', help='An integer in SI units.',
2104
from_unicode=config.int_SI_from_store)
2106
def test_convert_invalid(self):
2107
opt = self.get_option()
2108
self.assertConvertInvalid(opt, u'not-a-unit')
2109
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2110
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2111
self.assertConvertInvalid(opt, u'1GG')
2112
self.assertConvertInvalid(opt, u'1Mbb')
2113
self.assertConvertInvalid(opt, u'1MM')
2115
def test_convert_valid(self):
2116
opt = self.get_option()
2117
self.assertConverted(int(5e3), opt, u'5kb')
2118
self.assertConverted(int(5e6), opt, u'5M')
2119
self.assertConverted(int(5e6), opt, u'5MB')
2120
self.assertConverted(int(5e9), opt, u'5g')
2121
self.assertConverted(int(5e9), opt, u'5gB')
2122
self.assertConverted(100, opt, u'100')
2125
class TestListOption(TestOptionConverter):
2127
def get_option(self):
2128
return config.ListOption('foo', help='A list.')
2130
def test_convert_invalid(self):
2131
opt = self.get_option()
2132
# We don't even try to convert a list into a list, we only expect
2134
self.assertConvertInvalid(opt, [1])
2135
# No string is invalid as all forms can be converted to a list
2137
def test_convert_valid(self):
2138
opt = self.get_option()
2139
# An empty string is an empty list
2140
self.assertConverted([], opt, '') # Using a bare str() just in case
2141
self.assertConverted([], opt, u'')
2143
self.assertConverted([u'True'], opt, u'True')
2145
self.assertConverted([u'42'], opt, u'42')
2147
self.assertConverted([u'bar'], opt, u'bar')
2150
class TestRegistryOption(TestOptionConverter):
2152
def get_option(self, registry):
2153
return config.RegistryOption('foo', registry,
2154
help='A registry option.')
2156
def test_convert_invalid(self):
2157
registry = _mod_registry.Registry()
2158
opt = self.get_option(registry)
2159
self.assertConvertInvalid(opt, [1])
2160
self.assertConvertInvalid(opt, u"notregistered")
2162
def test_convert_valid(self):
2163
registry = _mod_registry.Registry()
2164
registry.register("someval", 1234)
2165
opt = self.get_option(registry)
2166
# Using a bare str() just in case
2167
self.assertConverted(1234, opt, "someval")
2168
self.assertConverted(1234, opt, u'someval')
2169
self.assertConverted(None, opt, None)
2171
def test_help(self):
2172
registry = _mod_registry.Registry()
2173
registry.register("someval", 1234, help="some option")
2174
registry.register("dunno", 1234, help="some other option")
2175
opt = self.get_option(registry)
2177
'A registry option.\n'
2179
'The following values are supported:\n'
2180
' dunno - some other option\n'
2181
' someval - some option\n',
2184
def test_get_help_text(self):
2185
registry = _mod_registry.Registry()
2186
registry.register("someval", 1234, help="some option")
2187
registry.register("dunno", 1234, help="some other option")
2188
opt = self.get_option(registry)
2190
'A registry option.\n'
2192
'The following values are supported:\n'
2193
' dunno - some other option\n'
2194
' someval - some option\n',
2195
opt.get_help_text())
2198
class TestOptionRegistry(tests.TestCase):
2201
super(TestOptionRegistry, self).setUp()
2202
# Always start with an empty registry
2203
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2204
self.registry = config.option_registry
2206
def test_register(self):
2207
opt = config.Option('foo')
2208
self.registry.register(opt)
2209
self.assertIs(opt, self.registry.get('foo'))
2211
def test_registered_help(self):
2212
opt = config.Option('foo', help='A simple option')
2213
self.registry.register(opt)
2214
self.assertEqual('A simple option', self.registry.get_help('foo'))
2216
def test_dont_register_illegal_name(self):
2217
self.assertRaises(config.IllegalOptionName,
2218
self.registry.register, config.Option(' foo'))
2219
self.assertRaises(config.IllegalOptionName,
2220
self.registry.register, config.Option('bar,'))
2222
lazy_option = config.Option('lazy_foo', help='Lazy help')
2224
def test_register_lazy(self):
2225
self.registry.register_lazy('lazy_foo', self.__module__,
2226
'TestOptionRegistry.lazy_option')
2227
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2229
def test_registered_lazy_help(self):
2230
self.registry.register_lazy('lazy_foo', self.__module__,
2231
'TestOptionRegistry.lazy_option')
2232
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2234
def test_dont_lazy_register_illegal_name(self):
2235
# This is where the root cause of http://pad.lv/1235099 is better
2236
# understood: 'register_lazy' doc string mentions that key should match
2237
# the option name which indirectly requires that the option name is a
2238
# valid python identifier. We violate that rule here (using a key that
2239
# doesn't match the option name) to test the option name checking.
2240
self.assertRaises(config.IllegalOptionName,
2241
self.registry.register_lazy, ' foo', self.__module__,
2242
'TestOptionRegistry.lazy_option')
2243
self.assertRaises(config.IllegalOptionName,
2244
self.registry.register_lazy, '1,2', self.__module__,
2245
'TestOptionRegistry.lazy_option')
2248
class TestRegisteredOptions(tests.TestCase):
2249
"""All registered options should verify some constraints."""
2251
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2252
in config.option_registry.iteritems()]
2255
super(TestRegisteredOptions, self).setUp()
2256
self.registry = config.option_registry
2258
def test_proper_name(self):
2259
# An option should be registered under its own name, this can't be
2260
# checked at registration time for the lazy ones.
2261
self.assertEqual(self.option_name, self.option.name)
2263
def test_help_is_set(self):
2264
option_help = self.registry.get_help(self.option_name)
2265
# Come on, think about the user, he really wants to know what the
2267
self.assertIsNot(None, option_help)
2268
self.assertNotEqual('', option_help)
2271
class TestSection(tests.TestCase):
2273
# FIXME: Parametrize so that all sections produced by Stores run these
2274
# tests -- vila 2011-04-01
2276
def test_get_a_value(self):
2277
a_dict = dict(foo='bar')
2278
section = config.Section('myID', a_dict)
2279
self.assertEqual('bar', section.get('foo'))
2281
def test_get_unknown_option(self):
2283
section = config.Section(None, a_dict)
2284
self.assertEqual('out of thin air',
2285
section.get('foo', 'out of thin air'))
2287
def test_options_is_shared(self):
2289
section = config.Section(None, a_dict)
2290
self.assertIs(a_dict, section.options)
2293
class TestMutableSection(tests.TestCase):
2295
scenarios = [('mutable',
2297
lambda opts: config.MutableSection('myID', opts)},),
2301
a_dict = dict(foo='bar')
2302
section = self.get_section(a_dict)
2303
section.set('foo', 'new_value')
2304
self.assertEqual('new_value', section.get('foo'))
2305
# The change appears in the shared section
2306
self.assertEqual('new_value', a_dict.get('foo'))
2307
# We keep track of the change
2308
self.assertTrue('foo' in section.orig)
2309
self.assertEqual('bar', section.orig.get('foo'))
2311
def test_set_preserve_original_once(self):
2312
a_dict = dict(foo='bar')
2313
section = self.get_section(a_dict)
2314
section.set('foo', 'first_value')
2315
section.set('foo', 'second_value')
2316
# We keep track of the original value
2317
self.assertTrue('foo' in section.orig)
2318
self.assertEqual('bar', section.orig.get('foo'))
2320
def test_remove(self):
2321
a_dict = dict(foo='bar')
2322
section = self.get_section(a_dict)
2323
section.remove('foo')
2324
# We get None for unknown options via the default value
2325
self.assertEqual(None, section.get('foo'))
2326
# Or we just get the default value
2327
self.assertEqual('unknown', section.get('foo', 'unknown'))
2328
self.assertFalse('foo' in section.options)
2329
# We keep track of the deletion
2330
self.assertTrue('foo' in section.orig)
2331
self.assertEqual('bar', section.orig.get('foo'))
2333
def test_remove_new_option(self):
2335
section = self.get_section(a_dict)
2336
section.set('foo', 'bar')
2337
section.remove('foo')
2338
self.assertFalse('foo' in section.options)
2339
# The option didn't exist initially so it we need to keep track of it
2340
# with a special value
2341
self.assertTrue('foo' in section.orig)
2342
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2345
class TestCommandLineStore(tests.TestCase):
2348
super(TestCommandLineStore, self).setUp()
2349
self.store = config.CommandLineStore()
2350
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2352
def get_section(self):
2353
"""Get the unique section for the command line overrides."""
2354
sections = list(self.store.get_sections())
2355
self.assertLength(1, sections)
2356
store, section = sections[0]
2357
self.assertEqual(self.store, store)
2360
def test_no_override(self):
2361
self.store._from_cmdline([])
2362
section = self.get_section()
2363
self.assertLength(0, list(section.iter_option_names()))
2365
def test_simple_override(self):
2366
self.store._from_cmdline(['a=b'])
2367
section = self.get_section()
2368
self.assertEqual('b', section.get('a'))
2370
def test_list_override(self):
2371
opt = config.ListOption('l')
2372
config.option_registry.register(opt)
2373
self.store._from_cmdline(['l=1,2,3'])
2374
val = self.get_section().get('l')
2375
self.assertEqual('1,2,3', val)
2376
# Reminder: lists should be registered as such explicitely, otherwise
2377
# the conversion needs to be done afterwards.
2378
self.assertEqual(['1', '2', '3'],
2379
opt.convert_from_unicode(self.store, val))
2381
def test_multiple_overrides(self):
2382
self.store._from_cmdline(['a=b', 'x=y'])
2383
section = self.get_section()
2384
self.assertEqual('b', section.get('a'))
2385
self.assertEqual('y', section.get('x'))
2387
def test_wrong_syntax(self):
2388
self.assertRaises(errors.BzrCommandError,
2389
self.store._from_cmdline, ['a=b', 'c'])
2391
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2393
scenarios = [(key, {'get_store': builder}) for key, builder
2394
in config.test_store_builder_registry.iteritems()] + [
2395
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2398
store = self.get_store(self)
2399
if isinstance(store, config.TransportIniFileStore):
2400
raise tests.TestNotApplicable(
2401
"%s is not a concrete Store implementation"
2402
" so it doesn't need an id" % (store.__class__.__name__,))
2403
self.assertIsNot(None, store.id)
2406
class TestStore(tests.TestCaseWithTransport):
2408
def assertSectionContent(self, expected, store_and_section):
2409
"""Assert that some options have the proper values in a section."""
2410
_, section = store_and_section
2411
expected_name, expected_options = expected
2412
self.assertEqual(expected_name, section.id)
2415
dict([(k, section.get(k)) for k in expected_options.keys()]))
2418
class TestReadonlyStore(TestStore):
2420
scenarios = [(key, {'get_store': builder}) for key, builder
2421
in config.test_store_builder_registry.iteritems()]
2423
def test_building_delays_load(self):
2424
store = self.get_store(self)
2425
self.assertEqual(False, store.is_loaded())
2426
store._load_from_string(b'')
2427
self.assertEqual(True, store.is_loaded())
2429
def test_get_no_sections_for_empty(self):
2430
store = self.get_store(self)
2431
store._load_from_string(b'')
2432
self.assertEqual([], list(store.get_sections()))
2434
def test_get_default_section(self):
2435
store = self.get_store(self)
2436
store._load_from_string(b'foo=bar')
2437
sections = list(store.get_sections())
2438
self.assertLength(1, sections)
2439
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2441
def test_get_named_section(self):
2442
store = self.get_store(self)
2443
store._load_from_string(b'[baz]\nfoo=bar')
2444
sections = list(store.get_sections())
2445
self.assertLength(1, sections)
2446
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2448
def test_load_from_string_fails_for_non_empty_store(self):
2449
store = self.get_store(self)
2450
store._load_from_string(b'foo=bar')
2451
self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
2454
class TestStoreQuoting(TestStore):
2456
scenarios = [(key, {'get_store': builder}) for key, builder
2457
in config.test_store_builder_registry.iteritems()]
2460
super(TestStoreQuoting, self).setUp()
2461
self.store = self.get_store(self)
2462
# We need a loaded store but any content will do
2463
self.store._load_from_string(b'')
2465
def assertIdempotent(self, s):
2466
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2468
What matters here is that option values, as they appear in a store, can
2469
be safely round-tripped out of the store and back.
2471
:param s: A string, quoted if required.
2473
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2474
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2476
def test_empty_string(self):
2477
if isinstance(self.store, config.IniFileStore):
2478
# configobj._quote doesn't handle empty values
2479
self.assertRaises(AssertionError,
2480
self.assertIdempotent, '')
2482
self.assertIdempotent('')
2483
# But quoted empty strings are ok
2484
self.assertIdempotent('""')
2486
def test_embedded_spaces(self):
2487
self.assertIdempotent('" a b c "')
2489
def test_embedded_commas(self):
2490
self.assertIdempotent('" a , b c "')
2492
def test_simple_comma(self):
2493
if isinstance(self.store, config.IniFileStore):
2494
# configobj requires that lists are special-cased
2495
self.assertRaises(AssertionError,
2496
self.assertIdempotent, ',')
2498
self.assertIdempotent(',')
2499
# When a single comma is required, quoting is also required
2500
self.assertIdempotent('","')
2502
def test_list(self):
2503
if isinstance(self.store, config.IniFileStore):
2504
# configobj requires that lists are special-cased
2505
self.assertRaises(AssertionError,
2506
self.assertIdempotent, 'a,b')
2508
self.assertIdempotent('a,b')
2511
class TestDictFromStore(tests.TestCase):
2513
def test_unquote_not_string(self):
2514
conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
2515
value = conf.get('a_section')
2516
# Urgh, despite 'conf' asking for the no-name section, we get the
2517
# content of another section as a dict o_O
2518
self.assertEqual({'a': '1'}, value)
2519
unquoted = conf.store.unquote(value)
2520
# Which cannot be unquoted but shouldn't crash either (the use cases
2521
# are getting the value or displaying it. In the later case, '%s' will
2523
self.assertEqual({'a': '1'}, unquoted)
2524
self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
2527
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2528
"""Simulate loading a config store with content of various encodings.
2530
All files produced by bzr are in utf8 content.
2532
Users may modify them manually and end up with a file that can't be
2533
loaded. We need to issue proper error messages in this case.
2536
invalid_utf8_char = b'\xff'
2538
def test_load_utf8(self):
2539
"""Ensure we can load an utf8-encoded file."""
2540
t = self.get_transport()
2541
# From http://pad.lv/799212
2542
unicode_user = u'b\N{Euro Sign}ar'
2543
unicode_content = u'user=%s' % (unicode_user,)
2544
utf8_content = unicode_content.encode('utf8')
2545
# Store the raw content in the config file
2546
t.put_bytes('foo.conf', utf8_content)
2547
store = config.TransportIniFileStore(t, 'foo.conf')
2549
stack = config.Stack([store.get_sections], store)
2550
self.assertEqual(unicode_user, stack.get('user'))
2552
def test_load_non_ascii(self):
2553
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2554
t = self.get_transport()
2555
t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2556
store = config.TransportIniFileStore(t, 'foo.conf')
2557
self.assertRaises(config.ConfigContentError, store.load)
2559
def test_load_erroneous_content(self):
2560
"""Ensure we display a proper error on content that can't be parsed."""
2561
t = self.get_transport()
2562
t.put_bytes('foo.conf', b'[open_section\n')
2563
store = config.TransportIniFileStore(t, 'foo.conf')
2564
self.assertRaises(config.ParseConfigError, store.load)
2566
def test_load_permission_denied(self):
2567
"""Ensure we get warned when trying to load an inaccessible file."""
2570
warnings.append(args[0] % args[1:])
2571
self.overrideAttr(trace, 'warning', warning)
2573
t = self.get_transport()
2575
def get_bytes(relpath):
2576
raise errors.PermissionDenied(relpath, "")
2577
t.get_bytes = get_bytes
2578
store = config.TransportIniFileStore(t, 'foo.conf')
2579
self.assertRaises(errors.PermissionDenied, store.load)
2582
[u'Permission denied while trying to load configuration store %s.'
2583
% store.external_url()])
2586
class TestIniConfigContent(tests.TestCaseWithTransport):
2587
"""Simulate loading a IniBasedConfig with content of various encodings.
2589
All files produced by bzr are in utf8 content.
2591
Users may modify them manually and end up with a file that can't be
2592
loaded. We need to issue proper error messages in this case.
2595
invalid_utf8_char = b'\xff'
2597
def test_load_utf8(self):
2598
"""Ensure we can load an utf8-encoded file."""
2599
# From http://pad.lv/799212
2600
unicode_user = u'b\N{Euro Sign}ar'
2601
unicode_content = u'user=%s' % (unicode_user,)
2602
utf8_content = unicode_content.encode('utf8')
2603
# Store the raw content in the config file
2604
with open('foo.conf', 'wb') as f:
2605
f.write(utf8_content)
2606
conf = config.IniBasedConfig(file_name='foo.conf')
2607
self.assertEqual(unicode_user, conf.get_user_option('user'))
2609
def test_load_badly_encoded_content(self):
2610
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2611
with open('foo.conf', 'wb') as f:
2612
f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2613
conf = config.IniBasedConfig(file_name='foo.conf')
2614
self.assertRaises(config.ConfigContentError, conf._get_parser)
2616
def test_load_erroneous_content(self):
2617
"""Ensure we display a proper error on content that can't be parsed."""
2618
with open('foo.conf', 'wb') as f:
2619
f.write(b'[open_section\n')
2620
conf = config.IniBasedConfig(file_name='foo.conf')
2621
self.assertRaises(config.ParseConfigError, conf._get_parser)
2624
class TestMutableStore(TestStore):
2626
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2627
in config.test_store_builder_registry.iteritems()]
2630
super(TestMutableStore, self).setUp()
2631
self.transport = self.get_transport()
2633
def has_store(self, store):
2634
store_basename = urlutils.relative_url(self.transport.external_url(),
2635
store.external_url())
2636
return self.transport.has(store_basename)
2638
def test_save_empty_creates_no_file(self):
2639
# FIXME: There should be a better way than relying on the test
2640
# parametrization to identify branch.conf -- vila 2011-0526
2641
if self.store_id in ('branch', 'remote_branch'):
2642
raise tests.TestNotApplicable(
2643
'branch.conf is *always* created when a branch is initialized')
2644
store = self.get_store(self)
2646
self.assertEqual(False, self.has_store(store))
2648
def test_mutable_section_shared(self):
2649
store = self.get_store(self)
2650
store._load_from_string(b'foo=bar\n')
2651
# FIXME: There should be a better way than relying on the test
2652
# parametrization to identify branch.conf -- vila 2011-0526
2653
if self.store_id in ('branch', 'remote_branch'):
2654
# branch stores requires write locked branches
2655
self.addCleanup(store.branch.lock_write().unlock)
2656
section1 = store.get_mutable_section(None)
2657
section2 = store.get_mutable_section(None)
2658
# If we get different sections, different callers won't share the
2660
self.assertIs(section1, section2)
2662
def test_save_emptied_succeeds(self):
2663
store = self.get_store(self)
2664
store._load_from_string(b'foo=bar\n')
2665
# FIXME: There should be a better way than relying on the test
2666
# parametrization to identify branch.conf -- vila 2011-0526
2667
if self.store_id in ('branch', 'remote_branch'):
2668
# branch stores requires write locked branches
2669
self.addCleanup(store.branch.lock_write().unlock)
2670
section = store.get_mutable_section(None)
2671
section.remove('foo')
2673
self.assertEqual(True, self.has_store(store))
2674
modified_store = self.get_store(self)
2675
sections = list(modified_store.get_sections())
2676
self.assertLength(0, sections)
2678
def test_save_with_content_succeeds(self):
2679
# FIXME: There should be a better way than relying on the test
2680
# parametrization to identify branch.conf -- vila 2011-0526
2681
if self.store_id in ('branch', 'remote_branch'):
2682
raise tests.TestNotApplicable(
2683
'branch.conf is *always* created when a branch is initialized')
2684
store = self.get_store(self)
2685
store._load_from_string(b'foo=bar\n')
2686
self.assertEqual(False, self.has_store(store))
2688
self.assertEqual(True, self.has_store(store))
2689
modified_store = self.get_store(self)
2690
sections = list(modified_store.get_sections())
2691
self.assertLength(1, sections)
2692
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2694
def test_set_option_in_empty_store(self):
2695
store = self.get_store(self)
2696
# FIXME: There should be a better way than relying on the test
2697
# parametrization to identify branch.conf -- vila 2011-0526
2698
if self.store_id in ('branch', 'remote_branch'):
2699
# branch stores requires write locked branches
2700
self.addCleanup(store.branch.lock_write().unlock)
2701
section = store.get_mutable_section(None)
2702
section.set('foo', 'bar')
2704
modified_store = self.get_store(self)
2705
sections = list(modified_store.get_sections())
2706
self.assertLength(1, sections)
2707
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2709
def test_set_option_in_default_section(self):
2710
store = self.get_store(self)
2711
store._load_from_string(b'')
2712
# FIXME: There should be a better way than relying on the test
2713
# parametrization to identify branch.conf -- vila 2011-0526
2714
if self.store_id in ('branch', 'remote_branch'):
2715
# branch stores requires write locked branches
2716
self.addCleanup(store.branch.lock_write().unlock)
2717
section = store.get_mutable_section(None)
2718
section.set('foo', 'bar')
2720
modified_store = self.get_store(self)
2721
sections = list(modified_store.get_sections())
2722
self.assertLength(1, sections)
2723
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2725
def test_set_option_in_named_section(self):
2726
store = self.get_store(self)
2727
store._load_from_string(b'')
2728
# FIXME: There should be a better way than relying on the test
2729
# parametrization to identify branch.conf -- vila 2011-0526
2730
if self.store_id in ('branch', 'remote_branch'):
2731
# branch stores requires write locked branches
2732
self.addCleanup(store.branch.lock_write().unlock)
2733
section = store.get_mutable_section('baz')
2734
section.set('foo', 'bar')
2736
modified_store = self.get_store(self)
2737
sections = list(modified_store.get_sections())
2738
self.assertLength(1, sections)
2739
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2741
def test_load_hook(self):
2742
# First, we need to ensure that the store exists
2743
store = self.get_store(self)
2744
# FIXME: There should be a better way than relying on the test
2745
# parametrization to identify branch.conf -- vila 2011-0526
2746
if self.store_id in ('branch', 'remote_branch'):
2747
# branch stores requires write locked branches
2748
self.addCleanup(store.branch.lock_write().unlock)
2749
section = store.get_mutable_section('baz')
2750
section.set('foo', 'bar')
2752
# Now we can try to load it
2753
store = self.get_store(self)
2757
config.ConfigHooks.install_named_hook('load', hook, None)
2758
self.assertLength(0, calls)
2760
self.assertLength(1, calls)
2761
self.assertEqual((store,), calls[0])
2763
def test_save_hook(self):
2767
config.ConfigHooks.install_named_hook('save', hook, None)
2768
self.assertLength(0, calls)
2769
store = self.get_store(self)
2770
# FIXME: There should be a better way than relying on the test
2771
# parametrization to identify branch.conf -- vila 2011-0526
2772
if self.store_id in ('branch', 'remote_branch'):
2773
# branch stores requires write locked branches
2774
self.addCleanup(store.branch.lock_write().unlock)
2775
section = store.get_mutable_section('baz')
2776
section.set('foo', 'bar')
2778
self.assertLength(1, calls)
2779
self.assertEqual((store,), calls[0])
2781
def test_set_mark_dirty(self):
2782
stack = config.MemoryStack(b'')
2783
self.assertLength(0, stack.store.dirty_sections)
2784
stack.set('foo', 'baz')
2785
self.assertLength(1, stack.store.dirty_sections)
2786
self.assertTrue(stack.store._need_saving())
2788
def test_remove_mark_dirty(self):
2789
stack = config.MemoryStack(b'foo=bar')
2790
self.assertLength(0, stack.store.dirty_sections)
2792
self.assertLength(1, stack.store.dirty_sections)
2793
self.assertTrue(stack.store._need_saving())
2796
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2797
"""Tests that config changes are kept in memory and saved on-demand."""
2800
super(TestStoreSaveChanges, self).setUp()
2801
self.transport = self.get_transport()
2802
# Most of the tests involve two stores pointing to the same persistent
2803
# storage to observe the effects of concurrent changes
2804
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2805
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2808
self.warnings.append(args[0] % args[1:])
2809
self.overrideAttr(trace, 'warning', warning)
2811
def has_store(self, store):
2812
store_basename = urlutils.relative_url(self.transport.external_url(),
2813
store.external_url())
2814
return self.transport.has(store_basename)
2816
def get_stack(self, store):
2817
# Any stack will do as long as it uses the right store, just a single
2818
# no-name section is enough
2819
return config.Stack([store.get_sections], store)
2821
def test_no_changes_no_save(self):
2822
s = self.get_stack(self.st1)
2823
s.store.save_changes()
2824
self.assertEqual(False, self.has_store(self.st1))
2826
def test_unrelated_concurrent_update(self):
2827
s1 = self.get_stack(self.st1)
2828
s2 = self.get_stack(self.st2)
2829
s1.set('foo', 'bar')
2830
s2.set('baz', 'quux')
2832
# Changes don't propagate magically
2833
self.assertEqual(None, s1.get('baz'))
2834
s2.store.save_changes()
2835
self.assertEqual('quux', s2.get('baz'))
2836
# Changes are acquired when saving
2837
self.assertEqual('bar', s2.get('foo'))
2838
# Since there is no overlap, no warnings are emitted
2839
self.assertLength(0, self.warnings)
2841
def test_concurrent_update_modified(self):
2842
s1 = self.get_stack(self.st1)
2843
s2 = self.get_stack(self.st2)
2844
s1.set('foo', 'bar')
2845
s2.set('foo', 'baz')
2848
s2.store.save_changes()
2849
self.assertEqual('baz', s2.get('foo'))
2850
# But the user get a warning
2851
self.assertLength(1, self.warnings)
2852
warning = self.warnings[0]
2853
self.assertStartsWith(warning, 'Option foo in section None')
2854
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2855
' The baz value will be saved.')
2857
def test_concurrent_deletion(self):
2858
self.st1._load_from_string(b'foo=bar')
2860
s1 = self.get_stack(self.st1)
2861
s2 = self.get_stack(self.st2)
2864
s1.store.save_changes()
2866
self.assertLength(0, self.warnings)
2867
s2.store.save_changes()
2869
self.assertLength(1, self.warnings)
2870
warning = self.warnings[0]
2871
self.assertStartsWith(warning, 'Option foo in section None')
2872
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2873
' The <DELETED> value will be saved.')
2876
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2878
def get_store(self):
2879
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2881
def test_get_quoted_string(self):
2882
store = self.get_store()
2883
store._load_from_string(b'foo= " abc "')
2884
stack = config.Stack([store.get_sections])
2885
self.assertEqual(' abc ', stack.get('foo'))
2887
def test_set_quoted_string(self):
2888
store = self.get_store()
2889
stack = config.Stack([store.get_sections], store)
2890
stack.set('foo', ' a b c ')
2892
self.assertFileEqual(b'foo = " a b c "' + os.linesep.encode('ascii'), 'foo.conf')
2895
class TestTransportIniFileStore(TestStore):
2897
def test_loading_unknown_file_fails(self):
2898
store = config.TransportIniFileStore(self.get_transport(),
2900
self.assertRaises(errors.NoSuchFile, store.load)
2902
def test_invalid_content(self):
2903
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2904
self.assertEqual(False, store.is_loaded())
2905
exc = self.assertRaises(
2906
config.ParseConfigError, store._load_from_string,
2907
b'this is invalid !')
2908
self.assertEndsWith(exc.filename, 'foo.conf')
2909
# And the load failed
2910
self.assertEqual(False, store.is_loaded())
2912
def test_get_embedded_sections(self):
2913
# A more complicated example (which also shows that section names and
2914
# option names share the same name space...)
2915
# FIXME: This should be fixed by forbidding dicts as values ?
2916
# -- vila 2011-04-05
2917
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2918
store._load_from_string(b'''
2922
foo_in_DEFAULT=foo_DEFAULT
2930
sections = list(store.get_sections())
2931
self.assertLength(4, sections)
2932
# The default section has no name.
2933
# List values are provided as strings and need to be explicitly
2934
# converted by specifying from_unicode=list_from_store at option
2936
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2938
self.assertSectionContent(
2939
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2940
self.assertSectionContent(
2941
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2942
# sub sections are provided as embedded dicts.
2943
self.assertSectionContent(
2944
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2948
class TestLockableIniFileStore(TestStore):
2950
def test_create_store_in_created_dir(self):
2951
self.assertPathDoesNotExist('dir')
2952
t = self.get_transport('dir/subdir')
2953
store = config.LockableIniFileStore(t, 'foo.conf')
2954
store.get_mutable_section(None).set('foo', 'bar')
2956
self.assertPathExists('dir/subdir')
2959
class TestConcurrentStoreUpdates(TestStore):
2960
"""Test that Stores properly handle conccurent updates.
2962
New Store implementation may fail some of these tests but until such
2963
implementations exist it's hard to properly filter them from the scenarios
2964
applied here. If you encounter such a case, contact the bzr devs.
2967
scenarios = [(key, {'get_stack': builder}) for key, builder
2968
in config.test_stack_builder_registry.iteritems()]
2971
super(TestConcurrentStoreUpdates, self).setUp()
2972
self.stack = self.get_stack(self)
2973
if not isinstance(self.stack, config._CompatibleStack):
2974
raise tests.TestNotApplicable(
2975
'%s is not meant to be compatible with the old config design'
2977
self.stack.set('one', '1')
2978
self.stack.set('two', '2')
2980
self.stack.store.save()
2982
def test_simple_read_access(self):
2983
self.assertEqual('1', self.stack.get('one'))
2985
def test_simple_write_access(self):
2986
self.stack.set('one', 'one')
2987
self.assertEqual('one', self.stack.get('one'))
2989
def test_listen_to_the_last_speaker(self):
2991
c2 = self.get_stack(self)
2992
c1.set('one', 'ONE')
2993
c2.set('two', 'TWO')
2994
self.assertEqual('ONE', c1.get('one'))
2995
self.assertEqual('TWO', c2.get('two'))
2996
# The second update respect the first one
2997
self.assertEqual('ONE', c2.get('one'))
2999
def test_last_speaker_wins(self):
3000
# If the same config is not shared, the same variable modified twice
3001
# can only see a single result.
3003
c2 = self.get_stack(self)
3006
self.assertEqual('c2', c2.get('one'))
3007
# The first modification is still available until another refresh
3009
self.assertEqual('c1', c1.get('one'))
3010
c1.set('two', 'done')
3011
self.assertEqual('c2', c1.get('one'))
3013
def test_writes_are_serialized(self):
3015
c2 = self.get_stack(self)
3017
# We spawn a thread that will pause *during* the config saving.
3018
before_writing = threading.Event()
3019
after_writing = threading.Event()
3020
writing_done = threading.Event()
3021
c1_save_without_locking_orig = c1.store.save_without_locking
3022
def c1_save_without_locking():
3023
before_writing.set()
3024
c1_save_without_locking_orig()
3025
# The lock is held. We wait for the main thread to decide when to
3027
after_writing.wait()
3028
c1.store.save_without_locking = c1_save_without_locking
3032
t1 = threading.Thread(target=c1_set)
3033
# Collect the thread after the test
3034
self.addCleanup(t1.join)
3035
# Be ready to unblock the thread if the test goes wrong
3036
self.addCleanup(after_writing.set)
3038
before_writing.wait()
3039
self.assertRaises(errors.LockContention,
3040
c2.set, 'one', 'c2')
3041
self.assertEqual('c1', c1.get('one'))
3042
# Let the lock be released
3046
self.assertEqual('c2', c2.get('one'))
3048
def test_read_while_writing(self):
3050
# We spawn a thread that will pause *during* the write
3051
ready_to_write = threading.Event()
3052
do_writing = threading.Event()
3053
writing_done = threading.Event()
3054
# We override the _save implementation so we know the store is locked
3055
c1_save_without_locking_orig = c1.store.save_without_locking
3056
def c1_save_without_locking():
3057
ready_to_write.set()
3058
# The lock is held. We wait for the main thread to decide when to
3061
c1_save_without_locking_orig()
3063
c1.store.save_without_locking = c1_save_without_locking
3066
t1 = threading.Thread(target=c1_set)
3067
# Collect the thread after the test
3068
self.addCleanup(t1.join)
3069
# Be ready to unblock the thread if the test goes wrong
3070
self.addCleanup(do_writing.set)
3072
# Ensure the thread is ready to write
3073
ready_to_write.wait()
3074
self.assertEqual('c1', c1.get('one'))
3075
# If we read during the write, we get the old value
3076
c2 = self.get_stack(self)
3077
self.assertEqual('1', c2.get('one'))
3078
# Let the writing occur and ensure it occurred
3081
# Now we get the updated value
3082
c3 = self.get_stack(self)
3083
self.assertEqual('c1', c3.get('one'))
3085
# FIXME: It may be worth looking into removing the lock dir when it's not
3086
# needed anymore and look at possible fallouts for concurrent lockers. This
3087
# will matter if/when we use config files outside of breezy directories
3088
# (.config/breezy or .bzr) -- vila 20110-04-111
3091
class TestSectionMatcher(TestStore):
3093
scenarios = [('location', {'matcher': config.LocationMatcher}),
3094
('id', {'matcher': config.NameMatcher}),]
3097
super(TestSectionMatcher, self).setUp()
3098
# Any simple store is good enough
3099
self.get_store = config.test_store_builder_registry.get('configobj')
3101
def test_no_matches_for_empty_stores(self):
3102
store = self.get_store(self)
3103
store._load_from_string(b'')
3104
matcher = self.matcher(store, '/bar')
3105
self.assertEqual([], list(matcher.get_sections()))
3107
def test_build_doesnt_load_store(self):
3108
store = self.get_store(self)
3109
self.matcher(store, '/bar')
3110
self.assertFalse(store.is_loaded())
3113
class TestLocationSection(tests.TestCase):
3115
def get_section(self, options, extra_path):
3116
section = config.Section('foo', options)
3117
return config.LocationSection(section, extra_path)
3119
def test_simple_option(self):
3120
section = self.get_section({'foo': 'bar'}, '')
3121
self.assertEqual('bar', section.get('foo'))
3123
def test_option_with_extra_path(self):
3124
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3126
self.assertEqual('bar/baz', section.get('foo'))
3128
def test_invalid_policy(self):
3129
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3131
# invalid policies are ignored
3132
self.assertEqual('bar', section.get('foo'))
3135
class TestLocationMatcher(TestStore):
3138
super(TestLocationMatcher, self).setUp()
3139
# Any simple store is good enough
3140
self.get_store = config.test_store_builder_registry.get('configobj')
3142
def test_unrelated_section_excluded(self):
3143
store = self.get_store(self)
3144
store._load_from_string(b'''
3152
section=/foo/bar/baz
3156
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3158
[section.id for _, section in store.get_sections()])
3159
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3160
sections = [section for _, section in matcher.get_sections()]
3161
self.assertEqual(['/foo/bar', '/foo'],
3162
[section.id for section in sections])
3163
self.assertEqual(['quux', 'bar/quux'],
3164
[section.extra_path for section in sections])
3166
def test_more_specific_sections_first(self):
3167
store = self.get_store(self)
3168
store._load_from_string(b'''
3174
self.assertEqual(['/foo', '/foo/bar'],
3175
[section.id for _, section in store.get_sections()])
3176
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3177
sections = [section for _, section in matcher.get_sections()]
3178
self.assertEqual(['/foo/bar', '/foo'],
3179
[section.id for section in sections])
3180
self.assertEqual(['baz', 'bar/baz'],
3181
[section.extra_path for section in sections])
3183
def test_appendpath_in_no_name_section(self):
3184
# It's a bit weird to allow appendpath in a no-name section, but
3185
# someone may found a use for it
3186
store = self.get_store(self)
3187
store._load_from_string(b'''
3189
foo:policy = appendpath
3191
matcher = config.LocationMatcher(store, 'dir/subdir')
3192
sections = list(matcher.get_sections())
3193
self.assertLength(1, sections)
3194
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3196
def test_file_urls_are_normalized(self):
3197
store = self.get_store(self)
3198
if sys.platform == 'win32':
3199
expected_url = 'file:///C:/dir/subdir'
3200
expected_location = 'C:/dir/subdir'
3202
expected_url = 'file:///dir/subdir'
3203
expected_location = '/dir/subdir'
3204
matcher = config.LocationMatcher(store, expected_url)
3205
self.assertEqual(expected_location, matcher.location)
3207
def test_branch_name_colo(self):
3208
store = self.get_store(self)
3209
store._load_from_string(dedent("""\
3211
push_location=my{branchname}
3212
""").encode('ascii'))
3213
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3214
self.assertEqual('example<', matcher.branch_name)
3215
((_, section),) = matcher.get_sections()
3216
self.assertEqual('example<', section.locals['branchname'])
3218
def test_branch_name_basename(self):
3219
store = self.get_store(self)
3220
store._load_from_string(dedent("""\
3222
push_location=my{branchname}
3223
""").encode('ascii'))
3224
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3225
self.assertEqual('example<', matcher.branch_name)
3226
((_, section),) = matcher.get_sections()
3227
self.assertEqual('example<', section.locals['branchname'])
3230
class TestStartingPathMatcher(TestStore):
3233
super(TestStartingPathMatcher, self).setUp()
3234
# Any simple store is good enough
3235
self.store = config.IniFileStore()
3237
def assertSectionIDs(self, expected, location, content):
3238
self.store._load_from_string(content)
3239
matcher = config.StartingPathMatcher(self.store, location)
3240
sections = list(matcher.get_sections())
3241
self.assertLength(len(expected), sections)
3242
self.assertEqual(expected, [section.id for _, section in sections])
3245
def test_empty(self):
3246
self.assertSectionIDs([], self.get_url(), b'')
3248
def test_url_vs_local_paths(self):
3249
# The matcher location is an url and the section names are local paths
3250
self.assertSectionIDs(['/foo/bar', '/foo'],
3251
'file:///foo/bar/baz', b'''\
3256
def test_local_path_vs_url(self):
3257
# The matcher location is a local path and the section names are urls
3258
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3259
'/foo/bar/baz', b'''\
3265
def test_no_name_section_included_when_present(self):
3266
# Note that other tests will cover the case where the no-name section
3267
# is empty and as such, not included.
3268
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3269
'/foo/bar/baz', b'''\
3270
option = defined so the no-name section exists
3274
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3275
[s.locals['relpath'] for _, s in sections])
3277
def test_order_reversed(self):
3278
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3283
def test_unrelated_section_excluded(self):
3284
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3290
def test_glob_included(self):
3291
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3292
'/foo/bar/baz', b'''\
3298
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3299
# nothing really is... as far using {relpath} to append it to something
3300
# else, this seems good enough though.
3301
self.assertEqual(['', 'baz', 'bar/baz'],
3302
[s.locals['relpath'] for _, s in sections])
3304
def test_respect_order(self):
3305
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3306
'/foo/bar/baz', b'''\
3314
class TestNameMatcher(TestStore):
3317
super(TestNameMatcher, self).setUp()
3318
self.matcher = config.NameMatcher
3319
# Any simple store is good enough
3320
self.get_store = config.test_store_builder_registry.get('configobj')
3322
def get_matching_sections(self, name):
3323
store = self.get_store(self)
3324
store._load_from_string(b'''
3332
matcher = self.matcher(store, name)
3333
return list(matcher.get_sections())
3335
def test_matching(self):
3336
sections = self.get_matching_sections('foo')
3337
self.assertLength(1, sections)
3338
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3340
def test_not_matching(self):
3341
sections = self.get_matching_sections('baz')
3342
self.assertLength(0, sections)
3345
class TestBaseStackGet(tests.TestCase):
3348
super(TestBaseStackGet, self).setUp()
3349
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3351
def test_get_first_definition(self):
3352
store1 = config.IniFileStore()
3353
store1._load_from_string(b'foo=bar')
3354
store2 = config.IniFileStore()
3355
store2._load_from_string(b'foo=baz')
3356
conf = config.Stack([store1.get_sections, store2.get_sections])
3357
self.assertEqual('bar', conf.get('foo'))
3359
def test_get_with_registered_default_value(self):
3360
config.option_registry.register(config.Option('foo', default='bar'))
3361
conf_stack = config.Stack([])
3362
self.assertEqual('bar', conf_stack.get('foo'))
3364
def test_get_without_registered_default_value(self):
3365
config.option_registry.register(config.Option('foo'))
3366
conf_stack = config.Stack([])
3367
self.assertEqual(None, conf_stack.get('foo'))
3369
def test_get_without_default_value_for_not_registered(self):
3370
conf_stack = config.Stack([])
3371
self.assertEqual(None, conf_stack.get('foo'))
3373
def test_get_for_empty_section_callable(self):
3374
conf_stack = config.Stack([lambda : []])
3375
self.assertEqual(None, conf_stack.get('foo'))
3377
def test_get_for_broken_callable(self):
3378
# Trying to use and invalid callable raises an exception on first use
3379
conf_stack = config.Stack([object])
3380
self.assertRaises(TypeError, conf_stack.get, 'foo')
3383
class TestStackWithSimpleStore(tests.TestCase):
3386
super(TestStackWithSimpleStore, self).setUp()
3387
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3388
self.registry = config.option_registry
3390
def get_conf(self, content=None):
3391
return config.MemoryStack(content)
3393
def test_override_value_from_env(self):
3394
self.overrideEnv('FOO', None)
3395
self.registry.register(
3396
config.Option('foo', default='bar', override_from_env=['FOO']))
3397
self.overrideEnv('FOO', 'quux')
3398
# Env variable provides a default taking over the option one
3399
conf = self.get_conf(b'foo=store')
3400
self.assertEqual('quux', conf.get('foo'))
3402
def test_first_override_value_from_env_wins(self):
3403
self.overrideEnv('NO_VALUE', None)
3404
self.overrideEnv('FOO', None)
3405
self.overrideEnv('BAZ', None)
3406
self.registry.register(
3407
config.Option('foo', default='bar',
3408
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3409
self.overrideEnv('FOO', 'foo')
3410
self.overrideEnv('BAZ', 'baz')
3411
# The first env var set wins
3412
conf = self.get_conf(b'foo=store')
3413
self.assertEqual('foo', conf.get('foo'))
3416
class TestMemoryStack(tests.TestCase):
3419
conf = config.MemoryStack(b'foo=bar')
3420
self.assertEqual('bar', conf.get('foo'))
3423
conf = config.MemoryStack(b'foo=bar')
3424
conf.set('foo', 'baz')
3425
self.assertEqual('baz', conf.get('foo'))
3427
def test_no_content(self):
3428
conf = config.MemoryStack()
3429
# No content means no loading
3430
self.assertFalse(conf.store.is_loaded())
3431
self.assertRaises(NotImplementedError, conf.get, 'foo')
3432
# But a content can still be provided
3433
conf.store._load_from_string(b'foo=bar')
3434
self.assertEqual('bar', conf.get('foo'))
3437
class TestStackIterSections(tests.TestCase):
3439
def test_empty_stack(self):
3440
conf = config.Stack([])
3441
sections = list(conf.iter_sections())
3442
self.assertLength(0, sections)
3444
def test_empty_store(self):
3445
store = config.IniFileStore()
3446
store._load_from_string(b'')
3447
conf = config.Stack([store.get_sections])
3448
sections = list(conf.iter_sections())
3449
self.assertLength(0, sections)
3451
def test_simple_store(self):
3452
store = config.IniFileStore()
3453
store._load_from_string(b'foo=bar')
3454
conf = config.Stack([store.get_sections])
3455
tuples = list(conf.iter_sections())
3456
self.assertLength(1, tuples)
3457
(found_store, found_section) = tuples[0]
3458
self.assertIs(store, found_store)
3460
def test_two_stores(self):
3461
store1 = config.IniFileStore()
3462
store1._load_from_string(b'foo=bar')
3463
store2 = config.IniFileStore()
3464
store2._load_from_string(b'bar=qux')
3465
conf = config.Stack([store1.get_sections, store2.get_sections])
3466
tuples = list(conf.iter_sections())
3467
self.assertLength(2, tuples)
3468
self.assertIs(store1, tuples[0][0])
3469
self.assertIs(store2, tuples[1][0])
3472
class TestStackWithTransport(tests.TestCaseWithTransport):
3474
scenarios = [(key, {'get_stack': builder}) for key, builder
3475
in config.test_stack_builder_registry.iteritems()]
3478
class TestConcreteStacks(TestStackWithTransport):
3480
def test_build_stack(self):
3481
# Just a smoke test to help debug builders
3482
self.get_stack(self)
3485
class TestStackGet(TestStackWithTransport):
3488
super(TestStackGet, self).setUp()
3489
self.conf = self.get_stack(self)
3491
def test_get_for_empty_stack(self):
3492
self.assertEqual(None, self.conf.get('foo'))
3494
def test_get_hook(self):
3495
self.conf.set('foo', 'bar')
3499
config.ConfigHooks.install_named_hook('get', hook, None)
3500
self.assertLength(0, calls)
3501
value = self.conf.get('foo')
3502
self.assertEqual('bar', value)
3503
self.assertLength(1, calls)
3504
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3507
class TestStackGetWithConverter(tests.TestCase):
3510
super(TestStackGetWithConverter, self).setUp()
3511
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3512
self.registry = config.option_registry
3514
def get_conf(self, content=None):
3515
return config.MemoryStack(content)
3517
def register_bool_option(self, name, default=None, default_from_env=None):
3518
b = config.Option(name, help='A boolean.',
3519
default=default, default_from_env=default_from_env,
3520
from_unicode=config.bool_from_store)
3521
self.registry.register(b)
3523
def test_get_default_bool_None(self):
3524
self.register_bool_option('foo')
3525
conf = self.get_conf(b'')
3526
self.assertEqual(None, conf.get('foo'))
3528
def test_get_default_bool_True(self):
3529
self.register_bool_option('foo', u'True')
3530
conf = self.get_conf(b'')
3531
self.assertEqual(True, conf.get('foo'))
3533
def test_get_default_bool_False(self):
3534
self.register_bool_option('foo', False)
3535
conf = self.get_conf(b'')
3536
self.assertEqual(False, conf.get('foo'))
3538
def test_get_default_bool_False_as_string(self):
3539
self.register_bool_option('foo', u'False')
3540
conf = self.get_conf(b'')
3541
self.assertEqual(False, conf.get('foo'))
3543
def test_get_default_bool_from_env_converted(self):
3544
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3545
self.overrideEnv('FOO', 'False')
3546
conf = self.get_conf(b'')
3547
self.assertEqual(False, conf.get('foo'))
3549
def test_get_default_bool_when_conversion_fails(self):
3550
self.register_bool_option('foo', default='True')
3551
conf = self.get_conf(b'foo=invalid boolean')
3552
self.assertEqual(True, conf.get('foo'))
3554
def register_integer_option(self, name,
3555
default=None, default_from_env=None):
3556
i = config.Option(name, help='An integer.',
3557
default=default, default_from_env=default_from_env,
3558
from_unicode=config.int_from_store)
3559
self.registry.register(i)
3561
def test_get_default_integer_None(self):
3562
self.register_integer_option('foo')
3563
conf = self.get_conf(b'')
3564
self.assertEqual(None, conf.get('foo'))
3566
def test_get_default_integer(self):
3567
self.register_integer_option('foo', 42)
3568
conf = self.get_conf(b'')
3569
self.assertEqual(42, conf.get('foo'))
3571
def test_get_default_integer_as_string(self):
3572
self.register_integer_option('foo', u'42')
3573
conf = self.get_conf(b'')
3574
self.assertEqual(42, conf.get('foo'))
3576
def test_get_default_integer_from_env(self):
3577
self.register_integer_option('foo', default_from_env=['FOO'])
3578
self.overrideEnv('FOO', '18')
3579
conf = self.get_conf(b'')
3580
self.assertEqual(18, conf.get('foo'))
3582
def test_get_default_integer_when_conversion_fails(self):
3583
self.register_integer_option('foo', default='12')
3584
conf = self.get_conf(b'foo=invalid integer')
3585
self.assertEqual(12, conf.get('foo'))
3587
def register_list_option(self, name, default=None, default_from_env=None):
3588
l = config.ListOption(name, help='A list.', default=default,
3589
default_from_env=default_from_env)
3590
self.registry.register(l)
3592
def test_get_default_list_None(self):
3593
self.register_list_option('foo')
3594
conf = self.get_conf(b'')
3595
self.assertEqual(None, conf.get('foo'))
3597
def test_get_default_list_empty(self):
3598
self.register_list_option('foo', '')
3599
conf = self.get_conf(b'')
3600
self.assertEqual([], conf.get('foo'))
3602
def test_get_default_list_from_env(self):
3603
self.register_list_option('foo', default_from_env=['FOO'])
3604
self.overrideEnv('FOO', '')
3605
conf = self.get_conf(b'')
3606
self.assertEqual([], conf.get('foo'))
3608
def test_get_with_list_converter_no_item(self):
3609
self.register_list_option('foo', None)
3610
conf = self.get_conf(b'foo=,')
3611
self.assertEqual([], conf.get('foo'))
3613
def test_get_with_list_converter_many_items(self):
3614
self.register_list_option('foo', None)
3615
conf = self.get_conf(b'foo=m,o,r,e')
3616
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3618
def test_get_with_list_converter_embedded_spaces_many_items(self):
3619
self.register_list_option('foo', None)
3620
conf = self.get_conf(b'foo=" bar", "baz "')
3621
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3623
def test_get_with_list_converter_stripped_spaces_many_items(self):
3624
self.register_list_option('foo', None)
3625
conf = self.get_conf(b'foo= bar , baz ')
3626
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3629
class TestIterOptionRefs(tests.TestCase):
3630
"""iter_option_refs is a bit unusual, document some cases."""
3632
def assertRefs(self, expected, string):
3633
self.assertEqual(expected, list(config.iter_option_refs(string)))
3635
def test_empty(self):
3636
self.assertRefs([(False, '')], '')
3638
def test_no_refs(self):
3639
self.assertRefs([(False, 'foo bar')], 'foo bar')
3641
def test_single_ref(self):
3642
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3644
def test_broken_ref(self):
3645
self.assertRefs([(False, '{foo')], '{foo')
3647
def test_embedded_ref(self):
3648
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3651
def test_two_refs(self):
3652
self.assertRefs([(False, ''), (True, '{foo}'),
3653
(False, ''), (True, '{bar}'),
3657
def test_newline_in_refs_are_not_matched(self):
3658
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3661
class TestStackExpandOptions(tests.TestCaseWithTransport):
3664
super(TestStackExpandOptions, self).setUp()
3665
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3666
self.registry = config.option_registry
3667
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3668
self.conf = config.Stack([store.get_sections], store)
3670
def assertExpansion(self, expected, string, env=None):
3671
self.assertEqual(expected, self.conf.expand_options(string, env))
3673
def test_no_expansion(self):
3674
self.assertExpansion('foo', 'foo')
3676
def test_expand_default_value(self):
3677
self.conf.store._load_from_string(b'bar=baz')
3678
self.registry.register(config.Option('foo', default=u'{bar}'))
3679
self.assertEqual('baz', self.conf.get('foo', expand=True))
3681
def test_expand_default_from_env(self):
3682
self.conf.store._load_from_string(b'bar=baz')
3683
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3684
self.overrideEnv('FOO', '{bar}')
3685
self.assertEqual('baz', self.conf.get('foo', expand=True))
3687
def test_expand_default_on_failed_conversion(self):
3688
self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
3689
self.registry.register(
3690
config.Option('foo', default=u'{bar}',
3691
from_unicode=config.int_from_store))
3692
self.assertEqual(42, self.conf.get('foo', expand=True))
3694
def test_env_adding_options(self):
3695
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3697
def test_env_overriding_options(self):
3698
self.conf.store._load_from_string(b'foo=baz')
3699
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3701
def test_simple_ref(self):
3702
self.conf.store._load_from_string(b'foo=xxx')
3703
self.assertExpansion('xxx', '{foo}')
3705
def test_unknown_ref(self):
3706
self.assertRaises(config.ExpandingUnknownOption,
3707
self.conf.expand_options, '{foo}')
3709
def test_illegal_def_is_ignored(self):
3710
self.assertExpansion('{1,2}', '{1,2}')
3711
self.assertExpansion('{ }', '{ }')
3712
self.assertExpansion('${Foo,f}', '${Foo,f}')
3714
def test_indirect_ref(self):
3715
self.conf.store._load_from_string(b'''
3719
self.assertExpansion('xxx', '{bar}')
3721
def test_embedded_ref(self):
3722
self.conf.store._load_from_string(b'''
3726
self.assertExpansion('xxx', '{{bar}}')
3728
def test_simple_loop(self):
3729
self.conf.store._load_from_string(b'foo={foo}')
3730
self.assertRaises(config.OptionExpansionLoop,
3731
self.conf.expand_options, '{foo}')
3733
def test_indirect_loop(self):
3734
self.conf.store._load_from_string(b'''
3738
e = self.assertRaises(config.OptionExpansionLoop,
3739
self.conf.expand_options, '{foo}')
3740
self.assertEqual('foo->bar->baz', e.refs)
3741
self.assertEqual('{foo}', e.string)
3743
def test_list(self):
3744
self.conf.store._load_from_string(b'''
3748
list={foo},{bar},{baz}
3750
self.registry.register(
3751
config.ListOption('list'))
3752
self.assertEqual(['start', 'middle', 'end'],
3753
self.conf.get('list', expand=True))
3755
def test_cascading_list(self):
3756
self.conf.store._load_from_string(b'''
3762
self.registry.register(config.ListOption('list'))
3763
# Register an intermediate option as a list to ensure no conversion
3764
# happen while expanding. Conversion should only occur for the original
3765
# option ('list' here).
3766
self.registry.register(config.ListOption('baz'))
3767
self.assertEqual(['start', 'middle', 'end'],
3768
self.conf.get('list', expand=True))
3770
def test_pathologically_hidden_list(self):
3771
self.conf.store._load_from_string(b'''
3777
hidden={start}{middle}{end}
3779
# What matters is what the registration says, the conversion happens
3780
# only after all expansions have been performed
3781
self.registry.register(config.ListOption('hidden'))
3782
self.assertEqual(['bin', 'go'],
3783
self.conf.get('hidden', expand=True))
3786
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3789
super(TestStackCrossSectionsExpand, self).setUp()
3791
def get_config(self, location, string):
3794
# Since we don't save the config we won't strictly require to inherit
3795
# from TestCaseInTempDir, but an error occurs so quickly...
3796
c = config.LocationStack(location)
3797
c.store._load_from_string(string)
3800
def test_dont_cross_unrelated_section(self):
3801
c = self.get_config('/another/branch/path', b'''
3806
[/another/branch/path]
3809
self.assertRaises(config.ExpandingUnknownOption,
3810
c.get, 'bar', expand=True)
3812
def test_cross_related_sections(self):
3813
c = self.get_config('/project/branch/path', b'''
3817
[/project/branch/path]
3820
self.assertEqual('quux', c.get('bar', expand=True))
3823
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3825
def test_cross_global_locations(self):
3826
l_store = config.LocationStore()
3827
l_store._load_from_string(b'''
3833
g_store = config.GlobalStore()
3834
g_store._load_from_string(b'''
3840
stack = config.LocationStack('/branch')
3841
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3842
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3845
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3847
def test_expand_locals_empty(self):
3848
l_store = config.LocationStore()
3849
l_store._load_from_string(b'''
3850
[/home/user/project]
3855
stack = config.LocationStack('/home/user/project/')
3856
self.assertEqual('', stack.get('base', expand=True))
3857
self.assertEqual('', stack.get('rel', expand=True))
3859
def test_expand_basename_locally(self):
3860
l_store = config.LocationStore()
3861
l_store._load_from_string(b'''
3862
[/home/user/project]
3866
stack = config.LocationStack('/home/user/project/branch')
3867
self.assertEqual('branch', stack.get('bfoo', expand=True))
3869
def test_expand_basename_locally_longer_path(self):
3870
l_store = config.LocationStore()
3871
l_store._load_from_string(b'''
3876
stack = config.LocationStack('/home/user/project/dir/branch')
3877
self.assertEqual('branch', stack.get('bfoo', expand=True))
3879
def test_expand_relpath_locally(self):
3880
l_store = config.LocationStore()
3881
l_store._load_from_string(b'''
3882
[/home/user/project]
3883
lfoo = loc-foo/{relpath}
3886
stack = config.LocationStack('/home/user/project/branch')
3887
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3889
def test_expand_relpath_unknonw_in_global(self):
3890
g_store = config.GlobalStore()
3891
g_store._load_from_string(b'''
3896
stack = config.LocationStack('/home/user/project/branch')
3897
self.assertRaises(config.ExpandingUnknownOption,
3898
stack.get, 'gfoo', expand=True)
3900
def test_expand_local_option_locally(self):
3901
l_store = config.LocationStore()
3902
l_store._load_from_string(b'''
3903
[/home/user/project]
3904
lfoo = loc-foo/{relpath}
3908
g_store = config.GlobalStore()
3909
g_store._load_from_string(b'''
3915
stack = config.LocationStack('/home/user/project/branch')
3916
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3917
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3919
def test_locals_dont_leak(self):
3920
"""Make sure we chose the right local in presence of several sections.
3922
l_store = config.LocationStore()
3923
l_store._load_from_string(b'''
3925
lfoo = loc-foo/{relpath}
3926
[/home/user/project]
3927
lfoo = loc-foo/{relpath}
3930
stack = config.LocationStack('/home/user/project/branch')
3931
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3932
stack = config.LocationStack('/home/user/bar/baz')
3933
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3937
class TestStackSet(TestStackWithTransport):
3939
def test_simple_set(self):
3940
conf = self.get_stack(self)
3941
self.assertEqual(None, conf.get('foo'))
3942
conf.set('foo', 'baz')
3943
# Did we get it back ?
3944
self.assertEqual('baz', conf.get('foo'))
3946
def test_set_creates_a_new_section(self):
3947
conf = self.get_stack(self)
3948
conf.set('foo', 'baz')
3949
self.assertEqual, 'baz', conf.get('foo')
3951
def test_set_hook(self):
3955
config.ConfigHooks.install_named_hook('set', hook, None)
3956
self.assertLength(0, calls)
3957
conf = self.get_stack(self)
3958
conf.set('foo', 'bar')
3959
self.assertLength(1, calls)
3960
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3963
class TestStackRemove(TestStackWithTransport):
3965
def test_remove_existing(self):
3966
conf = self.get_stack(self)
3967
conf.set('foo', 'bar')
3968
self.assertEqual('bar', conf.get('foo'))
3970
# Did we get it back ?
3971
self.assertEqual(None, conf.get('foo'))
3973
def test_remove_unknown(self):
3974
conf = self.get_stack(self)
3975
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3977
def test_remove_hook(self):
3981
config.ConfigHooks.install_named_hook('remove', hook, None)
3982
self.assertLength(0, calls)
3983
conf = self.get_stack(self)
3984
conf.set('foo', 'bar')
3986
self.assertLength(1, calls)
3987
self.assertEqual((conf, 'foo'), calls[0])
3990
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3993
super(TestConfigGetOptions, self).setUp()
3994
create_configs(self)
3996
def test_no_variable(self):
3997
# Using branch should query branch, locations and breezy
3998
self.assertOptions([], self.branch_config)
4000
def test_option_in_breezy(self):
4001
self.breezy_config.set_user_option('file', 'breezy')
4002
self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
4005
def test_option_in_locations(self):
4006
self.locations_config.set_user_option('file', 'locations')
4008
[('file', 'locations', self.tree.basedir, 'locations')],
4009
self.locations_config)
4011
def test_option_in_branch(self):
4012
self.branch_config.set_user_option('file', 'branch')
4013
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
4016
def test_option_in_breezy_and_branch(self):
4017
self.breezy_config.set_user_option('file', 'breezy')
4018
self.branch_config.set_user_option('file', 'branch')
4019
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
4020
('file', 'breezy', 'DEFAULT', 'breezy'),],
4023
def test_option_in_branch_and_locations(self):
4024
# Hmm, locations override branch :-/
4025
self.locations_config.set_user_option('file', 'locations')
4026
self.branch_config.set_user_option('file', 'branch')
4028
[('file', 'locations', self.tree.basedir, 'locations'),
4029
('file', 'branch', 'DEFAULT', 'branch'),],
4032
def test_option_in_breezy_locations_and_branch(self):
4033
self.breezy_config.set_user_option('file', 'breezy')
4034
self.locations_config.set_user_option('file', 'locations')
4035
self.branch_config.set_user_option('file', 'branch')
4037
[('file', 'locations', self.tree.basedir, 'locations'),
4038
('file', 'branch', 'DEFAULT', 'branch'),
4039
('file', 'breezy', 'DEFAULT', 'breezy'),],
4043
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4046
super(TestConfigRemoveOption, self).setUp()
4047
create_configs_with_file_option(self)
4049
def test_remove_in_locations(self):
4050
self.locations_config.remove_user_option('file', self.tree.basedir)
4052
[('file', 'branch', 'DEFAULT', 'branch'),
4053
('file', 'breezy', 'DEFAULT', 'breezy'),],
4056
def test_remove_in_branch(self):
4057
self.branch_config.remove_user_option('file')
4059
[('file', 'locations', self.tree.basedir, 'locations'),
4060
('file', 'breezy', 'DEFAULT', 'breezy'),],
4063
def test_remove_in_breezy(self):
4064
self.breezy_config.remove_user_option('file')
4066
[('file', 'locations', self.tree.basedir, 'locations'),
4067
('file', 'branch', 'DEFAULT', 'branch'),],
4071
class TestConfigGetSections(tests.TestCaseWithTransport):
4074
super(TestConfigGetSections, self).setUp()
4075
create_configs(self)
4077
def assertSectionNames(self, expected, conf, name=None):
4078
"""Check which sections are returned for a given config.
4080
If fallback configurations exist their sections can be included.
4082
:param expected: A list of section names.
4084
:param conf: The configuration that will be queried.
4086
:param name: An optional section name that will be passed to
4089
sections = list(conf._get_sections(name))
4090
self.assertLength(len(expected), sections)
4091
self.assertEqual(expected, [n for n, _, _ in sections])
4093
def test_breezy_default_section(self):
4094
self.assertSectionNames(['DEFAULT'], self.breezy_config)
4096
def test_locations_default_section(self):
4097
# No sections are defined in an empty file
4098
self.assertSectionNames([], self.locations_config)
4100
def test_locations_named_section(self):
4101
self.locations_config.set_user_option('file', 'locations')
4102
self.assertSectionNames([self.tree.basedir], self.locations_config)
4104
def test_locations_matching_sections(self):
4105
loc_config = self.locations_config
4106
loc_config.set_user_option('file', 'locations')
4107
# We need to cheat a bit here to create an option in sections above and
4108
# below the 'location' one.
4109
parser = loc_config._get_parser()
4110
# locations.cong deals with '/' ignoring native os.sep
4111
location_names = self.tree.basedir.split('/')
4112
parent = '/'.join(location_names[:-1])
4113
child = '/'.join(location_names + ['child'])
4115
parser[parent]['file'] = 'parent'
4117
parser[child]['file'] = 'child'
4118
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4120
def test_branch_data_default_section(self):
4121
self.assertSectionNames([None],
4122
self.branch_config._get_branch_data_config())
4124
def test_branch_default_sections(self):
4125
# No sections are defined in an empty locations file
4126
self.assertSectionNames([None, 'DEFAULT'],
4128
# Unless we define an option
4129
self.branch_config._get_location_config().set_user_option(
4130
'file', 'locations')
4131
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4134
def test_breezy_named_section(self):
4135
# We need to cheat as the API doesn't give direct access to sections
4136
# other than DEFAULT.
4137
self.breezy_config.set_alias('breezy', 'bzr')
4138
self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
4141
class TestSharedStores(tests.TestCaseInTempDir):
4143
def test_breezy_conf_shared(self):
4144
g1 = config.GlobalStack()
4145
g2 = config.GlobalStack()
4146
# The two stacks share the same store
4147
self.assertIs(g1.store, g2.store)
4150
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4151
"""Test warning for permissions of authentication.conf."""
4154
super(TestAuthenticationConfigFilePermissions, self).setUp()
4155
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4156
with open(self.path, 'wb') as f:
4157
f.write(b"""[broken]
4160
port=port # Error: Not an int
4162
self.overrideAttr(config, 'authentication_config_filename',
4164
osutils.chmod_if_possible(self.path, 0o755)
4166
def test_check_warning(self):
4167
conf = config.AuthenticationConfig()
4168
self.assertEqual(conf._filename, self.path)
4169
self.assertContainsRe(self.get_log(),
4170
'Saved passwords may be accessible by other users.')
4172
def test_check_suppressed_warning(self):
4173
global_config = config.GlobalConfig()
4174
global_config.set_user_option('suppress_warnings',
4175
'insecure_permissions')
4176
conf = config.AuthenticationConfig()
4177
self.assertEqual(conf._filename, self.path)
4178
self.assertNotContainsRe(self.get_log(),
4179
'Saved passwords may be accessible by other users.')
4182
class TestAuthenticationConfigFile(tests.TestCase):
4183
"""Test the authentication.conf file matching"""
4185
def _got_user_passwd(self, expected_user, expected_password,
4186
config, *args, **kwargs):
4187
credentials = config.get_credentials(*args, **kwargs)
4188
if credentials is None:
4192
user = credentials['user']
4193
password = credentials['password']
4194
self.assertEqual(expected_user, user)
4195
self.assertEqual(expected_password, password)
4197
def test_empty_config(self):
4198
conf = config.AuthenticationConfig(_file=BytesIO())
4199
self.assertEqual({}, conf._get_config())
4200
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4202
def test_non_utf8_config(self):
4203
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
4204
self.assertRaises(config.ConfigContentError, conf._get_config)
4206
def test_missing_auth_section_header(self):
4207
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
4208
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4210
def test_auth_section_header_not_closed(self):
4211
conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
4212
self.assertRaises(config.ParseConfigError, conf._get_config)
4214
def test_auth_value_not_boolean(self):
4215
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4219
verify_certificates=askme # Error: Not a boolean
4221
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4223
def test_auth_value_not_int(self):
4224
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4228
port=port # Error: Not an int
4230
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4232
def test_unknown_password_encoding(self):
4233
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4237
password_encoding=unknown
4239
self.assertRaises(ValueError, conf.get_password,
4240
'ftp', 'foo.net', 'joe')
4242
def test_credentials_for_scheme_host(self):
4243
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4244
# Identity on foo.net
4249
password=secret-pass
4252
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
4254
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4256
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
4258
def test_credentials_for_host_port(self):
4259
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4260
# Identity on foo.net
4266
password=secret-pass
4269
self._got_user_passwd('joe', 'secret-pass',
4270
conf, 'ftp', 'foo.net', port=10021)
4272
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
4274
def test_for_matching_host(self):
4275
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4276
# Identity on foo.net
4282
[sourceforge domain]
4289
self._got_user_passwd('georges', 'bendover',
4290
conf, 'bzr', 'foo.bzr.sf.net')
4292
self._got_user_passwd(None, None,
4293
conf, 'bzr', 'bbzr.sf.net')
4295
def test_for_matching_host_None(self):
4296
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4297
# Identity on foo.net
4307
self._got_user_passwd('joe', 'joepass',
4308
conf, 'bzr', 'quux.net')
4309
# no host but different scheme
4310
self._got_user_passwd('georges', 'bendover',
4311
conf, 'ftp', 'quux.net')
4313
def test_credentials_for_path(self):
4314
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4329
self._got_user_passwd(None, None,
4330
conf, 'http', host='bar.org', path='/dir3')
4332
self._got_user_passwd('georges', 'bendover',
4333
conf, 'http', host='bar.org', path='/dir2')
4335
self._got_user_passwd('jim', 'jimpass',
4336
conf, 'http', host='bar.org', path='/dir1/subdir')
4338
def test_credentials_for_user(self):
4339
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4347
self._got_user_passwd('jim', 'jimpass',
4348
conf, 'http', 'bar.org')
4350
self._got_user_passwd('jim', 'jimpass',
4351
conf, 'http', 'bar.org', user='jim')
4352
# Don't get a different user if one is specified
4353
self._got_user_passwd(None, None,
4354
conf, 'http', 'bar.org', user='georges')
4356
def test_credentials_for_user_without_password(self):
4357
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4363
# Get user but no password
4364
self._got_user_passwd('jim', None,
4365
conf, 'http', 'bar.org')
4367
def test_verify_certificates(self):
4368
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4374
verify_certificates=False
4381
credentials = conf.get_credentials('https', 'bar.org')
4382
self.assertEqual(False, credentials.get('verify_certificates'))
4383
credentials = conf.get_credentials('https', 'foo.net')
4384
self.assertEqual(True, credentials.get('verify_certificates'))
4387
class TestAuthenticationStorage(tests.TestCaseInTempDir):
4389
def test_set_credentials(self):
4390
conf = config.AuthenticationConfig()
4391
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4392
99, path='/foo', verify_certificates=False, realm='realm')
4393
credentials = conf.get_credentials(host='host', scheme='scheme',
4394
port=99, path='/foo',
4396
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
4397
'verify_certificates': False, 'scheme': 'scheme',
4398
'host': 'host', 'port': 99, 'path': '/foo',
4400
self.assertEqual(CREDENTIALS, credentials)
4401
credentials_from_disk = config.AuthenticationConfig().get_credentials(
4402
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
4403
self.assertEqual(CREDENTIALS, credentials_from_disk)
4405
def test_reset_credentials_different_name(self):
4406
conf = config.AuthenticationConfig()
4407
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
4408
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
4409
self.assertIs(None, conf._get_config().get('name'))
4410
credentials = conf.get_credentials(host='host', scheme='scheme')
4411
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
4412
'password', 'verify_certificates': True,
4413
'scheme': 'scheme', 'host': 'host', 'port': None,
4414
'path': None, 'realm': None}
4415
self.assertEqual(CREDENTIALS, credentials)
4418
class TestAuthenticationConfig(tests.TestCaseInTempDir):
4419
"""Test AuthenticationConfig behaviour"""
4421
def _check_default_password_prompt(self, expected_prompt_format, scheme,
4422
host=None, port=None, realm=None,
4426
user, password = 'jim', 'precious'
4427
expected_prompt = expected_prompt_format % {
4428
'scheme': scheme, 'host': host, 'port': port,
4429
'user': user, 'realm': realm}
4431
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
4432
# We use an empty conf so that the user is always prompted
4433
conf = config.AuthenticationConfig()
4434
self.assertEqual(password,
4435
conf.get_password(scheme, host, user, port=port,
4436
realm=realm, path=path))
4437
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4438
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4440
def _check_default_username_prompt(self, expected_prompt_format, scheme,
4441
host=None, port=None, realm=None,
4446
expected_prompt = expected_prompt_format % {
4447
'scheme': scheme, 'host': host, 'port': port,
4449
ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n')
4450
# We use an empty conf so that the user is always prompted
4451
conf = config.AuthenticationConfig()
4452
self.assertEqual(username, conf.get_user(scheme, host, port=port,
4453
realm=realm, path=path, ask=True))
4454
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4455
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4457
def test_username_defaults_prompts(self):
4458
# HTTP prompts can't be tested here, see test_http.py
4459
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
4460
self._check_default_username_prompt(
4461
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
4462
self._check_default_username_prompt(
4463
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
4465
def test_username_default_no_prompt(self):
4466
conf = config.AuthenticationConfig()
4467
self.assertEqual(None,
4468
conf.get_user('ftp', 'example.com'))
4469
self.assertEqual("explicitdefault",
4470
conf.get_user('ftp', 'example.com', default="explicitdefault"))
4472
def test_password_default_prompts(self):
4473
# HTTP prompts can't be tested here, see test_http.py
4474
self._check_default_password_prompt(
4475
u'FTP %(user)s@%(host)s password: ', 'ftp')
4476
self._check_default_password_prompt(
4477
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
4478
self._check_default_password_prompt(
4479
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
4480
# SMTP port handling is a bit special (it's handled if embedded in the
4482
# FIXME: should we: forbid that, extend it to other schemes, leave
4483
# things as they are that's fine thank you ?
4484
self._check_default_password_prompt(
4485
u'SMTP %(user)s@%(host)s password: ', 'smtp')
4486
self._check_default_password_prompt(
4487
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
4488
self._check_default_password_prompt(
4489
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
4491
def test_ssh_password_emits_warning(self):
4492
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4499
entered_password = 'typed-by-hand'
4500
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4502
# Since the password defined in the authentication config is ignored,
4503
# the user is prompted
4504
self.assertEqual(entered_password,
4505
conf.get_password('ssh', 'bar.org', user='jim'))
4506
self.assertContainsRe(
4508
'password ignored in section \\[ssh with password\\]')
4510
def test_ssh_without_password_doesnt_emit_warning(self):
4511
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4517
entered_password = 'typed-by-hand'
4518
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4520
# Since the password defined in the authentication config is ignored,
4521
# the user is prompted
4522
self.assertEqual(entered_password,
4523
conf.get_password('ssh', 'bar.org', user='jim'))
4524
# No warning shoud be emitted since there is no password. We are only
4526
self.assertNotContainsRe(
4528
'password ignored in section \\[ssh with password\\]')
4530
def test_uses_fallback_stores(self):
4531
self.overrideAttr(config, 'credential_store_registry',
4532
config.CredentialStoreRegistry())
4533
store = StubCredentialStore()
4534
store.add_credentials("http", "example.com", "joe", "secret")
4535
config.credential_store_registry.register("stub", store, fallback=True)
4536
conf = config.AuthenticationConfig(_file=BytesIO())
4537
creds = conf.get_credentials("http", "example.com")
4538
self.assertEqual("joe", creds["user"])
4539
self.assertEqual("secret", creds["password"])
4542
class StubCredentialStore(config.CredentialStore):
4548
def add_credentials(self, scheme, host, user, password=None):
4549
self._username[(scheme, host)] = user
4550
self._password[(scheme, host)] = password
4552
def get_credentials(self, scheme, host, port=None, user=None,
4553
path=None, realm=None):
4554
key = (scheme, host)
4555
if not key in self._username:
4557
return { "scheme": scheme, "host": host, "port": port,
4558
"user": self._username[key], "password": self._password[key]}
4561
class CountingCredentialStore(config.CredentialStore):
4566
def get_credentials(self, scheme, host, port=None, user=None,
4567
path=None, realm=None):
4572
class TestCredentialStoreRegistry(tests.TestCase):
4574
def _get_cs_registry(self):
4575
return config.credential_store_registry
4577
def test_default_credential_store(self):
4578
r = self._get_cs_registry()
4579
default = r.get_credential_store(None)
4580
self.assertIsInstance(default, config.PlainTextCredentialStore)
4582
def test_unknown_credential_store(self):
4583
r = self._get_cs_registry()
4584
# It's hard to imagine someone creating a credential store named
4585
# 'unknown' so we use that as an never registered key.
4586
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
4588
def test_fallback_none_registered(self):
4589
r = config.CredentialStoreRegistry()
4590
self.assertEqual(None,
4591
r.get_fallback_credentials("http", "example.com"))
4593
def test_register(self):
4594
r = config.CredentialStoreRegistry()
4595
r.register("stub", StubCredentialStore(), fallback=False)
4596
r.register("another", StubCredentialStore(), fallback=True)
4597
self.assertEqual(["another", "stub"], r.keys())
4599
def test_register_lazy(self):
4600
r = config.CredentialStoreRegistry()
4601
r.register_lazy("stub", "breezy.tests.test_config",
4602
"StubCredentialStore", fallback=False)
4603
self.assertEqual(["stub"], r.keys())
4604
self.assertIsInstance(r.get_credential_store("stub"),
4605
StubCredentialStore)
4607
def test_is_fallback(self):
4608
r = config.CredentialStoreRegistry()
4609
r.register("stub1", None, fallback=False)
4610
r.register("stub2", None, fallback=True)
4611
self.assertEqual(False, r.is_fallback("stub1"))
4612
self.assertEqual(True, r.is_fallback("stub2"))
4614
def test_no_fallback(self):
4615
r = config.CredentialStoreRegistry()
4616
store = CountingCredentialStore()
4617
r.register("count", store, fallback=False)
4618
self.assertEqual(None,
4619
r.get_fallback_credentials("http", "example.com"))
4620
self.assertEqual(0, store._calls)
4622
def test_fallback_credentials(self):
4623
r = config.CredentialStoreRegistry()
4624
store = StubCredentialStore()
4625
store.add_credentials("http", "example.com",
4626
"somebody", "geheim")
4627
r.register("stub", store, fallback=True)
4628
creds = r.get_fallback_credentials("http", "example.com")
4629
self.assertEqual("somebody", creds["user"])
4630
self.assertEqual("geheim", creds["password"])
4632
def test_fallback_first_wins(self):
4633
r = config.CredentialStoreRegistry()
4634
stub1 = StubCredentialStore()
4635
stub1.add_credentials("http", "example.com",
4636
"somebody", "stub1")
4637
r.register("stub1", stub1, fallback=True)
4638
stub2 = StubCredentialStore()
4639
stub2.add_credentials("http", "example.com",
4640
"somebody", "stub2")
4641
r.register("stub2", stub1, fallback=True)
4642
creds = r.get_fallback_credentials("http", "example.com")
4643
self.assertEqual("somebody", creds["user"])
4644
self.assertEqual("stub1", creds["password"])
4647
class TestPlainTextCredentialStore(tests.TestCase):
4649
def test_decode_password(self):
4650
r = config.credential_store_registry
4651
plain_text = r.get_credential_store()
4652
decoded = plain_text.decode_password(dict(password='secret'))
4653
self.assertEqual('secret', decoded)
4656
class TestBase64CredentialStore(tests.TestCase):
4658
def test_decode_password(self):
4659
r = config.credential_store_registry
4660
plain_text = r.get_credential_store('base64')
4661
decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
4662
self.assertEqual(b'secret', decoded)
4665
# FIXME: Once we have a way to declare authentication to all test servers, we
4666
# can implement generic tests.
4667
# test_user_password_in_url
4668
# test_user_in_url_password_from_config
4669
# test_user_in_url_password_prompted
4670
# test_user_in_config
4671
# test_user_getpass.getuser
4672
# test_user_prompted ?
4673
class TestAuthenticationRing(tests.TestCaseWithTransport):
4677
class TestAutoUserId(tests.TestCase):
4678
"""Test inferring an automatic user name."""
4680
def test_auto_user_id(self):
4681
"""Automatic inference of user name.
4683
This is a bit hard to test in an isolated way, because it depends on
4684
system functions that go direct to /etc or perhaps somewhere else.
4685
But it's reasonable to say that on Unix, with an /etc/mailname, we ought
4686
to be able to choose a user name with no configuration.
4688
if sys.platform == 'win32':
4689
raise tests.TestSkipped(
4690
"User name inference not implemented on win32")
4691
realname, address = config._auto_user_id()
4692
if os.path.exists('/etc/mailname'):
4693
self.assertIsNot(None, realname)
4694
self.assertIsNot(None, address)
4696
self.assertEqual((None, None), (realname, address))
4699
class TestDefaultMailDomain(tests.TestCaseInTempDir):
4700
"""Test retrieving default domain from mailname file"""
4702
def test_default_mail_domain_simple(self):
4703
with open('simple', 'w') as f:
4704
f.write("domainname.com\n")
4705
r = config._get_default_mail_domain('simple')
4706
self.assertEqual('domainname.com', r)
4708
def test_default_mail_domain_no_eol(self):
4709
with open('no_eol', 'w') as f:
4710
f.write("domainname.com")
4711
r = config._get_default_mail_domain('no_eol')
4712
self.assertEqual('domainname.com', r)
4714
def test_default_mail_domain_multiple_lines(self):
4715
with open('multiple_lines', 'w') as f:
4716
f.write("domainname.com\nsome other text\n")
4717
r = config._get_default_mail_domain('multiple_lines')
4718
self.assertEqual('domainname.com', r)
4721
class EmailOptionTests(tests.TestCase):
4723
def test_default_email_uses_BRZ_EMAIL(self):
4724
conf = config.MemoryStack(b'email=jelmer@debian.org')
4725
# BRZ_EMAIL takes precedence over EMAIL
4726
self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
4727
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4728
self.assertEqual('jelmer@samba.org', conf.get('email'))
4730
def test_default_email_uses_EMAIL(self):
4731
conf = config.MemoryStack(b'')
4732
self.overrideEnv('BRZ_EMAIL', None)
4733
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4734
self.assertEqual('jelmer@apache.org', conf.get('email'))
4736
def test_BRZ_EMAIL_overrides(self):
4737
conf = config.MemoryStack(b'email=jelmer@debian.org')
4738
self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
4739
self.assertEqual('jelmer@apache.org', conf.get('email'))
4740
self.overrideEnv('BRZ_EMAIL', None)
4741
self.overrideEnv('EMAIL', 'jelmer@samba.org')
4742
self.assertEqual('jelmer@debian.org', conf.get('email'))
4745
class MailClientOptionTests(tests.TestCase):
4747
def test_default(self):
4748
conf = config.MemoryStack(b'')
4749
client = conf.get('mail_client')
4750
self.assertIs(client, mail_client.DefaultMail)
4752
def test_evolution(self):
4753
conf = config.MemoryStack(b'mail_client=evolution')
4754
client = conf.get('mail_client')
4755
self.assertIs(client, mail_client.Evolution)
4757
def test_kmail(self):
4758
conf = config.MemoryStack(b'mail_client=kmail')
4759
client = conf.get('mail_client')
4760
self.assertIs(client, mail_client.KMail)
4762
def test_mutt(self):
4763
conf = config.MemoryStack(b'mail_client=mutt')
4764
client = conf.get('mail_client')
4765
self.assertIs(client, mail_client.Mutt)
4767
def test_thunderbird(self):
4768
conf = config.MemoryStack(b'mail_client=thunderbird')
4769
client = conf.get('mail_client')
4770
self.assertIs(client, mail_client.Thunderbird)
4772
def test_explicit_default(self):
4773
conf = config.MemoryStack(b'mail_client=default')
4774
client = conf.get('mail_client')
4775
self.assertIs(client, mail_client.DefaultMail)
4777
def test_editor(self):
4778
conf = config.MemoryStack(b'mail_client=editor')
4779
client = conf.get('mail_client')
4780
self.assertIs(client, mail_client.Editor)
4782
def test_mapi(self):
4783
conf = config.MemoryStack(b'mail_client=mapi')
4784
client = conf.get('mail_client')
4785
self.assertIs(client, mail_client.MAPIClient)
4787
def test_xdg_email(self):
4788
conf = config.MemoryStack(b'mail_client=xdg-email')
4789
client = conf.get('mail_client')
4790
self.assertIs(client, mail_client.XDGEmail)
4792
def test_unknown(self):
4793
conf = config.MemoryStack(b'mail_client=firebird')
4794
self.assertRaises(config.ConfigOptionValueError, conf.get,