1
# Copyright (C) 2005-2014, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for finding and reading the bzr config file[s]."""
19
from textwrap import dedent
25
from testtools import matchers
37
registry as _mod_registry,
42
from ..sixish import (
45
from ..transport import remote as transport_remote
53
def lockable_config_scenarios():
56
{'config_class': config.GlobalConfig,
58
'config_section': 'DEFAULT'}),
60
{'config_class': config.LocationConfig,
62
'config_section': '.'}),]
65
load_tests = scenarios.load_tests_apply_scenarios
67
# Register helpers to build stores
68
config.test_store_builder_registry.register(
69
'configobj', lambda test: config.TransportIniFileStore(
70
test.get_transport(), 'configobj.conf'))
71
config.test_store_builder_registry.register(
72
'bazaar', lambda test: config.GlobalStore())
73
config.test_store_builder_registry.register(
74
'location', lambda test: config.LocationStore())
77
def build_backing_branch(test, relpath,
78
transport_class=None, server_class=None):
79
"""Test helper to create a backing branch only once.
81
Some tests needs multiple stores/stacks to check concurrent update
82
behaviours. As such, they need to build different branch *objects* even if
83
they share the branch on disk.
85
:param relpath: The relative path to the branch. (Note that the helper
86
should always specify the same relpath).
88
:param transport_class: The Transport class the test needs to use.
90
:param server_class: The server associated with the ``transport_class``
93
Either both or neither of ``transport_class`` and ``server_class`` should
96
if transport_class is not None and server_class is not None:
97
test.transport_class = transport_class
98
test.transport_server = server_class
99
elif not (transport_class is None and server_class is None):
100
raise AssertionError('Specify both ``transport_class`` and '
101
'``server_class`` or neither of them')
102
if getattr(test, 'backing_branch', None) is None:
103
# First call, let's build the branch on disk
104
test.backing_branch = test.make_branch(relpath)
107
def build_branch_store(test):
108
build_backing_branch(test, 'branch')
109
b = branch.Branch.open('branch')
110
return config.BranchStore(b)
111
config.test_store_builder_registry.register('branch', build_branch_store)
114
def build_control_store(test):
115
build_backing_branch(test, 'branch')
116
b = controldir.ControlDir.open('branch')
117
return config.ControlStore(b)
118
config.test_store_builder_registry.register('control', build_control_store)
121
def build_remote_branch_store(test):
122
# There is only one permutation (but we won't be able to handle more with
123
# this design anyway)
125
server_class) = transport_remote.get_test_permutations()[0]
126
build_backing_branch(test, 'branch', transport_class, server_class)
127
b = branch.Branch.open(test.get_url('branch'))
128
return config.BranchStore(b)
129
config.test_store_builder_registry.register('remote_branch',
130
build_remote_branch_store)
133
config.test_stack_builder_registry.register(
134
'bazaar', lambda test: config.GlobalStack())
135
config.test_stack_builder_registry.register(
136
'location', lambda test: config.LocationStack('.'))
139
def build_branch_stack(test):
140
build_backing_branch(test, 'branch')
141
b = branch.Branch.open('branch')
142
return config.BranchStack(b)
143
config.test_stack_builder_registry.register('branch', build_branch_stack)
146
def build_branch_only_stack(test):
147
# There is only one permutation (but we won't be able to handle more with
148
# this design anyway)
150
server_class) = transport_remote.get_test_permutations()[0]
151
build_backing_branch(test, 'branch', transport_class, server_class)
152
b = branch.Branch.open(test.get_url('branch'))
153
return config.BranchOnlyStack(b)
154
config.test_stack_builder_registry.register('branch_only',
155
build_branch_only_stack)
157
def build_remote_control_stack(test):
158
# There is only one permutation (but we won't be able to handle more with
159
# this design anyway)
161
server_class) = transport_remote.get_test_permutations()[0]
162
# We need only a bzrdir for this, not a full branch, but it's not worth
163
# creating a dedicated helper to create only the bzrdir
164
build_backing_branch(test, 'branch', transport_class, server_class)
165
b = branch.Branch.open(test.get_url('branch'))
166
return config.RemoteControlStack(b.controldir)
167
config.test_stack_builder_registry.register('remote_control',
168
build_remote_control_stack)
171
sample_long_alias="log -r-15..-1 --line"
172
sample_config_text = u"""
174
email=Erik B\u00e5gfors <erik@bagfors.nu>
176
change_editor=vimdiff -of @new_path @old_path
177
gpg_signing_command=gnome-gpg
178
gpg_signing_key=DD4D5088
180
validate_signatures_in_log=true
182
user_global_option=something
183
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
184
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
185
bzr.mergetool.newtool='"newtool with spaces" {this_temp}'
186
bzr.default_mergetool=sometool
189
ll=""" + sample_long_alias + "\n"
192
sample_always_signatures = """
194
check_signatures=ignore
195
create_signatures=always
198
sample_ignore_signatures = """
200
check_signatures=require
201
create_signatures=never
204
sample_maybe_signatures = """
206
check_signatures=ignore
207
create_signatures=when-required
210
sample_branches_text = """
211
[http://www.example.com]
213
email=Robert Collins <robertc@example.org>
214
normal_option = normal
215
appendpath_option = append
216
appendpath_option:policy = appendpath
217
norecurse_option = norecurse
218
norecurse_option:policy = norecurse
219
[http://www.example.com/ignoreparent]
220
# different project: ignore parent dir config
222
[http://www.example.com/norecurse]
223
# configuration items that only apply to this dir
225
normal_option = norecurse
226
[http://www.example.com/dir]
227
appendpath_option = normal
229
check_signatures=require
230
# test trailing / matching with no children
232
check_signatures=check-available
233
gpg_signing_command=false
234
gpg_signing_key=default
235
user_local_option=local
236
# test trailing / matching
238
#subdirs will match but not the parent
240
check_signatures=ignore
241
post_commit=breezy.tests.test_config.post_commit
242
#testing explicit beats globs
246
def create_configs(test):
247
"""Create configuration files for a given test.
249
This requires creating a tree (and populate the ``test.tree`` attribute)
250
and its associated branch and will populate the following attributes:
252
- branch_config: A BranchConfig for the associated branch.
254
- locations_config : A LocationConfig for the associated branch
256
- bazaar_config: A GlobalConfig.
258
The tree and branch are created in a 'tree' subdirectory so the tests can
259
still use the test directory to stay outside of the branch.
261
tree = test.make_branch_and_tree('tree')
263
test.branch_config = config.BranchConfig(tree.branch)
264
test.locations_config = config.LocationConfig(tree.basedir)
265
test.bazaar_config = config.GlobalConfig()
268
def create_configs_with_file_option(test):
269
"""Create configuration files with a ``file`` option set in each.
271
This builds on ``create_configs`` and add one ``file`` option in each
272
configuration with a value which allows identifying the configuration file.
275
test.bazaar_config.set_user_option('file', 'bazaar')
276
test.locations_config.set_user_option('file', 'locations')
277
test.branch_config.set_user_option('file', 'branch')
280
class TestOptionsMixin:
282
def assertOptions(self, expected, conf):
283
# We don't care about the parser (as it will make tests hard to write
284
# and error-prone anyway)
285
self.assertThat([opt[:4] for opt in conf._get_options()],
286
matchers.Equals(expected))
289
class InstrumentedConfigObj(object):
290
"""A config obj look-enough-alike to record calls made to it."""
292
def __contains__(self, thing):
293
self._calls.append(('__contains__', thing))
296
def __getitem__(self, key):
297
self._calls.append(('__getitem__', key))
300
def __init__(self, input, encoding=None):
301
self._calls = [('__init__', input, encoding)]
303
def __setitem__(self, key, value):
304
self._calls.append(('__setitem__', key, value))
306
def __delitem__(self, key):
307
self._calls.append(('__delitem__', key))
310
self._calls.append(('keys',))
314
self._calls.append(('reload',))
316
def write(self, arg):
317
self._calls.append(('write',))
319
def as_bool(self, value):
320
self._calls.append(('as_bool', value))
323
def get_value(self, section, name):
324
self._calls.append(('get_value', section, name))
328
class FakeBranch(object):
330
def __init__(self, base=None):
332
self.base = "http://example.com/branches/demo"
335
self._transport = self.control_files = \
336
FakeControlFilesAndTransport()
338
def _get_config(self):
339
return config.TransportConfig(self._transport, 'branch.conf')
341
def lock_write(self):
348
class FakeControlFilesAndTransport(object):
352
self._transport = self
354
def get(self, filename):
357
return BytesIO(self.files[filename])
359
raise errors.NoSuchFile(filename)
361
def get_bytes(self, filename):
364
return self.files[filename]
366
raise errors.NoSuchFile(filename)
368
def put(self, filename, fileobj):
369
self.files[filename] = fileobj.read()
371
def put_file(self, filename, fileobj):
372
return self.put(filename, fileobj)
375
class InstrumentedConfig(config.Config):
376
"""An instrumented config that supplies stubs for template methods."""
379
super(InstrumentedConfig, self).__init__()
381
self._signatures = config.CHECK_NEVER
383
def _get_user_id(self):
384
self._calls.append('_get_user_id')
385
return "Robert Collins <robert.collins@example.org>"
387
def _get_signature_checking(self):
388
self._calls.append('_get_signature_checking')
389
return self._signatures
391
def _get_change_editor(self):
392
self._calls.append('_get_change_editor')
393
return 'vimdiff -fo @new_path @old_path'
396
bool_config = """[DEFAULT]
405
class TestConfigObj(tests.TestCase):
407
def test_get_bool(self):
408
co = config.ConfigObj(BytesIO(bool_config))
409
self.assertIs(co.get_bool('DEFAULT', 'active'), True)
410
self.assertIs(co.get_bool('DEFAULT', 'inactive'), False)
411
self.assertIs(co.get_bool('UPPERCASE', 'active'), True)
412
self.assertIs(co.get_bool('UPPERCASE', 'nonactive'), False)
414
def test_hash_sign_in_value(self):
416
Before 4.5.0, ConfigObj did not quote # signs in values, so they'd be
417
treated as comments when read in again. (#86838)
419
co = config.ConfigObj()
420
co['test'] = 'foo#bar'
422
co.write(outfile=outfile)
423
lines = outfile.getvalue().splitlines()
424
self.assertEqual(lines, ['test = "foo#bar"'])
425
co2 = config.ConfigObj(lines)
426
self.assertEqual(co2['test'], 'foo#bar')
428
def test_triple_quotes(self):
429
# Bug #710410: if the value string has triple quotes
430
# then ConfigObj versions up to 4.7.2 will quote them wrong
431
# and won't able to read them back
432
triple_quotes_value = '''spam
433
""" that's my spam """
435
co = config.ConfigObj()
436
co['test'] = triple_quotes_value
437
# While writing this test another bug in ConfigObj has been found:
438
# method co.write() without arguments produces list of lines
439
# one option per line, and multiline values are not split
440
# across multiple lines,
441
# and that breaks the parsing these lines back by ConfigObj.
442
# This issue only affects test, but it's better to avoid
443
# `co.write()` construct at all.
444
# [bialix 20110222] bug report sent to ConfigObj's author
446
co.write(outfile=outfile)
447
output = outfile.getvalue()
448
# now we're trying to read it back
449
co2 = config.ConfigObj(BytesIO(output))
450
self.assertEqual(triple_quotes_value, co2['test'])
453
erroneous_config = """[section] # line 1
456
whocares=notme # line 4
460
class TestConfigObjErrors(tests.TestCase):
462
def test_duplicate_section_name_error_line(self):
464
co = configobj.ConfigObj(BytesIO(erroneous_config),
466
except config.configobj.DuplicateError as e:
467
self.assertEqual(3, e.line_number)
469
self.fail('Error in config file not detected')
472
class TestConfig(tests.TestCase):
474
def test_constructs(self):
477
def test_user_email(self):
478
my_config = InstrumentedConfig()
479
self.assertEqual('robert.collins@example.org', my_config.user_email())
480
self.assertEqual(['_get_user_id'], my_config._calls)
482
def test_username(self):
483
my_config = InstrumentedConfig()
484
self.assertEqual('Robert Collins <robert.collins@example.org>',
485
my_config.username())
486
self.assertEqual(['_get_user_id'], my_config._calls)
488
def test_get_user_option_default(self):
489
my_config = config.Config()
490
self.assertEqual(None, my_config.get_user_option('no_option'))
492
def test_validate_signatures_in_log_default(self):
493
my_config = config.Config()
494
self.assertEqual(False, my_config.validate_signatures_in_log())
496
def test_get_change_editor(self):
497
my_config = InstrumentedConfig()
498
change_editor = my_config.get_change_editor('old_tree', 'new_tree')
499
self.assertEqual(['_get_change_editor'], my_config._calls)
500
self.assertIs(diff.DiffFromTool, change_editor.__class__)
501
self.assertEqual(['vimdiff', '-fo', '@new_path', '@old_path'],
502
change_editor.command_template)
505
class TestConfigPath(tests.TestCase):
508
super(TestConfigPath, self).setUp()
509
self.overrideEnv('HOME', '/home/bogus')
510
self.overrideEnv('XDG_CACHE_HOME', '')
511
if sys.platform == 'win32':
514
r'C:\Documents and Settings\bogus\Application Data')
516
'C:/Documents and Settings/bogus/Application Data/breezy'
518
self.brz_home = '/home/bogus/.config/breezy'
520
def test_config_dir(self):
521
self.assertEqual(config.config_dir(), self.brz_home)
523
def test_config_dir_is_unicode(self):
524
self.assertIsInstance(config.config_dir(), unicode)
526
def test_config_filename(self):
527
self.assertEqual(config.config_filename(),
528
self.brz_home + '/bazaar.conf')
530
def test_locations_config_filename(self):
531
self.assertEqual(config.locations_config_filename(),
532
self.brz_home + '/locations.conf')
534
def test_authentication_config_filename(self):
535
self.assertEqual(config.authentication_config_filename(),
536
self.brz_home + '/authentication.conf')
538
def test_xdg_cache_dir(self):
539
self.assertEqual(config.xdg_cache_dir(),
540
'/home/bogus/.cache')
543
class TestXDGConfigDir(tests.TestCaseInTempDir):
544
# must be in temp dir because config tests for the existence of the bazaar
545
# subdirectory of $XDG_CONFIG_HOME
548
if sys.platform == 'win32':
549
raise tests.TestNotApplicable(
550
'XDG config dir not used on this platform')
551
super(TestXDGConfigDir, self).setUp()
552
self.overrideEnv('HOME', self.test_home_dir)
553
# BRZ_HOME overrides everything we want to test so unset it.
554
self.overrideEnv('BRZ_HOME', None)
556
def test_xdg_config_dir_exists(self):
557
"""When ~/.config/bazaar exists, use it as the config dir."""
558
newdir = osutils.pathjoin(self.test_home_dir, '.config', 'bazaar')
560
self.assertEqual(config.config_dir(), newdir)
562
def test_xdg_config_home(self):
563
"""When XDG_CONFIG_HOME is set, use it."""
564
xdgconfigdir = osutils.pathjoin(self.test_home_dir, 'xdgconfig')
565
self.overrideEnv('XDG_CONFIG_HOME', xdgconfigdir)
566
newdir = osutils.pathjoin(xdgconfigdir, 'bazaar')
568
self.assertEqual(config.config_dir(), newdir)
571
class TestIniConfig(tests.TestCaseInTempDir):
573
def make_config_parser(self, s):
574
conf = config.IniBasedConfig.from_string(s)
575
return conf, conf._get_parser()
578
class TestIniConfigBuilding(TestIniConfig):
580
def test_contructs(self):
581
config.IniBasedConfig()
583
def test_from_fp(self):
584
my_config = config.IniBasedConfig.from_string(sample_config_text)
585
self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
587
def test_cached(self):
588
my_config = config.IniBasedConfig.from_string(sample_config_text)
589
parser = my_config._get_parser()
590
self.assertTrue(my_config._get_parser() is parser)
592
def _dummy_chown(self, path, uid, gid):
593
self.path, self.uid, self.gid = path, uid, gid
595
def test_ini_config_ownership(self):
596
"""Ensure that chown is happening during _write_config_file"""
597
self.requireFeature(features.chown_feature)
598
self.overrideAttr(os, 'chown', self._dummy_chown)
599
self.path = self.uid = self.gid = None
600
conf = config.IniBasedConfig(file_name='./foo.conf')
601
conf._write_config_file()
602
self.assertEqual(self.path, './foo.conf')
603
self.assertTrue(isinstance(self.uid, int))
604
self.assertTrue(isinstance(self.gid, int))
607
class TestIniConfigSaving(tests.TestCaseInTempDir):
609
def test_cant_save_without_a_file_name(self):
610
conf = config.IniBasedConfig()
611
self.assertRaises(AssertionError, conf._write_config_file)
613
def test_saved_with_content(self):
614
content = 'foo = bar\n'
615
config.IniBasedConfig.from_string(content, file_name='./test.conf',
617
self.assertFileEqual(content, 'test.conf')
620
class TestIniConfigOptionExpansion(tests.TestCase):
621
"""Test option expansion from the IniConfig level.
623
What we really want here is to test the Config level, but the class being
624
abstract as far as storing values is concerned, this can't be done
627
# FIXME: This should be rewritten when all configs share a storage
628
# implementation -- vila 2011-02-18
630
def get_config(self, string=None):
633
c = config.IniBasedConfig.from_string(string)
636
def assertExpansion(self, expected, conf, string, env=None):
637
self.assertEqual(expected, conf.expand_options(string, env))
639
def test_no_expansion(self):
640
c = self.get_config('')
641
self.assertExpansion('foo', c, 'foo')
643
def test_env_adding_options(self):
644
c = self.get_config('')
645
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
647
def test_env_overriding_options(self):
648
c = self.get_config('foo=baz')
649
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
651
def test_simple_ref(self):
652
c = self.get_config('foo=xxx')
653
self.assertExpansion('xxx', c, '{foo}')
655
def test_unknown_ref(self):
656
c = self.get_config('')
657
self.assertRaises(errors.ExpandingUnknownOption,
658
c.expand_options, '{foo}')
660
def test_indirect_ref(self):
661
c = self.get_config('''
665
self.assertExpansion('xxx', c, '{bar}')
667
def test_embedded_ref(self):
668
c = self.get_config('''
672
self.assertExpansion('xxx', c, '{{bar}}')
674
def test_simple_loop(self):
675
c = self.get_config('foo={foo}')
676
self.assertRaises(errors.OptionExpansionLoop, c.expand_options, '{foo}')
678
def test_indirect_loop(self):
679
c = self.get_config('''
683
e = self.assertRaises(errors.OptionExpansionLoop,
684
c.expand_options, '{foo}')
685
self.assertEqual('foo->bar->baz', e.refs)
686
self.assertEqual('{foo}', e.string)
689
conf = self.get_config('''
693
list={foo},{bar},{baz}
695
self.assertEqual(['start', 'middle', 'end'],
696
conf.get_user_option('list', expand=True))
698
def test_cascading_list(self):
699
conf = self.get_config('''
705
self.assertEqual(['start', 'middle', 'end'],
706
conf.get_user_option('list', expand=True))
708
def test_pathological_hidden_list(self):
709
conf = self.get_config('''
715
hidden={start}{middle}{end}
717
# Nope, it's either a string or a list, and the list wins as soon as a
718
# ',' appears, so the string concatenation never occur.
719
self.assertEqual(['{foo', '}', '{', 'bar}'],
720
conf.get_user_option('hidden', expand=True))
723
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
725
def get_config(self, location, string=None):
728
# Since we don't save the config we won't strictly require to inherit
729
# from TestCaseInTempDir, but an error occurs so quickly...
730
c = config.LocationConfig.from_string(string, location)
733
def test_dont_cross_unrelated_section(self):
734
c = self.get_config('/another/branch/path','''
739
[/another/branch/path]
742
self.assertRaises(errors.ExpandingUnknownOption,
743
c.get_user_option, 'bar', expand=True)
745
def test_cross_related_sections(self):
746
c = self.get_config('/project/branch/path','''
750
[/project/branch/path]
753
self.assertEqual('quux', c.get_user_option('bar', expand=True))
756
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
758
def test_cannot_reload_without_name(self):
759
conf = config.IniBasedConfig.from_string(sample_config_text)
760
self.assertRaises(AssertionError, conf.reload)
762
def test_reload_see_new_value(self):
763
c1 = config.IniBasedConfig.from_string('editor=vim\n',
764
file_name='./test/conf')
765
c1._write_config_file()
766
c2 = config.IniBasedConfig.from_string('editor=emacs\n',
767
file_name='./test/conf')
768
c2._write_config_file()
769
self.assertEqual('vim', c1.get_user_option('editor'))
770
self.assertEqual('emacs', c2.get_user_option('editor'))
771
# Make sure we get the Right value
773
self.assertEqual('emacs', c1.get_user_option('editor'))
776
class TestLockableConfig(tests.TestCaseInTempDir):
778
scenarios = lockable_config_scenarios()
783
config_section = None
786
super(TestLockableConfig, self).setUp()
787
self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
788
self.config = self.create_config(self._content)
790
def get_existing_config(self):
791
return self.config_class(*self.config_args)
793
def create_config(self, content):
794
kwargs = dict(save=True)
795
c = self.config_class.from_string(content, *self.config_args, **kwargs)
798
def test_simple_read_access(self):
799
self.assertEqual('1', self.config.get_user_option('one'))
801
def test_simple_write_access(self):
802
self.config.set_user_option('one', 'one')
803
self.assertEqual('one', self.config.get_user_option('one'))
805
def test_listen_to_the_last_speaker(self):
807
c2 = self.get_existing_config()
808
c1.set_user_option('one', 'ONE')
809
c2.set_user_option('two', 'TWO')
810
self.assertEqual('ONE', c1.get_user_option('one'))
811
self.assertEqual('TWO', c2.get_user_option('two'))
812
# The second update respect the first one
813
self.assertEqual('ONE', c2.get_user_option('one'))
815
def test_last_speaker_wins(self):
816
# If the same config is not shared, the same variable modified twice
817
# can only see a single result.
819
c2 = self.get_existing_config()
820
c1.set_user_option('one', 'c1')
821
c2.set_user_option('one', 'c2')
822
self.assertEqual('c2', c2._get_user_option('one'))
823
# The first modification is still available until another refresh
825
self.assertEqual('c1', c1._get_user_option('one'))
826
c1.set_user_option('two', 'done')
827
self.assertEqual('c2', c1._get_user_option('one'))
829
def test_writes_are_serialized(self):
831
c2 = self.get_existing_config()
833
# We spawn a thread that will pause *during* the write
834
before_writing = threading.Event()
835
after_writing = threading.Event()
836
writing_done = threading.Event()
837
c1_orig = c1._write_config_file
838
def c1_write_config_file():
841
# The lock is held. We wait for the main thread to decide when to
844
c1._write_config_file = c1_write_config_file
846
c1.set_user_option('one', 'c1')
848
t1 = threading.Thread(target=c1_set_option)
849
# Collect the thread after the test
850
self.addCleanup(t1.join)
851
# Be ready to unblock the thread if the test goes wrong
852
self.addCleanup(after_writing.set)
854
before_writing.wait()
855
self.assertTrue(c1._lock.is_held)
856
self.assertRaises(errors.LockContention,
857
c2.set_user_option, 'one', 'c2')
858
self.assertEqual('c1', c1.get_user_option('one'))
859
# Let the lock be released
862
c2.set_user_option('one', 'c2')
863
self.assertEqual('c2', c2.get_user_option('one'))
865
def test_read_while_writing(self):
867
# We spawn a thread that will pause *during* the write
868
ready_to_write = threading.Event()
869
do_writing = threading.Event()
870
writing_done = threading.Event()
871
c1_orig = c1._write_config_file
872
def c1_write_config_file():
874
# The lock is held. We wait for the main thread to decide when to
879
c1._write_config_file = c1_write_config_file
881
c1.set_user_option('one', 'c1')
882
t1 = threading.Thread(target=c1_set_option)
883
# Collect the thread after the test
884
self.addCleanup(t1.join)
885
# Be ready to unblock the thread if the test goes wrong
886
self.addCleanup(do_writing.set)
888
# Ensure the thread is ready to write
889
ready_to_write.wait()
890
self.assertTrue(c1._lock.is_held)
891
self.assertEqual('c1', c1.get_user_option('one'))
892
# If we read during the write, we get the old value
893
c2 = self.get_existing_config()
894
self.assertEqual('1', c2.get_user_option('one'))
895
# Let the writing occur and ensure it occurred
898
# Now we get the updated value
899
c3 = self.get_existing_config()
900
self.assertEqual('c1', c3.get_user_option('one'))
903
class TestGetUserOptionAs(TestIniConfig):
905
def test_get_user_option_as_bool(self):
906
conf, parser = self.make_config_parser("""
909
an_invalid_bool = maybe
910
a_list = hmm, who knows ? # This is interpreted as a list !
912
get_bool = conf.get_user_option_as_bool
913
self.assertEqual(True, get_bool('a_true_bool'))
914
self.assertEqual(False, get_bool('a_false_bool'))
917
warnings.append(args[0] % args[1:])
918
self.overrideAttr(trace, 'warning', warning)
919
msg = 'Value "%s" is not a boolean for "%s"'
920
self.assertIs(None, get_bool('an_invalid_bool'))
921
self.assertEqual(msg % ('maybe', 'an_invalid_bool'), warnings[0])
923
self.assertIs(None, get_bool('not_defined_in_this_config'))
924
self.assertEqual([], warnings)
926
def test_get_user_option_as_list(self):
927
conf, parser = self.make_config_parser("""
932
get_list = conf.get_user_option_as_list
933
self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
934
self.assertEqual(['1'], get_list('length_1'))
935
self.assertEqual('x', conf.get_user_option('one_item'))
936
# automatically cast to list
937
self.assertEqual(['x'], get_list('one_item'))
940
class TestSupressWarning(TestIniConfig):
942
def make_warnings_config(self, s):
943
conf, parser = self.make_config_parser(s)
944
return conf.suppress_warning
946
def test_suppress_warning_unknown(self):
947
suppress_warning = self.make_warnings_config('')
948
self.assertEqual(False, suppress_warning('unknown_warning'))
950
def test_suppress_warning_known(self):
951
suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
952
self.assertEqual(False, suppress_warning('c'))
953
self.assertEqual(True, suppress_warning('a'))
954
self.assertEqual(True, suppress_warning('b'))
957
class TestGetConfig(tests.TestCase):
959
def test_constructs(self):
960
config.GlobalConfig()
962
def test_calls_read_filenames(self):
963
# replace the class that is constructed, to check its parameters
964
oldparserclass = config.ConfigObj
965
config.ConfigObj = InstrumentedConfigObj
966
my_config = config.GlobalConfig()
968
parser = my_config._get_parser()
970
config.ConfigObj = oldparserclass
971
self.assertIsInstance(parser, InstrumentedConfigObj)
972
self.assertEqual(parser._calls, [('__init__', config.config_filename(),
976
class TestBranchConfig(tests.TestCaseWithTransport):
978
def test_constructs_valid(self):
979
branch = FakeBranch()
980
my_config = config.BranchConfig(branch)
981
self.assertIsNot(None, my_config)
983
def test_constructs_error(self):
984
self.assertRaises(TypeError, config.BranchConfig)
986
def test_get_location_config(self):
987
branch = FakeBranch()
988
my_config = config.BranchConfig(branch)
989
location_config = my_config._get_location_config()
990
self.assertEqual(branch.base, location_config.location)
991
self.assertIs(location_config, my_config._get_location_config())
993
def test_get_config(self):
994
"""The Branch.get_config method works properly"""
995
b = controldir.ControlDir.create_standalone_workingtree('.').branch
996
my_config = b.get_config()
997
self.assertIs(my_config.get_user_option('wacky'), None)
998
my_config.set_user_option('wacky', 'unlikely')
999
self.assertEqual(my_config.get_user_option('wacky'), 'unlikely')
1001
# Ensure we get the same thing if we start again
1002
b2 = branch.Branch.open('.')
1003
my_config2 = b2.get_config()
1004
self.assertEqual(my_config2.get_user_option('wacky'), 'unlikely')
1006
def test_has_explicit_nickname(self):
1007
b = self.make_branch('.')
1008
self.assertFalse(b.get_config().has_explicit_nickname())
1010
self.assertTrue(b.get_config().has_explicit_nickname())
1012
def test_config_url(self):
1013
"""The Branch.get_config will use section that uses a local url"""
1014
branch = self.make_branch('branch')
1015
self.assertEqual('branch', branch.nick)
1017
local_url = urlutils.local_path_to_url('branch')
1018
conf = config.LocationConfig.from_string(
1019
'[%s]\nnickname = foobar' % (local_url,),
1020
local_url, save=True)
1021
self.assertIsNot(None, conf)
1022
self.assertEqual('foobar', branch.nick)
1024
def test_config_local_path(self):
1025
"""The Branch.get_config will use a local system path"""
1026
branch = self.make_branch('branch')
1027
self.assertEqual('branch', branch.nick)
1029
local_path = osutils.getcwd().encode('utf8')
1030
config.LocationConfig.from_string(
1031
'[%s/branch]\nnickname = barry' % (local_path,),
1032
'branch', save=True)
1033
# Now the branch will find its nick via the location config
1034
self.assertEqual('barry', branch.nick)
1036
def test_config_creates_local(self):
1037
"""Creating a new entry in config uses a local path."""
1038
branch = self.make_branch('branch', format='knit')
1039
branch.set_push_location('http://foobar')
1040
local_path = osutils.getcwd().encode('utf8')
1041
# Surprisingly ConfigObj doesn't create a trailing newline
1042
self.check_file_contents(config.locations_config_filename(),
1044
'push_location = http://foobar\n'
1045
'push_location:policy = norecurse\n'
1048
def test_autonick_urlencoded(self):
1049
b = self.make_branch('!repo')
1050
self.assertEqual('!repo', b.get_config().get_nickname())
1052
def test_autonick_uses_branch_name(self):
1053
b = self.make_branch('foo', name='bar')
1054
self.assertEqual('bar', b.get_config().get_nickname())
1056
def test_warn_if_masked(self):
1059
warnings.append(args[0] % args[1:])
1060
self.overrideAttr(trace, 'warning', warning)
1062
def set_option(store, warn_masked=True):
1064
conf.set_user_option('example_option', repr(store), store=store,
1065
warn_masked=warn_masked)
1066
def assertWarning(warning):
1068
self.assertEqual(0, len(warnings))
1070
self.assertEqual(1, len(warnings))
1071
self.assertEqual(warning, warnings[0])
1072
branch = self.make_branch('.')
1073
conf = branch.get_config()
1074
set_option(config.STORE_GLOBAL)
1076
set_option(config.STORE_BRANCH)
1078
set_option(config.STORE_GLOBAL)
1079
assertWarning('Value "4" is masked by "3" from branch.conf')
1080
set_option(config.STORE_GLOBAL, warn_masked=False)
1082
set_option(config.STORE_LOCATION)
1084
set_option(config.STORE_BRANCH)
1085
assertWarning('Value "3" is masked by "0" from locations.conf')
1086
set_option(config.STORE_BRANCH, warn_masked=False)
1090
class TestGlobalConfigItems(tests.TestCaseInTempDir):
1092
def _get_empty_config(self):
1093
my_config = config.GlobalConfig()
1096
def _get_sample_config(self):
1097
my_config = config.GlobalConfig.from_string(sample_config_text)
1100
def test_user_id(self):
1101
my_config = config.GlobalConfig.from_string(sample_config_text)
1102
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
1103
my_config._get_user_id())
1105
def test_absent_user_id(self):
1106
my_config = config.GlobalConfig()
1107
self.assertEqual(None, my_config._get_user_id())
1109
def test_get_user_option_default(self):
1110
my_config = self._get_empty_config()
1111
self.assertEqual(None, my_config.get_user_option('no_option'))
1113
def test_get_user_option_global(self):
1114
my_config = self._get_sample_config()
1115
self.assertEqual("something",
1116
my_config.get_user_option('user_global_option'))
1118
def test_configured_validate_signatures_in_log(self):
1119
my_config = self._get_sample_config()
1120
self.assertEqual(True, my_config.validate_signatures_in_log())
1122
def test_get_alias(self):
1123
my_config = self._get_sample_config()
1124
self.assertEqual('help', my_config.get_alias('h'))
1126
def test_get_aliases(self):
1127
my_config = self._get_sample_config()
1128
aliases = my_config.get_aliases()
1129
self.assertEqual(2, len(aliases))
1130
sorted_keys = sorted(aliases)
1131
self.assertEqual('help', aliases[sorted_keys[0]])
1132
self.assertEqual(sample_long_alias, aliases[sorted_keys[1]])
1134
def test_get_no_alias(self):
1135
my_config = self._get_sample_config()
1136
self.assertEqual(None, my_config.get_alias('foo'))
1138
def test_get_long_alias(self):
1139
my_config = self._get_sample_config()
1140
self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
1142
def test_get_change_editor(self):
1143
my_config = self._get_sample_config()
1144
change_editor = my_config.get_change_editor('old', 'new')
1145
self.assertIs(diff.DiffFromTool, change_editor.__class__)
1146
self.assertEqual('vimdiff -of @new_path @old_path',
1147
' '.join(change_editor.command_template))
1149
def test_get_no_change_editor(self):
1150
my_config = self._get_empty_config()
1151
change_editor = my_config.get_change_editor('old', 'new')
1152
self.assertIs(None, change_editor)
1154
def test_get_merge_tools(self):
1155
conf = self._get_sample_config()
1156
tools = conf.get_merge_tools()
1157
self.log(repr(tools))
1159
{u'funkytool' : u'funkytool "arg with spaces" {this_temp}',
1160
u'sometool' : u'sometool {base} {this} {other} -o {result}',
1161
u'newtool' : u'"newtool with spaces" {this_temp}'},
1164
def test_get_merge_tools_empty(self):
1165
conf = self._get_empty_config()
1166
tools = conf.get_merge_tools()
1167
self.assertEqual({}, tools)
1169
def test_find_merge_tool(self):
1170
conf = self._get_sample_config()
1171
cmdline = conf.find_merge_tool('sometool')
1172
self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
1174
def test_find_merge_tool_not_found(self):
1175
conf = self._get_sample_config()
1176
cmdline = conf.find_merge_tool('DOES NOT EXIST')
1177
self.assertIs(cmdline, None)
1179
def test_find_merge_tool_known(self):
1180
conf = self._get_empty_config()
1181
cmdline = conf.find_merge_tool('kdiff3')
1182
self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
1184
def test_find_merge_tool_override_known(self):
1185
conf = self._get_empty_config()
1186
conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
1187
cmdline = conf.find_merge_tool('kdiff3')
1188
self.assertEqual('kdiff3 blah', cmdline)
1191
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
1193
def test_empty(self):
1194
my_config = config.GlobalConfig()
1195
self.assertEqual(0, len(my_config.get_aliases()))
1197
def test_set_alias(self):
1198
my_config = config.GlobalConfig()
1199
alias_value = 'commit --strict'
1200
my_config.set_alias('commit', alias_value)
1201
new_config = config.GlobalConfig()
1202
self.assertEqual(alias_value, new_config.get_alias('commit'))
1204
def test_remove_alias(self):
1205
my_config = config.GlobalConfig()
1206
my_config.set_alias('commit', 'commit --strict')
1207
# Now remove the alias again.
1208
my_config.unset_alias('commit')
1209
new_config = config.GlobalConfig()
1210
self.assertIs(None, new_config.get_alias('commit'))
1213
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
1215
def test_constructs_valid(self):
1216
config.LocationConfig('http://example.com')
1218
def test_constructs_error(self):
1219
self.assertRaises(TypeError, config.LocationConfig)
1221
def test_branch_calls_read_filenames(self):
1222
# This is testing the correct file names are provided.
1223
# TODO: consolidate with the test for GlobalConfigs filename checks.
1225
# replace the class that is constructed, to check its parameters
1226
oldparserclass = config.ConfigObj
1227
config.ConfigObj = InstrumentedConfigObj
1229
my_config = config.LocationConfig('http://www.example.com')
1230
parser = my_config._get_parser()
1232
config.ConfigObj = oldparserclass
1233
self.assertIsInstance(parser, InstrumentedConfigObj)
1234
self.assertEqual(parser._calls,
1235
[('__init__', config.locations_config_filename(),
1238
def test_get_global_config(self):
1239
my_config = config.BranchConfig(FakeBranch('http://example.com'))
1240
global_config = my_config._get_global_config()
1241
self.assertIsInstance(global_config, config.GlobalConfig)
1242
self.assertIs(global_config, my_config._get_global_config())
1244
def assertLocationMatching(self, expected):
1245
self.assertEqual(expected,
1246
list(self.my_location_config._get_matching_sections()))
1248
def test__get_matching_sections_no_match(self):
1249
self.get_branch_config('/')
1250
self.assertLocationMatching([])
1252
def test__get_matching_sections_exact(self):
1253
self.get_branch_config('http://www.example.com')
1254
self.assertLocationMatching([('http://www.example.com', '')])
1256
def test__get_matching_sections_suffix_does_not(self):
1257
self.get_branch_config('http://www.example.com-com')
1258
self.assertLocationMatching([])
1260
def test__get_matching_sections_subdir_recursive(self):
1261
self.get_branch_config('http://www.example.com/com')
1262
self.assertLocationMatching([('http://www.example.com', 'com')])
1264
def test__get_matching_sections_ignoreparent(self):
1265
self.get_branch_config('http://www.example.com/ignoreparent')
1266
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1269
def test__get_matching_sections_ignoreparent_subdir(self):
1270
self.get_branch_config(
1271
'http://www.example.com/ignoreparent/childbranch')
1272
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1275
def test__get_matching_sections_subdir_trailing_slash(self):
1276
self.get_branch_config('/b')
1277
self.assertLocationMatching([('/b/', '')])
1279
def test__get_matching_sections_subdir_child(self):
1280
self.get_branch_config('/a/foo')
1281
self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
1283
def test__get_matching_sections_subdir_child_child(self):
1284
self.get_branch_config('/a/foo/bar')
1285
self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
1287
def test__get_matching_sections_trailing_slash_with_children(self):
1288
self.get_branch_config('/a/')
1289
self.assertLocationMatching([('/a/', '')])
1291
def test__get_matching_sections_explicit_over_glob(self):
1292
# XXX: 2006-09-08 jamesh
1293
# This test only passes because ord('c') > ord('*'). If there
1294
# was a config section for '/a/?', it would get precedence
1296
self.get_branch_config('/a/c')
1297
self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
1299
def test__get_option_policy_normal(self):
1300
self.get_branch_config('http://www.example.com')
1302
self.my_location_config._get_config_policy(
1303
'http://www.example.com', 'normal_option'),
1306
def test__get_option_policy_norecurse(self):
1307
self.get_branch_config('http://www.example.com')
1309
self.my_location_config._get_option_policy(
1310
'http://www.example.com', 'norecurse_option'),
1311
config.POLICY_NORECURSE)
1312
# Test old recurse=False setting:
1314
self.my_location_config._get_option_policy(
1315
'http://www.example.com/norecurse', 'normal_option'),
1316
config.POLICY_NORECURSE)
1318
def test__get_option_policy_normal(self):
1319
self.get_branch_config('http://www.example.com')
1321
self.my_location_config._get_option_policy(
1322
'http://www.example.com', 'appendpath_option'),
1323
config.POLICY_APPENDPATH)
1325
def test__get_options_with_policy(self):
1326
self.get_branch_config('/dir/subdir',
1327
location_config="""\
1329
other_url = /other-dir
1330
other_url:policy = appendpath
1332
other_url = /other-subdir
1335
[(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
1336
(u'other_url', u'/other-dir', u'/dir', 'locations'),
1337
(u'other_url:policy', u'appendpath', u'/dir', 'locations')],
1338
self.my_location_config)
1340
def test_location_without_username(self):
1341
self.get_branch_config('http://www.example.com/ignoreparent')
1342
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1343
self.my_config.username())
1345
def test_location_not_listed(self):
1346
"""Test that the global username is used when no location matches"""
1347
self.get_branch_config('/home/robertc/sources')
1348
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1349
self.my_config.username())
1351
def test_overriding_location(self):
1352
self.get_branch_config('http://www.example.com/foo')
1353
self.assertEqual('Robert Collins <robertc@example.org>',
1354
self.my_config.username())
1356
def test_get_user_option_global(self):
1357
self.get_branch_config('/a')
1358
self.assertEqual('something',
1359
self.my_config.get_user_option('user_global_option'))
1361
def test_get_user_option_local(self):
1362
self.get_branch_config('/a')
1363
self.assertEqual('local',
1364
self.my_config.get_user_option('user_local_option'))
1366
def test_get_user_option_appendpath(self):
1367
# returned as is for the base path:
1368
self.get_branch_config('http://www.example.com')
1369
self.assertEqual('append',
1370
self.my_config.get_user_option('appendpath_option'))
1371
# Extra path components get appended:
1372
self.get_branch_config('http://www.example.com/a/b/c')
1373
self.assertEqual('append/a/b/c',
1374
self.my_config.get_user_option('appendpath_option'))
1375
# Overriden for http://www.example.com/dir, where it is a
1377
self.get_branch_config('http://www.example.com/dir/a/b/c')
1378
self.assertEqual('normal',
1379
self.my_config.get_user_option('appendpath_option'))
1381
def test_get_user_option_norecurse(self):
1382
self.get_branch_config('http://www.example.com')
1383
self.assertEqual('norecurse',
1384
self.my_config.get_user_option('norecurse_option'))
1385
self.get_branch_config('http://www.example.com/dir')
1386
self.assertEqual(None,
1387
self.my_config.get_user_option('norecurse_option'))
1388
# http://www.example.com/norecurse is a recurse=False section
1389
# that redefines normal_option. Subdirectories do not pick up
1390
# this redefinition.
1391
self.get_branch_config('http://www.example.com/norecurse')
1392
self.assertEqual('norecurse',
1393
self.my_config.get_user_option('normal_option'))
1394
self.get_branch_config('http://www.example.com/norecurse/subdir')
1395
self.assertEqual('normal',
1396
self.my_config.get_user_option('normal_option'))
1398
def test_set_user_option_norecurse(self):
1399
self.get_branch_config('http://www.example.com')
1400
self.my_config.set_user_option('foo', 'bar',
1401
store=config.STORE_LOCATION_NORECURSE)
1403
self.my_location_config._get_option_policy(
1404
'http://www.example.com', 'foo'),
1405
config.POLICY_NORECURSE)
1407
def test_set_user_option_appendpath(self):
1408
self.get_branch_config('http://www.example.com')
1409
self.my_config.set_user_option('foo', 'bar',
1410
store=config.STORE_LOCATION_APPENDPATH)
1412
self.my_location_config._get_option_policy(
1413
'http://www.example.com', 'foo'),
1414
config.POLICY_APPENDPATH)
1416
def test_set_user_option_change_policy(self):
1417
self.get_branch_config('http://www.example.com')
1418
self.my_config.set_user_option('norecurse_option', 'normal',
1419
store=config.STORE_LOCATION)
1421
self.my_location_config._get_option_policy(
1422
'http://www.example.com', 'norecurse_option'),
1425
def get_branch_config(self, location, global_config=None,
1426
location_config=None):
1427
my_branch = FakeBranch(location)
1428
if global_config is None:
1429
global_config = sample_config_text
1430
if location_config is None:
1431
location_config = sample_branches_text
1433
config.GlobalConfig.from_string(global_config, save=True)
1434
config.LocationConfig.from_string(location_config, my_branch.base,
1436
my_config = config.BranchConfig(my_branch)
1437
self.my_config = my_config
1438
self.my_location_config = my_config._get_location_config()
1440
def test_set_user_setting_sets_and_saves2(self):
1441
self.get_branch_config('/a/c')
1442
self.assertIs(self.my_config.get_user_option('foo'), None)
1443
self.my_config.set_user_option('foo', 'bar')
1445
self.my_config.branch.control_files.files['branch.conf'].strip(),
1447
self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
1448
self.my_config.set_user_option('foo', 'baz',
1449
store=config.STORE_LOCATION)
1450
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1451
self.my_config.set_user_option('foo', 'qux')
1452
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1454
def test_get_bzr_remote_path(self):
1455
my_config = config.LocationConfig('/a/c')
1456
self.assertEqual('bzr', my_config.get_bzr_remote_path())
1457
my_config.set_user_option('bzr_remote_path', '/path-bzr')
1458
self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
1459
self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
1460
self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
1463
precedence_global = 'option = global'
1464
precedence_branch = 'option = branch'
1465
precedence_location = """
1469
[http://example.com/specific]
1473
class TestBranchConfigItems(tests.TestCaseInTempDir):
1475
def get_branch_config(self, global_config=None, location=None,
1476
location_config=None, branch_data_config=None):
1477
my_branch = FakeBranch(location)
1478
if global_config is not None:
1479
config.GlobalConfig.from_string(global_config, save=True)
1480
if location_config is not None:
1481
config.LocationConfig.from_string(location_config, my_branch.base,
1483
my_config = config.BranchConfig(my_branch)
1484
if branch_data_config is not None:
1485
my_config.branch.control_files.files['branch.conf'] = \
1489
def test_user_id(self):
1490
branch = FakeBranch()
1491
my_config = config.BranchConfig(branch)
1492
self.assertIsNot(None, my_config.username())
1493
my_config.branch.control_files.files['email'] = "John"
1494
my_config.set_user_option('email',
1495
"Robert Collins <robertc@example.org>")
1496
self.assertEqual("Robert Collins <robertc@example.org>",
1497
my_config.username())
1499
def test_BRZ_EMAIL_OVERRIDES(self):
1500
self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
1501
branch = FakeBranch()
1502
my_config = config.BranchConfig(branch)
1503
self.assertEqual("Robert Collins <robertc@example.org>",
1504
my_config.username())
1506
def test_get_user_option_global(self):
1507
my_config = self.get_branch_config(global_config=sample_config_text)
1508
self.assertEqual('something',
1509
my_config.get_user_option('user_global_option'))
1511
def test_config_precedence(self):
1512
# FIXME: eager test, luckily no persitent config file makes it fail
1514
my_config = self.get_branch_config(global_config=precedence_global)
1515
self.assertEqual(my_config.get_user_option('option'), 'global')
1516
my_config = self.get_branch_config(global_config=precedence_global,
1517
branch_data_config=precedence_branch)
1518
self.assertEqual(my_config.get_user_option('option'), 'branch')
1519
my_config = self.get_branch_config(
1520
global_config=precedence_global,
1521
branch_data_config=precedence_branch,
1522
location_config=precedence_location)
1523
self.assertEqual(my_config.get_user_option('option'), 'recurse')
1524
my_config = self.get_branch_config(
1525
global_config=precedence_global,
1526
branch_data_config=precedence_branch,
1527
location_config=precedence_location,
1528
location='http://example.com/specific')
1529
self.assertEqual(my_config.get_user_option('option'), 'exact')
1532
class TestMailAddressExtraction(tests.TestCase):
1534
def test_extract_email_address(self):
1535
self.assertEqual('jane@test.com',
1536
config.extract_email_address('Jane <jane@test.com>'))
1537
self.assertRaises(errors.NoEmailInUsername,
1538
config.extract_email_address, 'Jane Tester')
1540
def test_parse_username(self):
1541
self.assertEqual(('', 'jdoe@example.com'),
1542
config.parse_username('jdoe@example.com'))
1543
self.assertEqual(('', 'jdoe@example.com'),
1544
config.parse_username('<jdoe@example.com>'))
1545
self.assertEqual(('John Doe', 'jdoe@example.com'),
1546
config.parse_username('John Doe <jdoe@example.com>'))
1547
self.assertEqual(('John Doe', ''),
1548
config.parse_username('John Doe'))
1549
self.assertEqual(('John Doe', 'jdoe@example.com'),
1550
config.parse_username('John Doe jdoe@example.com'))
1552
class TestTreeConfig(tests.TestCaseWithTransport):
1554
def test_get_value(self):
1555
"""Test that retreiving a value from a section is possible"""
1556
branch = self.make_branch('.')
1557
tree_config = config.TreeConfig(branch)
1558
tree_config.set_option('value', 'key', 'SECTION')
1559
tree_config.set_option('value2', 'key2')
1560
tree_config.set_option('value3-top', 'key3')
1561
tree_config.set_option('value3-section', 'key3', 'SECTION')
1562
value = tree_config.get_option('key', 'SECTION')
1563
self.assertEqual(value, 'value')
1564
value = tree_config.get_option('key2')
1565
self.assertEqual(value, 'value2')
1566
self.assertEqual(tree_config.get_option('non-existant'), None)
1567
value = tree_config.get_option('non-existant', 'SECTION')
1568
self.assertEqual(value, None)
1569
value = tree_config.get_option('non-existant', default='default')
1570
self.assertEqual(value, 'default')
1571
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1572
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1573
self.assertEqual(value, 'default')
1574
value = tree_config.get_option('key3')
1575
self.assertEqual(value, 'value3-top')
1576
value = tree_config.get_option('key3', 'SECTION')
1577
self.assertEqual(value, 'value3-section')
1580
class TestTransportConfig(tests.TestCaseWithTransport):
1582
def test_load_utf8(self):
1583
"""Ensure we can load an utf8-encoded file."""
1584
t = self.get_transport()
1585
unicode_user = u'b\N{Euro Sign}ar'
1586
unicode_content = u'user=%s' % (unicode_user,)
1587
utf8_content = unicode_content.encode('utf8')
1588
# Store the raw content in the config file
1589
t.put_bytes('foo.conf', utf8_content)
1590
conf = config.TransportConfig(t, 'foo.conf')
1591
self.assertEqual(unicode_user, conf.get_option('user'))
1593
def test_load_non_ascii(self):
1594
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
1595
t = self.get_transport()
1596
t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
1597
conf = config.TransportConfig(t, 'foo.conf')
1598
self.assertRaises(errors.ConfigContentError, conf._get_configobj)
1600
def test_load_erroneous_content(self):
1601
"""Ensure we display a proper error on content that can't be parsed."""
1602
t = self.get_transport()
1603
t.put_bytes('foo.conf', '[open_section\n')
1604
conf = config.TransportConfig(t, 'foo.conf')
1605
self.assertRaises(errors.ParseConfigError, conf._get_configobj)
1607
def test_load_permission_denied(self):
1608
"""Ensure we get an empty config file if the file is inaccessible."""
1611
warnings.append(args[0] % args[1:])
1612
self.overrideAttr(trace, 'warning', warning)
1614
class DenyingTransport(object):
1616
def __init__(self, base):
1619
def get_bytes(self, relpath):
1620
raise errors.PermissionDenied(relpath, "")
1622
cfg = config.TransportConfig(
1623
DenyingTransport("nonexisting://"), 'control.conf')
1624
self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
1627
[u'Permission denied while trying to open configuration file '
1628
u'nonexisting:///control.conf.'])
1630
def test_get_value(self):
1631
"""Test that retreiving a value from a section is possible"""
1632
bzrdir_config = config.TransportConfig(self.get_transport('.'),
1634
bzrdir_config.set_option('value', 'key', 'SECTION')
1635
bzrdir_config.set_option('value2', 'key2')
1636
bzrdir_config.set_option('value3-top', 'key3')
1637
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
1638
value = bzrdir_config.get_option('key', 'SECTION')
1639
self.assertEqual(value, 'value')
1640
value = bzrdir_config.get_option('key2')
1641
self.assertEqual(value, 'value2')
1642
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
1643
value = bzrdir_config.get_option('non-existant', 'SECTION')
1644
self.assertEqual(value, None)
1645
value = bzrdir_config.get_option('non-existant', default='default')
1646
self.assertEqual(value, 'default')
1647
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
1648
value = bzrdir_config.get_option('key2', 'NOSECTION',
1650
self.assertEqual(value, 'default')
1651
value = bzrdir_config.get_option('key3')
1652
self.assertEqual(value, 'value3-top')
1653
value = bzrdir_config.get_option('key3', 'SECTION')
1654
self.assertEqual(value, 'value3-section')
1656
def test_set_unset_default_stack_on(self):
1657
my_dir = self.make_controldir('.')
1658
bzrdir_config = config.BzrDirConfig(my_dir)
1659
self.assertIs(None, bzrdir_config.get_default_stack_on())
1660
bzrdir_config.set_default_stack_on('Foo')
1661
self.assertEqual('Foo', bzrdir_config._config.get_option(
1662
'default_stack_on'))
1663
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
1664
bzrdir_config.set_default_stack_on(None)
1665
self.assertIs(None, bzrdir_config.get_default_stack_on())
1668
class TestOldConfigHooks(tests.TestCaseWithTransport):
1671
super(TestOldConfigHooks, self).setUp()
1672
create_configs_with_file_option(self)
1674
def assertGetHook(self, conf, name, value):
1678
config.OldConfigHooks.install_named_hook('get', hook, None)
1680
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1681
self.assertLength(0, calls)
1682
actual_value = conf.get_user_option(name)
1683
self.assertEqual(value, actual_value)
1684
self.assertLength(1, calls)
1685
self.assertEqual((conf, name, value), calls[0])
1687
def test_get_hook_bazaar(self):
1688
self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
1690
def test_get_hook_locations(self):
1691
self.assertGetHook(self.locations_config, 'file', 'locations')
1693
def test_get_hook_branch(self):
1694
# Since locations masks branch, we define a different option
1695
self.branch_config.set_user_option('file2', 'branch')
1696
self.assertGetHook(self.branch_config, 'file2', 'branch')
1698
def assertSetHook(self, conf, name, value):
1702
config.OldConfigHooks.install_named_hook('set', hook, None)
1704
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1705
self.assertLength(0, calls)
1706
conf.set_user_option(name, value)
1707
self.assertLength(1, calls)
1708
# We can't assert the conf object below as different configs use
1709
# different means to implement set_user_option and we care only about
1711
self.assertEqual((name, value), calls[0][1:])
1713
def test_set_hook_bazaar(self):
1714
self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
1716
def test_set_hook_locations(self):
1717
self.assertSetHook(self.locations_config, 'foo', 'locations')
1719
def test_set_hook_branch(self):
1720
self.assertSetHook(self.branch_config, 'foo', 'branch')
1722
def assertRemoveHook(self, conf, name, section_name=None):
1726
config.OldConfigHooks.install_named_hook('remove', hook, None)
1728
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1729
self.assertLength(0, calls)
1730
conf.remove_user_option(name, section_name)
1731
self.assertLength(1, calls)
1732
# We can't assert the conf object below as different configs use
1733
# different means to implement remove_user_option and we care only about
1735
self.assertEqual((name,), calls[0][1:])
1737
def test_remove_hook_bazaar(self):
1738
self.assertRemoveHook(self.bazaar_config, 'file')
1740
def test_remove_hook_locations(self):
1741
self.assertRemoveHook(self.locations_config, 'file',
1742
self.locations_config.location)
1744
def test_remove_hook_branch(self):
1745
self.assertRemoveHook(self.branch_config, 'file')
1747
def assertLoadHook(self, name, conf_class, *conf_args):
1751
config.OldConfigHooks.install_named_hook('load', hook, None)
1753
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1754
self.assertLength(0, calls)
1756
conf = conf_class(*conf_args)
1757
# Access an option to trigger a load
1758
conf.get_user_option(name)
1759
self.assertLength(1, calls)
1760
# Since we can't assert about conf, we just use the number of calls ;-/
1762
def test_load_hook_bazaar(self):
1763
self.assertLoadHook('file', config.GlobalConfig)
1765
def test_load_hook_locations(self):
1766
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1768
def test_load_hook_branch(self):
1769
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1771
def assertSaveHook(self, conf):
1775
config.OldConfigHooks.install_named_hook('save', hook, None)
1777
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1778
self.assertLength(0, calls)
1779
# Setting an option triggers a save
1780
conf.set_user_option('foo', 'bar')
1781
self.assertLength(1, calls)
1782
# Since we can't assert about conf, we just use the number of calls ;-/
1784
def test_save_hook_bazaar(self):
1785
self.assertSaveHook(self.bazaar_config)
1787
def test_save_hook_locations(self):
1788
self.assertSaveHook(self.locations_config)
1790
def test_save_hook_branch(self):
1791
self.assertSaveHook(self.branch_config)
1794
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1795
"""Tests config hooks for remote configs.
1797
No tests for the remove hook as this is not implemented there.
1801
super(TestOldConfigHooksForRemote, self).setUp()
1802
self.transport_server = test_server.SmartTCPServer_for_testing
1803
create_configs_with_file_option(self)
1805
def assertGetHook(self, conf, name, value):
1809
config.OldConfigHooks.install_named_hook('get', hook, None)
1811
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1812
self.assertLength(0, calls)
1813
actual_value = conf.get_option(name)
1814
self.assertEqual(value, actual_value)
1815
self.assertLength(1, calls)
1816
self.assertEqual((conf, name, value), calls[0])
1818
def test_get_hook_remote_branch(self):
1819
remote_branch = branch.Branch.open(self.get_url('tree'))
1820
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1822
def test_get_hook_remote_bzrdir(self):
1823
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1824
conf = remote_bzrdir._get_config()
1825
conf.set_option('remotedir', 'file')
1826
self.assertGetHook(conf, 'file', 'remotedir')
1828
def assertSetHook(self, conf, name, value):
1832
config.OldConfigHooks.install_named_hook('set', hook, None)
1834
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1835
self.assertLength(0, calls)
1836
conf.set_option(value, name)
1837
self.assertLength(1, calls)
1838
# We can't assert the conf object below as different configs use
1839
# different means to implement set_user_option and we care only about
1841
self.assertEqual((name, value), calls[0][1:])
1843
def test_set_hook_remote_branch(self):
1844
remote_branch = branch.Branch.open(self.get_url('tree'))
1845
self.addCleanup(remote_branch.lock_write().unlock)
1846
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1848
def test_set_hook_remote_bzrdir(self):
1849
remote_branch = branch.Branch.open(self.get_url('tree'))
1850
self.addCleanup(remote_branch.lock_write().unlock)
1851
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1852
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1854
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1858
config.OldConfigHooks.install_named_hook('load', hook, None)
1860
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1861
self.assertLength(0, calls)
1863
conf = conf_class(*conf_args)
1864
# Access an option to trigger a load
1865
conf.get_option(name)
1866
self.assertLength(expected_nb_calls, calls)
1867
# Since we can't assert about conf, we just use the number of calls ;-/
1869
def test_load_hook_remote_branch(self):
1870
remote_branch = branch.Branch.open(self.get_url('tree'))
1871
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
1873
def test_load_hook_remote_bzrdir(self):
1874
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1875
# The config file doesn't exist, set an option to force its creation
1876
conf = remote_bzrdir._get_config()
1877
conf.set_option('remotedir', 'file')
1878
# We get one call for the server and one call for the client, this is
1879
# caused by the differences in implementations betwen
1880
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1881
# SmartServerBranchGetConfigFile (in smart/branch.py)
1882
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1884
def assertSaveHook(self, conf):
1888
config.OldConfigHooks.install_named_hook('save', hook, None)
1890
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1891
self.assertLength(0, calls)
1892
# Setting an option triggers a save
1893
conf.set_option('foo', 'bar')
1894
self.assertLength(1, calls)
1895
# Since we can't assert about conf, we just use the number of calls ;-/
1897
def test_save_hook_remote_branch(self):
1898
remote_branch = branch.Branch.open(self.get_url('tree'))
1899
self.addCleanup(remote_branch.lock_write().unlock)
1900
self.assertSaveHook(remote_branch._get_config())
1902
def test_save_hook_remote_bzrdir(self):
1903
remote_branch = branch.Branch.open(self.get_url('tree'))
1904
self.addCleanup(remote_branch.lock_write().unlock)
1905
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1906
self.assertSaveHook(remote_bzrdir._get_config())
1909
class TestOptionNames(tests.TestCase):
1911
def is_valid(self, name):
1912
return config._option_ref_re.match('{%s}' % name) is not None
1914
def test_valid_names(self):
1915
self.assertTrue(self.is_valid('foo'))
1916
self.assertTrue(self.is_valid('foo.bar'))
1917
self.assertTrue(self.is_valid('f1'))
1918
self.assertTrue(self.is_valid('_'))
1919
self.assertTrue(self.is_valid('__bar__'))
1920
self.assertTrue(self.is_valid('a_'))
1921
self.assertTrue(self.is_valid('a1'))
1922
# Don't break bzr-svn for no good reason
1923
self.assertTrue(self.is_valid('guessed-layout'))
1925
def test_invalid_names(self):
1926
self.assertFalse(self.is_valid(' foo'))
1927
self.assertFalse(self.is_valid('foo '))
1928
self.assertFalse(self.is_valid('1'))
1929
self.assertFalse(self.is_valid('1,2'))
1930
self.assertFalse(self.is_valid('foo$'))
1931
self.assertFalse(self.is_valid('!foo'))
1932
self.assertFalse(self.is_valid('foo.'))
1933
self.assertFalse(self.is_valid('foo..bar'))
1934
self.assertFalse(self.is_valid('{}'))
1935
self.assertFalse(self.is_valid('{a}'))
1936
self.assertFalse(self.is_valid('a\n'))
1937
self.assertFalse(self.is_valid('-'))
1938
self.assertFalse(self.is_valid('-a'))
1939
self.assertFalse(self.is_valid('a-'))
1940
self.assertFalse(self.is_valid('a--a'))
1942
def assertSingleGroup(self, reference):
1943
# the regexp is used with split and as such should match the reference
1944
# *only*, if more groups needs to be defined, (?:...) should be used.
1945
m = config._option_ref_re.match('{a}')
1946
self.assertLength(1, m.groups())
1948
def test_valid_references(self):
1949
self.assertSingleGroup('{a}')
1950
self.assertSingleGroup('{{a}}')
1953
class TestOption(tests.TestCase):
1955
def test_default_value(self):
1956
opt = config.Option('foo', default='bar')
1957
self.assertEqual('bar', opt.get_default())
1959
def test_callable_default_value(self):
1960
def bar_as_unicode():
1962
opt = config.Option('foo', default=bar_as_unicode)
1963
self.assertEqual('bar', opt.get_default())
1965
def test_default_value_from_env(self):
1966
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
1967
self.overrideEnv('FOO', 'quux')
1968
# Env variable provides a default taking over the option one
1969
self.assertEqual('quux', opt.get_default())
1971
def test_first_default_value_from_env_wins(self):
1972
opt = config.Option('foo', default='bar',
1973
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
1974
self.overrideEnv('FOO', 'foo')
1975
self.overrideEnv('BAZ', 'baz')
1976
# The first env var set wins
1977
self.assertEqual('foo', opt.get_default())
1979
def test_not_supported_list_default_value(self):
1980
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
1982
def test_not_supported_object_default_value(self):
1983
self.assertRaises(AssertionError, config.Option, 'foo',
1986
def test_not_supported_callable_default_value_not_unicode(self):
1987
def bar_not_unicode():
1989
opt = config.Option('foo', default=bar_not_unicode)
1990
self.assertRaises(AssertionError, opt.get_default)
1992
def test_get_help_topic(self):
1993
opt = config.Option('foo')
1994
self.assertEqual('foo', opt.get_help_topic())
1997
class TestOptionConverter(tests.TestCase):
1999
def assertConverted(self, expected, opt, value):
2000
self.assertEqual(expected, opt.convert_from_unicode(None, value))
2002
def assertCallsWarning(self, opt, value):
2006
warnings.append(args[0] % args[1:])
2007
self.overrideAttr(trace, 'warning', warning)
2008
self.assertEqual(None, opt.convert_from_unicode(None, value))
2009
self.assertLength(1, warnings)
2011
'Value "%s" is not valid for "%s"' % (value, opt.name),
2014
def assertCallsError(self, opt, value):
2015
self.assertRaises(errors.ConfigOptionValueError,
2016
opt.convert_from_unicode, None, value)
2018
def assertConvertInvalid(self, opt, invalid_value):
2020
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2021
opt.invalid = 'warning'
2022
self.assertCallsWarning(opt, invalid_value)
2023
opt.invalid = 'error'
2024
self.assertCallsError(opt, invalid_value)
2027
class TestOptionWithBooleanConverter(TestOptionConverter):
2029
def get_option(self):
2030
return config.Option('foo', help='A boolean.',
2031
from_unicode=config.bool_from_store)
2033
def test_convert_invalid(self):
2034
opt = self.get_option()
2035
# A string that is not recognized as a boolean
2036
self.assertConvertInvalid(opt, u'invalid-boolean')
2037
# A list of strings is never recognized as a boolean
2038
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2040
def test_convert_valid(self):
2041
opt = self.get_option()
2042
self.assertConverted(True, opt, u'True')
2043
self.assertConverted(True, opt, u'1')
2044
self.assertConverted(False, opt, u'False')
2047
class TestOptionWithIntegerConverter(TestOptionConverter):
2049
def get_option(self):
2050
return config.Option('foo', help='An integer.',
2051
from_unicode=config.int_from_store)
2053
def test_convert_invalid(self):
2054
opt = self.get_option()
2055
# A string that is not recognized as an integer
2056
self.assertConvertInvalid(opt, u'forty-two')
2057
# A list of strings is never recognized as an integer
2058
self.assertConvertInvalid(opt, [u'a', u'list'])
2060
def test_convert_valid(self):
2061
opt = self.get_option()
2062
self.assertConverted(16, opt, u'16')
2065
class TestOptionWithSIUnitConverter(TestOptionConverter):
2067
def get_option(self):
2068
return config.Option('foo', help='An integer in SI units.',
2069
from_unicode=config.int_SI_from_store)
2071
def test_convert_invalid(self):
2072
opt = self.get_option()
2073
self.assertConvertInvalid(opt, u'not-a-unit')
2074
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2075
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2076
self.assertConvertInvalid(opt, u'1GG')
2077
self.assertConvertInvalid(opt, u'1Mbb')
2078
self.assertConvertInvalid(opt, u'1MM')
2080
def test_convert_valid(self):
2081
opt = self.get_option()
2082
self.assertConverted(int(5e3), opt, u'5kb')
2083
self.assertConverted(int(5e6), opt, u'5M')
2084
self.assertConverted(int(5e6), opt, u'5MB')
2085
self.assertConverted(int(5e9), opt, u'5g')
2086
self.assertConverted(int(5e9), opt, u'5gB')
2087
self.assertConverted(100, opt, u'100')
2090
class TestListOption(TestOptionConverter):
2092
def get_option(self):
2093
return config.ListOption('foo', help='A list.')
2095
def test_convert_invalid(self):
2096
opt = self.get_option()
2097
# We don't even try to convert a list into a list, we only expect
2099
self.assertConvertInvalid(opt, [1])
2100
# No string is invalid as all forms can be converted to a list
2102
def test_convert_valid(self):
2103
opt = self.get_option()
2104
# An empty string is an empty list
2105
self.assertConverted([], opt, '') # Using a bare str() just in case
2106
self.assertConverted([], opt, u'')
2108
self.assertConverted([u'True'], opt, u'True')
2110
self.assertConverted([u'42'], opt, u'42')
2112
self.assertConverted([u'bar'], opt, u'bar')
2115
class TestRegistryOption(TestOptionConverter):
2117
def get_option(self, registry):
2118
return config.RegistryOption('foo', registry,
2119
help='A registry option.')
2121
def test_convert_invalid(self):
2122
registry = _mod_registry.Registry()
2123
opt = self.get_option(registry)
2124
self.assertConvertInvalid(opt, [1])
2125
self.assertConvertInvalid(opt, u"notregistered")
2127
def test_convert_valid(self):
2128
registry = _mod_registry.Registry()
2129
registry.register("someval", 1234)
2130
opt = self.get_option(registry)
2131
# Using a bare str() just in case
2132
self.assertConverted(1234, opt, "someval")
2133
self.assertConverted(1234, opt, u'someval')
2134
self.assertConverted(None, opt, None)
2136
def test_help(self):
2137
registry = _mod_registry.Registry()
2138
registry.register("someval", 1234, help="some option")
2139
registry.register("dunno", 1234, help="some other option")
2140
opt = self.get_option(registry)
2142
'A registry option.\n'
2144
'The following values are supported:\n'
2145
' dunno - some other option\n'
2146
' someval - some option\n',
2149
def test_get_help_text(self):
2150
registry = _mod_registry.Registry()
2151
registry.register("someval", 1234, help="some option")
2152
registry.register("dunno", 1234, help="some other option")
2153
opt = self.get_option(registry)
2155
'A registry option.\n'
2157
'The following values are supported:\n'
2158
' dunno - some other option\n'
2159
' someval - some option\n',
2160
opt.get_help_text())
2163
class TestOptionRegistry(tests.TestCase):
2166
super(TestOptionRegistry, self).setUp()
2167
# Always start with an empty registry
2168
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2169
self.registry = config.option_registry
2171
def test_register(self):
2172
opt = config.Option('foo')
2173
self.registry.register(opt)
2174
self.assertIs(opt, self.registry.get('foo'))
2176
def test_registered_help(self):
2177
opt = config.Option('foo', help='A simple option')
2178
self.registry.register(opt)
2179
self.assertEqual('A simple option', self.registry.get_help('foo'))
2181
def test_dont_register_illegal_name(self):
2182
self.assertRaises(errors.IllegalOptionName,
2183
self.registry.register, config.Option(' foo'))
2184
self.assertRaises(errors.IllegalOptionName,
2185
self.registry.register, config.Option('bar,'))
2187
lazy_option = config.Option('lazy_foo', help='Lazy help')
2189
def test_register_lazy(self):
2190
self.registry.register_lazy('lazy_foo', self.__module__,
2191
'TestOptionRegistry.lazy_option')
2192
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2194
def test_registered_lazy_help(self):
2195
self.registry.register_lazy('lazy_foo', self.__module__,
2196
'TestOptionRegistry.lazy_option')
2197
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2199
def test_dont_lazy_register_illegal_name(self):
2200
# This is where the root cause of http://pad.lv/1235099 is better
2201
# understood: 'register_lazy' doc string mentions that key should match
2202
# the option name which indirectly requires that the option name is a
2203
# valid python identifier. We violate that rule here (using a key that
2204
# doesn't match the option name) to test the option name checking.
2205
self.assertRaises(errors.IllegalOptionName,
2206
self.registry.register_lazy, ' foo', self.__module__,
2207
'TestOptionRegistry.lazy_option')
2208
self.assertRaises(errors.IllegalOptionName,
2209
self.registry.register_lazy, '1,2', self.__module__,
2210
'TestOptionRegistry.lazy_option')
2213
class TestRegisteredOptions(tests.TestCase):
2214
"""All registered options should verify some constraints."""
2216
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2217
in config.option_registry.iteritems()]
2220
super(TestRegisteredOptions, self).setUp()
2221
self.registry = config.option_registry
2223
def test_proper_name(self):
2224
# An option should be registered under its own name, this can't be
2225
# checked at registration time for the lazy ones.
2226
self.assertEqual(self.option_name, self.option.name)
2228
def test_help_is_set(self):
2229
option_help = self.registry.get_help(self.option_name)
2230
# Come on, think about the user, he really wants to know what the
2232
self.assertIsNot(None, option_help)
2233
self.assertNotEqual('', option_help)
2236
class TestSection(tests.TestCase):
2238
# FIXME: Parametrize so that all sections produced by Stores run these
2239
# tests -- vila 2011-04-01
2241
def test_get_a_value(self):
2242
a_dict = dict(foo='bar')
2243
section = config.Section('myID', a_dict)
2244
self.assertEqual('bar', section.get('foo'))
2246
def test_get_unknown_option(self):
2248
section = config.Section(None, a_dict)
2249
self.assertEqual('out of thin air',
2250
section.get('foo', 'out of thin air'))
2252
def test_options_is_shared(self):
2254
section = config.Section(None, a_dict)
2255
self.assertIs(a_dict, section.options)
2258
class TestMutableSection(tests.TestCase):
2260
scenarios = [('mutable',
2262
lambda opts: config.MutableSection('myID', opts)},),
2266
a_dict = dict(foo='bar')
2267
section = self.get_section(a_dict)
2268
section.set('foo', 'new_value')
2269
self.assertEqual('new_value', section.get('foo'))
2270
# The change appears in the shared section
2271
self.assertEqual('new_value', a_dict.get('foo'))
2272
# We keep track of the change
2273
self.assertTrue('foo' in section.orig)
2274
self.assertEqual('bar', section.orig.get('foo'))
2276
def test_set_preserve_original_once(self):
2277
a_dict = dict(foo='bar')
2278
section = self.get_section(a_dict)
2279
section.set('foo', 'first_value')
2280
section.set('foo', 'second_value')
2281
# We keep track of the original value
2282
self.assertTrue('foo' in section.orig)
2283
self.assertEqual('bar', section.orig.get('foo'))
2285
def test_remove(self):
2286
a_dict = dict(foo='bar')
2287
section = self.get_section(a_dict)
2288
section.remove('foo')
2289
# We get None for unknown options via the default value
2290
self.assertEqual(None, section.get('foo'))
2291
# Or we just get the default value
2292
self.assertEqual('unknown', section.get('foo', 'unknown'))
2293
self.assertFalse('foo' in section.options)
2294
# We keep track of the deletion
2295
self.assertTrue('foo' in section.orig)
2296
self.assertEqual('bar', section.orig.get('foo'))
2298
def test_remove_new_option(self):
2300
section = self.get_section(a_dict)
2301
section.set('foo', 'bar')
2302
section.remove('foo')
2303
self.assertFalse('foo' in section.options)
2304
# The option didn't exist initially so it we need to keep track of it
2305
# with a special value
2306
self.assertTrue('foo' in section.orig)
2307
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2310
class TestCommandLineStore(tests.TestCase):
2313
super(TestCommandLineStore, self).setUp()
2314
self.store = config.CommandLineStore()
2315
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2317
def get_section(self):
2318
"""Get the unique section for the command line overrides."""
2319
sections = list(self.store.get_sections())
2320
self.assertLength(1, sections)
2321
store, section = sections[0]
2322
self.assertEqual(self.store, store)
2325
def test_no_override(self):
2326
self.store._from_cmdline([])
2327
section = self.get_section()
2328
self.assertLength(0, list(section.iter_option_names()))
2330
def test_simple_override(self):
2331
self.store._from_cmdline(['a=b'])
2332
section = self.get_section()
2333
self.assertEqual('b', section.get('a'))
2335
def test_list_override(self):
2336
opt = config.ListOption('l')
2337
config.option_registry.register(opt)
2338
self.store._from_cmdline(['l=1,2,3'])
2339
val = self.get_section().get('l')
2340
self.assertEqual('1,2,3', val)
2341
# Reminder: lists should be registered as such explicitely, otherwise
2342
# the conversion needs to be done afterwards.
2343
self.assertEqual(['1', '2', '3'],
2344
opt.convert_from_unicode(self.store, val))
2346
def test_multiple_overrides(self):
2347
self.store._from_cmdline(['a=b', 'x=y'])
2348
section = self.get_section()
2349
self.assertEqual('b', section.get('a'))
2350
self.assertEqual('y', section.get('x'))
2352
def test_wrong_syntax(self):
2353
self.assertRaises(errors.BzrCommandError,
2354
self.store._from_cmdline, ['a=b', 'c'])
2356
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2358
scenarios = [(key, {'get_store': builder}) for key, builder
2359
in config.test_store_builder_registry.iteritems()] + [
2360
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2363
store = self.get_store(self)
2364
if isinstance(store, config.TransportIniFileStore):
2365
raise tests.TestNotApplicable(
2366
"%s is not a concrete Store implementation"
2367
" so it doesn't need an id" % (store.__class__.__name__,))
2368
self.assertIsNot(None, store.id)
2371
class TestStore(tests.TestCaseWithTransport):
2373
def assertSectionContent(self, expected, store_and_section):
2374
"""Assert that some options have the proper values in a section."""
2375
_, section = store_and_section
2376
expected_name, expected_options = expected
2377
self.assertEqual(expected_name, section.id)
2380
dict([(k, section.get(k)) for k in expected_options.keys()]))
2383
class TestReadonlyStore(TestStore):
2385
scenarios = [(key, {'get_store': builder}) for key, builder
2386
in config.test_store_builder_registry.iteritems()]
2388
def test_building_delays_load(self):
2389
store = self.get_store(self)
2390
self.assertEqual(False, store.is_loaded())
2391
store._load_from_string('')
2392
self.assertEqual(True, store.is_loaded())
2394
def test_get_no_sections_for_empty(self):
2395
store = self.get_store(self)
2396
store._load_from_string('')
2397
self.assertEqual([], list(store.get_sections()))
2399
def test_get_default_section(self):
2400
store = self.get_store(self)
2401
store._load_from_string('foo=bar')
2402
sections = list(store.get_sections())
2403
self.assertLength(1, sections)
2404
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2406
def test_get_named_section(self):
2407
store = self.get_store(self)
2408
store._load_from_string('[baz]\nfoo=bar')
2409
sections = list(store.get_sections())
2410
self.assertLength(1, sections)
2411
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2413
def test_load_from_string_fails_for_non_empty_store(self):
2414
store = self.get_store(self)
2415
store._load_from_string('foo=bar')
2416
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2419
class TestStoreQuoting(TestStore):
2421
scenarios = [(key, {'get_store': builder}) for key, builder
2422
in config.test_store_builder_registry.iteritems()]
2425
super(TestStoreQuoting, self).setUp()
2426
self.store = self.get_store(self)
2427
# We need a loaded store but any content will do
2428
self.store._load_from_string('')
2430
def assertIdempotent(self, s):
2431
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2433
What matters here is that option values, as they appear in a store, can
2434
be safely round-tripped out of the store and back.
2436
:param s: A string, quoted if required.
2438
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2439
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2441
def test_empty_string(self):
2442
if isinstance(self.store, config.IniFileStore):
2443
# configobj._quote doesn't handle empty values
2444
self.assertRaises(AssertionError,
2445
self.assertIdempotent, '')
2447
self.assertIdempotent('')
2448
# But quoted empty strings are ok
2449
self.assertIdempotent('""')
2451
def test_embedded_spaces(self):
2452
self.assertIdempotent('" a b c "')
2454
def test_embedded_commas(self):
2455
self.assertIdempotent('" a , b c "')
2457
def test_simple_comma(self):
2458
if isinstance(self.store, config.IniFileStore):
2459
# configobj requires that lists are special-cased
2460
self.assertRaises(AssertionError,
2461
self.assertIdempotent, ',')
2463
self.assertIdempotent(',')
2464
# When a single comma is required, quoting is also required
2465
self.assertIdempotent('","')
2467
def test_list(self):
2468
if isinstance(self.store, config.IniFileStore):
2469
# configobj requires that lists are special-cased
2470
self.assertRaises(AssertionError,
2471
self.assertIdempotent, 'a,b')
2473
self.assertIdempotent('a,b')
2476
class TestDictFromStore(tests.TestCase):
2478
def test_unquote_not_string(self):
2479
conf = config.MemoryStack('x=2\n[a_section]\na=1\n')
2480
value = conf.get('a_section')
2481
# Urgh, despite 'conf' asking for the no-name section, we get the
2482
# content of another section as a dict o_O
2483
self.assertEqual({'a': '1'}, value)
2484
unquoted = conf.store.unquote(value)
2485
# Which cannot be unquoted but shouldn't crash either (the use cases
2486
# are getting the value or displaying it. In the later case, '%s' will
2488
self.assertEqual({'a': '1'}, unquoted)
2489
self.assertEqual("{u'a': u'1'}", '%s' % (unquoted,))
2492
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2493
"""Simulate loading a config store with content of various encodings.
2495
All files produced by bzr are in utf8 content.
2497
Users may modify them manually and end up with a file that can't be
2498
loaded. We need to issue proper error messages in this case.
2501
invalid_utf8_char = '\xff'
2503
def test_load_utf8(self):
2504
"""Ensure we can load an utf8-encoded file."""
2505
t = self.get_transport()
2506
# From http://pad.lv/799212
2507
unicode_user = u'b\N{Euro Sign}ar'
2508
unicode_content = u'user=%s' % (unicode_user,)
2509
utf8_content = unicode_content.encode('utf8')
2510
# Store the raw content in the config file
2511
t.put_bytes('foo.conf', utf8_content)
2512
store = config.TransportIniFileStore(t, 'foo.conf')
2514
stack = config.Stack([store.get_sections], store)
2515
self.assertEqual(unicode_user, stack.get('user'))
2517
def test_load_non_ascii(self):
2518
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2519
t = self.get_transport()
2520
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2521
store = config.TransportIniFileStore(t, 'foo.conf')
2522
self.assertRaises(errors.ConfigContentError, store.load)
2524
def test_load_erroneous_content(self):
2525
"""Ensure we display a proper error on content that can't be parsed."""
2526
t = self.get_transport()
2527
t.put_bytes('foo.conf', '[open_section\n')
2528
store = config.TransportIniFileStore(t, 'foo.conf')
2529
self.assertRaises(errors.ParseConfigError, store.load)
2531
def test_load_permission_denied(self):
2532
"""Ensure we get warned when trying to load an inaccessible file."""
2535
warnings.append(args[0] % args[1:])
2536
self.overrideAttr(trace, 'warning', warning)
2538
t = self.get_transport()
2540
def get_bytes(relpath):
2541
raise errors.PermissionDenied(relpath, "")
2542
t.get_bytes = get_bytes
2543
store = config.TransportIniFileStore(t, 'foo.conf')
2544
self.assertRaises(errors.PermissionDenied, store.load)
2547
[u'Permission denied while trying to load configuration store %s.'
2548
% store.external_url()])
2551
class TestIniConfigContent(tests.TestCaseWithTransport):
2552
"""Simulate loading a IniBasedConfig with content of various encodings.
2554
All files produced by bzr are in utf8 content.
2556
Users may modify them manually and end up with a file that can't be
2557
loaded. We need to issue proper error messages in this case.
2560
invalid_utf8_char = '\xff'
2562
def test_load_utf8(self):
2563
"""Ensure we can load an utf8-encoded file."""
2564
# From http://pad.lv/799212
2565
unicode_user = u'b\N{Euro Sign}ar'
2566
unicode_content = u'user=%s' % (unicode_user,)
2567
utf8_content = unicode_content.encode('utf8')
2568
# Store the raw content in the config file
2569
with open('foo.conf', 'wb') as f:
2570
f.write(utf8_content)
2571
conf = config.IniBasedConfig(file_name='foo.conf')
2572
self.assertEqual(unicode_user, conf.get_user_option('user'))
2574
def test_load_badly_encoded_content(self):
2575
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2576
with open('foo.conf', 'wb') as f:
2577
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2578
conf = config.IniBasedConfig(file_name='foo.conf')
2579
self.assertRaises(errors.ConfigContentError, conf._get_parser)
2581
def test_load_erroneous_content(self):
2582
"""Ensure we display a proper error on content that can't be parsed."""
2583
with open('foo.conf', 'wb') as f:
2584
f.write('[open_section\n')
2585
conf = config.IniBasedConfig(file_name='foo.conf')
2586
self.assertRaises(errors.ParseConfigError, conf._get_parser)
2589
class TestMutableStore(TestStore):
2591
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2592
in config.test_store_builder_registry.iteritems()]
2595
super(TestMutableStore, self).setUp()
2596
self.transport = self.get_transport()
2598
def has_store(self, store):
2599
store_basename = urlutils.relative_url(self.transport.external_url(),
2600
store.external_url())
2601
return self.transport.has(store_basename)
2603
def test_save_empty_creates_no_file(self):
2604
# FIXME: There should be a better way than relying on the test
2605
# parametrization to identify branch.conf -- vila 2011-0526
2606
if self.store_id in ('branch', 'remote_branch'):
2607
raise tests.TestNotApplicable(
2608
'branch.conf is *always* created when a branch is initialized')
2609
store = self.get_store(self)
2611
self.assertEqual(False, self.has_store(store))
2613
def test_mutable_section_shared(self):
2614
store = self.get_store(self)
2615
store._load_from_string('foo=bar\n')
2616
# FIXME: There should be a better way than relying on the test
2617
# parametrization to identify branch.conf -- vila 2011-0526
2618
if self.store_id in ('branch', 'remote_branch'):
2619
# branch stores requires write locked branches
2620
self.addCleanup(store.branch.lock_write().unlock)
2621
section1 = store.get_mutable_section(None)
2622
section2 = store.get_mutable_section(None)
2623
# If we get different sections, different callers won't share the
2625
self.assertIs(section1, section2)
2627
def test_save_emptied_succeeds(self):
2628
store = self.get_store(self)
2629
store._load_from_string('foo=bar\n')
2630
# FIXME: There should be a better way than relying on the test
2631
# parametrization to identify branch.conf -- vila 2011-0526
2632
if self.store_id in ('branch', 'remote_branch'):
2633
# branch stores requires write locked branches
2634
self.addCleanup(store.branch.lock_write().unlock)
2635
section = store.get_mutable_section(None)
2636
section.remove('foo')
2638
self.assertEqual(True, self.has_store(store))
2639
modified_store = self.get_store(self)
2640
sections = list(modified_store.get_sections())
2641
self.assertLength(0, sections)
2643
def test_save_with_content_succeeds(self):
2644
# FIXME: There should be a better way than relying on the test
2645
# parametrization to identify branch.conf -- vila 2011-0526
2646
if self.store_id in ('branch', 'remote_branch'):
2647
raise tests.TestNotApplicable(
2648
'branch.conf is *always* created when a branch is initialized')
2649
store = self.get_store(self)
2650
store._load_from_string('foo=bar\n')
2651
self.assertEqual(False, self.has_store(store))
2653
self.assertEqual(True, self.has_store(store))
2654
modified_store = self.get_store(self)
2655
sections = list(modified_store.get_sections())
2656
self.assertLength(1, sections)
2657
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2659
def test_set_option_in_empty_store(self):
2660
store = self.get_store(self)
2661
# FIXME: There should be a better way than relying on the test
2662
# parametrization to identify branch.conf -- vila 2011-0526
2663
if self.store_id in ('branch', 'remote_branch'):
2664
# branch stores requires write locked branches
2665
self.addCleanup(store.branch.lock_write().unlock)
2666
section = store.get_mutable_section(None)
2667
section.set('foo', 'bar')
2669
modified_store = self.get_store(self)
2670
sections = list(modified_store.get_sections())
2671
self.assertLength(1, sections)
2672
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2674
def test_set_option_in_default_section(self):
2675
store = self.get_store(self)
2676
store._load_from_string('')
2677
# FIXME: There should be a better way than relying on the test
2678
# parametrization to identify branch.conf -- vila 2011-0526
2679
if self.store_id in ('branch', 'remote_branch'):
2680
# branch stores requires write locked branches
2681
self.addCleanup(store.branch.lock_write().unlock)
2682
section = store.get_mutable_section(None)
2683
section.set('foo', 'bar')
2685
modified_store = self.get_store(self)
2686
sections = list(modified_store.get_sections())
2687
self.assertLength(1, sections)
2688
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2690
def test_set_option_in_named_section(self):
2691
store = self.get_store(self)
2692
store._load_from_string('')
2693
# FIXME: There should be a better way than relying on the test
2694
# parametrization to identify branch.conf -- vila 2011-0526
2695
if self.store_id in ('branch', 'remote_branch'):
2696
# branch stores requires write locked branches
2697
self.addCleanup(store.branch.lock_write().unlock)
2698
section = store.get_mutable_section('baz')
2699
section.set('foo', 'bar')
2701
modified_store = self.get_store(self)
2702
sections = list(modified_store.get_sections())
2703
self.assertLength(1, sections)
2704
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2706
def test_load_hook(self):
2707
# First, we need to ensure that the store exists
2708
store = self.get_store(self)
2709
# FIXME: There should be a better way than relying on the test
2710
# parametrization to identify branch.conf -- vila 2011-0526
2711
if self.store_id in ('branch', 'remote_branch'):
2712
# branch stores requires write locked branches
2713
self.addCleanup(store.branch.lock_write().unlock)
2714
section = store.get_mutable_section('baz')
2715
section.set('foo', 'bar')
2717
# Now we can try to load it
2718
store = self.get_store(self)
2722
config.ConfigHooks.install_named_hook('load', hook, None)
2723
self.assertLength(0, calls)
2725
self.assertLength(1, calls)
2726
self.assertEqual((store,), calls[0])
2728
def test_save_hook(self):
2732
config.ConfigHooks.install_named_hook('save', hook, None)
2733
self.assertLength(0, calls)
2734
store = self.get_store(self)
2735
# FIXME: There should be a better way than relying on the test
2736
# parametrization to identify branch.conf -- vila 2011-0526
2737
if self.store_id in ('branch', 'remote_branch'):
2738
# branch stores requires write locked branches
2739
self.addCleanup(store.branch.lock_write().unlock)
2740
section = store.get_mutable_section('baz')
2741
section.set('foo', 'bar')
2743
self.assertLength(1, calls)
2744
self.assertEqual((store,), calls[0])
2746
def test_set_mark_dirty(self):
2747
stack = config.MemoryStack('')
2748
self.assertLength(0, stack.store.dirty_sections)
2749
stack.set('foo', 'baz')
2750
self.assertLength(1, stack.store.dirty_sections)
2751
self.assertTrue(stack.store._need_saving())
2753
def test_remove_mark_dirty(self):
2754
stack = config.MemoryStack('foo=bar')
2755
self.assertLength(0, stack.store.dirty_sections)
2757
self.assertLength(1, stack.store.dirty_sections)
2758
self.assertTrue(stack.store._need_saving())
2761
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2762
"""Tests that config changes are kept in memory and saved on-demand."""
2765
super(TestStoreSaveChanges, self).setUp()
2766
self.transport = self.get_transport()
2767
# Most of the tests involve two stores pointing to the same persistent
2768
# storage to observe the effects of concurrent changes
2769
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2770
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2773
self.warnings.append(args[0] % args[1:])
2774
self.overrideAttr(trace, 'warning', warning)
2776
def has_store(self, store):
2777
store_basename = urlutils.relative_url(self.transport.external_url(),
2778
store.external_url())
2779
return self.transport.has(store_basename)
2781
def get_stack(self, store):
2782
# Any stack will do as long as it uses the right store, just a single
2783
# no-name section is enough
2784
return config.Stack([store.get_sections], store)
2786
def test_no_changes_no_save(self):
2787
s = self.get_stack(self.st1)
2788
s.store.save_changes()
2789
self.assertEqual(False, self.has_store(self.st1))
2791
def test_unrelated_concurrent_update(self):
2792
s1 = self.get_stack(self.st1)
2793
s2 = self.get_stack(self.st2)
2794
s1.set('foo', 'bar')
2795
s2.set('baz', 'quux')
2797
# Changes don't propagate magically
2798
self.assertEqual(None, s1.get('baz'))
2799
s2.store.save_changes()
2800
self.assertEqual('quux', s2.get('baz'))
2801
# Changes are acquired when saving
2802
self.assertEqual('bar', s2.get('foo'))
2803
# Since there is no overlap, no warnings are emitted
2804
self.assertLength(0, self.warnings)
2806
def test_concurrent_update_modified(self):
2807
s1 = self.get_stack(self.st1)
2808
s2 = self.get_stack(self.st2)
2809
s1.set('foo', 'bar')
2810
s2.set('foo', 'baz')
2813
s2.store.save_changes()
2814
self.assertEqual('baz', s2.get('foo'))
2815
# But the user get a warning
2816
self.assertLength(1, self.warnings)
2817
warning = self.warnings[0]
2818
self.assertStartsWith(warning, 'Option foo in section None')
2819
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2820
' The baz value will be saved.')
2822
def test_concurrent_deletion(self):
2823
self.st1._load_from_string('foo=bar')
2825
s1 = self.get_stack(self.st1)
2826
s2 = self.get_stack(self.st2)
2829
s1.store.save_changes()
2831
self.assertLength(0, self.warnings)
2832
s2.store.save_changes()
2834
self.assertLength(1, self.warnings)
2835
warning = self.warnings[0]
2836
self.assertStartsWith(warning, 'Option foo in section None')
2837
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2838
' The <DELETED> value will be saved.')
2841
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2843
def get_store(self):
2844
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2846
def test_get_quoted_string(self):
2847
store = self.get_store()
2848
store._load_from_string('foo= " abc "')
2849
stack = config.Stack([store.get_sections])
2850
self.assertEqual(' abc ', stack.get('foo'))
2852
def test_set_quoted_string(self):
2853
store = self.get_store()
2854
stack = config.Stack([store.get_sections], store)
2855
stack.set('foo', ' a b c ')
2857
self.assertFileEqual('foo = " a b c "' + os.linesep, 'foo.conf')
2860
class TestTransportIniFileStore(TestStore):
2862
def test_loading_unknown_file_fails(self):
2863
store = config.TransportIniFileStore(self.get_transport(),
2865
self.assertRaises(errors.NoSuchFile, store.load)
2867
def test_invalid_content(self):
2868
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2869
self.assertEqual(False, store.is_loaded())
2870
exc = self.assertRaises(
2871
errors.ParseConfigError, store._load_from_string,
2872
'this is invalid !')
2873
self.assertEndsWith(exc.filename, 'foo.conf')
2874
# And the load failed
2875
self.assertEqual(False, store.is_loaded())
2877
def test_get_embedded_sections(self):
2878
# A more complicated example (which also shows that section names and
2879
# option names share the same name space...)
2880
# FIXME: This should be fixed by forbidding dicts as values ?
2881
# -- vila 2011-04-05
2882
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2883
store._load_from_string('''
2887
foo_in_DEFAULT=foo_DEFAULT
2895
sections = list(store.get_sections())
2896
self.assertLength(4, sections)
2897
# The default section has no name.
2898
# List values are provided as strings and need to be explicitly
2899
# converted by specifying from_unicode=list_from_store at option
2901
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2903
self.assertSectionContent(
2904
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2905
self.assertSectionContent(
2906
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2907
# sub sections are provided as embedded dicts.
2908
self.assertSectionContent(
2909
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2913
class TestLockableIniFileStore(TestStore):
2915
def test_create_store_in_created_dir(self):
2916
self.assertPathDoesNotExist('dir')
2917
t = self.get_transport('dir/subdir')
2918
store = config.LockableIniFileStore(t, 'foo.conf')
2919
store.get_mutable_section(None).set('foo', 'bar')
2921
self.assertPathExists('dir/subdir')
2924
class TestConcurrentStoreUpdates(TestStore):
2925
"""Test that Stores properly handle conccurent updates.
2927
New Store implementation may fail some of these tests but until such
2928
implementations exist it's hard to properly filter them from the scenarios
2929
applied here. If you encounter such a case, contact the bzr devs.
2932
scenarios = [(key, {'get_stack': builder}) for key, builder
2933
in config.test_stack_builder_registry.iteritems()]
2936
super(TestConcurrentStoreUpdates, self).setUp()
2937
self.stack = self.get_stack(self)
2938
if not isinstance(self.stack, config._CompatibleStack):
2939
raise tests.TestNotApplicable(
2940
'%s is not meant to be compatible with the old config design'
2942
self.stack.set('one', '1')
2943
self.stack.set('two', '2')
2945
self.stack.store.save()
2947
def test_simple_read_access(self):
2948
self.assertEqual('1', self.stack.get('one'))
2950
def test_simple_write_access(self):
2951
self.stack.set('one', 'one')
2952
self.assertEqual('one', self.stack.get('one'))
2954
def test_listen_to_the_last_speaker(self):
2956
c2 = self.get_stack(self)
2957
c1.set('one', 'ONE')
2958
c2.set('two', 'TWO')
2959
self.assertEqual('ONE', c1.get('one'))
2960
self.assertEqual('TWO', c2.get('two'))
2961
# The second update respect the first one
2962
self.assertEqual('ONE', c2.get('one'))
2964
def test_last_speaker_wins(self):
2965
# If the same config is not shared, the same variable modified twice
2966
# can only see a single result.
2968
c2 = self.get_stack(self)
2971
self.assertEqual('c2', c2.get('one'))
2972
# The first modification is still available until another refresh
2974
self.assertEqual('c1', c1.get('one'))
2975
c1.set('two', 'done')
2976
self.assertEqual('c2', c1.get('one'))
2978
def test_writes_are_serialized(self):
2980
c2 = self.get_stack(self)
2982
# We spawn a thread that will pause *during* the config saving.
2983
before_writing = threading.Event()
2984
after_writing = threading.Event()
2985
writing_done = threading.Event()
2986
c1_save_without_locking_orig = c1.store.save_without_locking
2987
def c1_save_without_locking():
2988
before_writing.set()
2989
c1_save_without_locking_orig()
2990
# The lock is held. We wait for the main thread to decide when to
2992
after_writing.wait()
2993
c1.store.save_without_locking = c1_save_without_locking
2997
t1 = threading.Thread(target=c1_set)
2998
# Collect the thread after the test
2999
self.addCleanup(t1.join)
3000
# Be ready to unblock the thread if the test goes wrong
3001
self.addCleanup(after_writing.set)
3003
before_writing.wait()
3004
self.assertRaises(errors.LockContention,
3005
c2.set, 'one', 'c2')
3006
self.assertEqual('c1', c1.get('one'))
3007
# Let the lock be released
3011
self.assertEqual('c2', c2.get('one'))
3013
def test_read_while_writing(self):
3015
# We spawn a thread that will pause *during* the write
3016
ready_to_write = threading.Event()
3017
do_writing = threading.Event()
3018
writing_done = threading.Event()
3019
# We override the _save implementation so we know the store is locked
3020
c1_save_without_locking_orig = c1.store.save_without_locking
3021
def c1_save_without_locking():
3022
ready_to_write.set()
3023
# The lock is held. We wait for the main thread to decide when to
3026
c1_save_without_locking_orig()
3028
c1.store.save_without_locking = c1_save_without_locking
3031
t1 = threading.Thread(target=c1_set)
3032
# Collect the thread after the test
3033
self.addCleanup(t1.join)
3034
# Be ready to unblock the thread if the test goes wrong
3035
self.addCleanup(do_writing.set)
3037
# Ensure the thread is ready to write
3038
ready_to_write.wait()
3039
self.assertEqual('c1', c1.get('one'))
3040
# If we read during the write, we get the old value
3041
c2 = self.get_stack(self)
3042
self.assertEqual('1', c2.get('one'))
3043
# Let the writing occur and ensure it occurred
3046
# Now we get the updated value
3047
c3 = self.get_stack(self)
3048
self.assertEqual('c1', c3.get('one'))
3050
# FIXME: It may be worth looking into removing the lock dir when it's not
3051
# needed anymore and look at possible fallouts for concurrent lockers. This
3052
# will matter if/when we use config files outside of bazaar directories
3053
# (.bazaar or .bzr) -- vila 20110-04-111
3056
class TestSectionMatcher(TestStore):
3058
scenarios = [('location', {'matcher': config.LocationMatcher}),
3059
('id', {'matcher': config.NameMatcher}),]
3062
super(TestSectionMatcher, self).setUp()
3063
# Any simple store is good enough
3064
self.get_store = config.test_store_builder_registry.get('configobj')
3066
def test_no_matches_for_empty_stores(self):
3067
store = self.get_store(self)
3068
store._load_from_string('')
3069
matcher = self.matcher(store, '/bar')
3070
self.assertEqual([], list(matcher.get_sections()))
3072
def test_build_doesnt_load_store(self):
3073
store = self.get_store(self)
3074
self.matcher(store, '/bar')
3075
self.assertFalse(store.is_loaded())
3078
class TestLocationSection(tests.TestCase):
3080
def get_section(self, options, extra_path):
3081
section = config.Section('foo', options)
3082
return config.LocationSection(section, extra_path)
3084
def test_simple_option(self):
3085
section = self.get_section({'foo': 'bar'}, '')
3086
self.assertEqual('bar', section.get('foo'))
3088
def test_option_with_extra_path(self):
3089
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3091
self.assertEqual('bar/baz', section.get('foo'))
3093
def test_invalid_policy(self):
3094
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3096
# invalid policies are ignored
3097
self.assertEqual('bar', section.get('foo'))
3100
class TestLocationMatcher(TestStore):
3103
super(TestLocationMatcher, self).setUp()
3104
# Any simple store is good enough
3105
self.get_store = config.test_store_builder_registry.get('configobj')
3107
def test_unrelated_section_excluded(self):
3108
store = self.get_store(self)
3109
store._load_from_string('''
3117
section=/foo/bar/baz
3121
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3123
[section.id for _, section in store.get_sections()])
3124
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3125
sections = [section for _, section in matcher.get_sections()]
3126
self.assertEqual(['/foo/bar', '/foo'],
3127
[section.id for section in sections])
3128
self.assertEqual(['quux', 'bar/quux'],
3129
[section.extra_path for section in sections])
3131
def test_more_specific_sections_first(self):
3132
store = self.get_store(self)
3133
store._load_from_string('''
3139
self.assertEqual(['/foo', '/foo/bar'],
3140
[section.id for _, section in store.get_sections()])
3141
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3142
sections = [section for _, section in matcher.get_sections()]
3143
self.assertEqual(['/foo/bar', '/foo'],
3144
[section.id for section in sections])
3145
self.assertEqual(['baz', 'bar/baz'],
3146
[section.extra_path for section in sections])
3148
def test_appendpath_in_no_name_section(self):
3149
# It's a bit weird to allow appendpath in a no-name section, but
3150
# someone may found a use for it
3151
store = self.get_store(self)
3152
store._load_from_string('''
3154
foo:policy = appendpath
3156
matcher = config.LocationMatcher(store, 'dir/subdir')
3157
sections = list(matcher.get_sections())
3158
self.assertLength(1, sections)
3159
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3161
def test_file_urls_are_normalized(self):
3162
store = self.get_store(self)
3163
if sys.platform == 'win32':
3164
expected_url = 'file:///C:/dir/subdir'
3165
expected_location = 'C:/dir/subdir'
3167
expected_url = 'file:///dir/subdir'
3168
expected_location = '/dir/subdir'
3169
matcher = config.LocationMatcher(store, expected_url)
3170
self.assertEqual(expected_location, matcher.location)
3172
def test_branch_name_colo(self):
3173
store = self.get_store(self)
3174
store._load_from_string(dedent("""\
3176
push_location=my{branchname}
3178
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3179
self.assertEqual('example<', matcher.branch_name)
3180
((_, section),) = matcher.get_sections()
3181
self.assertEqual('example<', section.locals['branchname'])
3183
def test_branch_name_basename(self):
3184
store = self.get_store(self)
3185
store._load_from_string(dedent("""\
3187
push_location=my{branchname}
3189
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3190
self.assertEqual('example<', matcher.branch_name)
3191
((_, section),) = matcher.get_sections()
3192
self.assertEqual('example<', section.locals['branchname'])
3195
class TestStartingPathMatcher(TestStore):
3198
super(TestStartingPathMatcher, self).setUp()
3199
# Any simple store is good enough
3200
self.store = config.IniFileStore()
3202
def assertSectionIDs(self, expected, location, content):
3203
self.store._load_from_string(content)
3204
matcher = config.StartingPathMatcher(self.store, location)
3205
sections = list(matcher.get_sections())
3206
self.assertLength(len(expected), sections)
3207
self.assertEqual(expected, [section.id for _, section in sections])
3210
def test_empty(self):
3211
self.assertSectionIDs([], self.get_url(), '')
3213
def test_url_vs_local_paths(self):
3214
# The matcher location is an url and the section names are local paths
3215
self.assertSectionIDs(['/foo/bar', '/foo'],
3216
'file:///foo/bar/baz', '''\
3221
def test_local_path_vs_url(self):
3222
# The matcher location is a local path and the section names are urls
3223
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3224
'/foo/bar/baz', '''\
3230
def test_no_name_section_included_when_present(self):
3231
# Note that other tests will cover the case where the no-name section
3232
# is empty and as such, not included.
3233
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3234
'/foo/bar/baz', '''\
3235
option = defined so the no-name section exists
3239
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3240
[s.locals['relpath'] for _, s in sections])
3242
def test_order_reversed(self):
3243
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3248
def test_unrelated_section_excluded(self):
3249
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3255
def test_glob_included(self):
3256
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3257
'/foo/bar/baz', '''\
3263
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3264
# nothing really is... as far using {relpath} to append it to something
3265
# else, this seems good enough though.
3266
self.assertEqual(['', 'baz', 'bar/baz'],
3267
[s.locals['relpath'] for _, s in sections])
3269
def test_respect_order(self):
3270
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3271
'/foo/bar/baz', '''\
3279
class TestNameMatcher(TestStore):
3282
super(TestNameMatcher, self).setUp()
3283
self.matcher = config.NameMatcher
3284
# Any simple store is good enough
3285
self.get_store = config.test_store_builder_registry.get('configobj')
3287
def get_matching_sections(self, name):
3288
store = self.get_store(self)
3289
store._load_from_string('''
3297
matcher = self.matcher(store, name)
3298
return list(matcher.get_sections())
3300
def test_matching(self):
3301
sections = self.get_matching_sections('foo')
3302
self.assertLength(1, sections)
3303
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3305
def test_not_matching(self):
3306
sections = self.get_matching_sections('baz')
3307
self.assertLength(0, sections)
3310
class TestBaseStackGet(tests.TestCase):
3313
super(TestBaseStackGet, self).setUp()
3314
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3316
def test_get_first_definition(self):
3317
store1 = config.IniFileStore()
3318
store1._load_from_string('foo=bar')
3319
store2 = config.IniFileStore()
3320
store2._load_from_string('foo=baz')
3321
conf = config.Stack([store1.get_sections, store2.get_sections])
3322
self.assertEqual('bar', conf.get('foo'))
3324
def test_get_with_registered_default_value(self):
3325
config.option_registry.register(config.Option('foo', default='bar'))
3326
conf_stack = config.Stack([])
3327
self.assertEqual('bar', conf_stack.get('foo'))
3329
def test_get_without_registered_default_value(self):
3330
config.option_registry.register(config.Option('foo'))
3331
conf_stack = config.Stack([])
3332
self.assertEqual(None, conf_stack.get('foo'))
3334
def test_get_without_default_value_for_not_registered(self):
3335
conf_stack = config.Stack([])
3336
self.assertEqual(None, conf_stack.get('foo'))
3338
def test_get_for_empty_section_callable(self):
3339
conf_stack = config.Stack([lambda : []])
3340
self.assertEqual(None, conf_stack.get('foo'))
3342
def test_get_for_broken_callable(self):
3343
# Trying to use and invalid callable raises an exception on first use
3344
conf_stack = config.Stack([object])
3345
self.assertRaises(TypeError, conf_stack.get, 'foo')
3348
class TestStackWithSimpleStore(tests.TestCase):
3351
super(TestStackWithSimpleStore, self).setUp()
3352
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3353
self.registry = config.option_registry
3355
def get_conf(self, content=None):
3356
return config.MemoryStack(content)
3358
def test_override_value_from_env(self):
3359
self.overrideEnv('FOO', None)
3360
self.registry.register(
3361
config.Option('foo', default='bar', override_from_env=['FOO']))
3362
self.overrideEnv('FOO', 'quux')
3363
# Env variable provides a default taking over the option one
3364
conf = self.get_conf('foo=store')
3365
self.assertEqual('quux', conf.get('foo'))
3367
def test_first_override_value_from_env_wins(self):
3368
self.overrideEnv('NO_VALUE', None)
3369
self.overrideEnv('FOO', None)
3370
self.overrideEnv('BAZ', None)
3371
self.registry.register(
3372
config.Option('foo', default='bar',
3373
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3374
self.overrideEnv('FOO', 'foo')
3375
self.overrideEnv('BAZ', 'baz')
3376
# The first env var set wins
3377
conf = self.get_conf('foo=store')
3378
self.assertEqual('foo', conf.get('foo'))
3381
class TestMemoryStack(tests.TestCase):
3384
conf = config.MemoryStack('foo=bar')
3385
self.assertEqual('bar', conf.get('foo'))
3388
conf = config.MemoryStack('foo=bar')
3389
conf.set('foo', 'baz')
3390
self.assertEqual('baz', conf.get('foo'))
3392
def test_no_content(self):
3393
conf = config.MemoryStack()
3394
# No content means no loading
3395
self.assertFalse(conf.store.is_loaded())
3396
self.assertRaises(NotImplementedError, conf.get, 'foo')
3397
# But a content can still be provided
3398
conf.store._load_from_string('foo=bar')
3399
self.assertEqual('bar', conf.get('foo'))
3402
class TestStackIterSections(tests.TestCase):
3404
def test_empty_stack(self):
3405
conf = config.Stack([])
3406
sections = list(conf.iter_sections())
3407
self.assertLength(0, sections)
3409
def test_empty_store(self):
3410
store = config.IniFileStore()
3411
store._load_from_string('')
3412
conf = config.Stack([store.get_sections])
3413
sections = list(conf.iter_sections())
3414
self.assertLength(0, sections)
3416
def test_simple_store(self):
3417
store = config.IniFileStore()
3418
store._load_from_string('foo=bar')
3419
conf = config.Stack([store.get_sections])
3420
tuples = list(conf.iter_sections())
3421
self.assertLength(1, tuples)
3422
(found_store, found_section) = tuples[0]
3423
self.assertIs(store, found_store)
3425
def test_two_stores(self):
3426
store1 = config.IniFileStore()
3427
store1._load_from_string('foo=bar')
3428
store2 = config.IniFileStore()
3429
store2._load_from_string('bar=qux')
3430
conf = config.Stack([store1.get_sections, store2.get_sections])
3431
tuples = list(conf.iter_sections())
3432
self.assertLength(2, tuples)
3433
self.assertIs(store1, tuples[0][0])
3434
self.assertIs(store2, tuples[1][0])
3437
class TestStackWithTransport(tests.TestCaseWithTransport):
3439
scenarios = [(key, {'get_stack': builder}) for key, builder
3440
in config.test_stack_builder_registry.iteritems()]
3443
class TestConcreteStacks(TestStackWithTransport):
3445
def test_build_stack(self):
3446
# Just a smoke test to help debug builders
3447
self.get_stack(self)
3450
class TestStackGet(TestStackWithTransport):
3453
super(TestStackGet, self).setUp()
3454
self.conf = self.get_stack(self)
3456
def test_get_for_empty_stack(self):
3457
self.assertEqual(None, self.conf.get('foo'))
3459
def test_get_hook(self):
3460
self.conf.set('foo', 'bar')
3464
config.ConfigHooks.install_named_hook('get', hook, None)
3465
self.assertLength(0, calls)
3466
value = self.conf.get('foo')
3467
self.assertEqual('bar', value)
3468
self.assertLength(1, calls)
3469
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3472
class TestStackGetWithConverter(tests.TestCase):
3475
super(TestStackGetWithConverter, self).setUp()
3476
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3477
self.registry = config.option_registry
3479
def get_conf(self, content=None):
3480
return config.MemoryStack(content)
3482
def register_bool_option(self, name, default=None, default_from_env=None):
3483
b = config.Option(name, help='A boolean.',
3484
default=default, default_from_env=default_from_env,
3485
from_unicode=config.bool_from_store)
3486
self.registry.register(b)
3488
def test_get_default_bool_None(self):
3489
self.register_bool_option('foo')
3490
conf = self.get_conf('')
3491
self.assertEqual(None, conf.get('foo'))
3493
def test_get_default_bool_True(self):
3494
self.register_bool_option('foo', u'True')
3495
conf = self.get_conf('')
3496
self.assertEqual(True, conf.get('foo'))
3498
def test_get_default_bool_False(self):
3499
self.register_bool_option('foo', False)
3500
conf = self.get_conf('')
3501
self.assertEqual(False, conf.get('foo'))
3503
def test_get_default_bool_False_as_string(self):
3504
self.register_bool_option('foo', u'False')
3505
conf = self.get_conf('')
3506
self.assertEqual(False, conf.get('foo'))
3508
def test_get_default_bool_from_env_converted(self):
3509
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3510
self.overrideEnv('FOO', 'False')
3511
conf = self.get_conf('')
3512
self.assertEqual(False, conf.get('foo'))
3514
def test_get_default_bool_when_conversion_fails(self):
3515
self.register_bool_option('foo', default='True')
3516
conf = self.get_conf('foo=invalid boolean')
3517
self.assertEqual(True, conf.get('foo'))
3519
def register_integer_option(self, name,
3520
default=None, default_from_env=None):
3521
i = config.Option(name, help='An integer.',
3522
default=default, default_from_env=default_from_env,
3523
from_unicode=config.int_from_store)
3524
self.registry.register(i)
3526
def test_get_default_integer_None(self):
3527
self.register_integer_option('foo')
3528
conf = self.get_conf('')
3529
self.assertEqual(None, conf.get('foo'))
3531
def test_get_default_integer(self):
3532
self.register_integer_option('foo', 42)
3533
conf = self.get_conf('')
3534
self.assertEqual(42, conf.get('foo'))
3536
def test_get_default_integer_as_string(self):
3537
self.register_integer_option('foo', u'42')
3538
conf = self.get_conf('')
3539
self.assertEqual(42, conf.get('foo'))
3541
def test_get_default_integer_from_env(self):
3542
self.register_integer_option('foo', default_from_env=['FOO'])
3543
self.overrideEnv('FOO', '18')
3544
conf = self.get_conf('')
3545
self.assertEqual(18, conf.get('foo'))
3547
def test_get_default_integer_when_conversion_fails(self):
3548
self.register_integer_option('foo', default='12')
3549
conf = self.get_conf('foo=invalid integer')
3550
self.assertEqual(12, conf.get('foo'))
3552
def register_list_option(self, name, default=None, default_from_env=None):
3553
l = config.ListOption(name, help='A list.', default=default,
3554
default_from_env=default_from_env)
3555
self.registry.register(l)
3557
def test_get_default_list_None(self):
3558
self.register_list_option('foo')
3559
conf = self.get_conf('')
3560
self.assertEqual(None, conf.get('foo'))
3562
def test_get_default_list_empty(self):
3563
self.register_list_option('foo', '')
3564
conf = self.get_conf('')
3565
self.assertEqual([], conf.get('foo'))
3567
def test_get_default_list_from_env(self):
3568
self.register_list_option('foo', default_from_env=['FOO'])
3569
self.overrideEnv('FOO', '')
3570
conf = self.get_conf('')
3571
self.assertEqual([], conf.get('foo'))
3573
def test_get_with_list_converter_no_item(self):
3574
self.register_list_option('foo', None)
3575
conf = self.get_conf('foo=,')
3576
self.assertEqual([], conf.get('foo'))
3578
def test_get_with_list_converter_many_items(self):
3579
self.register_list_option('foo', None)
3580
conf = self.get_conf('foo=m,o,r,e')
3581
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3583
def test_get_with_list_converter_embedded_spaces_many_items(self):
3584
self.register_list_option('foo', None)
3585
conf = self.get_conf('foo=" bar", "baz "')
3586
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3588
def test_get_with_list_converter_stripped_spaces_many_items(self):
3589
self.register_list_option('foo', None)
3590
conf = self.get_conf('foo= bar , baz ')
3591
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3594
class TestIterOptionRefs(tests.TestCase):
3595
"""iter_option_refs is a bit unusual, document some cases."""
3597
def assertRefs(self, expected, string):
3598
self.assertEqual(expected, list(config.iter_option_refs(string)))
3600
def test_empty(self):
3601
self.assertRefs([(False, '')], '')
3603
def test_no_refs(self):
3604
self.assertRefs([(False, 'foo bar')], 'foo bar')
3606
def test_single_ref(self):
3607
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3609
def test_broken_ref(self):
3610
self.assertRefs([(False, '{foo')], '{foo')
3612
def test_embedded_ref(self):
3613
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3616
def test_two_refs(self):
3617
self.assertRefs([(False, ''), (True, '{foo}'),
3618
(False, ''), (True, '{bar}'),
3622
def test_newline_in_refs_are_not_matched(self):
3623
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3626
class TestStackExpandOptions(tests.TestCaseWithTransport):
3629
super(TestStackExpandOptions, self).setUp()
3630
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3631
self.registry = config.option_registry
3632
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3633
self.conf = config.Stack([store.get_sections], store)
3635
def assertExpansion(self, expected, string, env=None):
3636
self.assertEqual(expected, self.conf.expand_options(string, env))
3638
def test_no_expansion(self):
3639
self.assertExpansion('foo', 'foo')
3641
def test_expand_default_value(self):
3642
self.conf.store._load_from_string('bar=baz')
3643
self.registry.register(config.Option('foo', default=u'{bar}'))
3644
self.assertEqual('baz', self.conf.get('foo', expand=True))
3646
def test_expand_default_from_env(self):
3647
self.conf.store._load_from_string('bar=baz')
3648
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3649
self.overrideEnv('FOO', '{bar}')
3650
self.assertEqual('baz', self.conf.get('foo', expand=True))
3652
def test_expand_default_on_failed_conversion(self):
3653
self.conf.store._load_from_string('baz=bogus\nbar=42\nfoo={baz}')
3654
self.registry.register(
3655
config.Option('foo', default=u'{bar}',
3656
from_unicode=config.int_from_store))
3657
self.assertEqual(42, self.conf.get('foo', expand=True))
3659
def test_env_adding_options(self):
3660
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3662
def test_env_overriding_options(self):
3663
self.conf.store._load_from_string('foo=baz')
3664
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3666
def test_simple_ref(self):
3667
self.conf.store._load_from_string('foo=xxx')
3668
self.assertExpansion('xxx', '{foo}')
3670
def test_unknown_ref(self):
3671
self.assertRaises(errors.ExpandingUnknownOption,
3672
self.conf.expand_options, '{foo}')
3674
def test_illegal_def_is_ignored(self):
3675
self.assertExpansion('{1,2}', '{1,2}')
3676
self.assertExpansion('{ }', '{ }')
3677
self.assertExpansion('${Foo,f}', '${Foo,f}')
3679
def test_indirect_ref(self):
3680
self.conf.store._load_from_string('''
3684
self.assertExpansion('xxx', '{bar}')
3686
def test_embedded_ref(self):
3687
self.conf.store._load_from_string('''
3691
self.assertExpansion('xxx', '{{bar}}')
3693
def test_simple_loop(self):
3694
self.conf.store._load_from_string('foo={foo}')
3695
self.assertRaises(errors.OptionExpansionLoop,
3696
self.conf.expand_options, '{foo}')
3698
def test_indirect_loop(self):
3699
self.conf.store._load_from_string('''
3703
e = self.assertRaises(errors.OptionExpansionLoop,
3704
self.conf.expand_options, '{foo}')
3705
self.assertEqual('foo->bar->baz', e.refs)
3706
self.assertEqual('{foo}', e.string)
3708
def test_list(self):
3709
self.conf.store._load_from_string('''
3713
list={foo},{bar},{baz}
3715
self.registry.register(
3716
config.ListOption('list'))
3717
self.assertEqual(['start', 'middle', 'end'],
3718
self.conf.get('list', expand=True))
3720
def test_cascading_list(self):
3721
self.conf.store._load_from_string('''
3727
self.registry.register(config.ListOption('list'))
3728
# Register an intermediate option as a list to ensure no conversion
3729
# happen while expanding. Conversion should only occur for the original
3730
# option ('list' here).
3731
self.registry.register(config.ListOption('baz'))
3732
self.assertEqual(['start', 'middle', 'end'],
3733
self.conf.get('list', expand=True))
3735
def test_pathologically_hidden_list(self):
3736
self.conf.store._load_from_string('''
3742
hidden={start}{middle}{end}
3744
# What matters is what the registration says, the conversion happens
3745
# only after all expansions have been performed
3746
self.registry.register(config.ListOption('hidden'))
3747
self.assertEqual(['bin', 'go'],
3748
self.conf.get('hidden', expand=True))
3751
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3754
super(TestStackCrossSectionsExpand, self).setUp()
3756
def get_config(self, location, string):
3759
# Since we don't save the config we won't strictly require to inherit
3760
# from TestCaseInTempDir, but an error occurs so quickly...
3761
c = config.LocationStack(location)
3762
c.store._load_from_string(string)
3765
def test_dont_cross_unrelated_section(self):
3766
c = self.get_config('/another/branch/path','''
3771
[/another/branch/path]
3774
self.assertRaises(errors.ExpandingUnknownOption,
3775
c.get, 'bar', expand=True)
3777
def test_cross_related_sections(self):
3778
c = self.get_config('/project/branch/path','''
3782
[/project/branch/path]
3785
self.assertEqual('quux', c.get('bar', expand=True))
3788
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3790
def test_cross_global_locations(self):
3791
l_store = config.LocationStore()
3792
l_store._load_from_string('''
3798
g_store = config.GlobalStore()
3799
g_store._load_from_string('''
3805
stack = config.LocationStack('/branch')
3806
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3807
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3810
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3812
def test_expand_locals_empty(self):
3813
l_store = config.LocationStore()
3814
l_store._load_from_string('''
3815
[/home/user/project]
3820
stack = config.LocationStack('/home/user/project/')
3821
self.assertEqual('', stack.get('base', expand=True))
3822
self.assertEqual('', stack.get('rel', expand=True))
3824
def test_expand_basename_locally(self):
3825
l_store = config.LocationStore()
3826
l_store._load_from_string('''
3827
[/home/user/project]
3831
stack = config.LocationStack('/home/user/project/branch')
3832
self.assertEqual('branch', stack.get('bfoo', expand=True))
3834
def test_expand_basename_locally_longer_path(self):
3835
l_store = config.LocationStore()
3836
l_store._load_from_string('''
3841
stack = config.LocationStack('/home/user/project/dir/branch')
3842
self.assertEqual('branch', stack.get('bfoo', expand=True))
3844
def test_expand_relpath_locally(self):
3845
l_store = config.LocationStore()
3846
l_store._load_from_string('''
3847
[/home/user/project]
3848
lfoo = loc-foo/{relpath}
3851
stack = config.LocationStack('/home/user/project/branch')
3852
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3854
def test_expand_relpath_unknonw_in_global(self):
3855
g_store = config.GlobalStore()
3856
g_store._load_from_string('''
3861
stack = config.LocationStack('/home/user/project/branch')
3862
self.assertRaises(errors.ExpandingUnknownOption,
3863
stack.get, 'gfoo', expand=True)
3865
def test_expand_local_option_locally(self):
3866
l_store = config.LocationStore()
3867
l_store._load_from_string('''
3868
[/home/user/project]
3869
lfoo = loc-foo/{relpath}
3873
g_store = config.GlobalStore()
3874
g_store._load_from_string('''
3880
stack = config.LocationStack('/home/user/project/branch')
3881
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3882
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3884
def test_locals_dont_leak(self):
3885
"""Make sure we chose the right local in presence of several sections.
3887
l_store = config.LocationStore()
3888
l_store._load_from_string('''
3890
lfoo = loc-foo/{relpath}
3891
[/home/user/project]
3892
lfoo = loc-foo/{relpath}
3895
stack = config.LocationStack('/home/user/project/branch')
3896
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3897
stack = config.LocationStack('/home/user/bar/baz')
3898
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3902
class TestStackSet(TestStackWithTransport):
3904
def test_simple_set(self):
3905
conf = self.get_stack(self)
3906
self.assertEqual(None, conf.get('foo'))
3907
conf.set('foo', 'baz')
3908
# Did we get it back ?
3909
self.assertEqual('baz', conf.get('foo'))
3911
def test_set_creates_a_new_section(self):
3912
conf = self.get_stack(self)
3913
conf.set('foo', 'baz')
3914
self.assertEqual, 'baz', conf.get('foo')
3916
def test_set_hook(self):
3920
config.ConfigHooks.install_named_hook('set', hook, None)
3921
self.assertLength(0, calls)
3922
conf = self.get_stack(self)
3923
conf.set('foo', 'bar')
3924
self.assertLength(1, calls)
3925
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3928
class TestStackRemove(TestStackWithTransport):
3930
def test_remove_existing(self):
3931
conf = self.get_stack(self)
3932
conf.set('foo', 'bar')
3933
self.assertEqual('bar', conf.get('foo'))
3935
# Did we get it back ?
3936
self.assertEqual(None, conf.get('foo'))
3938
def test_remove_unknown(self):
3939
conf = self.get_stack(self)
3940
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3942
def test_remove_hook(self):
3946
config.ConfigHooks.install_named_hook('remove', hook, None)
3947
self.assertLength(0, calls)
3948
conf = self.get_stack(self)
3949
conf.set('foo', 'bar')
3951
self.assertLength(1, calls)
3952
self.assertEqual((conf, 'foo'), calls[0])
3955
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3958
super(TestConfigGetOptions, self).setUp()
3959
create_configs(self)
3961
def test_no_variable(self):
3962
# Using branch should query branch, locations and bazaar
3963
self.assertOptions([], self.branch_config)
3965
def test_option_in_bazaar(self):
3966
self.bazaar_config.set_user_option('file', 'bazaar')
3967
self.assertOptions([('file', 'bazaar', 'DEFAULT', 'bazaar')],
3970
def test_option_in_locations(self):
3971
self.locations_config.set_user_option('file', 'locations')
3973
[('file', 'locations', self.tree.basedir, 'locations')],
3974
self.locations_config)
3976
def test_option_in_branch(self):
3977
self.branch_config.set_user_option('file', 'branch')
3978
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
3981
def test_option_in_bazaar_and_branch(self):
3982
self.bazaar_config.set_user_option('file', 'bazaar')
3983
self.branch_config.set_user_option('file', 'branch')
3984
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
3985
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
3988
def test_option_in_branch_and_locations(self):
3989
# Hmm, locations override branch :-/
3990
self.locations_config.set_user_option('file', 'locations')
3991
self.branch_config.set_user_option('file', 'branch')
3993
[('file', 'locations', self.tree.basedir, 'locations'),
3994
('file', 'branch', 'DEFAULT', 'branch'),],
3997
def test_option_in_bazaar_locations_and_branch(self):
3998
self.bazaar_config.set_user_option('file', 'bazaar')
3999
self.locations_config.set_user_option('file', 'locations')
4000
self.branch_config.set_user_option('file', 'branch')
4002
[('file', 'locations', self.tree.basedir, 'locations'),
4003
('file', 'branch', 'DEFAULT', 'branch'),
4004
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4008
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4011
super(TestConfigRemoveOption, self).setUp()
4012
create_configs_with_file_option(self)
4014
def test_remove_in_locations(self):
4015
self.locations_config.remove_user_option('file', self.tree.basedir)
4017
[('file', 'branch', 'DEFAULT', 'branch'),
4018
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4021
def test_remove_in_branch(self):
4022
self.branch_config.remove_user_option('file')
4024
[('file', 'locations', self.tree.basedir, 'locations'),
4025
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4028
def test_remove_in_bazaar(self):
4029
self.bazaar_config.remove_user_option('file')
4031
[('file', 'locations', self.tree.basedir, 'locations'),
4032
('file', 'branch', 'DEFAULT', 'branch'),],
4036
class TestConfigGetSections(tests.TestCaseWithTransport):
4039
super(TestConfigGetSections, self).setUp()
4040
create_configs(self)
4042
def assertSectionNames(self, expected, conf, name=None):
4043
"""Check which sections are returned for a given config.
4045
If fallback configurations exist their sections can be included.
4047
:param expected: A list of section names.
4049
:param conf: The configuration that will be queried.
4051
:param name: An optional section name that will be passed to
4054
sections = list(conf._get_sections(name))
4055
self.assertLength(len(expected), sections)
4056
self.assertEqual(expected, [n for n, _, _ in sections])
4058
def test_bazaar_default_section(self):
4059
self.assertSectionNames(['DEFAULT'], self.bazaar_config)
4061
def test_locations_default_section(self):
4062
# No sections are defined in an empty file
4063
self.assertSectionNames([], self.locations_config)
4065
def test_locations_named_section(self):
4066
self.locations_config.set_user_option('file', 'locations')
4067
self.assertSectionNames([self.tree.basedir], self.locations_config)
4069
def test_locations_matching_sections(self):
4070
loc_config = self.locations_config
4071
loc_config.set_user_option('file', 'locations')
4072
# We need to cheat a bit here to create an option in sections above and
4073
# below the 'location' one.
4074
parser = loc_config._get_parser()
4075
# locations.cong deals with '/' ignoring native os.sep
4076
location_names = self.tree.basedir.split('/')
4077
parent = '/'.join(location_names[:-1])
4078
child = '/'.join(location_names + ['child'])
4080
parser[parent]['file'] = 'parent'
4082
parser[child]['file'] = 'child'
4083
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4085
def test_branch_data_default_section(self):
4086
self.assertSectionNames([None],
4087
self.branch_config._get_branch_data_config())
4089
def test_branch_default_sections(self):
4090
# No sections are defined in an empty locations file
4091
self.assertSectionNames([None, 'DEFAULT'],
4093
# Unless we define an option
4094
self.branch_config._get_location_config().set_user_option(
4095
'file', 'locations')
4096
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4099
def test_bazaar_named_section(self):
4100
# We need to cheat as the API doesn't give direct access to sections
4101
# other than DEFAULT.
4102
self.bazaar_config.set_alias('bazaar', 'bzr')
4103
self.assertSectionNames(['ALIASES'], self.bazaar_config, 'ALIASES')
4106
class TestSharedStores(tests.TestCaseInTempDir):
4108
def test_bazaar_conf_shared(self):
4109
g1 = config.GlobalStack()
4110
g2 = config.GlobalStack()
4111
# The two stacks share the same store
4112
self.assertIs(g1.store, g2.store)
4115
class TestAuthenticationConfigFile(tests.TestCase):
4116
"""Test the authentication.conf file matching"""
4118
def _got_user_passwd(self, expected_user, expected_password,
4119
config, *args, **kwargs):
4120
credentials = config.get_credentials(*args, **kwargs)
4121
if credentials is None:
4125
user = credentials['user']
4126
password = credentials['password']
4127
self.assertEqual(expected_user, user)
4128
self.assertEqual(expected_password, password)
4130
def test_empty_config(self):
4131
conf = config.AuthenticationConfig(_file=BytesIO())
4132
self.assertEqual({}, conf._get_config())
4133
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4135
def test_non_utf8_config(self):
4136
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
4137
self.assertRaises(errors.ConfigContentError, conf._get_config)
4139
def test_missing_auth_section_header(self):
4140
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
4141
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4143
def test_auth_section_header_not_closed(self):
4144
conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
4145
self.assertRaises(errors.ParseConfigError, conf._get_config)
4147
def test_auth_value_not_boolean(self):
4148
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4152
verify_certificates=askme # Error: Not a boolean
4154
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4156
def test_auth_value_not_int(self):
4157
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4161
port=port # Error: Not an int
4163
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4165
def test_unknown_password_encoding(self):
4166
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4170
password_encoding=unknown
4172
self.assertRaises(ValueError, conf.get_password,
4173
'ftp', 'foo.net', 'joe')
4175
def test_credentials_for_scheme_host(self):
4176
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4177
# Identity on foo.net
4182
password=secret-pass
4185
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
4187
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4189
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
4191
def test_credentials_for_host_port(self):
4192
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4193
# Identity on foo.net
4199
password=secret-pass
4202
self._got_user_passwd('joe', 'secret-pass',
4203
conf, 'ftp', 'foo.net', port=10021)
4205
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
4207
def test_for_matching_host(self):
4208
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4209
# Identity on foo.net
4215
[sourceforge domain]
4222
self._got_user_passwd('georges', 'bendover',
4223
conf, 'bzr', 'foo.bzr.sf.net')
4225
self._got_user_passwd(None, None,
4226
conf, 'bzr', 'bbzr.sf.net')
4228
def test_for_matching_host_None(self):
4229
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4230
# Identity on foo.net
4240
self._got_user_passwd('joe', 'joepass',
4241
conf, 'bzr', 'quux.net')
4242
# no host but different scheme
4243
self._got_user_passwd('georges', 'bendover',
4244
conf, 'ftp', 'quux.net')
4246
def test_credentials_for_path(self):
4247
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4262
self._got_user_passwd(None, None,
4263
conf, 'http', host='bar.org', path='/dir3')
4265
self._got_user_passwd('georges', 'bendover',
4266
conf, 'http', host='bar.org', path='/dir2')
4268
self._got_user_passwd('jim', 'jimpass',
4269
conf, 'http', host='bar.org',path='/dir1/subdir')
4271
def test_credentials_for_user(self):
4272
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4280
self._got_user_passwd('jim', 'jimpass',
4281
conf, 'http', 'bar.org')
4283
self._got_user_passwd('jim', 'jimpass',
4284
conf, 'http', 'bar.org', user='jim')
4285
# Don't get a different user if one is specified
4286
self._got_user_passwd(None, None,
4287
conf, 'http', 'bar.org', user='georges')
4289
def test_credentials_for_user_without_password(self):
4290
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4296
# Get user but no password
4297
self._got_user_passwd('jim', None,
4298
conf, 'http', 'bar.org')
4300
def test_verify_certificates(self):
4301
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4307
verify_certificates=False
4314
credentials = conf.get_credentials('https', 'bar.org')
4315
self.assertEqual(False, credentials.get('verify_certificates'))
4316
credentials = conf.get_credentials('https', 'foo.net')
4317
self.assertEqual(True, credentials.get('verify_certificates'))
4320
class TestAuthenticationStorage(tests.TestCaseInTempDir):
4322
def test_set_credentials(self):
4323
conf = config.AuthenticationConfig()
4324
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4325
99, path='/foo', verify_certificates=False, realm='realm')
4326
credentials = conf.get_credentials(host='host', scheme='scheme',
4327
port=99, path='/foo',
4329
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
4330
'verify_certificates': False, 'scheme': 'scheme',
4331
'host': 'host', 'port': 99, 'path': '/foo',
4333
self.assertEqual(CREDENTIALS, credentials)
4334
credentials_from_disk = config.AuthenticationConfig().get_credentials(
4335
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
4336
self.assertEqual(CREDENTIALS, credentials_from_disk)
4338
def test_reset_credentials_different_name(self):
4339
conf = config.AuthenticationConfig()
4340
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
4341
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
4342
self.assertIs(None, conf._get_config().get('name'))
4343
credentials = conf.get_credentials(host='host', scheme='scheme')
4344
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
4345
'password', 'verify_certificates': True,
4346
'scheme': 'scheme', 'host': 'host', 'port': None,
4347
'path': None, 'realm': None}
4348
self.assertEqual(CREDENTIALS, credentials)
4351
class TestAuthenticationConfig(tests.TestCase):
4352
"""Test AuthenticationConfig behaviour"""
4354
def _check_default_password_prompt(self, expected_prompt_format, scheme,
4355
host=None, port=None, realm=None,
4359
user, password = 'jim', 'precious'
4360
expected_prompt = expected_prompt_format % {
4361
'scheme': scheme, 'host': host, 'port': port,
4362
'user': user, 'realm': realm}
4364
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
4365
# We use an empty conf so that the user is always prompted
4366
conf = config.AuthenticationConfig()
4367
self.assertEqual(password,
4368
conf.get_password(scheme, host, user, port=port,
4369
realm=realm, path=path))
4370
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4371
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4373
def _check_default_username_prompt(self, expected_prompt_format, scheme,
4374
host=None, port=None, realm=None,
4379
expected_prompt = expected_prompt_format % {
4380
'scheme': scheme, 'host': host, 'port': port,
4382
ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n')
4383
# We use an empty conf so that the user is always prompted
4384
conf = config.AuthenticationConfig()
4385
self.assertEqual(username, conf.get_user(scheme, host, port=port,
4386
realm=realm, path=path, ask=True))
4387
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4388
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4390
def test_username_defaults_prompts(self):
4391
# HTTP prompts can't be tested here, see test_http.py
4392
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
4393
self._check_default_username_prompt(
4394
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
4395
self._check_default_username_prompt(
4396
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
4398
def test_username_default_no_prompt(self):
4399
conf = config.AuthenticationConfig()
4400
self.assertEqual(None,
4401
conf.get_user('ftp', 'example.com'))
4402
self.assertEqual("explicitdefault",
4403
conf.get_user('ftp', 'example.com', default="explicitdefault"))
4405
def test_password_default_prompts(self):
4406
# HTTP prompts can't be tested here, see test_http.py
4407
self._check_default_password_prompt(
4408
u'FTP %(user)s@%(host)s password: ', 'ftp')
4409
self._check_default_password_prompt(
4410
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
4411
self._check_default_password_prompt(
4412
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
4413
# SMTP port handling is a bit special (it's handled if embedded in the
4415
# FIXME: should we: forbid that, extend it to other schemes, leave
4416
# things as they are that's fine thank you ?
4417
self._check_default_password_prompt(
4418
u'SMTP %(user)s@%(host)s password: ', 'smtp')
4419
self._check_default_password_prompt(
4420
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
4421
self._check_default_password_prompt(
4422
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
4424
def test_ssh_password_emits_warning(self):
4425
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4432
entered_password = 'typed-by-hand'
4433
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4435
# Since the password defined in the authentication config is ignored,
4436
# the user is prompted
4437
self.assertEqual(entered_password,
4438
conf.get_password('ssh', 'bar.org', user='jim'))
4439
self.assertContainsRe(
4441
'password ignored in section \[ssh with password\]')
4443
def test_ssh_without_password_doesnt_emit_warning(self):
4444
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4450
entered_password = 'typed-by-hand'
4451
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4453
# Since the password defined in the authentication config is ignored,
4454
# the user is prompted
4455
self.assertEqual(entered_password,
4456
conf.get_password('ssh', 'bar.org', user='jim'))
4457
# No warning shoud be emitted since there is no password. We are only
4459
self.assertNotContainsRe(
4461
'password ignored in section \[ssh with password\]')
4463
def test_uses_fallback_stores(self):
4464
self.overrideAttr(config, 'credential_store_registry',
4465
config.CredentialStoreRegistry())
4466
store = StubCredentialStore()
4467
store.add_credentials("http", "example.com", "joe", "secret")
4468
config.credential_store_registry.register("stub", store, fallback=True)
4469
conf = config.AuthenticationConfig(_file=BytesIO())
4470
creds = conf.get_credentials("http", "example.com")
4471
self.assertEqual("joe", creds["user"])
4472
self.assertEqual("secret", creds["password"])
4475
class StubCredentialStore(config.CredentialStore):
4481
def add_credentials(self, scheme, host, user, password=None):
4482
self._username[(scheme, host)] = user
4483
self._password[(scheme, host)] = password
4485
def get_credentials(self, scheme, host, port=None, user=None,
4486
path=None, realm=None):
4487
key = (scheme, host)
4488
if not key in self._username:
4490
return { "scheme": scheme, "host": host, "port": port,
4491
"user": self._username[key], "password": self._password[key]}
4494
class CountingCredentialStore(config.CredentialStore):
4499
def get_credentials(self, scheme, host, port=None, user=None,
4500
path=None, realm=None):
4505
class TestCredentialStoreRegistry(tests.TestCase):
4507
def _get_cs_registry(self):
4508
return config.credential_store_registry
4510
def test_default_credential_store(self):
4511
r = self._get_cs_registry()
4512
default = r.get_credential_store(None)
4513
self.assertIsInstance(default, config.PlainTextCredentialStore)
4515
def test_unknown_credential_store(self):
4516
r = self._get_cs_registry()
4517
# It's hard to imagine someone creating a credential store named
4518
# 'unknown' so we use that as an never registered key.
4519
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
4521
def test_fallback_none_registered(self):
4522
r = config.CredentialStoreRegistry()
4523
self.assertEqual(None,
4524
r.get_fallback_credentials("http", "example.com"))
4526
def test_register(self):
4527
r = config.CredentialStoreRegistry()
4528
r.register("stub", StubCredentialStore(), fallback=False)
4529
r.register("another", StubCredentialStore(), fallback=True)
4530
self.assertEqual(["another", "stub"], r.keys())
4532
def test_register_lazy(self):
4533
r = config.CredentialStoreRegistry()
4534
r.register_lazy("stub", "breezy.tests.test_config",
4535
"StubCredentialStore", fallback=False)
4536
self.assertEqual(["stub"], r.keys())
4537
self.assertIsInstance(r.get_credential_store("stub"),
4538
StubCredentialStore)
4540
def test_is_fallback(self):
4541
r = config.CredentialStoreRegistry()
4542
r.register("stub1", None, fallback=False)
4543
r.register("stub2", None, fallback=True)
4544
self.assertEqual(False, r.is_fallback("stub1"))
4545
self.assertEqual(True, r.is_fallback("stub2"))
4547
def test_no_fallback(self):
4548
r = config.CredentialStoreRegistry()
4549
store = CountingCredentialStore()
4550
r.register("count", store, fallback=False)
4551
self.assertEqual(None,
4552
r.get_fallback_credentials("http", "example.com"))
4553
self.assertEqual(0, store._calls)
4555
def test_fallback_credentials(self):
4556
r = config.CredentialStoreRegistry()
4557
store = StubCredentialStore()
4558
store.add_credentials("http", "example.com",
4559
"somebody", "geheim")
4560
r.register("stub", store, fallback=True)
4561
creds = r.get_fallback_credentials("http", "example.com")
4562
self.assertEqual("somebody", creds["user"])
4563
self.assertEqual("geheim", creds["password"])
4565
def test_fallback_first_wins(self):
4566
r = config.CredentialStoreRegistry()
4567
stub1 = StubCredentialStore()
4568
stub1.add_credentials("http", "example.com",
4569
"somebody", "stub1")
4570
r.register("stub1", stub1, fallback=True)
4571
stub2 = StubCredentialStore()
4572
stub2.add_credentials("http", "example.com",
4573
"somebody", "stub2")
4574
r.register("stub2", stub1, fallback=True)
4575
creds = r.get_fallback_credentials("http", "example.com")
4576
self.assertEqual("somebody", creds["user"])
4577
self.assertEqual("stub1", creds["password"])
4580
class TestPlainTextCredentialStore(tests.TestCase):
4582
def test_decode_password(self):
4583
r = config.credential_store_registry
4584
plain_text = r.get_credential_store()
4585
decoded = plain_text.decode_password(dict(password='secret'))
4586
self.assertEqual('secret', decoded)
4589
class TestBase64CredentialStore(tests.TestCase):
4591
def test_decode_password(self):
4592
r = config.credential_store_registry
4593
plain_text = r.get_credential_store('base64')
4594
decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
4595
self.assertEqual('secret', decoded)
4598
# FIXME: Once we have a way to declare authentication to all test servers, we
4599
# can implement generic tests.
4600
# test_user_password_in_url
4601
# test_user_in_url_password_from_config
4602
# test_user_in_url_password_prompted
4603
# test_user_in_config
4604
# test_user_getpass.getuser
4605
# test_user_prompted ?
4606
class TestAuthenticationRing(tests.TestCaseWithTransport):
4610
class TestAutoUserId(tests.TestCase):
4611
"""Test inferring an automatic user name."""
4613
def test_auto_user_id(self):
4614
"""Automatic inference of user name.
4616
This is a bit hard to test in an isolated way, because it depends on
4617
system functions that go direct to /etc or perhaps somewhere else.
4618
But it's reasonable to say that on Unix, with an /etc/mailname, we ought
4619
to be able to choose a user name with no configuration.
4621
if sys.platform == 'win32':
4622
raise tests.TestSkipped(
4623
"User name inference not implemented on win32")
4624
realname, address = config._auto_user_id()
4625
if os.path.exists('/etc/mailname'):
4626
self.assertIsNot(None, realname)
4627
self.assertIsNot(None, address)
4629
self.assertEqual((None, None), (realname, address))
4632
class TestDefaultMailDomain(tests.TestCaseInTempDir):
4633
"""Test retrieving default domain from mailname file"""
4635
def test_default_mail_domain_simple(self):
4636
f = file('simple', 'w')
4638
f.write("domainname.com\n")
4641
r = config._get_default_mail_domain('simple')
4642
self.assertEqual('domainname.com', r)
4644
def test_default_mail_domain_no_eol(self):
4645
f = file('no_eol', 'w')
4647
f.write("domainname.com")
4650
r = config._get_default_mail_domain('no_eol')
4651
self.assertEqual('domainname.com', r)
4653
def test_default_mail_domain_multiple_lines(self):
4654
f = file('multiple_lines', 'w')
4656
f.write("domainname.com\nsome other text\n")
4659
r = config._get_default_mail_domain('multiple_lines')
4660
self.assertEqual('domainname.com', r)
4663
class EmailOptionTests(tests.TestCase):
4665
def test_default_email_uses_BRZ_EMAIL(self):
4666
conf = config.MemoryStack('email=jelmer@debian.org')
4667
# BRZ_EMAIL takes precedence over EMAIL
4668
self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
4669
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4670
self.assertEqual('jelmer@samba.org', conf.get('email'))
4672
def test_default_email_uses_EMAIL(self):
4673
conf = config.MemoryStack('')
4674
self.overrideEnv('BRZ_EMAIL', None)
4675
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4676
self.assertEqual('jelmer@apache.org', conf.get('email'))
4678
def test_BRZ_EMAIL_overrides(self):
4679
conf = config.MemoryStack('email=jelmer@debian.org')
4680
self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
4681
self.assertEqual('jelmer@apache.org', conf.get('email'))
4682
self.overrideEnv('BRZ_EMAIL', None)
4683
self.overrideEnv('EMAIL', 'jelmer@samba.org')
4684
self.assertEqual('jelmer@debian.org', conf.get('email'))
4687
class MailClientOptionTests(tests.TestCase):
4689
def test_default(self):
4690
conf = config.MemoryStack('')
4691
client = conf.get('mail_client')
4692
self.assertIs(client, mail_client.DefaultMail)
4694
def test_evolution(self):
4695
conf = config.MemoryStack('mail_client=evolution')
4696
client = conf.get('mail_client')
4697
self.assertIs(client, mail_client.Evolution)
4699
def test_kmail(self):
4700
conf = config.MemoryStack('mail_client=kmail')
4701
client = conf.get('mail_client')
4702
self.assertIs(client, mail_client.KMail)
4704
def test_mutt(self):
4705
conf = config.MemoryStack('mail_client=mutt')
4706
client = conf.get('mail_client')
4707
self.assertIs(client, mail_client.Mutt)
4709
def test_thunderbird(self):
4710
conf = config.MemoryStack('mail_client=thunderbird')
4711
client = conf.get('mail_client')
4712
self.assertIs(client, mail_client.Thunderbird)
4714
def test_explicit_default(self):
4715
conf = config.MemoryStack('mail_client=default')
4716
client = conf.get('mail_client')
4717
self.assertIs(client, mail_client.DefaultMail)
4719
def test_editor(self):
4720
conf = config.MemoryStack('mail_client=editor')
4721
client = conf.get('mail_client')
4722
self.assertIs(client, mail_client.Editor)
4724
def test_mapi(self):
4725
conf = config.MemoryStack('mail_client=mapi')
4726
client = conf.get('mail_client')
4727
self.assertIs(client, mail_client.MAPIClient)
4729
def test_xdg_email(self):
4730
conf = config.MemoryStack('mail_client=xdg-email')
4731
client = conf.get('mail_client')
4732
self.assertIs(client, mail_client.XDGEmail)
4734
def test_unknown(self):
4735
conf = config.MemoryStack('mail_client=firebird')
4736
self.assertRaises(errors.ConfigOptionValueError, conf.get,