1
# Copyright (C) 2005-2014, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for finding and reading the bzr config file[s]."""
19
from textwrap import dedent
25
from testtools import matchers
37
registry as _mod_registry,
44
from ..sixish import (
47
from ..transport import remote as transport_remote
55
def lockable_config_scenarios():
58
{'config_class': config.GlobalConfig,
60
'config_section': 'DEFAULT'}),
62
{'config_class': config.LocationConfig,
64
'config_section': '.'}),]
67
load_tests = scenarios.load_tests_apply_scenarios
69
# Register helpers to build stores
70
config.test_store_builder_registry.register(
71
'configobj', lambda test: config.TransportIniFileStore(
72
test.get_transport(), 'configobj.conf'))
73
config.test_store_builder_registry.register(
74
'bazaar', lambda test: config.GlobalStore())
75
config.test_store_builder_registry.register(
76
'location', lambda test: config.LocationStore())
79
def build_backing_branch(test, relpath,
80
transport_class=None, server_class=None):
81
"""Test helper to create a backing branch only once.
83
Some tests needs multiple stores/stacks to check concurrent update
84
behaviours. As such, they need to build different branch *objects* even if
85
they share the branch on disk.
87
:param relpath: The relative path to the branch. (Note that the helper
88
should always specify the same relpath).
90
:param transport_class: The Transport class the test needs to use.
92
:param server_class: The server associated with the ``transport_class``
95
Either both or neither of ``transport_class`` and ``server_class`` should
98
if transport_class is not None and server_class is not None:
99
test.transport_class = transport_class
100
test.transport_server = server_class
101
elif not (transport_class is None and server_class is None):
102
raise AssertionError('Specify both ``transport_class`` and '
103
'``server_class`` or neither of them')
104
if getattr(test, 'backing_branch', None) is None:
105
# First call, let's build the branch on disk
106
test.backing_branch = test.make_branch(relpath)
109
def build_branch_store(test):
110
build_backing_branch(test, 'branch')
111
b = branch.Branch.open('branch')
112
return config.BranchStore(b)
113
config.test_store_builder_registry.register('branch', build_branch_store)
116
def build_control_store(test):
117
build_backing_branch(test, 'branch')
118
b = controldir.ControlDir.open('branch')
119
return config.ControlStore(b)
120
config.test_store_builder_registry.register('control', build_control_store)
123
def build_remote_branch_store(test):
124
# There is only one permutation (but we won't be able to handle more with
125
# this design anyway)
127
server_class) = transport_remote.get_test_permutations()[0]
128
build_backing_branch(test, 'branch', transport_class, server_class)
129
b = branch.Branch.open(test.get_url('branch'))
130
return config.BranchStore(b)
131
config.test_store_builder_registry.register('remote_branch',
132
build_remote_branch_store)
135
config.test_stack_builder_registry.register(
136
'bazaar', lambda test: config.GlobalStack())
137
config.test_stack_builder_registry.register(
138
'location', lambda test: config.LocationStack('.'))
141
def build_branch_stack(test):
142
build_backing_branch(test, 'branch')
143
b = branch.Branch.open('branch')
144
return config.BranchStack(b)
145
config.test_stack_builder_registry.register('branch', build_branch_stack)
148
def build_branch_only_stack(test):
149
# There is only one permutation (but we won't be able to handle more with
150
# this design anyway)
152
server_class) = transport_remote.get_test_permutations()[0]
153
build_backing_branch(test, 'branch', transport_class, server_class)
154
b = branch.Branch.open(test.get_url('branch'))
155
return config.BranchOnlyStack(b)
156
config.test_stack_builder_registry.register('branch_only',
157
build_branch_only_stack)
159
def build_remote_control_stack(test):
160
# There is only one permutation (but we won't be able to handle more with
161
# this design anyway)
163
server_class) = transport_remote.get_test_permutations()[0]
164
# We need only a bzrdir for this, not a full branch, but it's not worth
165
# creating a dedicated helper to create only the bzrdir
166
build_backing_branch(test, 'branch', transport_class, server_class)
167
b = branch.Branch.open(test.get_url('branch'))
168
return config.RemoteControlStack(b.controldir)
169
config.test_stack_builder_registry.register('remote_control',
170
build_remote_control_stack)
173
sample_long_alias="log -r-15..-1 --line"
174
sample_config_text = u"""
176
email=Erik B\u00e5gfors <erik@bagfors.nu>
178
change_editor=vimdiff -of @new_path @old_path
179
gpg_signing_key=DD4D5088
181
validate_signatures_in_log=true
183
user_global_option=something
184
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
185
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
186
bzr.mergetool.newtool='"newtool with spaces" {this_temp}'
187
bzr.default_mergetool=sometool
190
ll=""" + sample_long_alias + "\n"
193
sample_always_signatures = """
195
check_signatures=ignore
196
create_signatures=always
199
sample_ignore_signatures = """
201
check_signatures=require
202
create_signatures=never
205
sample_maybe_signatures = """
207
check_signatures=ignore
208
create_signatures=when-required
211
sample_branches_text = """
212
[http://www.example.com]
214
email=Robert Collins <robertc@example.org>
215
normal_option = normal
216
appendpath_option = append
217
appendpath_option:policy = appendpath
218
norecurse_option = norecurse
219
norecurse_option:policy = norecurse
220
[http://www.example.com/ignoreparent]
221
# different project: ignore parent dir config
223
[http://www.example.com/norecurse]
224
# configuration items that only apply to this dir
226
normal_option = norecurse
227
[http://www.example.com/dir]
228
appendpath_option = normal
230
check_signatures=require
231
# test trailing / matching with no children
233
check_signatures=check-available
234
gpg_signing_key=default
235
user_local_option=local
236
# test trailing / matching
238
#subdirs will match but not the parent
240
check_signatures=ignore
241
post_commit=breezy.tests.test_config.post_commit
242
#testing explicit beats globs
246
def create_configs(test):
247
"""Create configuration files for a given test.
249
This requires creating a tree (and populate the ``test.tree`` attribute)
250
and its associated branch and will populate the following attributes:
252
- branch_config: A BranchConfig for the associated branch.
254
- locations_config : A LocationConfig for the associated branch
256
- bazaar_config: A GlobalConfig.
258
The tree and branch are created in a 'tree' subdirectory so the tests can
259
still use the test directory to stay outside of the branch.
261
tree = test.make_branch_and_tree('tree')
263
test.branch_config = config.BranchConfig(tree.branch)
264
test.locations_config = config.LocationConfig(tree.basedir)
265
test.bazaar_config = config.GlobalConfig()
268
def create_configs_with_file_option(test):
269
"""Create configuration files with a ``file`` option set in each.
271
This builds on ``create_configs`` and add one ``file`` option in each
272
configuration with a value which allows identifying the configuration file.
275
test.bazaar_config.set_user_option('file', 'bazaar')
276
test.locations_config.set_user_option('file', 'locations')
277
test.branch_config.set_user_option('file', 'branch')
280
class TestOptionsMixin:
282
def assertOptions(self, expected, conf):
283
# We don't care about the parser (as it will make tests hard to write
284
# and error-prone anyway)
285
self.assertThat([opt[:4] for opt in conf._get_options()],
286
matchers.Equals(expected))
289
class InstrumentedConfigObj(object):
290
"""A config obj look-enough-alike to record calls made to it."""
292
def __contains__(self, thing):
293
self._calls.append(('__contains__', thing))
296
def __getitem__(self, key):
297
self._calls.append(('__getitem__', key))
300
def __init__(self, input, encoding=None):
301
self._calls = [('__init__', input, encoding)]
303
def __setitem__(self, key, value):
304
self._calls.append(('__setitem__', key, value))
306
def __delitem__(self, key):
307
self._calls.append(('__delitem__', key))
310
self._calls.append(('keys',))
314
self._calls.append(('reload',))
316
def write(self, arg):
317
self._calls.append(('write',))
319
def as_bool(self, value):
320
self._calls.append(('as_bool', value))
323
def get_value(self, section, name):
324
self._calls.append(('get_value', section, name))
328
class FakeBranch(object):
330
def __init__(self, base=None):
332
self.base = "http://example.com/branches/demo"
335
self._transport = self.control_files = \
336
FakeControlFilesAndTransport()
338
def _get_config(self):
339
return config.TransportConfig(self._transport, 'branch.conf')
341
def lock_write(self):
348
class FakeControlFilesAndTransport(object):
352
self._transport = self
354
def get(self, filename):
357
return BytesIO(self.files[filename])
359
raise errors.NoSuchFile(filename)
361
def get_bytes(self, filename):
364
return self.files[filename]
366
raise errors.NoSuchFile(filename)
368
def put(self, filename, fileobj):
369
self.files[filename] = fileobj.read()
371
def put_file(self, filename, fileobj):
372
return self.put(filename, fileobj)
375
class InstrumentedConfig(config.Config):
376
"""An instrumented config that supplies stubs for template methods."""
379
super(InstrumentedConfig, self).__init__()
381
self._signatures = config.CHECK_NEVER
383
def _get_user_id(self):
384
self._calls.append('_get_user_id')
385
return "Robert Collins <robert.collins@example.org>"
387
def _get_signature_checking(self):
388
self._calls.append('_get_signature_checking')
389
return self._signatures
391
def _get_change_editor(self):
392
self._calls.append('_get_change_editor')
393
return 'vimdiff -fo @new_path @old_path'
396
bool_config = """[DEFAULT]
405
class TestConfigObj(tests.TestCase):
407
def test_get_bool(self):
408
co = config.ConfigObj(BytesIO(bool_config))
409
self.assertIs(co.get_bool('DEFAULT', 'active'), True)
410
self.assertIs(co.get_bool('DEFAULT', 'inactive'), False)
411
self.assertIs(co.get_bool('UPPERCASE', 'active'), True)
412
self.assertIs(co.get_bool('UPPERCASE', 'nonactive'), False)
414
def test_hash_sign_in_value(self):
416
Before 4.5.0, ConfigObj did not quote # signs in values, so they'd be
417
treated as comments when read in again. (#86838)
419
co = config.ConfigObj()
420
co['test'] = 'foo#bar'
422
co.write(outfile=outfile)
423
lines = outfile.getvalue().splitlines()
424
self.assertEqual(lines, ['test = "foo#bar"'])
425
co2 = config.ConfigObj(lines)
426
self.assertEqual(co2['test'], 'foo#bar')
428
def test_triple_quotes(self):
429
# Bug #710410: if the value string has triple quotes
430
# then ConfigObj versions up to 4.7.2 will quote them wrong
431
# and won't able to read them back
432
triple_quotes_value = '''spam
433
""" that's my spam """
435
co = config.ConfigObj()
436
co['test'] = triple_quotes_value
437
# While writing this test another bug in ConfigObj has been found:
438
# method co.write() without arguments produces list of lines
439
# one option per line, and multiline values are not split
440
# across multiple lines,
441
# and that breaks the parsing these lines back by ConfigObj.
442
# This issue only affects test, but it's better to avoid
443
# `co.write()` construct at all.
444
# [bialix 20110222] bug report sent to ConfigObj's author
446
co.write(outfile=outfile)
447
output = outfile.getvalue()
448
# now we're trying to read it back
449
co2 = config.ConfigObj(BytesIO(output))
450
self.assertEqual(triple_quotes_value, co2['test'])
453
erroneous_config = """[section] # line 1
456
whocares=notme # line 4
460
class TestConfigObjErrors(tests.TestCase):
462
def test_duplicate_section_name_error_line(self):
464
co = configobj.ConfigObj(BytesIO(erroneous_config),
466
except config.configobj.DuplicateError as e:
467
self.assertEqual(3, e.line_number)
469
self.fail('Error in config file not detected')
472
class TestConfig(tests.TestCase):
474
def test_constructs(self):
477
def test_user_email(self):
478
my_config = InstrumentedConfig()
479
self.assertEqual('robert.collins@example.org', my_config.user_email())
480
self.assertEqual(['_get_user_id'], my_config._calls)
482
def test_username(self):
483
my_config = InstrumentedConfig()
484
self.assertEqual('Robert Collins <robert.collins@example.org>',
485
my_config.username())
486
self.assertEqual(['_get_user_id'], my_config._calls)
488
def test_get_user_option_default(self):
489
my_config = config.Config()
490
self.assertEqual(None, my_config.get_user_option('no_option'))
492
def test_validate_signatures_in_log_default(self):
493
my_config = config.Config()
494
self.assertEqual(False, my_config.validate_signatures_in_log())
496
def test_get_change_editor(self):
497
my_config = InstrumentedConfig()
498
change_editor = my_config.get_change_editor('old_tree', 'new_tree')
499
self.assertEqual(['_get_change_editor'], my_config._calls)
500
self.assertIs(diff.DiffFromTool, change_editor.__class__)
501
self.assertEqual(['vimdiff', '-fo', '@new_path', '@old_path'],
502
change_editor.command_template)
505
class TestConfigPath(tests.TestCase):
508
super(TestConfigPath, self).setUp()
509
self.overrideEnv('HOME', '/home/bogus')
510
self.overrideEnv('XDG_CACHE_HOME', '')
511
if sys.platform == 'win32':
514
r'C:\Documents and Settings\bogus\Application Data')
516
'C:/Documents and Settings/bogus/Application Data/breezy'
518
self.brz_home = '/home/bogus/.config/breezy'
520
def test_config_dir(self):
521
self.assertEqual(config.config_dir(), self.brz_home)
523
def test_config_dir_is_unicode(self):
524
self.assertIsInstance(config.config_dir(), unicode)
526
def test_config_filename(self):
527
self.assertEqual(config.config_filename(),
528
self.brz_home + '/bazaar.conf')
530
def test_locations_config_filename(self):
531
self.assertEqual(config.locations_config_filename(),
532
self.brz_home + '/locations.conf')
534
def test_authentication_config_filename(self):
535
self.assertEqual(config.authentication_config_filename(),
536
self.brz_home + '/authentication.conf')
538
def test_xdg_cache_dir(self):
539
self.assertEqual(config.xdg_cache_dir(),
540
'/home/bogus/.cache')
543
class TestXDGConfigDir(tests.TestCaseInTempDir):
544
# must be in temp dir because config tests for the existence of the bazaar
545
# subdirectory of $XDG_CONFIG_HOME
548
if sys.platform == 'win32':
549
raise tests.TestNotApplicable(
550
'XDG config dir not used on this platform')
551
super(TestXDGConfigDir, self).setUp()
552
self.overrideEnv('HOME', self.test_home_dir)
553
# BRZ_HOME overrides everything we want to test so unset it.
554
self.overrideEnv('BRZ_HOME', None)
556
def test_xdg_config_dir_exists(self):
557
"""When ~/.config/bazaar exists, use it as the config dir."""
558
newdir = osutils.pathjoin(self.test_home_dir, '.config', 'bazaar')
560
self.assertEqual(config.config_dir(), newdir)
562
def test_xdg_config_home(self):
563
"""When XDG_CONFIG_HOME is set, use it."""
564
xdgconfigdir = osutils.pathjoin(self.test_home_dir, 'xdgconfig')
565
self.overrideEnv('XDG_CONFIG_HOME', xdgconfigdir)
566
newdir = osutils.pathjoin(xdgconfigdir, 'bazaar')
568
self.assertEqual(config.config_dir(), newdir)
571
class TestIniConfig(tests.TestCaseInTempDir):
573
def make_config_parser(self, s):
574
conf = config.IniBasedConfig.from_string(s)
575
return conf, conf._get_parser()
578
class TestIniConfigBuilding(TestIniConfig):
580
def test_contructs(self):
581
config.IniBasedConfig()
583
def test_from_fp(self):
584
my_config = config.IniBasedConfig.from_string(sample_config_text)
585
self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
587
def test_cached(self):
588
my_config = config.IniBasedConfig.from_string(sample_config_text)
589
parser = my_config._get_parser()
590
self.assertTrue(my_config._get_parser() is parser)
592
def _dummy_chown(self, path, uid, gid):
593
self.path, self.uid, self.gid = path, uid, gid
595
def test_ini_config_ownership(self):
596
"""Ensure that chown is happening during _write_config_file"""
597
self.requireFeature(features.chown_feature)
598
self.overrideAttr(os, 'chown', self._dummy_chown)
599
self.path = self.uid = self.gid = None
600
conf = config.IniBasedConfig(file_name='./foo.conf')
601
conf._write_config_file()
602
self.assertEqual(self.path, './foo.conf')
603
self.assertTrue(isinstance(self.uid, int))
604
self.assertTrue(isinstance(self.gid, int))
607
class TestIniConfigSaving(tests.TestCaseInTempDir):
609
def test_cant_save_without_a_file_name(self):
610
conf = config.IniBasedConfig()
611
self.assertRaises(AssertionError, conf._write_config_file)
613
def test_saved_with_content(self):
614
content = 'foo = bar\n'
615
config.IniBasedConfig.from_string(content, file_name='./test.conf',
617
self.assertFileEqual(content, 'test.conf')
620
class TestIniConfigOptionExpansion(tests.TestCase):
621
"""Test option expansion from the IniConfig level.
623
What we really want here is to test the Config level, but the class being
624
abstract as far as storing values is concerned, this can't be done
627
# FIXME: This should be rewritten when all configs share a storage
628
# implementation -- vila 2011-02-18
630
def get_config(self, string=None):
633
c = config.IniBasedConfig.from_string(string)
636
def assertExpansion(self, expected, conf, string, env=None):
637
self.assertEqual(expected, conf.expand_options(string, env))
639
def test_no_expansion(self):
640
c = self.get_config('')
641
self.assertExpansion('foo', c, 'foo')
643
def test_env_adding_options(self):
644
c = self.get_config('')
645
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
647
def test_env_overriding_options(self):
648
c = self.get_config('foo=baz')
649
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
651
def test_simple_ref(self):
652
c = self.get_config('foo=xxx')
653
self.assertExpansion('xxx', c, '{foo}')
655
def test_unknown_ref(self):
656
c = self.get_config('')
657
self.assertRaises(config.ExpandingUnknownOption,
658
c.expand_options, '{foo}')
660
def test_indirect_ref(self):
661
c = self.get_config('''
665
self.assertExpansion('xxx', c, '{bar}')
667
def test_embedded_ref(self):
668
c = self.get_config('''
672
self.assertExpansion('xxx', c, '{{bar}}')
674
def test_simple_loop(self):
675
c = self.get_config('foo={foo}')
676
self.assertRaises(config.OptionExpansionLoop, c.expand_options,
679
def test_indirect_loop(self):
680
c = self.get_config('''
684
e = self.assertRaises(config.OptionExpansionLoop,
685
c.expand_options, '{foo}')
686
self.assertEqual('foo->bar->baz', e.refs)
687
self.assertEqual('{foo}', e.string)
690
conf = self.get_config('''
694
list={foo},{bar},{baz}
696
self.assertEqual(['start', 'middle', 'end'],
697
conf.get_user_option('list', expand=True))
699
def test_cascading_list(self):
700
conf = self.get_config('''
706
self.assertEqual(['start', 'middle', 'end'],
707
conf.get_user_option('list', expand=True))
709
def test_pathological_hidden_list(self):
710
conf = self.get_config('''
716
hidden={start}{middle}{end}
718
# Nope, it's either a string or a list, and the list wins as soon as a
719
# ',' appears, so the string concatenation never occur.
720
self.assertEqual(['{foo', '}', '{', 'bar}'],
721
conf.get_user_option('hidden', expand=True))
724
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
726
def get_config(self, location, string=None):
729
# Since we don't save the config we won't strictly require to inherit
730
# from TestCaseInTempDir, but an error occurs so quickly...
731
c = config.LocationConfig.from_string(string, location)
734
def test_dont_cross_unrelated_section(self):
735
c = self.get_config('/another/branch/path','''
740
[/another/branch/path]
743
self.assertRaises(config.ExpandingUnknownOption,
744
c.get_user_option, 'bar', expand=True)
746
def test_cross_related_sections(self):
747
c = self.get_config('/project/branch/path','''
751
[/project/branch/path]
754
self.assertEqual('quux', c.get_user_option('bar', expand=True))
757
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
759
def test_cannot_reload_without_name(self):
760
conf = config.IniBasedConfig.from_string(sample_config_text)
761
self.assertRaises(AssertionError, conf.reload)
763
def test_reload_see_new_value(self):
764
c1 = config.IniBasedConfig.from_string('editor=vim\n',
765
file_name='./test/conf')
766
c1._write_config_file()
767
c2 = config.IniBasedConfig.from_string('editor=emacs\n',
768
file_name='./test/conf')
769
c2._write_config_file()
770
self.assertEqual('vim', c1.get_user_option('editor'))
771
self.assertEqual('emacs', c2.get_user_option('editor'))
772
# Make sure we get the Right value
774
self.assertEqual('emacs', c1.get_user_option('editor'))
777
class TestLockableConfig(tests.TestCaseInTempDir):
779
scenarios = lockable_config_scenarios()
784
config_section = None
787
super(TestLockableConfig, self).setUp()
788
self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
789
self.config = self.create_config(self._content)
791
def get_existing_config(self):
792
return self.config_class(*self.config_args)
794
def create_config(self, content):
795
kwargs = dict(save=True)
796
c = self.config_class.from_string(content, *self.config_args, **kwargs)
799
def test_simple_read_access(self):
800
self.assertEqual('1', self.config.get_user_option('one'))
802
def test_simple_write_access(self):
803
self.config.set_user_option('one', 'one')
804
self.assertEqual('one', self.config.get_user_option('one'))
806
def test_listen_to_the_last_speaker(self):
808
c2 = self.get_existing_config()
809
c1.set_user_option('one', 'ONE')
810
c2.set_user_option('two', 'TWO')
811
self.assertEqual('ONE', c1.get_user_option('one'))
812
self.assertEqual('TWO', c2.get_user_option('two'))
813
# The second update respect the first one
814
self.assertEqual('ONE', c2.get_user_option('one'))
816
def test_last_speaker_wins(self):
817
# If the same config is not shared, the same variable modified twice
818
# can only see a single result.
820
c2 = self.get_existing_config()
821
c1.set_user_option('one', 'c1')
822
c2.set_user_option('one', 'c2')
823
self.assertEqual('c2', c2._get_user_option('one'))
824
# The first modification is still available until another refresh
826
self.assertEqual('c1', c1._get_user_option('one'))
827
c1.set_user_option('two', 'done')
828
self.assertEqual('c2', c1._get_user_option('one'))
830
def test_writes_are_serialized(self):
832
c2 = self.get_existing_config()
834
# We spawn a thread that will pause *during* the write
835
before_writing = threading.Event()
836
after_writing = threading.Event()
837
writing_done = threading.Event()
838
c1_orig = c1._write_config_file
839
def c1_write_config_file():
842
# The lock is held. We wait for the main thread to decide when to
845
c1._write_config_file = c1_write_config_file
847
c1.set_user_option('one', 'c1')
849
t1 = threading.Thread(target=c1_set_option)
850
# Collect the thread after the test
851
self.addCleanup(t1.join)
852
# Be ready to unblock the thread if the test goes wrong
853
self.addCleanup(after_writing.set)
855
before_writing.wait()
856
self.assertTrue(c1._lock.is_held)
857
self.assertRaises(errors.LockContention,
858
c2.set_user_option, 'one', 'c2')
859
self.assertEqual('c1', c1.get_user_option('one'))
860
# Let the lock be released
863
c2.set_user_option('one', 'c2')
864
self.assertEqual('c2', c2.get_user_option('one'))
866
def test_read_while_writing(self):
868
# We spawn a thread that will pause *during* the write
869
ready_to_write = threading.Event()
870
do_writing = threading.Event()
871
writing_done = threading.Event()
872
c1_orig = c1._write_config_file
873
def c1_write_config_file():
875
# The lock is held. We wait for the main thread to decide when to
880
c1._write_config_file = c1_write_config_file
882
c1.set_user_option('one', 'c1')
883
t1 = threading.Thread(target=c1_set_option)
884
# Collect the thread after the test
885
self.addCleanup(t1.join)
886
# Be ready to unblock the thread if the test goes wrong
887
self.addCleanup(do_writing.set)
889
# Ensure the thread is ready to write
890
ready_to_write.wait()
891
self.assertTrue(c1._lock.is_held)
892
self.assertEqual('c1', c1.get_user_option('one'))
893
# If we read during the write, we get the old value
894
c2 = self.get_existing_config()
895
self.assertEqual('1', c2.get_user_option('one'))
896
# Let the writing occur and ensure it occurred
899
# Now we get the updated value
900
c3 = self.get_existing_config()
901
self.assertEqual('c1', c3.get_user_option('one'))
904
class TestGetUserOptionAs(TestIniConfig):
906
def test_get_user_option_as_bool(self):
907
conf, parser = self.make_config_parser("""
910
an_invalid_bool = maybe
911
a_list = hmm, who knows ? # This is interpreted as a list !
913
get_bool = conf.get_user_option_as_bool
914
self.assertEqual(True, get_bool('a_true_bool'))
915
self.assertEqual(False, get_bool('a_false_bool'))
918
warnings.append(args[0] % args[1:])
919
self.overrideAttr(trace, 'warning', warning)
920
msg = 'Value "%s" is not a boolean for "%s"'
921
self.assertIs(None, get_bool('an_invalid_bool'))
922
self.assertEqual(msg % ('maybe', 'an_invalid_bool'), warnings[0])
924
self.assertIs(None, get_bool('not_defined_in_this_config'))
925
self.assertEqual([], warnings)
927
def test_get_user_option_as_list(self):
928
conf, parser = self.make_config_parser("""
933
get_list = conf.get_user_option_as_list
934
self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
935
self.assertEqual(['1'], get_list('length_1'))
936
self.assertEqual('x', conf.get_user_option('one_item'))
937
# automatically cast to list
938
self.assertEqual(['x'], get_list('one_item'))
941
class TestSupressWarning(TestIniConfig):
943
def make_warnings_config(self, s):
944
conf, parser = self.make_config_parser(s)
945
return conf.suppress_warning
947
def test_suppress_warning_unknown(self):
948
suppress_warning = self.make_warnings_config('')
949
self.assertEqual(False, suppress_warning('unknown_warning'))
951
def test_suppress_warning_known(self):
952
suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
953
self.assertEqual(False, suppress_warning('c'))
954
self.assertEqual(True, suppress_warning('a'))
955
self.assertEqual(True, suppress_warning('b'))
958
class TestGetConfig(tests.TestCaseInTempDir):
960
def test_constructs(self):
961
config.GlobalConfig()
963
def test_calls_read_filenames(self):
964
# replace the class that is constructed, to check its parameters
965
oldparserclass = config.ConfigObj
966
config.ConfigObj = InstrumentedConfigObj
967
my_config = config.GlobalConfig()
969
parser = my_config._get_parser()
971
config.ConfigObj = oldparserclass
972
self.assertIsInstance(parser, InstrumentedConfigObj)
973
self.assertEqual(parser._calls, [('__init__', config.config_filename(),
977
class TestBranchConfig(tests.TestCaseWithTransport):
979
def test_constructs_valid(self):
980
branch = FakeBranch()
981
my_config = config.BranchConfig(branch)
982
self.assertIsNot(None, my_config)
984
def test_constructs_error(self):
985
self.assertRaises(TypeError, config.BranchConfig)
987
def test_get_location_config(self):
988
branch = FakeBranch()
989
my_config = config.BranchConfig(branch)
990
location_config = my_config._get_location_config()
991
self.assertEqual(branch.base, location_config.location)
992
self.assertIs(location_config, my_config._get_location_config())
994
def test_get_config(self):
995
"""The Branch.get_config method works properly"""
996
b = controldir.ControlDir.create_standalone_workingtree('.').branch
997
my_config = b.get_config()
998
self.assertIs(my_config.get_user_option('wacky'), None)
999
my_config.set_user_option('wacky', 'unlikely')
1000
self.assertEqual(my_config.get_user_option('wacky'), 'unlikely')
1002
# Ensure we get the same thing if we start again
1003
b2 = branch.Branch.open('.')
1004
my_config2 = b2.get_config()
1005
self.assertEqual(my_config2.get_user_option('wacky'), 'unlikely')
1007
def test_has_explicit_nickname(self):
1008
b = self.make_branch('.')
1009
self.assertFalse(b.get_config().has_explicit_nickname())
1011
self.assertTrue(b.get_config().has_explicit_nickname())
1013
def test_config_url(self):
1014
"""The Branch.get_config will use section that uses a local url"""
1015
branch = self.make_branch('branch')
1016
self.assertEqual('branch', branch.nick)
1018
local_url = urlutils.local_path_to_url('branch')
1019
conf = config.LocationConfig.from_string(
1020
'[%s]\nnickname = foobar' % (local_url,),
1021
local_url, save=True)
1022
self.assertIsNot(None, conf)
1023
self.assertEqual('foobar', branch.nick)
1025
def test_config_local_path(self):
1026
"""The Branch.get_config will use a local system path"""
1027
branch = self.make_branch('branch')
1028
self.assertEqual('branch', branch.nick)
1030
local_path = osutils.getcwd().encode('utf8')
1031
config.LocationConfig.from_string(
1032
'[%s/branch]\nnickname = barry' % (local_path,),
1033
'branch', save=True)
1034
# Now the branch will find its nick via the location config
1035
self.assertEqual('barry', branch.nick)
1037
def test_config_creates_local(self):
1038
"""Creating a new entry in config uses a local path."""
1039
branch = self.make_branch('branch', format='knit')
1040
branch.set_push_location('http://foobar')
1041
local_path = osutils.getcwd().encode('utf8')
1042
# Surprisingly ConfigObj doesn't create a trailing newline
1043
self.check_file_contents(config.locations_config_filename(),
1045
'push_location = http://foobar\n'
1046
'push_location:policy = norecurse\n'
1049
def test_autonick_urlencoded(self):
1050
b = self.make_branch('!repo')
1051
self.assertEqual('!repo', b.get_config().get_nickname())
1053
def test_autonick_uses_branch_name(self):
1054
b = self.make_branch('foo', name='bar')
1055
self.assertEqual('bar', b.get_config().get_nickname())
1057
def test_warn_if_masked(self):
1060
warnings.append(args[0] % args[1:])
1061
self.overrideAttr(trace, 'warning', warning)
1063
def set_option(store, warn_masked=True):
1065
conf.set_user_option('example_option', repr(store), store=store,
1066
warn_masked=warn_masked)
1067
def assertWarning(warning):
1069
self.assertEqual(0, len(warnings))
1071
self.assertEqual(1, len(warnings))
1072
self.assertEqual(warning, warnings[0])
1073
branch = self.make_branch('.')
1074
conf = branch.get_config()
1075
set_option(config.STORE_GLOBAL)
1077
set_option(config.STORE_BRANCH)
1079
set_option(config.STORE_GLOBAL)
1080
assertWarning('Value "4" is masked by "3" from branch.conf')
1081
set_option(config.STORE_GLOBAL, warn_masked=False)
1083
set_option(config.STORE_LOCATION)
1085
set_option(config.STORE_BRANCH)
1086
assertWarning('Value "3" is masked by "0" from locations.conf')
1087
set_option(config.STORE_BRANCH, warn_masked=False)
1091
class TestGlobalConfigItems(tests.TestCaseInTempDir):
1093
def _get_empty_config(self):
1094
my_config = config.GlobalConfig()
1097
def _get_sample_config(self):
1098
my_config = config.GlobalConfig.from_string(sample_config_text)
1101
def test_user_id(self):
1102
my_config = config.GlobalConfig.from_string(sample_config_text)
1103
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
1104
my_config._get_user_id())
1106
def test_absent_user_id(self):
1107
my_config = config.GlobalConfig()
1108
self.assertEqual(None, my_config._get_user_id())
1110
def test_get_user_option_default(self):
1111
my_config = self._get_empty_config()
1112
self.assertEqual(None, my_config.get_user_option('no_option'))
1114
def test_get_user_option_global(self):
1115
my_config = self._get_sample_config()
1116
self.assertEqual("something",
1117
my_config.get_user_option('user_global_option'))
1119
def test_configured_validate_signatures_in_log(self):
1120
my_config = self._get_sample_config()
1121
self.assertEqual(True, my_config.validate_signatures_in_log())
1123
def test_get_alias(self):
1124
my_config = self._get_sample_config()
1125
self.assertEqual('help', my_config.get_alias('h'))
1127
def test_get_aliases(self):
1128
my_config = self._get_sample_config()
1129
aliases = my_config.get_aliases()
1130
self.assertEqual(2, len(aliases))
1131
sorted_keys = sorted(aliases)
1132
self.assertEqual('help', aliases[sorted_keys[0]])
1133
self.assertEqual(sample_long_alias, aliases[sorted_keys[1]])
1135
def test_get_no_alias(self):
1136
my_config = self._get_sample_config()
1137
self.assertEqual(None, my_config.get_alias('foo'))
1139
def test_get_long_alias(self):
1140
my_config = self._get_sample_config()
1141
self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
1143
def test_get_change_editor(self):
1144
my_config = self._get_sample_config()
1145
change_editor = my_config.get_change_editor('old', 'new')
1146
self.assertIs(diff.DiffFromTool, change_editor.__class__)
1147
self.assertEqual('vimdiff -of @new_path @old_path',
1148
' '.join(change_editor.command_template))
1150
def test_get_no_change_editor(self):
1151
my_config = self._get_empty_config()
1152
change_editor = my_config.get_change_editor('old', 'new')
1153
self.assertIs(None, change_editor)
1155
def test_get_merge_tools(self):
1156
conf = self._get_sample_config()
1157
tools = conf.get_merge_tools()
1158
self.log(repr(tools))
1160
{u'funkytool' : u'funkytool "arg with spaces" {this_temp}',
1161
u'sometool' : u'sometool {base} {this} {other} -o {result}',
1162
u'newtool' : u'"newtool with spaces" {this_temp}'},
1165
def test_get_merge_tools_empty(self):
1166
conf = self._get_empty_config()
1167
tools = conf.get_merge_tools()
1168
self.assertEqual({}, tools)
1170
def test_find_merge_tool(self):
1171
conf = self._get_sample_config()
1172
cmdline = conf.find_merge_tool('sometool')
1173
self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
1175
def test_find_merge_tool_not_found(self):
1176
conf = self._get_sample_config()
1177
cmdline = conf.find_merge_tool('DOES NOT EXIST')
1178
self.assertIs(cmdline, None)
1180
def test_find_merge_tool_known(self):
1181
conf = self._get_empty_config()
1182
cmdline = conf.find_merge_tool('kdiff3')
1183
self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
1185
def test_find_merge_tool_override_known(self):
1186
conf = self._get_empty_config()
1187
conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
1188
cmdline = conf.find_merge_tool('kdiff3')
1189
self.assertEqual('kdiff3 blah', cmdline)
1192
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
1194
def test_empty(self):
1195
my_config = config.GlobalConfig()
1196
self.assertEqual(0, len(my_config.get_aliases()))
1198
def test_set_alias(self):
1199
my_config = config.GlobalConfig()
1200
alias_value = 'commit --strict'
1201
my_config.set_alias('commit', alias_value)
1202
new_config = config.GlobalConfig()
1203
self.assertEqual(alias_value, new_config.get_alias('commit'))
1205
def test_remove_alias(self):
1206
my_config = config.GlobalConfig()
1207
my_config.set_alias('commit', 'commit --strict')
1208
# Now remove the alias again.
1209
my_config.unset_alias('commit')
1210
new_config = config.GlobalConfig()
1211
self.assertIs(None, new_config.get_alias('commit'))
1214
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
1216
def test_constructs_valid(self):
1217
config.LocationConfig('http://example.com')
1219
def test_constructs_error(self):
1220
self.assertRaises(TypeError, config.LocationConfig)
1222
def test_branch_calls_read_filenames(self):
1223
# This is testing the correct file names are provided.
1224
# TODO: consolidate with the test for GlobalConfigs filename checks.
1226
# replace the class that is constructed, to check its parameters
1227
oldparserclass = config.ConfigObj
1228
config.ConfigObj = InstrumentedConfigObj
1230
my_config = config.LocationConfig('http://www.example.com')
1231
parser = my_config._get_parser()
1233
config.ConfigObj = oldparserclass
1234
self.assertIsInstance(parser, InstrumentedConfigObj)
1235
self.assertEqual(parser._calls,
1236
[('__init__', config.locations_config_filename(),
1239
def test_get_global_config(self):
1240
my_config = config.BranchConfig(FakeBranch('http://example.com'))
1241
global_config = my_config._get_global_config()
1242
self.assertIsInstance(global_config, config.GlobalConfig)
1243
self.assertIs(global_config, my_config._get_global_config())
1245
def assertLocationMatching(self, expected):
1246
self.assertEqual(expected,
1247
list(self.my_location_config._get_matching_sections()))
1249
def test__get_matching_sections_no_match(self):
1250
self.get_branch_config('/')
1251
self.assertLocationMatching([])
1253
def test__get_matching_sections_exact(self):
1254
self.get_branch_config('http://www.example.com')
1255
self.assertLocationMatching([('http://www.example.com', '')])
1257
def test__get_matching_sections_suffix_does_not(self):
1258
self.get_branch_config('http://www.example.com-com')
1259
self.assertLocationMatching([])
1261
def test__get_matching_sections_subdir_recursive(self):
1262
self.get_branch_config('http://www.example.com/com')
1263
self.assertLocationMatching([('http://www.example.com', 'com')])
1265
def test__get_matching_sections_ignoreparent(self):
1266
self.get_branch_config('http://www.example.com/ignoreparent')
1267
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1270
def test__get_matching_sections_ignoreparent_subdir(self):
1271
self.get_branch_config(
1272
'http://www.example.com/ignoreparent/childbranch')
1273
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1276
def test__get_matching_sections_subdir_trailing_slash(self):
1277
self.get_branch_config('/b')
1278
self.assertLocationMatching([('/b/', '')])
1280
def test__get_matching_sections_subdir_child(self):
1281
self.get_branch_config('/a/foo')
1282
self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
1284
def test__get_matching_sections_subdir_child_child(self):
1285
self.get_branch_config('/a/foo/bar')
1286
self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
1288
def test__get_matching_sections_trailing_slash_with_children(self):
1289
self.get_branch_config('/a/')
1290
self.assertLocationMatching([('/a/', '')])
1292
def test__get_matching_sections_explicit_over_glob(self):
1293
# XXX: 2006-09-08 jamesh
1294
# This test only passes because ord('c') > ord('*'). If there
1295
# was a config section for '/a/?', it would get precedence
1297
self.get_branch_config('/a/c')
1298
self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
1300
def test__get_option_policy_normal(self):
1301
self.get_branch_config('http://www.example.com')
1303
self.my_location_config._get_config_policy(
1304
'http://www.example.com', 'normal_option'),
1307
def test__get_option_policy_norecurse(self):
1308
self.get_branch_config('http://www.example.com')
1310
self.my_location_config._get_option_policy(
1311
'http://www.example.com', 'norecurse_option'),
1312
config.POLICY_NORECURSE)
1313
# Test old recurse=False setting:
1315
self.my_location_config._get_option_policy(
1316
'http://www.example.com/norecurse', 'normal_option'),
1317
config.POLICY_NORECURSE)
1319
def test__get_option_policy_normal(self):
1320
self.get_branch_config('http://www.example.com')
1322
self.my_location_config._get_option_policy(
1323
'http://www.example.com', 'appendpath_option'),
1324
config.POLICY_APPENDPATH)
1326
def test__get_options_with_policy(self):
1327
self.get_branch_config('/dir/subdir',
1328
location_config="""\
1330
other_url = /other-dir
1331
other_url:policy = appendpath
1333
other_url = /other-subdir
1336
[(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
1337
(u'other_url', u'/other-dir', u'/dir', 'locations'),
1338
(u'other_url:policy', u'appendpath', u'/dir', 'locations')],
1339
self.my_location_config)
1341
def test_location_without_username(self):
1342
self.get_branch_config('http://www.example.com/ignoreparent')
1343
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1344
self.my_config.username())
1346
def test_location_not_listed(self):
1347
"""Test that the global username is used when no location matches"""
1348
self.get_branch_config('/home/robertc/sources')
1349
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1350
self.my_config.username())
1352
def test_overriding_location(self):
1353
self.get_branch_config('http://www.example.com/foo')
1354
self.assertEqual('Robert Collins <robertc@example.org>',
1355
self.my_config.username())
1357
def test_get_user_option_global(self):
1358
self.get_branch_config('/a')
1359
self.assertEqual('something',
1360
self.my_config.get_user_option('user_global_option'))
1362
def test_get_user_option_local(self):
1363
self.get_branch_config('/a')
1364
self.assertEqual('local',
1365
self.my_config.get_user_option('user_local_option'))
1367
def test_get_user_option_appendpath(self):
1368
# returned as is for the base path:
1369
self.get_branch_config('http://www.example.com')
1370
self.assertEqual('append',
1371
self.my_config.get_user_option('appendpath_option'))
1372
# Extra path components get appended:
1373
self.get_branch_config('http://www.example.com/a/b/c')
1374
self.assertEqual('append/a/b/c',
1375
self.my_config.get_user_option('appendpath_option'))
1376
# Overriden for http://www.example.com/dir, where it is a
1378
self.get_branch_config('http://www.example.com/dir/a/b/c')
1379
self.assertEqual('normal',
1380
self.my_config.get_user_option('appendpath_option'))
1382
def test_get_user_option_norecurse(self):
1383
self.get_branch_config('http://www.example.com')
1384
self.assertEqual('norecurse',
1385
self.my_config.get_user_option('norecurse_option'))
1386
self.get_branch_config('http://www.example.com/dir')
1387
self.assertEqual(None,
1388
self.my_config.get_user_option('norecurse_option'))
1389
# http://www.example.com/norecurse is a recurse=False section
1390
# that redefines normal_option. Subdirectories do not pick up
1391
# this redefinition.
1392
self.get_branch_config('http://www.example.com/norecurse')
1393
self.assertEqual('norecurse',
1394
self.my_config.get_user_option('normal_option'))
1395
self.get_branch_config('http://www.example.com/norecurse/subdir')
1396
self.assertEqual('normal',
1397
self.my_config.get_user_option('normal_option'))
1399
def test_set_user_option_norecurse(self):
1400
self.get_branch_config('http://www.example.com')
1401
self.my_config.set_user_option('foo', 'bar',
1402
store=config.STORE_LOCATION_NORECURSE)
1404
self.my_location_config._get_option_policy(
1405
'http://www.example.com', 'foo'),
1406
config.POLICY_NORECURSE)
1408
def test_set_user_option_appendpath(self):
1409
self.get_branch_config('http://www.example.com')
1410
self.my_config.set_user_option('foo', 'bar',
1411
store=config.STORE_LOCATION_APPENDPATH)
1413
self.my_location_config._get_option_policy(
1414
'http://www.example.com', 'foo'),
1415
config.POLICY_APPENDPATH)
1417
def test_set_user_option_change_policy(self):
1418
self.get_branch_config('http://www.example.com')
1419
self.my_config.set_user_option('norecurse_option', 'normal',
1420
store=config.STORE_LOCATION)
1422
self.my_location_config._get_option_policy(
1423
'http://www.example.com', 'norecurse_option'),
1426
def get_branch_config(self, location, global_config=None,
1427
location_config=None):
1428
my_branch = FakeBranch(location)
1429
if global_config is None:
1430
global_config = sample_config_text
1431
if location_config is None:
1432
location_config = sample_branches_text
1434
config.GlobalConfig.from_string(global_config, save=True)
1435
config.LocationConfig.from_string(location_config, my_branch.base,
1437
my_config = config.BranchConfig(my_branch)
1438
self.my_config = my_config
1439
self.my_location_config = my_config._get_location_config()
1441
def test_set_user_setting_sets_and_saves2(self):
1442
self.get_branch_config('/a/c')
1443
self.assertIs(self.my_config.get_user_option('foo'), None)
1444
self.my_config.set_user_option('foo', 'bar')
1446
self.my_config.branch.control_files.files['branch.conf'].strip(),
1448
self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
1449
self.my_config.set_user_option('foo', 'baz',
1450
store=config.STORE_LOCATION)
1451
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1452
self.my_config.set_user_option('foo', 'qux')
1453
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1455
def test_get_bzr_remote_path(self):
1456
my_config = config.LocationConfig('/a/c')
1457
self.assertEqual('bzr', my_config.get_bzr_remote_path())
1458
my_config.set_user_option('bzr_remote_path', '/path-bzr')
1459
self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
1460
self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
1461
self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
1464
precedence_global = 'option = global'
1465
precedence_branch = 'option = branch'
1466
precedence_location = """
1470
[http://example.com/specific]
1474
class TestBranchConfigItems(tests.TestCaseInTempDir):
1476
def get_branch_config(self, global_config=None, location=None,
1477
location_config=None, branch_data_config=None):
1478
my_branch = FakeBranch(location)
1479
if global_config is not None:
1480
config.GlobalConfig.from_string(global_config, save=True)
1481
if location_config is not None:
1482
config.LocationConfig.from_string(location_config, my_branch.base,
1484
my_config = config.BranchConfig(my_branch)
1485
if branch_data_config is not None:
1486
my_config.branch.control_files.files['branch.conf'] = \
1490
def test_user_id(self):
1491
branch = FakeBranch()
1492
my_config = config.BranchConfig(branch)
1493
self.assertIsNot(None, my_config.username())
1494
my_config.branch.control_files.files['email'] = "John"
1495
my_config.set_user_option('email',
1496
"Robert Collins <robertc@example.org>")
1497
self.assertEqual("Robert Collins <robertc@example.org>",
1498
my_config.username())
1500
def test_BRZ_EMAIL_OVERRIDES(self):
1501
self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
1502
branch = FakeBranch()
1503
my_config = config.BranchConfig(branch)
1504
self.assertEqual("Robert Collins <robertc@example.org>",
1505
my_config.username())
1507
def test_get_user_option_global(self):
1508
my_config = self.get_branch_config(global_config=sample_config_text)
1509
self.assertEqual('something',
1510
my_config.get_user_option('user_global_option'))
1512
def test_config_precedence(self):
1513
# FIXME: eager test, luckily no persitent config file makes it fail
1515
my_config = self.get_branch_config(global_config=precedence_global)
1516
self.assertEqual(my_config.get_user_option('option'), 'global')
1517
my_config = self.get_branch_config(global_config=precedence_global,
1518
branch_data_config=precedence_branch)
1519
self.assertEqual(my_config.get_user_option('option'), 'branch')
1520
my_config = self.get_branch_config(
1521
global_config=precedence_global,
1522
branch_data_config=precedence_branch,
1523
location_config=precedence_location)
1524
self.assertEqual(my_config.get_user_option('option'), 'recurse')
1525
my_config = self.get_branch_config(
1526
global_config=precedence_global,
1527
branch_data_config=precedence_branch,
1528
location_config=precedence_location,
1529
location='http://example.com/specific')
1530
self.assertEqual(my_config.get_user_option('option'), 'exact')
1533
class TestMailAddressExtraction(tests.TestCase):
1535
def test_extract_email_address(self):
1536
self.assertEqual('jane@test.com',
1537
config.extract_email_address('Jane <jane@test.com>'))
1538
self.assertRaises(config.NoEmailInUsername,
1539
config.extract_email_address, 'Jane Tester')
1541
def test_parse_username(self):
1542
self.assertEqual(('', 'jdoe@example.com'),
1543
config.parse_username('jdoe@example.com'))
1544
self.assertEqual(('', 'jdoe@example.com'),
1545
config.parse_username('<jdoe@example.com>'))
1546
self.assertEqual(('John Doe', 'jdoe@example.com'),
1547
config.parse_username('John Doe <jdoe@example.com>'))
1548
self.assertEqual(('John Doe', ''),
1549
config.parse_username('John Doe'))
1550
self.assertEqual(('John Doe', 'jdoe@example.com'),
1551
config.parse_username('John Doe jdoe@example.com'))
1553
class TestTreeConfig(tests.TestCaseWithTransport):
1555
def test_get_value(self):
1556
"""Test that retreiving a value from a section is possible"""
1557
branch = self.make_branch('.')
1558
tree_config = config.TreeConfig(branch)
1559
tree_config.set_option('value', 'key', 'SECTION')
1560
tree_config.set_option('value2', 'key2')
1561
tree_config.set_option('value3-top', 'key3')
1562
tree_config.set_option('value3-section', 'key3', 'SECTION')
1563
value = tree_config.get_option('key', 'SECTION')
1564
self.assertEqual(value, 'value')
1565
value = tree_config.get_option('key2')
1566
self.assertEqual(value, 'value2')
1567
self.assertEqual(tree_config.get_option('non-existant'), None)
1568
value = tree_config.get_option('non-existant', 'SECTION')
1569
self.assertEqual(value, None)
1570
value = tree_config.get_option('non-existant', default='default')
1571
self.assertEqual(value, 'default')
1572
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1573
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1574
self.assertEqual(value, 'default')
1575
value = tree_config.get_option('key3')
1576
self.assertEqual(value, 'value3-top')
1577
value = tree_config.get_option('key3', 'SECTION')
1578
self.assertEqual(value, 'value3-section')
1581
class TestTransportConfig(tests.TestCaseWithTransport):
1583
def test_load_utf8(self):
1584
"""Ensure we can load an utf8-encoded file."""
1585
t = self.get_transport()
1586
unicode_user = u'b\N{Euro Sign}ar'
1587
unicode_content = u'user=%s' % (unicode_user,)
1588
utf8_content = unicode_content.encode('utf8')
1589
# Store the raw content in the config file
1590
t.put_bytes('foo.conf', utf8_content)
1591
conf = config.TransportConfig(t, 'foo.conf')
1592
self.assertEqual(unicode_user, conf.get_option('user'))
1594
def test_load_non_ascii(self):
1595
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
1596
t = self.get_transport()
1597
t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
1598
conf = config.TransportConfig(t, 'foo.conf')
1599
self.assertRaises(config.ConfigContentError, conf._get_configobj)
1601
def test_load_erroneous_content(self):
1602
"""Ensure we display a proper error on content that can't be parsed."""
1603
t = self.get_transport()
1604
t.put_bytes('foo.conf', '[open_section\n')
1605
conf = config.TransportConfig(t, 'foo.conf')
1606
self.assertRaises(config.ParseConfigError, conf._get_configobj)
1608
def test_load_permission_denied(self):
1609
"""Ensure we get an empty config file if the file is inaccessible."""
1612
warnings.append(args[0] % args[1:])
1613
self.overrideAttr(trace, 'warning', warning)
1615
class DenyingTransport(object):
1617
def __init__(self, base):
1620
def get_bytes(self, relpath):
1621
raise errors.PermissionDenied(relpath, "")
1623
cfg = config.TransportConfig(
1624
DenyingTransport("nonexisting://"), 'control.conf')
1625
self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
1628
[u'Permission denied while trying to open configuration file '
1629
u'nonexisting:///control.conf.'])
1631
def test_get_value(self):
1632
"""Test that retreiving a value from a section is possible"""
1633
bzrdir_config = config.TransportConfig(self.get_transport('.'),
1635
bzrdir_config.set_option('value', 'key', 'SECTION')
1636
bzrdir_config.set_option('value2', 'key2')
1637
bzrdir_config.set_option('value3-top', 'key3')
1638
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
1639
value = bzrdir_config.get_option('key', 'SECTION')
1640
self.assertEqual(value, 'value')
1641
value = bzrdir_config.get_option('key2')
1642
self.assertEqual(value, 'value2')
1643
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
1644
value = bzrdir_config.get_option('non-existant', 'SECTION')
1645
self.assertEqual(value, None)
1646
value = bzrdir_config.get_option('non-existant', default='default')
1647
self.assertEqual(value, 'default')
1648
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
1649
value = bzrdir_config.get_option('key2', 'NOSECTION',
1651
self.assertEqual(value, 'default')
1652
value = bzrdir_config.get_option('key3')
1653
self.assertEqual(value, 'value3-top')
1654
value = bzrdir_config.get_option('key3', 'SECTION')
1655
self.assertEqual(value, 'value3-section')
1657
def test_set_unset_default_stack_on(self):
1658
my_dir = self.make_controldir('.')
1659
bzrdir_config = config.BzrDirConfig(my_dir)
1660
self.assertIs(None, bzrdir_config.get_default_stack_on())
1661
bzrdir_config.set_default_stack_on('Foo')
1662
self.assertEqual('Foo', bzrdir_config._config.get_option(
1663
'default_stack_on'))
1664
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
1665
bzrdir_config.set_default_stack_on(None)
1666
self.assertIs(None, bzrdir_config.get_default_stack_on())
1669
class TestOldConfigHooks(tests.TestCaseWithTransport):
1672
super(TestOldConfigHooks, self).setUp()
1673
create_configs_with_file_option(self)
1675
def assertGetHook(self, conf, name, value):
1679
config.OldConfigHooks.install_named_hook('get', hook, None)
1681
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1682
self.assertLength(0, calls)
1683
actual_value = conf.get_user_option(name)
1684
self.assertEqual(value, actual_value)
1685
self.assertLength(1, calls)
1686
self.assertEqual((conf, name, value), calls[0])
1688
def test_get_hook_bazaar(self):
1689
self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
1691
def test_get_hook_locations(self):
1692
self.assertGetHook(self.locations_config, 'file', 'locations')
1694
def test_get_hook_branch(self):
1695
# Since locations masks branch, we define a different option
1696
self.branch_config.set_user_option('file2', 'branch')
1697
self.assertGetHook(self.branch_config, 'file2', 'branch')
1699
def assertSetHook(self, conf, name, value):
1703
config.OldConfigHooks.install_named_hook('set', hook, None)
1705
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1706
self.assertLength(0, calls)
1707
conf.set_user_option(name, value)
1708
self.assertLength(1, calls)
1709
# We can't assert the conf object below as different configs use
1710
# different means to implement set_user_option and we care only about
1712
self.assertEqual((name, value), calls[0][1:])
1714
def test_set_hook_bazaar(self):
1715
self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
1717
def test_set_hook_locations(self):
1718
self.assertSetHook(self.locations_config, 'foo', 'locations')
1720
def test_set_hook_branch(self):
1721
self.assertSetHook(self.branch_config, 'foo', 'branch')
1723
def assertRemoveHook(self, conf, name, section_name=None):
1727
config.OldConfigHooks.install_named_hook('remove', hook, None)
1729
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1730
self.assertLength(0, calls)
1731
conf.remove_user_option(name, section_name)
1732
self.assertLength(1, calls)
1733
# We can't assert the conf object below as different configs use
1734
# different means to implement remove_user_option and we care only about
1736
self.assertEqual((name,), calls[0][1:])
1738
def test_remove_hook_bazaar(self):
1739
self.assertRemoveHook(self.bazaar_config, 'file')
1741
def test_remove_hook_locations(self):
1742
self.assertRemoveHook(self.locations_config, 'file',
1743
self.locations_config.location)
1745
def test_remove_hook_branch(self):
1746
self.assertRemoveHook(self.branch_config, 'file')
1748
def assertLoadHook(self, name, conf_class, *conf_args):
1752
config.OldConfigHooks.install_named_hook('load', hook, None)
1754
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1755
self.assertLength(0, calls)
1757
conf = conf_class(*conf_args)
1758
# Access an option to trigger a load
1759
conf.get_user_option(name)
1760
self.assertLength(1, calls)
1761
# Since we can't assert about conf, we just use the number of calls ;-/
1763
def test_load_hook_bazaar(self):
1764
self.assertLoadHook('file', config.GlobalConfig)
1766
def test_load_hook_locations(self):
1767
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1769
def test_load_hook_branch(self):
1770
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1772
def assertSaveHook(self, conf):
1776
config.OldConfigHooks.install_named_hook('save', hook, None)
1778
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1779
self.assertLength(0, calls)
1780
# Setting an option triggers a save
1781
conf.set_user_option('foo', 'bar')
1782
self.assertLength(1, calls)
1783
# Since we can't assert about conf, we just use the number of calls ;-/
1785
def test_save_hook_bazaar(self):
1786
self.assertSaveHook(self.bazaar_config)
1788
def test_save_hook_locations(self):
1789
self.assertSaveHook(self.locations_config)
1791
def test_save_hook_branch(self):
1792
self.assertSaveHook(self.branch_config)
1795
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1796
"""Tests config hooks for remote configs.
1798
No tests for the remove hook as this is not implemented there.
1802
super(TestOldConfigHooksForRemote, self).setUp()
1803
self.transport_server = test_server.SmartTCPServer_for_testing
1804
create_configs_with_file_option(self)
1806
def assertGetHook(self, conf, name, value):
1810
config.OldConfigHooks.install_named_hook('get', hook, None)
1812
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1813
self.assertLength(0, calls)
1814
actual_value = conf.get_option(name)
1815
self.assertEqual(value, actual_value)
1816
self.assertLength(1, calls)
1817
self.assertEqual((conf, name, value), calls[0])
1819
def test_get_hook_remote_branch(self):
1820
remote_branch = branch.Branch.open(self.get_url('tree'))
1821
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1823
def test_get_hook_remote_bzrdir(self):
1824
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1825
conf = remote_bzrdir._get_config()
1826
conf.set_option('remotedir', 'file')
1827
self.assertGetHook(conf, 'file', 'remotedir')
1829
def assertSetHook(self, conf, name, value):
1833
config.OldConfigHooks.install_named_hook('set', hook, None)
1835
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1836
self.assertLength(0, calls)
1837
conf.set_option(value, name)
1838
self.assertLength(1, calls)
1839
# We can't assert the conf object below as different configs use
1840
# different means to implement set_user_option and we care only about
1842
self.assertEqual((name, value), calls[0][1:])
1844
def test_set_hook_remote_branch(self):
1845
remote_branch = branch.Branch.open(self.get_url('tree'))
1846
self.addCleanup(remote_branch.lock_write().unlock)
1847
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1849
def test_set_hook_remote_bzrdir(self):
1850
remote_branch = branch.Branch.open(self.get_url('tree'))
1851
self.addCleanup(remote_branch.lock_write().unlock)
1852
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1853
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1855
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1859
config.OldConfigHooks.install_named_hook('load', hook, None)
1861
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1862
self.assertLength(0, calls)
1864
conf = conf_class(*conf_args)
1865
# Access an option to trigger a load
1866
conf.get_option(name)
1867
self.assertLength(expected_nb_calls, calls)
1868
# Since we can't assert about conf, we just use the number of calls ;-/
1870
def test_load_hook_remote_branch(self):
1871
remote_branch = branch.Branch.open(self.get_url('tree'))
1872
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
1874
def test_load_hook_remote_bzrdir(self):
1875
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1876
# The config file doesn't exist, set an option to force its creation
1877
conf = remote_bzrdir._get_config()
1878
conf.set_option('remotedir', 'file')
1879
# We get one call for the server and one call for the client, this is
1880
# caused by the differences in implementations betwen
1881
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1882
# SmartServerBranchGetConfigFile (in smart/branch.py)
1883
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1885
def assertSaveHook(self, conf):
1889
config.OldConfigHooks.install_named_hook('save', hook, None)
1891
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1892
self.assertLength(0, calls)
1893
# Setting an option triggers a save
1894
conf.set_option('foo', 'bar')
1895
self.assertLength(1, calls)
1896
# Since we can't assert about conf, we just use the number of calls ;-/
1898
def test_save_hook_remote_branch(self):
1899
remote_branch = branch.Branch.open(self.get_url('tree'))
1900
self.addCleanup(remote_branch.lock_write().unlock)
1901
self.assertSaveHook(remote_branch._get_config())
1903
def test_save_hook_remote_bzrdir(self):
1904
remote_branch = branch.Branch.open(self.get_url('tree'))
1905
self.addCleanup(remote_branch.lock_write().unlock)
1906
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1907
self.assertSaveHook(remote_bzrdir._get_config())
1910
class TestOptionNames(tests.TestCase):
1912
def is_valid(self, name):
1913
return config._option_ref_re.match('{%s}' % name) is not None
1915
def test_valid_names(self):
1916
self.assertTrue(self.is_valid('foo'))
1917
self.assertTrue(self.is_valid('foo.bar'))
1918
self.assertTrue(self.is_valid('f1'))
1919
self.assertTrue(self.is_valid('_'))
1920
self.assertTrue(self.is_valid('__bar__'))
1921
self.assertTrue(self.is_valid('a_'))
1922
self.assertTrue(self.is_valid('a1'))
1923
# Don't break bzr-svn for no good reason
1924
self.assertTrue(self.is_valid('guessed-layout'))
1926
def test_invalid_names(self):
1927
self.assertFalse(self.is_valid(' foo'))
1928
self.assertFalse(self.is_valid('foo '))
1929
self.assertFalse(self.is_valid('1'))
1930
self.assertFalse(self.is_valid('1,2'))
1931
self.assertFalse(self.is_valid('foo$'))
1932
self.assertFalse(self.is_valid('!foo'))
1933
self.assertFalse(self.is_valid('foo.'))
1934
self.assertFalse(self.is_valid('foo..bar'))
1935
self.assertFalse(self.is_valid('{}'))
1936
self.assertFalse(self.is_valid('{a}'))
1937
self.assertFalse(self.is_valid('a\n'))
1938
self.assertFalse(self.is_valid('-'))
1939
self.assertFalse(self.is_valid('-a'))
1940
self.assertFalse(self.is_valid('a-'))
1941
self.assertFalse(self.is_valid('a--a'))
1943
def assertSingleGroup(self, reference):
1944
# the regexp is used with split and as such should match the reference
1945
# *only*, if more groups needs to be defined, (?:...) should be used.
1946
m = config._option_ref_re.match('{a}')
1947
self.assertLength(1, m.groups())
1949
def test_valid_references(self):
1950
self.assertSingleGroup('{a}')
1951
self.assertSingleGroup('{{a}}')
1954
class TestOption(tests.TestCase):
1956
def test_default_value(self):
1957
opt = config.Option('foo', default='bar')
1958
self.assertEqual('bar', opt.get_default())
1960
def test_callable_default_value(self):
1961
def bar_as_unicode():
1963
opt = config.Option('foo', default=bar_as_unicode)
1964
self.assertEqual('bar', opt.get_default())
1966
def test_default_value_from_env(self):
1967
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
1968
self.overrideEnv('FOO', 'quux')
1969
# Env variable provides a default taking over the option one
1970
self.assertEqual('quux', opt.get_default())
1972
def test_first_default_value_from_env_wins(self):
1973
opt = config.Option('foo', default='bar',
1974
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
1975
self.overrideEnv('FOO', 'foo')
1976
self.overrideEnv('BAZ', 'baz')
1977
# The first env var set wins
1978
self.assertEqual('foo', opt.get_default())
1980
def test_not_supported_list_default_value(self):
1981
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
1983
def test_not_supported_object_default_value(self):
1984
self.assertRaises(AssertionError, config.Option, 'foo',
1987
def test_not_supported_callable_default_value_not_unicode(self):
1988
def bar_not_unicode():
1990
opt = config.Option('foo', default=bar_not_unicode)
1991
self.assertRaises(AssertionError, opt.get_default)
1993
def test_get_help_topic(self):
1994
opt = config.Option('foo')
1995
self.assertEqual('foo', opt.get_help_topic())
1998
class TestOptionConverter(tests.TestCase):
2000
def assertConverted(self, expected, opt, value):
2001
self.assertEqual(expected, opt.convert_from_unicode(None, value))
2003
def assertCallsWarning(self, opt, value):
2007
warnings.append(args[0] % args[1:])
2008
self.overrideAttr(trace, 'warning', warning)
2009
self.assertEqual(None, opt.convert_from_unicode(None, value))
2010
self.assertLength(1, warnings)
2012
'Value "%s" is not valid for "%s"' % (value, opt.name),
2015
def assertCallsError(self, opt, value):
2016
self.assertRaises(config.ConfigOptionValueError,
2017
opt.convert_from_unicode, None, value)
2019
def assertConvertInvalid(self, opt, invalid_value):
2021
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2022
opt.invalid = 'warning'
2023
self.assertCallsWarning(opt, invalid_value)
2024
opt.invalid = 'error'
2025
self.assertCallsError(opt, invalid_value)
2028
class TestOptionWithBooleanConverter(TestOptionConverter):
2030
def get_option(self):
2031
return config.Option('foo', help='A boolean.',
2032
from_unicode=config.bool_from_store)
2034
def test_convert_invalid(self):
2035
opt = self.get_option()
2036
# A string that is not recognized as a boolean
2037
self.assertConvertInvalid(opt, u'invalid-boolean')
2038
# A list of strings is never recognized as a boolean
2039
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2041
def test_convert_valid(self):
2042
opt = self.get_option()
2043
self.assertConverted(True, opt, u'True')
2044
self.assertConverted(True, opt, u'1')
2045
self.assertConverted(False, opt, u'False')
2048
class TestOptionWithIntegerConverter(TestOptionConverter):
2050
def get_option(self):
2051
return config.Option('foo', help='An integer.',
2052
from_unicode=config.int_from_store)
2054
def test_convert_invalid(self):
2055
opt = self.get_option()
2056
# A string that is not recognized as an integer
2057
self.assertConvertInvalid(opt, u'forty-two')
2058
# A list of strings is never recognized as an integer
2059
self.assertConvertInvalid(opt, [u'a', u'list'])
2061
def test_convert_valid(self):
2062
opt = self.get_option()
2063
self.assertConverted(16, opt, u'16')
2066
class TestOptionWithSIUnitConverter(TestOptionConverter):
2068
def get_option(self):
2069
return config.Option('foo', help='An integer in SI units.',
2070
from_unicode=config.int_SI_from_store)
2072
def test_convert_invalid(self):
2073
opt = self.get_option()
2074
self.assertConvertInvalid(opt, u'not-a-unit')
2075
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2076
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2077
self.assertConvertInvalid(opt, u'1GG')
2078
self.assertConvertInvalid(opt, u'1Mbb')
2079
self.assertConvertInvalid(opt, u'1MM')
2081
def test_convert_valid(self):
2082
opt = self.get_option()
2083
self.assertConverted(int(5e3), opt, u'5kb')
2084
self.assertConverted(int(5e6), opt, u'5M')
2085
self.assertConverted(int(5e6), opt, u'5MB')
2086
self.assertConverted(int(5e9), opt, u'5g')
2087
self.assertConverted(int(5e9), opt, u'5gB')
2088
self.assertConverted(100, opt, u'100')
2091
class TestListOption(TestOptionConverter):
2093
def get_option(self):
2094
return config.ListOption('foo', help='A list.')
2096
def test_convert_invalid(self):
2097
opt = self.get_option()
2098
# We don't even try to convert a list into a list, we only expect
2100
self.assertConvertInvalid(opt, [1])
2101
# No string is invalid as all forms can be converted to a list
2103
def test_convert_valid(self):
2104
opt = self.get_option()
2105
# An empty string is an empty list
2106
self.assertConverted([], opt, '') # Using a bare str() just in case
2107
self.assertConverted([], opt, u'')
2109
self.assertConverted([u'True'], opt, u'True')
2111
self.assertConverted([u'42'], opt, u'42')
2113
self.assertConverted([u'bar'], opt, u'bar')
2116
class TestRegistryOption(TestOptionConverter):
2118
def get_option(self, registry):
2119
return config.RegistryOption('foo', registry,
2120
help='A registry option.')
2122
def test_convert_invalid(self):
2123
registry = _mod_registry.Registry()
2124
opt = self.get_option(registry)
2125
self.assertConvertInvalid(opt, [1])
2126
self.assertConvertInvalid(opt, u"notregistered")
2128
def test_convert_valid(self):
2129
registry = _mod_registry.Registry()
2130
registry.register("someval", 1234)
2131
opt = self.get_option(registry)
2132
# Using a bare str() just in case
2133
self.assertConverted(1234, opt, "someval")
2134
self.assertConverted(1234, opt, u'someval')
2135
self.assertConverted(None, opt, None)
2137
def test_help(self):
2138
registry = _mod_registry.Registry()
2139
registry.register("someval", 1234, help="some option")
2140
registry.register("dunno", 1234, help="some other option")
2141
opt = self.get_option(registry)
2143
'A registry option.\n'
2145
'The following values are supported:\n'
2146
' dunno - some other option\n'
2147
' someval - some option\n',
2150
def test_get_help_text(self):
2151
registry = _mod_registry.Registry()
2152
registry.register("someval", 1234, help="some option")
2153
registry.register("dunno", 1234, help="some other option")
2154
opt = self.get_option(registry)
2156
'A registry option.\n'
2158
'The following values are supported:\n'
2159
' dunno - some other option\n'
2160
' someval - some option\n',
2161
opt.get_help_text())
2164
class TestOptionRegistry(tests.TestCase):
2167
super(TestOptionRegistry, self).setUp()
2168
# Always start with an empty registry
2169
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2170
self.registry = config.option_registry
2172
def test_register(self):
2173
opt = config.Option('foo')
2174
self.registry.register(opt)
2175
self.assertIs(opt, self.registry.get('foo'))
2177
def test_registered_help(self):
2178
opt = config.Option('foo', help='A simple option')
2179
self.registry.register(opt)
2180
self.assertEqual('A simple option', self.registry.get_help('foo'))
2182
def test_dont_register_illegal_name(self):
2183
self.assertRaises(config.IllegalOptionName,
2184
self.registry.register, config.Option(' foo'))
2185
self.assertRaises(config.IllegalOptionName,
2186
self.registry.register, config.Option('bar,'))
2188
lazy_option = config.Option('lazy_foo', help='Lazy help')
2190
def test_register_lazy(self):
2191
self.registry.register_lazy('lazy_foo', self.__module__,
2192
'TestOptionRegistry.lazy_option')
2193
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2195
def test_registered_lazy_help(self):
2196
self.registry.register_lazy('lazy_foo', self.__module__,
2197
'TestOptionRegistry.lazy_option')
2198
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2200
def test_dont_lazy_register_illegal_name(self):
2201
# This is where the root cause of http://pad.lv/1235099 is better
2202
# understood: 'register_lazy' doc string mentions that key should match
2203
# the option name which indirectly requires that the option name is a
2204
# valid python identifier. We violate that rule here (using a key that
2205
# doesn't match the option name) to test the option name checking.
2206
self.assertRaises(config.IllegalOptionName,
2207
self.registry.register_lazy, ' foo', self.__module__,
2208
'TestOptionRegistry.lazy_option')
2209
self.assertRaises(config.IllegalOptionName,
2210
self.registry.register_lazy, '1,2', self.__module__,
2211
'TestOptionRegistry.lazy_option')
2214
class TestRegisteredOptions(tests.TestCase):
2215
"""All registered options should verify some constraints."""
2217
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2218
in config.option_registry.iteritems()]
2221
super(TestRegisteredOptions, self).setUp()
2222
self.registry = config.option_registry
2224
def test_proper_name(self):
2225
# An option should be registered under its own name, this can't be
2226
# checked at registration time for the lazy ones.
2227
self.assertEqual(self.option_name, self.option.name)
2229
def test_help_is_set(self):
2230
option_help = self.registry.get_help(self.option_name)
2231
# Come on, think about the user, he really wants to know what the
2233
self.assertIsNot(None, option_help)
2234
self.assertNotEqual('', option_help)
2237
class TestSection(tests.TestCase):
2239
# FIXME: Parametrize so that all sections produced by Stores run these
2240
# tests -- vila 2011-04-01
2242
def test_get_a_value(self):
2243
a_dict = dict(foo='bar')
2244
section = config.Section('myID', a_dict)
2245
self.assertEqual('bar', section.get('foo'))
2247
def test_get_unknown_option(self):
2249
section = config.Section(None, a_dict)
2250
self.assertEqual('out of thin air',
2251
section.get('foo', 'out of thin air'))
2253
def test_options_is_shared(self):
2255
section = config.Section(None, a_dict)
2256
self.assertIs(a_dict, section.options)
2259
class TestMutableSection(tests.TestCase):
2261
scenarios = [('mutable',
2263
lambda opts: config.MutableSection('myID', opts)},),
2267
a_dict = dict(foo='bar')
2268
section = self.get_section(a_dict)
2269
section.set('foo', 'new_value')
2270
self.assertEqual('new_value', section.get('foo'))
2271
# The change appears in the shared section
2272
self.assertEqual('new_value', a_dict.get('foo'))
2273
# We keep track of the change
2274
self.assertTrue('foo' in section.orig)
2275
self.assertEqual('bar', section.orig.get('foo'))
2277
def test_set_preserve_original_once(self):
2278
a_dict = dict(foo='bar')
2279
section = self.get_section(a_dict)
2280
section.set('foo', 'first_value')
2281
section.set('foo', 'second_value')
2282
# We keep track of the original value
2283
self.assertTrue('foo' in section.orig)
2284
self.assertEqual('bar', section.orig.get('foo'))
2286
def test_remove(self):
2287
a_dict = dict(foo='bar')
2288
section = self.get_section(a_dict)
2289
section.remove('foo')
2290
# We get None for unknown options via the default value
2291
self.assertEqual(None, section.get('foo'))
2292
# Or we just get the default value
2293
self.assertEqual('unknown', section.get('foo', 'unknown'))
2294
self.assertFalse('foo' in section.options)
2295
# We keep track of the deletion
2296
self.assertTrue('foo' in section.orig)
2297
self.assertEqual('bar', section.orig.get('foo'))
2299
def test_remove_new_option(self):
2301
section = self.get_section(a_dict)
2302
section.set('foo', 'bar')
2303
section.remove('foo')
2304
self.assertFalse('foo' in section.options)
2305
# The option didn't exist initially so it we need to keep track of it
2306
# with a special value
2307
self.assertTrue('foo' in section.orig)
2308
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2311
class TestCommandLineStore(tests.TestCase):
2314
super(TestCommandLineStore, self).setUp()
2315
self.store = config.CommandLineStore()
2316
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2318
def get_section(self):
2319
"""Get the unique section for the command line overrides."""
2320
sections = list(self.store.get_sections())
2321
self.assertLength(1, sections)
2322
store, section = sections[0]
2323
self.assertEqual(self.store, store)
2326
def test_no_override(self):
2327
self.store._from_cmdline([])
2328
section = self.get_section()
2329
self.assertLength(0, list(section.iter_option_names()))
2331
def test_simple_override(self):
2332
self.store._from_cmdline(['a=b'])
2333
section = self.get_section()
2334
self.assertEqual('b', section.get('a'))
2336
def test_list_override(self):
2337
opt = config.ListOption('l')
2338
config.option_registry.register(opt)
2339
self.store._from_cmdline(['l=1,2,3'])
2340
val = self.get_section().get('l')
2341
self.assertEqual('1,2,3', val)
2342
# Reminder: lists should be registered as such explicitely, otherwise
2343
# the conversion needs to be done afterwards.
2344
self.assertEqual(['1', '2', '3'],
2345
opt.convert_from_unicode(self.store, val))
2347
def test_multiple_overrides(self):
2348
self.store._from_cmdline(['a=b', 'x=y'])
2349
section = self.get_section()
2350
self.assertEqual('b', section.get('a'))
2351
self.assertEqual('y', section.get('x'))
2353
def test_wrong_syntax(self):
2354
self.assertRaises(errors.BzrCommandError,
2355
self.store._from_cmdline, ['a=b', 'c'])
2357
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2359
scenarios = [(key, {'get_store': builder}) for key, builder
2360
in config.test_store_builder_registry.iteritems()] + [
2361
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2364
store = self.get_store(self)
2365
if isinstance(store, config.TransportIniFileStore):
2366
raise tests.TestNotApplicable(
2367
"%s is not a concrete Store implementation"
2368
" so it doesn't need an id" % (store.__class__.__name__,))
2369
self.assertIsNot(None, store.id)
2372
class TestStore(tests.TestCaseWithTransport):
2374
def assertSectionContent(self, expected, store_and_section):
2375
"""Assert that some options have the proper values in a section."""
2376
_, section = store_and_section
2377
expected_name, expected_options = expected
2378
self.assertEqual(expected_name, section.id)
2381
dict([(k, section.get(k)) for k in expected_options.keys()]))
2384
class TestReadonlyStore(TestStore):
2386
scenarios = [(key, {'get_store': builder}) for key, builder
2387
in config.test_store_builder_registry.iteritems()]
2389
def test_building_delays_load(self):
2390
store = self.get_store(self)
2391
self.assertEqual(False, store.is_loaded())
2392
store._load_from_string('')
2393
self.assertEqual(True, store.is_loaded())
2395
def test_get_no_sections_for_empty(self):
2396
store = self.get_store(self)
2397
store._load_from_string('')
2398
self.assertEqual([], list(store.get_sections()))
2400
def test_get_default_section(self):
2401
store = self.get_store(self)
2402
store._load_from_string('foo=bar')
2403
sections = list(store.get_sections())
2404
self.assertLength(1, sections)
2405
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2407
def test_get_named_section(self):
2408
store = self.get_store(self)
2409
store._load_from_string('[baz]\nfoo=bar')
2410
sections = list(store.get_sections())
2411
self.assertLength(1, sections)
2412
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2414
def test_load_from_string_fails_for_non_empty_store(self):
2415
store = self.get_store(self)
2416
store._load_from_string('foo=bar')
2417
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2420
class TestStoreQuoting(TestStore):
2422
scenarios = [(key, {'get_store': builder}) for key, builder
2423
in config.test_store_builder_registry.iteritems()]
2426
super(TestStoreQuoting, self).setUp()
2427
self.store = self.get_store(self)
2428
# We need a loaded store but any content will do
2429
self.store._load_from_string('')
2431
def assertIdempotent(self, s):
2432
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2434
What matters here is that option values, as they appear in a store, can
2435
be safely round-tripped out of the store and back.
2437
:param s: A string, quoted if required.
2439
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2440
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2442
def test_empty_string(self):
2443
if isinstance(self.store, config.IniFileStore):
2444
# configobj._quote doesn't handle empty values
2445
self.assertRaises(AssertionError,
2446
self.assertIdempotent, '')
2448
self.assertIdempotent('')
2449
# But quoted empty strings are ok
2450
self.assertIdempotent('""')
2452
def test_embedded_spaces(self):
2453
self.assertIdempotent('" a b c "')
2455
def test_embedded_commas(self):
2456
self.assertIdempotent('" a , b c "')
2458
def test_simple_comma(self):
2459
if isinstance(self.store, config.IniFileStore):
2460
# configobj requires that lists are special-cased
2461
self.assertRaises(AssertionError,
2462
self.assertIdempotent, ',')
2464
self.assertIdempotent(',')
2465
# When a single comma is required, quoting is also required
2466
self.assertIdempotent('","')
2468
def test_list(self):
2469
if isinstance(self.store, config.IniFileStore):
2470
# configobj requires that lists are special-cased
2471
self.assertRaises(AssertionError,
2472
self.assertIdempotent, 'a,b')
2474
self.assertIdempotent('a,b')
2477
class TestDictFromStore(tests.TestCase):
2479
def test_unquote_not_string(self):
2480
conf = config.MemoryStack('x=2\n[a_section]\na=1\n')
2481
value = conf.get('a_section')
2482
# Urgh, despite 'conf' asking for the no-name section, we get the
2483
# content of another section as a dict o_O
2484
self.assertEqual({'a': '1'}, value)
2485
unquoted = conf.store.unquote(value)
2486
# Which cannot be unquoted but shouldn't crash either (the use cases
2487
# are getting the value or displaying it. In the later case, '%s' will
2489
self.assertEqual({'a': '1'}, unquoted)
2490
self.assertEqual("{u'a': u'1'}", '%s' % (unquoted,))
2493
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2494
"""Simulate loading a config store with content of various encodings.
2496
All files produced by bzr are in utf8 content.
2498
Users may modify them manually and end up with a file that can't be
2499
loaded. We need to issue proper error messages in this case.
2502
invalid_utf8_char = '\xff'
2504
def test_load_utf8(self):
2505
"""Ensure we can load an utf8-encoded file."""
2506
t = self.get_transport()
2507
# From http://pad.lv/799212
2508
unicode_user = u'b\N{Euro Sign}ar'
2509
unicode_content = u'user=%s' % (unicode_user,)
2510
utf8_content = unicode_content.encode('utf8')
2511
# Store the raw content in the config file
2512
t.put_bytes('foo.conf', utf8_content)
2513
store = config.TransportIniFileStore(t, 'foo.conf')
2515
stack = config.Stack([store.get_sections], store)
2516
self.assertEqual(unicode_user, stack.get('user'))
2518
def test_load_non_ascii(self):
2519
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2520
t = self.get_transport()
2521
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2522
store = config.TransportIniFileStore(t, 'foo.conf')
2523
self.assertRaises(config.ConfigContentError, store.load)
2525
def test_load_erroneous_content(self):
2526
"""Ensure we display a proper error on content that can't be parsed."""
2527
t = self.get_transport()
2528
t.put_bytes('foo.conf', '[open_section\n')
2529
store = config.TransportIniFileStore(t, 'foo.conf')
2530
self.assertRaises(config.ParseConfigError, store.load)
2532
def test_load_permission_denied(self):
2533
"""Ensure we get warned when trying to load an inaccessible file."""
2536
warnings.append(args[0] % args[1:])
2537
self.overrideAttr(trace, 'warning', warning)
2539
t = self.get_transport()
2541
def get_bytes(relpath):
2542
raise errors.PermissionDenied(relpath, "")
2543
t.get_bytes = get_bytes
2544
store = config.TransportIniFileStore(t, 'foo.conf')
2545
self.assertRaises(errors.PermissionDenied, store.load)
2548
[u'Permission denied while trying to load configuration store %s.'
2549
% store.external_url()])
2552
class TestIniConfigContent(tests.TestCaseWithTransport):
2553
"""Simulate loading a IniBasedConfig with content of various encodings.
2555
All files produced by bzr are in utf8 content.
2557
Users may modify them manually and end up with a file that can't be
2558
loaded. We need to issue proper error messages in this case.
2561
invalid_utf8_char = '\xff'
2563
def test_load_utf8(self):
2564
"""Ensure we can load an utf8-encoded file."""
2565
# From http://pad.lv/799212
2566
unicode_user = u'b\N{Euro Sign}ar'
2567
unicode_content = u'user=%s' % (unicode_user,)
2568
utf8_content = unicode_content.encode('utf8')
2569
# Store the raw content in the config file
2570
with open('foo.conf', 'wb') as f:
2571
f.write(utf8_content)
2572
conf = config.IniBasedConfig(file_name='foo.conf')
2573
self.assertEqual(unicode_user, conf.get_user_option('user'))
2575
def test_load_badly_encoded_content(self):
2576
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2577
with open('foo.conf', 'wb') as f:
2578
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2579
conf = config.IniBasedConfig(file_name='foo.conf')
2580
self.assertRaises(config.ConfigContentError, conf._get_parser)
2582
def test_load_erroneous_content(self):
2583
"""Ensure we display a proper error on content that can't be parsed."""
2584
with open('foo.conf', 'wb') as f:
2585
f.write('[open_section\n')
2586
conf = config.IniBasedConfig(file_name='foo.conf')
2587
self.assertRaises(config.ParseConfigError, conf._get_parser)
2590
class TestMutableStore(TestStore):
2592
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2593
in config.test_store_builder_registry.iteritems()]
2596
super(TestMutableStore, self).setUp()
2597
self.transport = self.get_transport()
2599
def has_store(self, store):
2600
store_basename = urlutils.relative_url(self.transport.external_url(),
2601
store.external_url())
2602
return self.transport.has(store_basename)
2604
def test_save_empty_creates_no_file(self):
2605
# FIXME: There should be a better way than relying on the test
2606
# parametrization to identify branch.conf -- vila 2011-0526
2607
if self.store_id in ('branch', 'remote_branch'):
2608
raise tests.TestNotApplicable(
2609
'branch.conf is *always* created when a branch is initialized')
2610
store = self.get_store(self)
2612
self.assertEqual(False, self.has_store(store))
2614
def test_mutable_section_shared(self):
2615
store = self.get_store(self)
2616
store._load_from_string('foo=bar\n')
2617
# FIXME: There should be a better way than relying on the test
2618
# parametrization to identify branch.conf -- vila 2011-0526
2619
if self.store_id in ('branch', 'remote_branch'):
2620
# branch stores requires write locked branches
2621
self.addCleanup(store.branch.lock_write().unlock)
2622
section1 = store.get_mutable_section(None)
2623
section2 = store.get_mutable_section(None)
2624
# If we get different sections, different callers won't share the
2626
self.assertIs(section1, section2)
2628
def test_save_emptied_succeeds(self):
2629
store = self.get_store(self)
2630
store._load_from_string('foo=bar\n')
2631
# FIXME: There should be a better way than relying on the test
2632
# parametrization to identify branch.conf -- vila 2011-0526
2633
if self.store_id in ('branch', 'remote_branch'):
2634
# branch stores requires write locked branches
2635
self.addCleanup(store.branch.lock_write().unlock)
2636
section = store.get_mutable_section(None)
2637
section.remove('foo')
2639
self.assertEqual(True, self.has_store(store))
2640
modified_store = self.get_store(self)
2641
sections = list(modified_store.get_sections())
2642
self.assertLength(0, sections)
2644
def test_save_with_content_succeeds(self):
2645
# FIXME: There should be a better way than relying on the test
2646
# parametrization to identify branch.conf -- vila 2011-0526
2647
if self.store_id in ('branch', 'remote_branch'):
2648
raise tests.TestNotApplicable(
2649
'branch.conf is *always* created when a branch is initialized')
2650
store = self.get_store(self)
2651
store._load_from_string('foo=bar\n')
2652
self.assertEqual(False, self.has_store(store))
2654
self.assertEqual(True, self.has_store(store))
2655
modified_store = self.get_store(self)
2656
sections = list(modified_store.get_sections())
2657
self.assertLength(1, sections)
2658
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2660
def test_set_option_in_empty_store(self):
2661
store = self.get_store(self)
2662
# FIXME: There should be a better way than relying on the test
2663
# parametrization to identify branch.conf -- vila 2011-0526
2664
if self.store_id in ('branch', 'remote_branch'):
2665
# branch stores requires write locked branches
2666
self.addCleanup(store.branch.lock_write().unlock)
2667
section = store.get_mutable_section(None)
2668
section.set('foo', 'bar')
2670
modified_store = self.get_store(self)
2671
sections = list(modified_store.get_sections())
2672
self.assertLength(1, sections)
2673
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2675
def test_set_option_in_default_section(self):
2676
store = self.get_store(self)
2677
store._load_from_string('')
2678
# FIXME: There should be a better way than relying on the test
2679
# parametrization to identify branch.conf -- vila 2011-0526
2680
if self.store_id in ('branch', 'remote_branch'):
2681
# branch stores requires write locked branches
2682
self.addCleanup(store.branch.lock_write().unlock)
2683
section = store.get_mutable_section(None)
2684
section.set('foo', 'bar')
2686
modified_store = self.get_store(self)
2687
sections = list(modified_store.get_sections())
2688
self.assertLength(1, sections)
2689
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2691
def test_set_option_in_named_section(self):
2692
store = self.get_store(self)
2693
store._load_from_string('')
2694
# FIXME: There should be a better way than relying on the test
2695
# parametrization to identify branch.conf -- vila 2011-0526
2696
if self.store_id in ('branch', 'remote_branch'):
2697
# branch stores requires write locked branches
2698
self.addCleanup(store.branch.lock_write().unlock)
2699
section = store.get_mutable_section('baz')
2700
section.set('foo', 'bar')
2702
modified_store = self.get_store(self)
2703
sections = list(modified_store.get_sections())
2704
self.assertLength(1, sections)
2705
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2707
def test_load_hook(self):
2708
# First, we need to ensure that the store exists
2709
store = self.get_store(self)
2710
# FIXME: There should be a better way than relying on the test
2711
# parametrization to identify branch.conf -- vila 2011-0526
2712
if self.store_id in ('branch', 'remote_branch'):
2713
# branch stores requires write locked branches
2714
self.addCleanup(store.branch.lock_write().unlock)
2715
section = store.get_mutable_section('baz')
2716
section.set('foo', 'bar')
2718
# Now we can try to load it
2719
store = self.get_store(self)
2723
config.ConfigHooks.install_named_hook('load', hook, None)
2724
self.assertLength(0, calls)
2726
self.assertLength(1, calls)
2727
self.assertEqual((store,), calls[0])
2729
def test_save_hook(self):
2733
config.ConfigHooks.install_named_hook('save', hook, None)
2734
self.assertLength(0, calls)
2735
store = self.get_store(self)
2736
# FIXME: There should be a better way than relying on the test
2737
# parametrization to identify branch.conf -- vila 2011-0526
2738
if self.store_id in ('branch', 'remote_branch'):
2739
# branch stores requires write locked branches
2740
self.addCleanup(store.branch.lock_write().unlock)
2741
section = store.get_mutable_section('baz')
2742
section.set('foo', 'bar')
2744
self.assertLength(1, calls)
2745
self.assertEqual((store,), calls[0])
2747
def test_set_mark_dirty(self):
2748
stack = config.MemoryStack('')
2749
self.assertLength(0, stack.store.dirty_sections)
2750
stack.set('foo', 'baz')
2751
self.assertLength(1, stack.store.dirty_sections)
2752
self.assertTrue(stack.store._need_saving())
2754
def test_remove_mark_dirty(self):
2755
stack = config.MemoryStack('foo=bar')
2756
self.assertLength(0, stack.store.dirty_sections)
2758
self.assertLength(1, stack.store.dirty_sections)
2759
self.assertTrue(stack.store._need_saving())
2762
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2763
"""Tests that config changes are kept in memory and saved on-demand."""
2766
super(TestStoreSaveChanges, self).setUp()
2767
self.transport = self.get_transport()
2768
# Most of the tests involve two stores pointing to the same persistent
2769
# storage to observe the effects of concurrent changes
2770
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2771
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2774
self.warnings.append(args[0] % args[1:])
2775
self.overrideAttr(trace, 'warning', warning)
2777
def has_store(self, store):
2778
store_basename = urlutils.relative_url(self.transport.external_url(),
2779
store.external_url())
2780
return self.transport.has(store_basename)
2782
def get_stack(self, store):
2783
# Any stack will do as long as it uses the right store, just a single
2784
# no-name section is enough
2785
return config.Stack([store.get_sections], store)
2787
def test_no_changes_no_save(self):
2788
s = self.get_stack(self.st1)
2789
s.store.save_changes()
2790
self.assertEqual(False, self.has_store(self.st1))
2792
def test_unrelated_concurrent_update(self):
2793
s1 = self.get_stack(self.st1)
2794
s2 = self.get_stack(self.st2)
2795
s1.set('foo', 'bar')
2796
s2.set('baz', 'quux')
2798
# Changes don't propagate magically
2799
self.assertEqual(None, s1.get('baz'))
2800
s2.store.save_changes()
2801
self.assertEqual('quux', s2.get('baz'))
2802
# Changes are acquired when saving
2803
self.assertEqual('bar', s2.get('foo'))
2804
# Since there is no overlap, no warnings are emitted
2805
self.assertLength(0, self.warnings)
2807
def test_concurrent_update_modified(self):
2808
s1 = self.get_stack(self.st1)
2809
s2 = self.get_stack(self.st2)
2810
s1.set('foo', 'bar')
2811
s2.set('foo', 'baz')
2814
s2.store.save_changes()
2815
self.assertEqual('baz', s2.get('foo'))
2816
# But the user get a warning
2817
self.assertLength(1, self.warnings)
2818
warning = self.warnings[0]
2819
self.assertStartsWith(warning, 'Option foo in section None')
2820
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2821
' The baz value will be saved.')
2823
def test_concurrent_deletion(self):
2824
self.st1._load_from_string('foo=bar')
2826
s1 = self.get_stack(self.st1)
2827
s2 = self.get_stack(self.st2)
2830
s1.store.save_changes()
2832
self.assertLength(0, self.warnings)
2833
s2.store.save_changes()
2835
self.assertLength(1, self.warnings)
2836
warning = self.warnings[0]
2837
self.assertStartsWith(warning, 'Option foo in section None')
2838
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2839
' The <DELETED> value will be saved.')
2842
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2844
def get_store(self):
2845
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2847
def test_get_quoted_string(self):
2848
store = self.get_store()
2849
store._load_from_string('foo= " abc "')
2850
stack = config.Stack([store.get_sections])
2851
self.assertEqual(' abc ', stack.get('foo'))
2853
def test_set_quoted_string(self):
2854
store = self.get_store()
2855
stack = config.Stack([store.get_sections], store)
2856
stack.set('foo', ' a b c ')
2858
self.assertFileEqual('foo = " a b c "' + os.linesep, 'foo.conf')
2861
class TestTransportIniFileStore(TestStore):
2863
def test_loading_unknown_file_fails(self):
2864
store = config.TransportIniFileStore(self.get_transport(),
2866
self.assertRaises(errors.NoSuchFile, store.load)
2868
def test_invalid_content(self):
2869
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2870
self.assertEqual(False, store.is_loaded())
2871
exc = self.assertRaises(
2872
config.ParseConfigError, store._load_from_string,
2873
'this is invalid !')
2874
self.assertEndsWith(exc.filename, 'foo.conf')
2875
# And the load failed
2876
self.assertEqual(False, store.is_loaded())
2878
def test_get_embedded_sections(self):
2879
# A more complicated example (which also shows that section names and
2880
# option names share the same name space...)
2881
# FIXME: This should be fixed by forbidding dicts as values ?
2882
# -- vila 2011-04-05
2883
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2884
store._load_from_string('''
2888
foo_in_DEFAULT=foo_DEFAULT
2896
sections = list(store.get_sections())
2897
self.assertLength(4, sections)
2898
# The default section has no name.
2899
# List values are provided as strings and need to be explicitly
2900
# converted by specifying from_unicode=list_from_store at option
2902
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2904
self.assertSectionContent(
2905
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2906
self.assertSectionContent(
2907
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2908
# sub sections are provided as embedded dicts.
2909
self.assertSectionContent(
2910
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2914
class TestLockableIniFileStore(TestStore):
2916
def test_create_store_in_created_dir(self):
2917
self.assertPathDoesNotExist('dir')
2918
t = self.get_transport('dir/subdir')
2919
store = config.LockableIniFileStore(t, 'foo.conf')
2920
store.get_mutable_section(None).set('foo', 'bar')
2922
self.assertPathExists('dir/subdir')
2925
class TestConcurrentStoreUpdates(TestStore):
2926
"""Test that Stores properly handle conccurent updates.
2928
New Store implementation may fail some of these tests but until such
2929
implementations exist it's hard to properly filter them from the scenarios
2930
applied here. If you encounter such a case, contact the bzr devs.
2933
scenarios = [(key, {'get_stack': builder}) for key, builder
2934
in config.test_stack_builder_registry.iteritems()]
2937
super(TestConcurrentStoreUpdates, self).setUp()
2938
self.stack = self.get_stack(self)
2939
if not isinstance(self.stack, config._CompatibleStack):
2940
raise tests.TestNotApplicable(
2941
'%s is not meant to be compatible with the old config design'
2943
self.stack.set('one', '1')
2944
self.stack.set('two', '2')
2946
self.stack.store.save()
2948
def test_simple_read_access(self):
2949
self.assertEqual('1', self.stack.get('one'))
2951
def test_simple_write_access(self):
2952
self.stack.set('one', 'one')
2953
self.assertEqual('one', self.stack.get('one'))
2955
def test_listen_to_the_last_speaker(self):
2957
c2 = self.get_stack(self)
2958
c1.set('one', 'ONE')
2959
c2.set('two', 'TWO')
2960
self.assertEqual('ONE', c1.get('one'))
2961
self.assertEqual('TWO', c2.get('two'))
2962
# The second update respect the first one
2963
self.assertEqual('ONE', c2.get('one'))
2965
def test_last_speaker_wins(self):
2966
# If the same config is not shared, the same variable modified twice
2967
# can only see a single result.
2969
c2 = self.get_stack(self)
2972
self.assertEqual('c2', c2.get('one'))
2973
# The first modification is still available until another refresh
2975
self.assertEqual('c1', c1.get('one'))
2976
c1.set('two', 'done')
2977
self.assertEqual('c2', c1.get('one'))
2979
def test_writes_are_serialized(self):
2981
c2 = self.get_stack(self)
2983
# We spawn a thread that will pause *during* the config saving.
2984
before_writing = threading.Event()
2985
after_writing = threading.Event()
2986
writing_done = threading.Event()
2987
c1_save_without_locking_orig = c1.store.save_without_locking
2988
def c1_save_without_locking():
2989
before_writing.set()
2990
c1_save_without_locking_orig()
2991
# The lock is held. We wait for the main thread to decide when to
2993
after_writing.wait()
2994
c1.store.save_without_locking = c1_save_without_locking
2998
t1 = threading.Thread(target=c1_set)
2999
# Collect the thread after the test
3000
self.addCleanup(t1.join)
3001
# Be ready to unblock the thread if the test goes wrong
3002
self.addCleanup(after_writing.set)
3004
before_writing.wait()
3005
self.assertRaises(errors.LockContention,
3006
c2.set, 'one', 'c2')
3007
self.assertEqual('c1', c1.get('one'))
3008
# Let the lock be released
3012
self.assertEqual('c2', c2.get('one'))
3014
def test_read_while_writing(self):
3016
# We spawn a thread that will pause *during* the write
3017
ready_to_write = threading.Event()
3018
do_writing = threading.Event()
3019
writing_done = threading.Event()
3020
# We override the _save implementation so we know the store is locked
3021
c1_save_without_locking_orig = c1.store.save_without_locking
3022
def c1_save_without_locking():
3023
ready_to_write.set()
3024
# The lock is held. We wait for the main thread to decide when to
3027
c1_save_without_locking_orig()
3029
c1.store.save_without_locking = c1_save_without_locking
3032
t1 = threading.Thread(target=c1_set)
3033
# Collect the thread after the test
3034
self.addCleanup(t1.join)
3035
# Be ready to unblock the thread if the test goes wrong
3036
self.addCleanup(do_writing.set)
3038
# Ensure the thread is ready to write
3039
ready_to_write.wait()
3040
self.assertEqual('c1', c1.get('one'))
3041
# If we read during the write, we get the old value
3042
c2 = self.get_stack(self)
3043
self.assertEqual('1', c2.get('one'))
3044
# Let the writing occur and ensure it occurred
3047
# Now we get the updated value
3048
c3 = self.get_stack(self)
3049
self.assertEqual('c1', c3.get('one'))
3051
# FIXME: It may be worth looking into removing the lock dir when it's not
3052
# needed anymore and look at possible fallouts for concurrent lockers. This
3053
# will matter if/when we use config files outside of bazaar directories
3054
# (.bazaar or .bzr) -- vila 20110-04-111
3057
class TestSectionMatcher(TestStore):
3059
scenarios = [('location', {'matcher': config.LocationMatcher}),
3060
('id', {'matcher': config.NameMatcher}),]
3063
super(TestSectionMatcher, self).setUp()
3064
# Any simple store is good enough
3065
self.get_store = config.test_store_builder_registry.get('configobj')
3067
def test_no_matches_for_empty_stores(self):
3068
store = self.get_store(self)
3069
store._load_from_string('')
3070
matcher = self.matcher(store, '/bar')
3071
self.assertEqual([], list(matcher.get_sections()))
3073
def test_build_doesnt_load_store(self):
3074
store = self.get_store(self)
3075
self.matcher(store, '/bar')
3076
self.assertFalse(store.is_loaded())
3079
class TestLocationSection(tests.TestCase):
3081
def get_section(self, options, extra_path):
3082
section = config.Section('foo', options)
3083
return config.LocationSection(section, extra_path)
3085
def test_simple_option(self):
3086
section = self.get_section({'foo': 'bar'}, '')
3087
self.assertEqual('bar', section.get('foo'))
3089
def test_option_with_extra_path(self):
3090
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3092
self.assertEqual('bar/baz', section.get('foo'))
3094
def test_invalid_policy(self):
3095
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3097
# invalid policies are ignored
3098
self.assertEqual('bar', section.get('foo'))
3101
class TestLocationMatcher(TestStore):
3104
super(TestLocationMatcher, self).setUp()
3105
# Any simple store is good enough
3106
self.get_store = config.test_store_builder_registry.get('configobj')
3108
def test_unrelated_section_excluded(self):
3109
store = self.get_store(self)
3110
store._load_from_string('''
3118
section=/foo/bar/baz
3122
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3124
[section.id for _, section in store.get_sections()])
3125
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3126
sections = [section for _, section in matcher.get_sections()]
3127
self.assertEqual(['/foo/bar', '/foo'],
3128
[section.id for section in sections])
3129
self.assertEqual(['quux', 'bar/quux'],
3130
[section.extra_path for section in sections])
3132
def test_more_specific_sections_first(self):
3133
store = self.get_store(self)
3134
store._load_from_string('''
3140
self.assertEqual(['/foo', '/foo/bar'],
3141
[section.id for _, section in store.get_sections()])
3142
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3143
sections = [section for _, section in matcher.get_sections()]
3144
self.assertEqual(['/foo/bar', '/foo'],
3145
[section.id for section in sections])
3146
self.assertEqual(['baz', 'bar/baz'],
3147
[section.extra_path for section in sections])
3149
def test_appendpath_in_no_name_section(self):
3150
# It's a bit weird to allow appendpath in a no-name section, but
3151
# someone may found a use for it
3152
store = self.get_store(self)
3153
store._load_from_string('''
3155
foo:policy = appendpath
3157
matcher = config.LocationMatcher(store, 'dir/subdir')
3158
sections = list(matcher.get_sections())
3159
self.assertLength(1, sections)
3160
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3162
def test_file_urls_are_normalized(self):
3163
store = self.get_store(self)
3164
if sys.platform == 'win32':
3165
expected_url = 'file:///C:/dir/subdir'
3166
expected_location = 'C:/dir/subdir'
3168
expected_url = 'file:///dir/subdir'
3169
expected_location = '/dir/subdir'
3170
matcher = config.LocationMatcher(store, expected_url)
3171
self.assertEqual(expected_location, matcher.location)
3173
def test_branch_name_colo(self):
3174
store = self.get_store(self)
3175
store._load_from_string(dedent("""\
3177
push_location=my{branchname}
3179
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3180
self.assertEqual('example<', matcher.branch_name)
3181
((_, section),) = matcher.get_sections()
3182
self.assertEqual('example<', section.locals['branchname'])
3184
def test_branch_name_basename(self):
3185
store = self.get_store(self)
3186
store._load_from_string(dedent("""\
3188
push_location=my{branchname}
3190
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3191
self.assertEqual('example<', matcher.branch_name)
3192
((_, section),) = matcher.get_sections()
3193
self.assertEqual('example<', section.locals['branchname'])
3196
class TestStartingPathMatcher(TestStore):
3199
super(TestStartingPathMatcher, self).setUp()
3200
# Any simple store is good enough
3201
self.store = config.IniFileStore()
3203
def assertSectionIDs(self, expected, location, content):
3204
self.store._load_from_string(content)
3205
matcher = config.StartingPathMatcher(self.store, location)
3206
sections = list(matcher.get_sections())
3207
self.assertLength(len(expected), sections)
3208
self.assertEqual(expected, [section.id for _, section in sections])
3211
def test_empty(self):
3212
self.assertSectionIDs([], self.get_url(), '')
3214
def test_url_vs_local_paths(self):
3215
# The matcher location is an url and the section names are local paths
3216
self.assertSectionIDs(['/foo/bar', '/foo'],
3217
'file:///foo/bar/baz', '''\
3222
def test_local_path_vs_url(self):
3223
# The matcher location is a local path and the section names are urls
3224
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3225
'/foo/bar/baz', '''\
3231
def test_no_name_section_included_when_present(self):
3232
# Note that other tests will cover the case where the no-name section
3233
# is empty and as such, not included.
3234
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3235
'/foo/bar/baz', '''\
3236
option = defined so the no-name section exists
3240
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3241
[s.locals['relpath'] for _, s in sections])
3243
def test_order_reversed(self):
3244
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3249
def test_unrelated_section_excluded(self):
3250
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3256
def test_glob_included(self):
3257
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3258
'/foo/bar/baz', '''\
3264
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3265
# nothing really is... as far using {relpath} to append it to something
3266
# else, this seems good enough though.
3267
self.assertEqual(['', 'baz', 'bar/baz'],
3268
[s.locals['relpath'] for _, s in sections])
3270
def test_respect_order(self):
3271
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3272
'/foo/bar/baz', '''\
3280
class TestNameMatcher(TestStore):
3283
super(TestNameMatcher, self).setUp()
3284
self.matcher = config.NameMatcher
3285
# Any simple store is good enough
3286
self.get_store = config.test_store_builder_registry.get('configobj')
3288
def get_matching_sections(self, name):
3289
store = self.get_store(self)
3290
store._load_from_string('''
3298
matcher = self.matcher(store, name)
3299
return list(matcher.get_sections())
3301
def test_matching(self):
3302
sections = self.get_matching_sections('foo')
3303
self.assertLength(1, sections)
3304
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3306
def test_not_matching(self):
3307
sections = self.get_matching_sections('baz')
3308
self.assertLength(0, sections)
3311
class TestBaseStackGet(tests.TestCase):
3314
super(TestBaseStackGet, self).setUp()
3315
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3317
def test_get_first_definition(self):
3318
store1 = config.IniFileStore()
3319
store1._load_from_string('foo=bar')
3320
store2 = config.IniFileStore()
3321
store2._load_from_string('foo=baz')
3322
conf = config.Stack([store1.get_sections, store2.get_sections])
3323
self.assertEqual('bar', conf.get('foo'))
3325
def test_get_with_registered_default_value(self):
3326
config.option_registry.register(config.Option('foo', default='bar'))
3327
conf_stack = config.Stack([])
3328
self.assertEqual('bar', conf_stack.get('foo'))
3330
def test_get_without_registered_default_value(self):
3331
config.option_registry.register(config.Option('foo'))
3332
conf_stack = config.Stack([])
3333
self.assertEqual(None, conf_stack.get('foo'))
3335
def test_get_without_default_value_for_not_registered(self):
3336
conf_stack = config.Stack([])
3337
self.assertEqual(None, conf_stack.get('foo'))
3339
def test_get_for_empty_section_callable(self):
3340
conf_stack = config.Stack([lambda : []])
3341
self.assertEqual(None, conf_stack.get('foo'))
3343
def test_get_for_broken_callable(self):
3344
# Trying to use and invalid callable raises an exception on first use
3345
conf_stack = config.Stack([object])
3346
self.assertRaises(TypeError, conf_stack.get, 'foo')
3349
class TestStackWithSimpleStore(tests.TestCase):
3352
super(TestStackWithSimpleStore, self).setUp()
3353
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3354
self.registry = config.option_registry
3356
def get_conf(self, content=None):
3357
return config.MemoryStack(content)
3359
def test_override_value_from_env(self):
3360
self.overrideEnv('FOO', None)
3361
self.registry.register(
3362
config.Option('foo', default='bar', override_from_env=['FOO']))
3363
self.overrideEnv('FOO', 'quux')
3364
# Env variable provides a default taking over the option one
3365
conf = self.get_conf('foo=store')
3366
self.assertEqual('quux', conf.get('foo'))
3368
def test_first_override_value_from_env_wins(self):
3369
self.overrideEnv('NO_VALUE', None)
3370
self.overrideEnv('FOO', None)
3371
self.overrideEnv('BAZ', None)
3372
self.registry.register(
3373
config.Option('foo', default='bar',
3374
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3375
self.overrideEnv('FOO', 'foo')
3376
self.overrideEnv('BAZ', 'baz')
3377
# The first env var set wins
3378
conf = self.get_conf('foo=store')
3379
self.assertEqual('foo', conf.get('foo'))
3382
class TestMemoryStack(tests.TestCase):
3385
conf = config.MemoryStack('foo=bar')
3386
self.assertEqual('bar', conf.get('foo'))
3389
conf = config.MemoryStack('foo=bar')
3390
conf.set('foo', 'baz')
3391
self.assertEqual('baz', conf.get('foo'))
3393
def test_no_content(self):
3394
conf = config.MemoryStack()
3395
# No content means no loading
3396
self.assertFalse(conf.store.is_loaded())
3397
self.assertRaises(NotImplementedError, conf.get, 'foo')
3398
# But a content can still be provided
3399
conf.store._load_from_string('foo=bar')
3400
self.assertEqual('bar', conf.get('foo'))
3403
class TestStackIterSections(tests.TestCase):
3405
def test_empty_stack(self):
3406
conf = config.Stack([])
3407
sections = list(conf.iter_sections())
3408
self.assertLength(0, sections)
3410
def test_empty_store(self):
3411
store = config.IniFileStore()
3412
store._load_from_string('')
3413
conf = config.Stack([store.get_sections])
3414
sections = list(conf.iter_sections())
3415
self.assertLength(0, sections)
3417
def test_simple_store(self):
3418
store = config.IniFileStore()
3419
store._load_from_string('foo=bar')
3420
conf = config.Stack([store.get_sections])
3421
tuples = list(conf.iter_sections())
3422
self.assertLength(1, tuples)
3423
(found_store, found_section) = tuples[0]
3424
self.assertIs(store, found_store)
3426
def test_two_stores(self):
3427
store1 = config.IniFileStore()
3428
store1._load_from_string('foo=bar')
3429
store2 = config.IniFileStore()
3430
store2._load_from_string('bar=qux')
3431
conf = config.Stack([store1.get_sections, store2.get_sections])
3432
tuples = list(conf.iter_sections())
3433
self.assertLength(2, tuples)
3434
self.assertIs(store1, tuples[0][0])
3435
self.assertIs(store2, tuples[1][0])
3438
class TestStackWithTransport(tests.TestCaseWithTransport):
3440
scenarios = [(key, {'get_stack': builder}) for key, builder
3441
in config.test_stack_builder_registry.iteritems()]
3444
class TestConcreteStacks(TestStackWithTransport):
3446
def test_build_stack(self):
3447
# Just a smoke test to help debug builders
3448
self.get_stack(self)
3451
class TestStackGet(TestStackWithTransport):
3454
super(TestStackGet, self).setUp()
3455
self.conf = self.get_stack(self)
3457
def test_get_for_empty_stack(self):
3458
self.assertEqual(None, self.conf.get('foo'))
3460
def test_get_hook(self):
3461
self.conf.set('foo', 'bar')
3465
config.ConfigHooks.install_named_hook('get', hook, None)
3466
self.assertLength(0, calls)
3467
value = self.conf.get('foo')
3468
self.assertEqual('bar', value)
3469
self.assertLength(1, calls)
3470
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3473
class TestStackGetWithConverter(tests.TestCase):
3476
super(TestStackGetWithConverter, self).setUp()
3477
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3478
self.registry = config.option_registry
3480
def get_conf(self, content=None):
3481
return config.MemoryStack(content)
3483
def register_bool_option(self, name, default=None, default_from_env=None):
3484
b = config.Option(name, help='A boolean.',
3485
default=default, default_from_env=default_from_env,
3486
from_unicode=config.bool_from_store)
3487
self.registry.register(b)
3489
def test_get_default_bool_None(self):
3490
self.register_bool_option('foo')
3491
conf = self.get_conf('')
3492
self.assertEqual(None, conf.get('foo'))
3494
def test_get_default_bool_True(self):
3495
self.register_bool_option('foo', u'True')
3496
conf = self.get_conf('')
3497
self.assertEqual(True, conf.get('foo'))
3499
def test_get_default_bool_False(self):
3500
self.register_bool_option('foo', False)
3501
conf = self.get_conf('')
3502
self.assertEqual(False, conf.get('foo'))
3504
def test_get_default_bool_False_as_string(self):
3505
self.register_bool_option('foo', u'False')
3506
conf = self.get_conf('')
3507
self.assertEqual(False, conf.get('foo'))
3509
def test_get_default_bool_from_env_converted(self):
3510
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3511
self.overrideEnv('FOO', 'False')
3512
conf = self.get_conf('')
3513
self.assertEqual(False, conf.get('foo'))
3515
def test_get_default_bool_when_conversion_fails(self):
3516
self.register_bool_option('foo', default='True')
3517
conf = self.get_conf('foo=invalid boolean')
3518
self.assertEqual(True, conf.get('foo'))
3520
def register_integer_option(self, name,
3521
default=None, default_from_env=None):
3522
i = config.Option(name, help='An integer.',
3523
default=default, default_from_env=default_from_env,
3524
from_unicode=config.int_from_store)
3525
self.registry.register(i)
3527
def test_get_default_integer_None(self):
3528
self.register_integer_option('foo')
3529
conf = self.get_conf('')
3530
self.assertEqual(None, conf.get('foo'))
3532
def test_get_default_integer(self):
3533
self.register_integer_option('foo', 42)
3534
conf = self.get_conf('')
3535
self.assertEqual(42, conf.get('foo'))
3537
def test_get_default_integer_as_string(self):
3538
self.register_integer_option('foo', u'42')
3539
conf = self.get_conf('')
3540
self.assertEqual(42, conf.get('foo'))
3542
def test_get_default_integer_from_env(self):
3543
self.register_integer_option('foo', default_from_env=['FOO'])
3544
self.overrideEnv('FOO', '18')
3545
conf = self.get_conf('')
3546
self.assertEqual(18, conf.get('foo'))
3548
def test_get_default_integer_when_conversion_fails(self):
3549
self.register_integer_option('foo', default='12')
3550
conf = self.get_conf('foo=invalid integer')
3551
self.assertEqual(12, conf.get('foo'))
3553
def register_list_option(self, name, default=None, default_from_env=None):
3554
l = config.ListOption(name, help='A list.', default=default,
3555
default_from_env=default_from_env)
3556
self.registry.register(l)
3558
def test_get_default_list_None(self):
3559
self.register_list_option('foo')
3560
conf = self.get_conf('')
3561
self.assertEqual(None, conf.get('foo'))
3563
def test_get_default_list_empty(self):
3564
self.register_list_option('foo', '')
3565
conf = self.get_conf('')
3566
self.assertEqual([], conf.get('foo'))
3568
def test_get_default_list_from_env(self):
3569
self.register_list_option('foo', default_from_env=['FOO'])
3570
self.overrideEnv('FOO', '')
3571
conf = self.get_conf('')
3572
self.assertEqual([], conf.get('foo'))
3574
def test_get_with_list_converter_no_item(self):
3575
self.register_list_option('foo', None)
3576
conf = self.get_conf('foo=,')
3577
self.assertEqual([], conf.get('foo'))
3579
def test_get_with_list_converter_many_items(self):
3580
self.register_list_option('foo', None)
3581
conf = self.get_conf('foo=m,o,r,e')
3582
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3584
def test_get_with_list_converter_embedded_spaces_many_items(self):
3585
self.register_list_option('foo', None)
3586
conf = self.get_conf('foo=" bar", "baz "')
3587
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3589
def test_get_with_list_converter_stripped_spaces_many_items(self):
3590
self.register_list_option('foo', None)
3591
conf = self.get_conf('foo= bar , baz ')
3592
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3595
class TestIterOptionRefs(tests.TestCase):
3596
"""iter_option_refs is a bit unusual, document some cases."""
3598
def assertRefs(self, expected, string):
3599
self.assertEqual(expected, list(config.iter_option_refs(string)))
3601
def test_empty(self):
3602
self.assertRefs([(False, '')], '')
3604
def test_no_refs(self):
3605
self.assertRefs([(False, 'foo bar')], 'foo bar')
3607
def test_single_ref(self):
3608
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3610
def test_broken_ref(self):
3611
self.assertRefs([(False, '{foo')], '{foo')
3613
def test_embedded_ref(self):
3614
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3617
def test_two_refs(self):
3618
self.assertRefs([(False, ''), (True, '{foo}'),
3619
(False, ''), (True, '{bar}'),
3623
def test_newline_in_refs_are_not_matched(self):
3624
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3627
class TestStackExpandOptions(tests.TestCaseWithTransport):
3630
super(TestStackExpandOptions, self).setUp()
3631
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3632
self.registry = config.option_registry
3633
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3634
self.conf = config.Stack([store.get_sections], store)
3636
def assertExpansion(self, expected, string, env=None):
3637
self.assertEqual(expected, self.conf.expand_options(string, env))
3639
def test_no_expansion(self):
3640
self.assertExpansion('foo', 'foo')
3642
def test_expand_default_value(self):
3643
self.conf.store._load_from_string('bar=baz')
3644
self.registry.register(config.Option('foo', default=u'{bar}'))
3645
self.assertEqual('baz', self.conf.get('foo', expand=True))
3647
def test_expand_default_from_env(self):
3648
self.conf.store._load_from_string('bar=baz')
3649
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3650
self.overrideEnv('FOO', '{bar}')
3651
self.assertEqual('baz', self.conf.get('foo', expand=True))
3653
def test_expand_default_on_failed_conversion(self):
3654
self.conf.store._load_from_string('baz=bogus\nbar=42\nfoo={baz}')
3655
self.registry.register(
3656
config.Option('foo', default=u'{bar}',
3657
from_unicode=config.int_from_store))
3658
self.assertEqual(42, self.conf.get('foo', expand=True))
3660
def test_env_adding_options(self):
3661
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3663
def test_env_overriding_options(self):
3664
self.conf.store._load_from_string('foo=baz')
3665
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3667
def test_simple_ref(self):
3668
self.conf.store._load_from_string('foo=xxx')
3669
self.assertExpansion('xxx', '{foo}')
3671
def test_unknown_ref(self):
3672
self.assertRaises(config.ExpandingUnknownOption,
3673
self.conf.expand_options, '{foo}')
3675
def test_illegal_def_is_ignored(self):
3676
self.assertExpansion('{1,2}', '{1,2}')
3677
self.assertExpansion('{ }', '{ }')
3678
self.assertExpansion('${Foo,f}', '${Foo,f}')
3680
def test_indirect_ref(self):
3681
self.conf.store._load_from_string('''
3685
self.assertExpansion('xxx', '{bar}')
3687
def test_embedded_ref(self):
3688
self.conf.store._load_from_string('''
3692
self.assertExpansion('xxx', '{{bar}}')
3694
def test_simple_loop(self):
3695
self.conf.store._load_from_string('foo={foo}')
3696
self.assertRaises(config.OptionExpansionLoop,
3697
self.conf.expand_options, '{foo}')
3699
def test_indirect_loop(self):
3700
self.conf.store._load_from_string('''
3704
e = self.assertRaises(config.OptionExpansionLoop,
3705
self.conf.expand_options, '{foo}')
3706
self.assertEqual('foo->bar->baz', e.refs)
3707
self.assertEqual('{foo}', e.string)
3709
def test_list(self):
3710
self.conf.store._load_from_string('''
3714
list={foo},{bar},{baz}
3716
self.registry.register(
3717
config.ListOption('list'))
3718
self.assertEqual(['start', 'middle', 'end'],
3719
self.conf.get('list', expand=True))
3721
def test_cascading_list(self):
3722
self.conf.store._load_from_string('''
3728
self.registry.register(config.ListOption('list'))
3729
# Register an intermediate option as a list to ensure no conversion
3730
# happen while expanding. Conversion should only occur for the original
3731
# option ('list' here).
3732
self.registry.register(config.ListOption('baz'))
3733
self.assertEqual(['start', 'middle', 'end'],
3734
self.conf.get('list', expand=True))
3736
def test_pathologically_hidden_list(self):
3737
self.conf.store._load_from_string('''
3743
hidden={start}{middle}{end}
3745
# What matters is what the registration says, the conversion happens
3746
# only after all expansions have been performed
3747
self.registry.register(config.ListOption('hidden'))
3748
self.assertEqual(['bin', 'go'],
3749
self.conf.get('hidden', expand=True))
3752
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3755
super(TestStackCrossSectionsExpand, self).setUp()
3757
def get_config(self, location, string):
3760
# Since we don't save the config we won't strictly require to inherit
3761
# from TestCaseInTempDir, but an error occurs so quickly...
3762
c = config.LocationStack(location)
3763
c.store._load_from_string(string)
3766
def test_dont_cross_unrelated_section(self):
3767
c = self.get_config('/another/branch/path','''
3772
[/another/branch/path]
3775
self.assertRaises(config.ExpandingUnknownOption,
3776
c.get, 'bar', expand=True)
3778
def test_cross_related_sections(self):
3779
c = self.get_config('/project/branch/path','''
3783
[/project/branch/path]
3786
self.assertEqual('quux', c.get('bar', expand=True))
3789
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3791
def test_cross_global_locations(self):
3792
l_store = config.LocationStore()
3793
l_store._load_from_string('''
3799
g_store = config.GlobalStore()
3800
g_store._load_from_string('''
3806
stack = config.LocationStack('/branch')
3807
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3808
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3811
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3813
def test_expand_locals_empty(self):
3814
l_store = config.LocationStore()
3815
l_store._load_from_string('''
3816
[/home/user/project]
3821
stack = config.LocationStack('/home/user/project/')
3822
self.assertEqual('', stack.get('base', expand=True))
3823
self.assertEqual('', stack.get('rel', expand=True))
3825
def test_expand_basename_locally(self):
3826
l_store = config.LocationStore()
3827
l_store._load_from_string('''
3828
[/home/user/project]
3832
stack = config.LocationStack('/home/user/project/branch')
3833
self.assertEqual('branch', stack.get('bfoo', expand=True))
3835
def test_expand_basename_locally_longer_path(self):
3836
l_store = config.LocationStore()
3837
l_store._load_from_string('''
3842
stack = config.LocationStack('/home/user/project/dir/branch')
3843
self.assertEqual('branch', stack.get('bfoo', expand=True))
3845
def test_expand_relpath_locally(self):
3846
l_store = config.LocationStore()
3847
l_store._load_from_string('''
3848
[/home/user/project]
3849
lfoo = loc-foo/{relpath}
3852
stack = config.LocationStack('/home/user/project/branch')
3853
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3855
def test_expand_relpath_unknonw_in_global(self):
3856
g_store = config.GlobalStore()
3857
g_store._load_from_string('''
3862
stack = config.LocationStack('/home/user/project/branch')
3863
self.assertRaises(config.ExpandingUnknownOption,
3864
stack.get, 'gfoo', expand=True)
3866
def test_expand_local_option_locally(self):
3867
l_store = config.LocationStore()
3868
l_store._load_from_string('''
3869
[/home/user/project]
3870
lfoo = loc-foo/{relpath}
3874
g_store = config.GlobalStore()
3875
g_store._load_from_string('''
3881
stack = config.LocationStack('/home/user/project/branch')
3882
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3883
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3885
def test_locals_dont_leak(self):
3886
"""Make sure we chose the right local in presence of several sections.
3888
l_store = config.LocationStore()
3889
l_store._load_from_string('''
3891
lfoo = loc-foo/{relpath}
3892
[/home/user/project]
3893
lfoo = loc-foo/{relpath}
3896
stack = config.LocationStack('/home/user/project/branch')
3897
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3898
stack = config.LocationStack('/home/user/bar/baz')
3899
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3903
class TestStackSet(TestStackWithTransport):
3905
def test_simple_set(self):
3906
conf = self.get_stack(self)
3907
self.assertEqual(None, conf.get('foo'))
3908
conf.set('foo', 'baz')
3909
# Did we get it back ?
3910
self.assertEqual('baz', conf.get('foo'))
3912
def test_set_creates_a_new_section(self):
3913
conf = self.get_stack(self)
3914
conf.set('foo', 'baz')
3915
self.assertEqual, 'baz', conf.get('foo')
3917
def test_set_hook(self):
3921
config.ConfigHooks.install_named_hook('set', hook, None)
3922
self.assertLength(0, calls)
3923
conf = self.get_stack(self)
3924
conf.set('foo', 'bar')
3925
self.assertLength(1, calls)
3926
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3929
class TestStackRemove(TestStackWithTransport):
3931
def test_remove_existing(self):
3932
conf = self.get_stack(self)
3933
conf.set('foo', 'bar')
3934
self.assertEqual('bar', conf.get('foo'))
3936
# Did we get it back ?
3937
self.assertEqual(None, conf.get('foo'))
3939
def test_remove_unknown(self):
3940
conf = self.get_stack(self)
3941
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3943
def test_remove_hook(self):
3947
config.ConfigHooks.install_named_hook('remove', hook, None)
3948
self.assertLength(0, calls)
3949
conf = self.get_stack(self)
3950
conf.set('foo', 'bar')
3952
self.assertLength(1, calls)
3953
self.assertEqual((conf, 'foo'), calls[0])
3956
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3959
super(TestConfigGetOptions, self).setUp()
3960
create_configs(self)
3962
def test_no_variable(self):
3963
# Using branch should query branch, locations and bazaar
3964
self.assertOptions([], self.branch_config)
3966
def test_option_in_bazaar(self):
3967
self.bazaar_config.set_user_option('file', 'bazaar')
3968
self.assertOptions([('file', 'bazaar', 'DEFAULT', 'bazaar')],
3971
def test_option_in_locations(self):
3972
self.locations_config.set_user_option('file', 'locations')
3974
[('file', 'locations', self.tree.basedir, 'locations')],
3975
self.locations_config)
3977
def test_option_in_branch(self):
3978
self.branch_config.set_user_option('file', 'branch')
3979
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
3982
def test_option_in_bazaar_and_branch(self):
3983
self.bazaar_config.set_user_option('file', 'bazaar')
3984
self.branch_config.set_user_option('file', 'branch')
3985
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
3986
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
3989
def test_option_in_branch_and_locations(self):
3990
# Hmm, locations override branch :-/
3991
self.locations_config.set_user_option('file', 'locations')
3992
self.branch_config.set_user_option('file', 'branch')
3994
[('file', 'locations', self.tree.basedir, 'locations'),
3995
('file', 'branch', 'DEFAULT', 'branch'),],
3998
def test_option_in_bazaar_locations_and_branch(self):
3999
self.bazaar_config.set_user_option('file', 'bazaar')
4000
self.locations_config.set_user_option('file', 'locations')
4001
self.branch_config.set_user_option('file', 'branch')
4003
[('file', 'locations', self.tree.basedir, 'locations'),
4004
('file', 'branch', 'DEFAULT', 'branch'),
4005
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4009
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4012
super(TestConfigRemoveOption, self).setUp()
4013
create_configs_with_file_option(self)
4015
def test_remove_in_locations(self):
4016
self.locations_config.remove_user_option('file', self.tree.basedir)
4018
[('file', 'branch', 'DEFAULT', 'branch'),
4019
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4022
def test_remove_in_branch(self):
4023
self.branch_config.remove_user_option('file')
4025
[('file', 'locations', self.tree.basedir, 'locations'),
4026
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4029
def test_remove_in_bazaar(self):
4030
self.bazaar_config.remove_user_option('file')
4032
[('file', 'locations', self.tree.basedir, 'locations'),
4033
('file', 'branch', 'DEFAULT', 'branch'),],
4037
class TestConfigGetSections(tests.TestCaseWithTransport):
4040
super(TestConfigGetSections, self).setUp()
4041
create_configs(self)
4043
def assertSectionNames(self, expected, conf, name=None):
4044
"""Check which sections are returned for a given config.
4046
If fallback configurations exist their sections can be included.
4048
:param expected: A list of section names.
4050
:param conf: The configuration that will be queried.
4052
:param name: An optional section name that will be passed to
4055
sections = list(conf._get_sections(name))
4056
self.assertLength(len(expected), sections)
4057
self.assertEqual(expected, [n for n, _, _ in sections])
4059
def test_bazaar_default_section(self):
4060
self.assertSectionNames(['DEFAULT'], self.bazaar_config)
4062
def test_locations_default_section(self):
4063
# No sections are defined in an empty file
4064
self.assertSectionNames([], self.locations_config)
4066
def test_locations_named_section(self):
4067
self.locations_config.set_user_option('file', 'locations')
4068
self.assertSectionNames([self.tree.basedir], self.locations_config)
4070
def test_locations_matching_sections(self):
4071
loc_config = self.locations_config
4072
loc_config.set_user_option('file', 'locations')
4073
# We need to cheat a bit here to create an option in sections above and
4074
# below the 'location' one.
4075
parser = loc_config._get_parser()
4076
# locations.cong deals with '/' ignoring native os.sep
4077
location_names = self.tree.basedir.split('/')
4078
parent = '/'.join(location_names[:-1])
4079
child = '/'.join(location_names + ['child'])
4081
parser[parent]['file'] = 'parent'
4083
parser[child]['file'] = 'child'
4084
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4086
def test_branch_data_default_section(self):
4087
self.assertSectionNames([None],
4088
self.branch_config._get_branch_data_config())
4090
def test_branch_default_sections(self):
4091
# No sections are defined in an empty locations file
4092
self.assertSectionNames([None, 'DEFAULT'],
4094
# Unless we define an option
4095
self.branch_config._get_location_config().set_user_option(
4096
'file', 'locations')
4097
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4100
def test_bazaar_named_section(self):
4101
# We need to cheat as the API doesn't give direct access to sections
4102
# other than DEFAULT.
4103
self.bazaar_config.set_alias('bazaar', 'bzr')
4104
self.assertSectionNames(['ALIASES'], self.bazaar_config, 'ALIASES')
4107
class TestSharedStores(tests.TestCaseInTempDir):
4109
def test_bazaar_conf_shared(self):
4110
g1 = config.GlobalStack()
4111
g2 = config.GlobalStack()
4112
# The two stacks share the same store
4113
self.assertIs(g1.store, g2.store)
4116
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4117
"""Test warning for permissions of authentication.conf."""
4120
super(TestAuthenticationConfigFilePermissions, self).setUp()
4121
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4122
with open(self.path, 'w') as f:
4123
f.write(b"""[broken]
4126
port=port # Error: Not an int
4128
self.overrideAttr(config, 'authentication_config_filename',
4130
osutils.chmod_if_possible(self.path, 0o755)
4132
def test_check_warning(self):
4133
conf = config.AuthenticationConfig()
4134
self.assertEqual(conf._filename, self.path)
4135
self.assertContainsRe(self.get_log(),
4136
'Saved passwords may be accessible by other users.')
4138
def test_check_suppressed_warning(self):
4139
global_config = config.GlobalConfig()
4140
global_config.set_user_option('suppress_warnings',
4141
'insecure_permissions')
4142
conf = config.AuthenticationConfig()
4143
self.assertEqual(conf._filename, self.path)
4144
self.assertNotContainsRe(self.get_log(),
4145
'Saved passwords may be accessible by other users.')
4148
class TestAuthenticationConfigFile(tests.TestCase):
4149
"""Test the authentication.conf file matching"""
4151
def _got_user_passwd(self, expected_user, expected_password,
4152
config, *args, **kwargs):
4153
credentials = config.get_credentials(*args, **kwargs)
4154
if credentials is None:
4158
user = credentials['user']
4159
password = credentials['password']
4160
self.assertEqual(expected_user, user)
4161
self.assertEqual(expected_password, password)
4163
def test_empty_config(self):
4164
conf = config.AuthenticationConfig(_file=BytesIO())
4165
self.assertEqual({}, conf._get_config())
4166
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4168
def test_non_utf8_config(self):
4169
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
4170
self.assertRaises(config.ConfigContentError, conf._get_config)
4172
def test_missing_auth_section_header(self):
4173
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
4174
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4176
def test_auth_section_header_not_closed(self):
4177
conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
4178
self.assertRaises(config.ParseConfigError, conf._get_config)
4180
def test_auth_value_not_boolean(self):
4181
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4185
verify_certificates=askme # Error: Not a boolean
4187
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4189
def test_auth_value_not_int(self):
4190
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4194
port=port # Error: Not an int
4196
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4198
def test_unknown_password_encoding(self):
4199
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4203
password_encoding=unknown
4205
self.assertRaises(ValueError, conf.get_password,
4206
'ftp', 'foo.net', 'joe')
4208
def test_credentials_for_scheme_host(self):
4209
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4210
# Identity on foo.net
4215
password=secret-pass
4218
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
4220
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4222
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
4224
def test_credentials_for_host_port(self):
4225
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4226
# Identity on foo.net
4232
password=secret-pass
4235
self._got_user_passwd('joe', 'secret-pass',
4236
conf, 'ftp', 'foo.net', port=10021)
4238
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
4240
def test_for_matching_host(self):
4241
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4242
# Identity on foo.net
4248
[sourceforge domain]
4255
self._got_user_passwd('georges', 'bendover',
4256
conf, 'bzr', 'foo.bzr.sf.net')
4258
self._got_user_passwd(None, None,
4259
conf, 'bzr', 'bbzr.sf.net')
4261
def test_for_matching_host_None(self):
4262
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4263
# Identity on foo.net
4273
self._got_user_passwd('joe', 'joepass',
4274
conf, 'bzr', 'quux.net')
4275
# no host but different scheme
4276
self._got_user_passwd('georges', 'bendover',
4277
conf, 'ftp', 'quux.net')
4279
def test_credentials_for_path(self):
4280
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4295
self._got_user_passwd(None, None,
4296
conf, 'http', host='bar.org', path='/dir3')
4298
self._got_user_passwd('georges', 'bendover',
4299
conf, 'http', host='bar.org', path='/dir2')
4301
self._got_user_passwd('jim', 'jimpass',
4302
conf, 'http', host='bar.org',path='/dir1/subdir')
4304
def test_credentials_for_user(self):
4305
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4313
self._got_user_passwd('jim', 'jimpass',
4314
conf, 'http', 'bar.org')
4316
self._got_user_passwd('jim', 'jimpass',
4317
conf, 'http', 'bar.org', user='jim')
4318
# Don't get a different user if one is specified
4319
self._got_user_passwd(None, None,
4320
conf, 'http', 'bar.org', user='georges')
4322
def test_credentials_for_user_without_password(self):
4323
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4329
# Get user but no password
4330
self._got_user_passwd('jim', None,
4331
conf, 'http', 'bar.org')
4333
def test_verify_certificates(self):
4334
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4340
verify_certificates=False
4347
credentials = conf.get_credentials('https', 'bar.org')
4348
self.assertEqual(False, credentials.get('verify_certificates'))
4349
credentials = conf.get_credentials('https', 'foo.net')
4350
self.assertEqual(True, credentials.get('verify_certificates'))
4353
class TestAuthenticationStorage(tests.TestCaseInTempDir):
4355
def test_set_credentials(self):
4356
conf = config.AuthenticationConfig()
4357
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4358
99, path='/foo', verify_certificates=False, realm='realm')
4359
credentials = conf.get_credentials(host='host', scheme='scheme',
4360
port=99, path='/foo',
4362
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
4363
'verify_certificates': False, 'scheme': 'scheme',
4364
'host': 'host', 'port': 99, 'path': '/foo',
4366
self.assertEqual(CREDENTIALS, credentials)
4367
credentials_from_disk = config.AuthenticationConfig().get_credentials(
4368
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
4369
self.assertEqual(CREDENTIALS, credentials_from_disk)
4371
def test_reset_credentials_different_name(self):
4372
conf = config.AuthenticationConfig()
4373
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
4374
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
4375
self.assertIs(None, conf._get_config().get('name'))
4376
credentials = conf.get_credentials(host='host', scheme='scheme')
4377
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
4378
'password', 'verify_certificates': True,
4379
'scheme': 'scheme', 'host': 'host', 'port': None,
4380
'path': None, 'realm': None}
4381
self.assertEqual(CREDENTIALS, credentials)
4384
class TestAuthenticationConfig(tests.TestCaseInTempDir):
4385
"""Test AuthenticationConfig behaviour"""
4387
def _check_default_password_prompt(self, expected_prompt_format, scheme,
4388
host=None, port=None, realm=None,
4392
user, password = 'jim', 'precious'
4393
expected_prompt = expected_prompt_format % {
4394
'scheme': scheme, 'host': host, 'port': port,
4395
'user': user, 'realm': realm}
4397
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
4398
# We use an empty conf so that the user is always prompted
4399
conf = config.AuthenticationConfig()
4400
self.assertEqual(password,
4401
conf.get_password(scheme, host, user, port=port,
4402
realm=realm, path=path))
4403
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4404
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4406
def _check_default_username_prompt(self, expected_prompt_format, scheme,
4407
host=None, port=None, realm=None,
4412
expected_prompt = expected_prompt_format % {
4413
'scheme': scheme, 'host': host, 'port': port,
4415
ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n')
4416
# We use an empty conf so that the user is always prompted
4417
conf = config.AuthenticationConfig()
4418
self.assertEqual(username, conf.get_user(scheme, host, port=port,
4419
realm=realm, path=path, ask=True))
4420
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4421
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4423
def test_username_defaults_prompts(self):
4424
# HTTP prompts can't be tested here, see test_http.py
4425
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
4426
self._check_default_username_prompt(
4427
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
4428
self._check_default_username_prompt(
4429
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
4431
def test_username_default_no_prompt(self):
4432
conf = config.AuthenticationConfig()
4433
self.assertEqual(None,
4434
conf.get_user('ftp', 'example.com'))
4435
self.assertEqual("explicitdefault",
4436
conf.get_user('ftp', 'example.com', default="explicitdefault"))
4438
def test_password_default_prompts(self):
4439
# HTTP prompts can't be tested here, see test_http.py
4440
self._check_default_password_prompt(
4441
u'FTP %(user)s@%(host)s password: ', 'ftp')
4442
self._check_default_password_prompt(
4443
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
4444
self._check_default_password_prompt(
4445
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
4446
# SMTP port handling is a bit special (it's handled if embedded in the
4448
# FIXME: should we: forbid that, extend it to other schemes, leave
4449
# things as they are that's fine thank you ?
4450
self._check_default_password_prompt(
4451
u'SMTP %(user)s@%(host)s password: ', 'smtp')
4452
self._check_default_password_prompt(
4453
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
4454
self._check_default_password_prompt(
4455
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
4457
def test_ssh_password_emits_warning(self):
4458
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4465
entered_password = 'typed-by-hand'
4466
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4468
# Since the password defined in the authentication config is ignored,
4469
# the user is prompted
4470
self.assertEqual(entered_password,
4471
conf.get_password('ssh', 'bar.org', user='jim'))
4472
self.assertContainsRe(
4474
'password ignored in section \[ssh with password\]')
4476
def test_ssh_without_password_doesnt_emit_warning(self):
4477
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4483
entered_password = 'typed-by-hand'
4484
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4486
# Since the password defined in the authentication config is ignored,
4487
# the user is prompted
4488
self.assertEqual(entered_password,
4489
conf.get_password('ssh', 'bar.org', user='jim'))
4490
# No warning shoud be emitted since there is no password. We are only
4492
self.assertNotContainsRe(
4494
'password ignored in section \[ssh with password\]')
4496
def test_uses_fallback_stores(self):
4497
self.overrideAttr(config, 'credential_store_registry',
4498
config.CredentialStoreRegistry())
4499
store = StubCredentialStore()
4500
store.add_credentials("http", "example.com", "joe", "secret")
4501
config.credential_store_registry.register("stub", store, fallback=True)
4502
conf = config.AuthenticationConfig(_file=BytesIO())
4503
creds = conf.get_credentials("http", "example.com")
4504
self.assertEqual("joe", creds["user"])
4505
self.assertEqual("secret", creds["password"])
4508
class StubCredentialStore(config.CredentialStore):
4514
def add_credentials(self, scheme, host, user, password=None):
4515
self._username[(scheme, host)] = user
4516
self._password[(scheme, host)] = password
4518
def get_credentials(self, scheme, host, port=None, user=None,
4519
path=None, realm=None):
4520
key = (scheme, host)
4521
if not key in self._username:
4523
return { "scheme": scheme, "host": host, "port": port,
4524
"user": self._username[key], "password": self._password[key]}
4527
class CountingCredentialStore(config.CredentialStore):
4532
def get_credentials(self, scheme, host, port=None, user=None,
4533
path=None, realm=None):
4538
class TestCredentialStoreRegistry(tests.TestCase):
4540
def _get_cs_registry(self):
4541
return config.credential_store_registry
4543
def test_default_credential_store(self):
4544
r = self._get_cs_registry()
4545
default = r.get_credential_store(None)
4546
self.assertIsInstance(default, config.PlainTextCredentialStore)
4548
def test_unknown_credential_store(self):
4549
r = self._get_cs_registry()
4550
# It's hard to imagine someone creating a credential store named
4551
# 'unknown' so we use that as an never registered key.
4552
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
4554
def test_fallback_none_registered(self):
4555
r = config.CredentialStoreRegistry()
4556
self.assertEqual(None,
4557
r.get_fallback_credentials("http", "example.com"))
4559
def test_register(self):
4560
r = config.CredentialStoreRegistry()
4561
r.register("stub", StubCredentialStore(), fallback=False)
4562
r.register("another", StubCredentialStore(), fallback=True)
4563
self.assertEqual(["another", "stub"], r.keys())
4565
def test_register_lazy(self):
4566
r = config.CredentialStoreRegistry()
4567
r.register_lazy("stub", "breezy.tests.test_config",
4568
"StubCredentialStore", fallback=False)
4569
self.assertEqual(["stub"], r.keys())
4570
self.assertIsInstance(r.get_credential_store("stub"),
4571
StubCredentialStore)
4573
def test_is_fallback(self):
4574
r = config.CredentialStoreRegistry()
4575
r.register("stub1", None, fallback=False)
4576
r.register("stub2", None, fallback=True)
4577
self.assertEqual(False, r.is_fallback("stub1"))
4578
self.assertEqual(True, r.is_fallback("stub2"))
4580
def test_no_fallback(self):
4581
r = config.CredentialStoreRegistry()
4582
store = CountingCredentialStore()
4583
r.register("count", store, fallback=False)
4584
self.assertEqual(None,
4585
r.get_fallback_credentials("http", "example.com"))
4586
self.assertEqual(0, store._calls)
4588
def test_fallback_credentials(self):
4589
r = config.CredentialStoreRegistry()
4590
store = StubCredentialStore()
4591
store.add_credentials("http", "example.com",
4592
"somebody", "geheim")
4593
r.register("stub", store, fallback=True)
4594
creds = r.get_fallback_credentials("http", "example.com")
4595
self.assertEqual("somebody", creds["user"])
4596
self.assertEqual("geheim", creds["password"])
4598
def test_fallback_first_wins(self):
4599
r = config.CredentialStoreRegistry()
4600
stub1 = StubCredentialStore()
4601
stub1.add_credentials("http", "example.com",
4602
"somebody", "stub1")
4603
r.register("stub1", stub1, fallback=True)
4604
stub2 = StubCredentialStore()
4605
stub2.add_credentials("http", "example.com",
4606
"somebody", "stub2")
4607
r.register("stub2", stub1, fallback=True)
4608
creds = r.get_fallback_credentials("http", "example.com")
4609
self.assertEqual("somebody", creds["user"])
4610
self.assertEqual("stub1", creds["password"])
4613
class TestPlainTextCredentialStore(tests.TestCase):
4615
def test_decode_password(self):
4616
r = config.credential_store_registry
4617
plain_text = r.get_credential_store()
4618
decoded = plain_text.decode_password(dict(password='secret'))
4619
self.assertEqual('secret', decoded)
4622
class TestBase64CredentialStore(tests.TestCase):
4624
def test_decode_password(self):
4625
r = config.credential_store_registry
4626
plain_text = r.get_credential_store('base64')
4627
decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
4628
self.assertEqual('secret', decoded)
4631
# FIXME: Once we have a way to declare authentication to all test servers, we
4632
# can implement generic tests.
4633
# test_user_password_in_url
4634
# test_user_in_url_password_from_config
4635
# test_user_in_url_password_prompted
4636
# test_user_in_config
4637
# test_user_getpass.getuser
4638
# test_user_prompted ?
4639
class TestAuthenticationRing(tests.TestCaseWithTransport):
4643
class TestAutoUserId(tests.TestCase):
4644
"""Test inferring an automatic user name."""
4646
def test_auto_user_id(self):
4647
"""Automatic inference of user name.
4649
This is a bit hard to test in an isolated way, because it depends on
4650
system functions that go direct to /etc or perhaps somewhere else.
4651
But it's reasonable to say that on Unix, with an /etc/mailname, we ought
4652
to be able to choose a user name with no configuration.
4654
if sys.platform == 'win32':
4655
raise tests.TestSkipped(
4656
"User name inference not implemented on win32")
4657
realname, address = config._auto_user_id()
4658
if os.path.exists('/etc/mailname'):
4659
self.assertIsNot(None, realname)
4660
self.assertIsNot(None, address)
4662
self.assertEqual((None, None), (realname, address))
4665
class TestDefaultMailDomain(tests.TestCaseInTempDir):
4666
"""Test retrieving default domain from mailname file"""
4668
def test_default_mail_domain_simple(self):
4669
f = file('simple', 'w')
4671
f.write("domainname.com\n")
4674
r = config._get_default_mail_domain('simple')
4675
self.assertEqual('domainname.com', r)
4677
def test_default_mail_domain_no_eol(self):
4678
f = file('no_eol', 'w')
4680
f.write("domainname.com")
4683
r = config._get_default_mail_domain('no_eol')
4684
self.assertEqual('domainname.com', r)
4686
def test_default_mail_domain_multiple_lines(self):
4687
f = file('multiple_lines', 'w')
4689
f.write("domainname.com\nsome other text\n")
4692
r = config._get_default_mail_domain('multiple_lines')
4693
self.assertEqual('domainname.com', r)
4696
class EmailOptionTests(tests.TestCase):
4698
def test_default_email_uses_BRZ_EMAIL(self):
4699
conf = config.MemoryStack('email=jelmer@debian.org')
4700
# BRZ_EMAIL takes precedence over EMAIL
4701
self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
4702
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4703
self.assertEqual('jelmer@samba.org', conf.get('email'))
4705
def test_default_email_uses_EMAIL(self):
4706
conf = config.MemoryStack('')
4707
self.overrideEnv('BRZ_EMAIL', None)
4708
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4709
self.assertEqual('jelmer@apache.org', conf.get('email'))
4711
def test_BRZ_EMAIL_overrides(self):
4712
conf = config.MemoryStack('email=jelmer@debian.org')
4713
self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
4714
self.assertEqual('jelmer@apache.org', conf.get('email'))
4715
self.overrideEnv('BRZ_EMAIL', None)
4716
self.overrideEnv('EMAIL', 'jelmer@samba.org')
4717
self.assertEqual('jelmer@debian.org', conf.get('email'))
4720
class MailClientOptionTests(tests.TestCase):
4722
def test_default(self):
4723
conf = config.MemoryStack('')
4724
client = conf.get('mail_client')
4725
self.assertIs(client, mail_client.DefaultMail)
4727
def test_evolution(self):
4728
conf = config.MemoryStack('mail_client=evolution')
4729
client = conf.get('mail_client')
4730
self.assertIs(client, mail_client.Evolution)
4732
def test_kmail(self):
4733
conf = config.MemoryStack('mail_client=kmail')
4734
client = conf.get('mail_client')
4735
self.assertIs(client, mail_client.KMail)
4737
def test_mutt(self):
4738
conf = config.MemoryStack('mail_client=mutt')
4739
client = conf.get('mail_client')
4740
self.assertIs(client, mail_client.Mutt)
4742
def test_thunderbird(self):
4743
conf = config.MemoryStack('mail_client=thunderbird')
4744
client = conf.get('mail_client')
4745
self.assertIs(client, mail_client.Thunderbird)
4747
def test_explicit_default(self):
4748
conf = config.MemoryStack('mail_client=default')
4749
client = conf.get('mail_client')
4750
self.assertIs(client, mail_client.DefaultMail)
4752
def test_editor(self):
4753
conf = config.MemoryStack('mail_client=editor')
4754
client = conf.get('mail_client')
4755
self.assertIs(client, mail_client.Editor)
4757
def test_mapi(self):
4758
conf = config.MemoryStack('mail_client=mapi')
4759
client = conf.get('mail_client')
4760
self.assertIs(client, mail_client.MAPIClient)
4762
def test_xdg_email(self):
4763
conf = config.MemoryStack('mail_client=xdg-email')
4764
client = conf.get('mail_client')
4765
self.assertIs(client, mail_client.XDGEmail)
4767
def test_unknown(self):
4768
conf = config.MemoryStack('mail_client=firebird')
4769
self.assertRaises(config.ConfigOptionValueError, conf.get,