1
# Copyright (C) 2005-2014, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for finding and reading the bzr config file[s]."""
19
from textwrap import dedent
25
from testtools import matchers
38
registry as _mod_registry,
45
from ..sixish import (
48
from ..transport import remote as transport_remote
56
def lockable_config_scenarios():
59
{'config_class': config.GlobalConfig,
61
'config_section': 'DEFAULT'}),
63
{'config_class': config.LocationConfig,
65
'config_section': '.'}),]
68
load_tests = scenarios.load_tests_apply_scenarios
70
# Register helpers to build stores
71
config.test_store_builder_registry.register(
72
'configobj', lambda test: config.TransportIniFileStore(
73
test.get_transport(), 'configobj.conf'))
74
config.test_store_builder_registry.register(
75
'breezy', lambda test: config.GlobalStore())
76
config.test_store_builder_registry.register(
77
'location', lambda test: config.LocationStore())
80
def build_backing_branch(test, relpath,
81
transport_class=None, server_class=None):
82
"""Test helper to create a backing branch only once.
84
Some tests needs multiple stores/stacks to check concurrent update
85
behaviours. As such, they need to build different branch *objects* even if
86
they share the branch on disk.
88
:param relpath: The relative path to the branch. (Note that the helper
89
should always specify the same relpath).
91
:param transport_class: The Transport class the test needs to use.
93
:param server_class: The server associated with the ``transport_class``
96
Either both or neither of ``transport_class`` and ``server_class`` should
99
if transport_class is not None and server_class is not None:
100
test.transport_class = transport_class
101
test.transport_server = server_class
102
elif not (transport_class is None and server_class is None):
103
raise AssertionError('Specify both ``transport_class`` and '
104
'``server_class`` or neither of them')
105
if getattr(test, 'backing_branch', None) is None:
106
# First call, let's build the branch on disk
107
test.backing_branch = test.make_branch(relpath)
110
def build_branch_store(test):
111
build_backing_branch(test, 'branch')
112
b = branch.Branch.open('branch')
113
return config.BranchStore(b)
114
config.test_store_builder_registry.register('branch', build_branch_store)
117
def build_control_store(test):
118
build_backing_branch(test, 'branch')
119
b = controldir.ControlDir.open('branch')
120
return config.ControlStore(b)
121
config.test_store_builder_registry.register('control', build_control_store)
124
def build_remote_branch_store(test):
125
# There is only one permutation (but we won't be able to handle more with
126
# this design anyway)
128
server_class) = transport_remote.get_test_permutations()[0]
129
build_backing_branch(test, 'branch', transport_class, server_class)
130
b = branch.Branch.open(test.get_url('branch'))
131
return config.BranchStore(b)
132
config.test_store_builder_registry.register('remote_branch',
133
build_remote_branch_store)
136
config.test_stack_builder_registry.register(
137
'breezy', lambda test: config.GlobalStack())
138
config.test_stack_builder_registry.register(
139
'location', lambda test: config.LocationStack('.'))
142
def build_branch_stack(test):
143
build_backing_branch(test, 'branch')
144
b = branch.Branch.open('branch')
145
return config.BranchStack(b)
146
config.test_stack_builder_registry.register('branch', build_branch_stack)
149
def build_branch_only_stack(test):
150
# There is only one permutation (but we won't be able to handle more with
151
# this design anyway)
153
server_class) = transport_remote.get_test_permutations()[0]
154
build_backing_branch(test, 'branch', transport_class, server_class)
155
b = branch.Branch.open(test.get_url('branch'))
156
return config.BranchOnlyStack(b)
157
config.test_stack_builder_registry.register('branch_only',
158
build_branch_only_stack)
160
def build_remote_control_stack(test):
161
# There is only one permutation (but we won't be able to handle more with
162
# this design anyway)
164
server_class) = transport_remote.get_test_permutations()[0]
165
# We need only a bzrdir for this, not a full branch, but it's not worth
166
# creating a dedicated helper to create only the bzrdir
167
build_backing_branch(test, 'branch', transport_class, server_class)
168
b = branch.Branch.open(test.get_url('branch'))
169
return config.RemoteControlStack(b.controldir)
170
config.test_stack_builder_registry.register('remote_control',
171
build_remote_control_stack)
174
sample_long_alias="log -r-15..-1 --line"
175
sample_config_text = u"""
177
email=Erik B\u00e5gfors <erik@bagfors.nu>
179
change_editor=vimdiff -of @new_path @old_path
180
gpg_signing_key=DD4D5088
182
validate_signatures_in_log=true
184
user_global_option=something
185
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
186
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
187
bzr.mergetool.newtool='"newtool with spaces" {this_temp}'
188
bzr.default_mergetool=sometool
191
ll=""" + sample_long_alias + "\n"
194
sample_always_signatures = """
196
check_signatures=ignore
197
create_signatures=always
200
sample_ignore_signatures = """
202
check_signatures=require
203
create_signatures=never
206
sample_maybe_signatures = """
208
check_signatures=ignore
209
create_signatures=when-required
212
sample_branches_text = """
213
[http://www.example.com]
215
email=Robert Collins <robertc@example.org>
216
normal_option = normal
217
appendpath_option = append
218
appendpath_option:policy = appendpath
219
norecurse_option = norecurse
220
norecurse_option:policy = norecurse
221
[http://www.example.com/ignoreparent]
222
# different project: ignore parent dir config
224
[http://www.example.com/norecurse]
225
# configuration items that only apply to this dir
227
normal_option = norecurse
228
[http://www.example.com/dir]
229
appendpath_option = normal
231
check_signatures=require
232
# test trailing / matching with no children
234
check_signatures=check-available
235
gpg_signing_key=default
236
user_local_option=local
237
# test trailing / matching
239
#subdirs will match but not the parent
241
check_signatures=ignore
242
post_commit=breezy.tests.test_config.post_commit
243
#testing explicit beats globs
247
def create_configs(test):
248
"""Create configuration files for a given test.
250
This requires creating a tree (and populate the ``test.tree`` attribute)
251
and its associated branch and will populate the following attributes:
253
- branch_config: A BranchConfig for the associated branch.
255
- locations_config : A LocationConfig for the associated branch
257
- breezy_config: A GlobalConfig.
259
The tree and branch are created in a 'tree' subdirectory so the tests can
260
still use the test directory to stay outside of the branch.
262
tree = test.make_branch_and_tree('tree')
264
test.branch_config = config.BranchConfig(tree.branch)
265
test.locations_config = config.LocationConfig(tree.basedir)
266
test.breezy_config = config.GlobalConfig()
269
def create_configs_with_file_option(test):
270
"""Create configuration files with a ``file`` option set in each.
272
This builds on ``create_configs`` and add one ``file`` option in each
273
configuration with a value which allows identifying the configuration file.
276
test.breezy_config.set_user_option('file', 'breezy')
277
test.locations_config.set_user_option('file', 'locations')
278
test.branch_config.set_user_option('file', 'branch')
281
class TestOptionsMixin:
283
def assertOptions(self, expected, conf):
284
# We don't care about the parser (as it will make tests hard to write
285
# and error-prone anyway)
286
self.assertThat([opt[:4] for opt in conf._get_options()],
287
matchers.Equals(expected))
290
class InstrumentedConfigObj(object):
291
"""A config obj look-enough-alike to record calls made to it."""
293
def __contains__(self, thing):
294
self._calls.append(('__contains__', thing))
297
def __getitem__(self, key):
298
self._calls.append(('__getitem__', key))
301
def __init__(self, input, encoding=None):
302
self._calls = [('__init__', input, encoding)]
304
def __setitem__(self, key, value):
305
self._calls.append(('__setitem__', key, value))
307
def __delitem__(self, key):
308
self._calls.append(('__delitem__', key))
311
self._calls.append(('keys',))
315
self._calls.append(('reload',))
317
def write(self, arg):
318
self._calls.append(('write',))
320
def as_bool(self, value):
321
self._calls.append(('as_bool', value))
324
def get_value(self, section, name):
325
self._calls.append(('get_value', section, name))
329
class FakeBranch(object):
331
def __init__(self, base=None):
333
self.base = "http://example.com/branches/demo"
336
self._transport = self.control_files = \
337
FakeControlFilesAndTransport()
339
def _get_config(self):
340
return config.TransportConfig(self._transport, 'branch.conf')
342
def lock_write(self):
343
return lock.LogicalLockResult(self.unlock)
349
class FakeControlFilesAndTransport(object):
353
self._transport = self
355
def get(self, filename):
358
return BytesIO(self.files[filename])
360
raise errors.NoSuchFile(filename)
362
def get_bytes(self, filename):
365
return self.files[filename]
367
raise errors.NoSuchFile(filename)
369
def put(self, filename, fileobj):
370
self.files[filename] = fileobj.read()
372
def put_file(self, filename, fileobj):
373
return self.put(filename, fileobj)
376
class InstrumentedConfig(config.Config):
377
"""An instrumented config that supplies stubs for template methods."""
380
super(InstrumentedConfig, self).__init__()
382
self._signatures = config.CHECK_NEVER
384
def _get_user_id(self):
385
self._calls.append('_get_user_id')
386
return "Robert Collins <robert.collins@example.org>"
388
def _get_signature_checking(self):
389
self._calls.append('_get_signature_checking')
390
return self._signatures
392
def _get_change_editor(self):
393
self._calls.append('_get_change_editor')
394
return 'vimdiff -fo @new_path @old_path'
397
bool_config = """[DEFAULT]
406
class TestConfigObj(tests.TestCase):
408
def test_get_bool(self):
409
co = config.ConfigObj(BytesIO(bool_config))
410
self.assertIs(co.get_bool('DEFAULT', 'active'), True)
411
self.assertIs(co.get_bool('DEFAULT', 'inactive'), False)
412
self.assertIs(co.get_bool('UPPERCASE', 'active'), True)
413
self.assertIs(co.get_bool('UPPERCASE', 'nonactive'), False)
415
def test_hash_sign_in_value(self):
417
Before 4.5.0, ConfigObj did not quote # signs in values, so they'd be
418
treated as comments when read in again. (#86838)
420
co = config.ConfigObj()
421
co['test'] = 'foo#bar'
423
co.write(outfile=outfile)
424
lines = outfile.getvalue().splitlines()
425
self.assertEqual(lines, ['test = "foo#bar"'])
426
co2 = config.ConfigObj(lines)
427
self.assertEqual(co2['test'], 'foo#bar')
429
def test_triple_quotes(self):
430
# Bug #710410: if the value string has triple quotes
431
# then ConfigObj versions up to 4.7.2 will quote them wrong
432
# and won't able to read them back
433
triple_quotes_value = '''spam
434
""" that's my spam """
436
co = config.ConfigObj()
437
co['test'] = triple_quotes_value
438
# While writing this test another bug in ConfigObj has been found:
439
# method co.write() without arguments produces list of lines
440
# one option per line, and multiline values are not split
441
# across multiple lines,
442
# and that breaks the parsing these lines back by ConfigObj.
443
# This issue only affects test, but it's better to avoid
444
# `co.write()` construct at all.
445
# [bialix 20110222] bug report sent to ConfigObj's author
447
co.write(outfile=outfile)
448
output = outfile.getvalue()
449
# now we're trying to read it back
450
co2 = config.ConfigObj(BytesIO(output))
451
self.assertEqual(triple_quotes_value, co2['test'])
454
erroneous_config = """[section] # line 1
457
whocares=notme # line 4
461
class TestConfigObjErrors(tests.TestCase):
463
def test_duplicate_section_name_error_line(self):
465
co = configobj.ConfigObj(BytesIO(erroneous_config),
467
except config.configobj.DuplicateError as e:
468
self.assertEqual(3, e.line_number)
470
self.fail('Error in config file not detected')
473
class TestConfig(tests.TestCase):
475
def test_constructs(self):
478
def test_user_email(self):
479
my_config = InstrumentedConfig()
480
self.assertEqual('robert.collins@example.org', my_config.user_email())
481
self.assertEqual(['_get_user_id'], my_config._calls)
483
def test_username(self):
484
my_config = InstrumentedConfig()
485
self.assertEqual('Robert Collins <robert.collins@example.org>',
486
my_config.username())
487
self.assertEqual(['_get_user_id'], my_config._calls)
489
def test_get_user_option_default(self):
490
my_config = config.Config()
491
self.assertEqual(None, my_config.get_user_option('no_option'))
493
def test_validate_signatures_in_log_default(self):
494
my_config = config.Config()
495
self.assertEqual(False, my_config.validate_signatures_in_log())
497
def test_get_change_editor(self):
498
my_config = InstrumentedConfig()
499
change_editor = my_config.get_change_editor('old_tree', 'new_tree')
500
self.assertEqual(['_get_change_editor'], my_config._calls)
501
self.assertIs(diff.DiffFromTool, change_editor.__class__)
502
self.assertEqual(['vimdiff', '-fo', '@new_path', '@old_path'],
503
change_editor.command_template)
506
class TestConfigPath(tests.TestCase):
509
super(TestConfigPath, self).setUp()
510
self.overrideEnv('HOME', '/home/bogus')
511
self.overrideEnv('XDG_CACHE_HOME', '')
512
if sys.platform == 'win32':
515
r'C:\Documents and Settings\bogus\Application Data')
517
'C:/Documents and Settings/bogus/Application Data/breezy'
519
self.brz_home = '/home/bogus/.config/breezy'
521
def test_config_dir(self):
522
self.assertEqual(config.config_dir(), self.brz_home)
524
def test_config_dir_is_unicode(self):
525
self.assertIsInstance(config.config_dir(), unicode)
527
def test_config_filename(self):
528
self.assertEqual(config.config_filename(),
529
self.brz_home + '/breezy.conf')
531
def test_locations_config_filename(self):
532
self.assertEqual(config.locations_config_filename(),
533
self.brz_home + '/locations.conf')
535
def test_authentication_config_filename(self):
536
self.assertEqual(config.authentication_config_filename(),
537
self.brz_home + '/authentication.conf')
539
def test_xdg_cache_dir(self):
540
self.assertEqual(config.xdg_cache_dir(),
541
'/home/bogus/.cache')
544
class TestConfigPathFallback(tests.TestCaseInTempDir):
547
super(TestConfigPathFallback, self).setUp()
548
self.overrideEnv('HOME', self.test_dir)
549
self.overrideEnv('XDG_CACHE_HOME', '')
550
self.bzr_home = os.path.join(self.test_dir, '.bazaar')
551
os.mkdir(self.bzr_home)
553
def test_config_dir(self):
554
self.assertEqual(config.config_dir(), self.bzr_home)
556
def test_config_dir_is_unicode(self):
557
self.assertIsInstance(config.config_dir(), unicode)
559
def test_config_filename(self):
560
self.assertEqual(config.config_filename(),
561
self.bzr_home + '/bazaar.conf')
563
def test_locations_config_filename(self):
564
self.assertEqual(config.locations_config_filename(),
565
self.bzr_home + '/locations.conf')
567
def test_authentication_config_filename(self):
568
self.assertEqual(config.authentication_config_filename(),
569
self.bzr_home + '/authentication.conf')
571
def test_xdg_cache_dir(self):
572
self.assertEqual(config.xdg_cache_dir(),
573
os.path.join(self.test_dir, '.cache'))
576
class TestXDGConfigDir(tests.TestCaseInTempDir):
577
# must be in temp dir because config tests for the existence of the bazaar
578
# subdirectory of $XDG_CONFIG_HOME
581
if sys.platform == 'win32':
582
raise tests.TestNotApplicable(
583
'XDG config dir not used on this platform')
584
super(TestXDGConfigDir, self).setUp()
585
self.overrideEnv('HOME', self.test_home_dir)
586
# BRZ_HOME overrides everything we want to test so unset it.
587
self.overrideEnv('BRZ_HOME', None)
589
def test_xdg_config_dir_exists(self):
590
"""When ~/.config/bazaar exists, use it as the config dir."""
591
newdir = osutils.pathjoin(self.test_home_dir, '.config', 'bazaar')
593
self.assertEqual(config.config_dir(), newdir)
595
def test_xdg_config_home(self):
596
"""When XDG_CONFIG_HOME is set, use it."""
597
xdgconfigdir = osutils.pathjoin(self.test_home_dir, 'xdgconfig')
598
self.overrideEnv('XDG_CONFIG_HOME', xdgconfigdir)
599
newdir = osutils.pathjoin(xdgconfigdir, 'bazaar')
601
self.assertEqual(config.config_dir(), newdir)
604
class TestIniConfig(tests.TestCaseInTempDir):
606
def make_config_parser(self, s):
607
conf = config.IniBasedConfig.from_string(s)
608
return conf, conf._get_parser()
611
class TestIniConfigBuilding(TestIniConfig):
613
def test_contructs(self):
614
config.IniBasedConfig()
616
def test_from_fp(self):
617
my_config = config.IniBasedConfig.from_string(sample_config_text)
618
self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
620
def test_cached(self):
621
my_config = config.IniBasedConfig.from_string(sample_config_text)
622
parser = my_config._get_parser()
623
self.assertTrue(my_config._get_parser() is parser)
625
def _dummy_chown(self, path, uid, gid):
626
self.path, self.uid, self.gid = path, uid, gid
628
def test_ini_config_ownership(self):
629
"""Ensure that chown is happening during _write_config_file"""
630
self.requireFeature(features.chown_feature)
631
self.overrideAttr(os, 'chown', self._dummy_chown)
632
self.path = self.uid = self.gid = None
633
conf = config.IniBasedConfig(file_name='./foo.conf')
634
conf._write_config_file()
635
self.assertEqual(self.path, './foo.conf')
636
self.assertTrue(isinstance(self.uid, int))
637
self.assertTrue(isinstance(self.gid, int))
640
class TestIniConfigSaving(tests.TestCaseInTempDir):
642
def test_cant_save_without_a_file_name(self):
643
conf = config.IniBasedConfig()
644
self.assertRaises(AssertionError, conf._write_config_file)
646
def test_saved_with_content(self):
647
content = 'foo = bar\n'
648
config.IniBasedConfig.from_string(content, file_name='./test.conf',
650
self.assertFileEqual(content, 'test.conf')
653
class TestIniConfigOptionExpansion(tests.TestCase):
654
"""Test option expansion from the IniConfig level.
656
What we really want here is to test the Config level, but the class being
657
abstract as far as storing values is concerned, this can't be done
660
# FIXME: This should be rewritten when all configs share a storage
661
# implementation -- vila 2011-02-18
663
def get_config(self, string=None):
666
c = config.IniBasedConfig.from_string(string)
669
def assertExpansion(self, expected, conf, string, env=None):
670
self.assertEqual(expected, conf.expand_options(string, env))
672
def test_no_expansion(self):
673
c = self.get_config('')
674
self.assertExpansion('foo', c, 'foo')
676
def test_env_adding_options(self):
677
c = self.get_config('')
678
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
680
def test_env_overriding_options(self):
681
c = self.get_config('foo=baz')
682
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
684
def test_simple_ref(self):
685
c = self.get_config('foo=xxx')
686
self.assertExpansion('xxx', c, '{foo}')
688
def test_unknown_ref(self):
689
c = self.get_config('')
690
self.assertRaises(config.ExpandingUnknownOption,
691
c.expand_options, '{foo}')
693
def test_indirect_ref(self):
694
c = self.get_config('''
698
self.assertExpansion('xxx', c, '{bar}')
700
def test_embedded_ref(self):
701
c = self.get_config('''
705
self.assertExpansion('xxx', c, '{{bar}}')
707
def test_simple_loop(self):
708
c = self.get_config('foo={foo}')
709
self.assertRaises(config.OptionExpansionLoop, c.expand_options,
712
def test_indirect_loop(self):
713
c = self.get_config('''
717
e = self.assertRaises(config.OptionExpansionLoop,
718
c.expand_options, '{foo}')
719
self.assertEqual('foo->bar->baz', e.refs)
720
self.assertEqual('{foo}', e.string)
723
conf = self.get_config('''
727
list={foo},{bar},{baz}
729
self.assertEqual(['start', 'middle', 'end'],
730
conf.get_user_option('list', expand=True))
732
def test_cascading_list(self):
733
conf = self.get_config('''
739
self.assertEqual(['start', 'middle', 'end'],
740
conf.get_user_option('list', expand=True))
742
def test_pathological_hidden_list(self):
743
conf = self.get_config('''
749
hidden={start}{middle}{end}
751
# Nope, it's either a string or a list, and the list wins as soon as a
752
# ',' appears, so the string concatenation never occur.
753
self.assertEqual(['{foo', '}', '{', 'bar}'],
754
conf.get_user_option('hidden', expand=True))
757
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
759
def get_config(self, location, string=None):
762
# Since we don't save the config we won't strictly require to inherit
763
# from TestCaseInTempDir, but an error occurs so quickly...
764
c = config.LocationConfig.from_string(string, location)
767
def test_dont_cross_unrelated_section(self):
768
c = self.get_config('/another/branch/path','''
773
[/another/branch/path]
776
self.assertRaises(config.ExpandingUnknownOption,
777
c.get_user_option, 'bar', expand=True)
779
def test_cross_related_sections(self):
780
c = self.get_config('/project/branch/path','''
784
[/project/branch/path]
787
self.assertEqual('quux', c.get_user_option('bar', expand=True))
790
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
792
def test_cannot_reload_without_name(self):
793
conf = config.IniBasedConfig.from_string(sample_config_text)
794
self.assertRaises(AssertionError, conf.reload)
796
def test_reload_see_new_value(self):
797
c1 = config.IniBasedConfig.from_string('editor=vim\n',
798
file_name='./test/conf')
799
c1._write_config_file()
800
c2 = config.IniBasedConfig.from_string('editor=emacs\n',
801
file_name='./test/conf')
802
c2._write_config_file()
803
self.assertEqual('vim', c1.get_user_option('editor'))
804
self.assertEqual('emacs', c2.get_user_option('editor'))
805
# Make sure we get the Right value
807
self.assertEqual('emacs', c1.get_user_option('editor'))
810
class TestLockableConfig(tests.TestCaseInTempDir):
812
scenarios = lockable_config_scenarios()
817
config_section = None
820
super(TestLockableConfig, self).setUp()
821
self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
822
self.config = self.create_config(self._content)
824
def get_existing_config(self):
825
return self.config_class(*self.config_args)
827
def create_config(self, content):
828
kwargs = dict(save=True)
829
c = self.config_class.from_string(content, *self.config_args, **kwargs)
832
def test_simple_read_access(self):
833
self.assertEqual('1', self.config.get_user_option('one'))
835
def test_simple_write_access(self):
836
self.config.set_user_option('one', 'one')
837
self.assertEqual('one', self.config.get_user_option('one'))
839
def test_listen_to_the_last_speaker(self):
841
c2 = self.get_existing_config()
842
c1.set_user_option('one', 'ONE')
843
c2.set_user_option('two', 'TWO')
844
self.assertEqual('ONE', c1.get_user_option('one'))
845
self.assertEqual('TWO', c2.get_user_option('two'))
846
# The second update respect the first one
847
self.assertEqual('ONE', c2.get_user_option('one'))
849
def test_last_speaker_wins(self):
850
# If the same config is not shared, the same variable modified twice
851
# can only see a single result.
853
c2 = self.get_existing_config()
854
c1.set_user_option('one', 'c1')
855
c2.set_user_option('one', 'c2')
856
self.assertEqual('c2', c2._get_user_option('one'))
857
# The first modification is still available until another refresh
859
self.assertEqual('c1', c1._get_user_option('one'))
860
c1.set_user_option('two', 'done')
861
self.assertEqual('c2', c1._get_user_option('one'))
863
def test_writes_are_serialized(self):
865
c2 = self.get_existing_config()
867
# We spawn a thread that will pause *during* the write
868
before_writing = threading.Event()
869
after_writing = threading.Event()
870
writing_done = threading.Event()
871
c1_orig = c1._write_config_file
872
def c1_write_config_file():
875
# The lock is held. We wait for the main thread to decide when to
878
c1._write_config_file = c1_write_config_file
880
c1.set_user_option('one', 'c1')
882
t1 = threading.Thread(target=c1_set_option)
883
# Collect the thread after the test
884
self.addCleanup(t1.join)
885
# Be ready to unblock the thread if the test goes wrong
886
self.addCleanup(after_writing.set)
888
before_writing.wait()
889
self.assertTrue(c1._lock.is_held)
890
self.assertRaises(errors.LockContention,
891
c2.set_user_option, 'one', 'c2')
892
self.assertEqual('c1', c1.get_user_option('one'))
893
# Let the lock be released
896
c2.set_user_option('one', 'c2')
897
self.assertEqual('c2', c2.get_user_option('one'))
899
def test_read_while_writing(self):
901
# We spawn a thread that will pause *during* the write
902
ready_to_write = threading.Event()
903
do_writing = threading.Event()
904
writing_done = threading.Event()
905
c1_orig = c1._write_config_file
906
def c1_write_config_file():
908
# The lock is held. We wait for the main thread to decide when to
913
c1._write_config_file = c1_write_config_file
915
c1.set_user_option('one', 'c1')
916
t1 = threading.Thread(target=c1_set_option)
917
# Collect the thread after the test
918
self.addCleanup(t1.join)
919
# Be ready to unblock the thread if the test goes wrong
920
self.addCleanup(do_writing.set)
922
# Ensure the thread is ready to write
923
ready_to_write.wait()
924
self.assertTrue(c1._lock.is_held)
925
self.assertEqual('c1', c1.get_user_option('one'))
926
# If we read during the write, we get the old value
927
c2 = self.get_existing_config()
928
self.assertEqual('1', c2.get_user_option('one'))
929
# Let the writing occur and ensure it occurred
932
# Now we get the updated value
933
c3 = self.get_existing_config()
934
self.assertEqual('c1', c3.get_user_option('one'))
937
class TestGetUserOptionAs(TestIniConfig):
939
def test_get_user_option_as_bool(self):
940
conf, parser = self.make_config_parser("""
943
an_invalid_bool = maybe
944
a_list = hmm, who knows ? # This is interpreted as a list !
946
get_bool = conf.get_user_option_as_bool
947
self.assertEqual(True, get_bool('a_true_bool'))
948
self.assertEqual(False, get_bool('a_false_bool'))
951
warnings.append(args[0] % args[1:])
952
self.overrideAttr(trace, 'warning', warning)
953
msg = 'Value "%s" is not a boolean for "%s"'
954
self.assertIs(None, get_bool('an_invalid_bool'))
955
self.assertEqual(msg % ('maybe', 'an_invalid_bool'), warnings[0])
957
self.assertIs(None, get_bool('not_defined_in_this_config'))
958
self.assertEqual([], warnings)
960
def test_get_user_option_as_list(self):
961
conf, parser = self.make_config_parser("""
966
get_list = conf.get_user_option_as_list
967
self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
968
self.assertEqual(['1'], get_list('length_1'))
969
self.assertEqual('x', conf.get_user_option('one_item'))
970
# automatically cast to list
971
self.assertEqual(['x'], get_list('one_item'))
974
class TestSupressWarning(TestIniConfig):
976
def make_warnings_config(self, s):
977
conf, parser = self.make_config_parser(s)
978
return conf.suppress_warning
980
def test_suppress_warning_unknown(self):
981
suppress_warning = self.make_warnings_config('')
982
self.assertEqual(False, suppress_warning('unknown_warning'))
984
def test_suppress_warning_known(self):
985
suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
986
self.assertEqual(False, suppress_warning('c'))
987
self.assertEqual(True, suppress_warning('a'))
988
self.assertEqual(True, suppress_warning('b'))
991
class TestGetConfig(tests.TestCaseInTempDir):
993
def test_constructs(self):
994
config.GlobalConfig()
996
def test_calls_read_filenames(self):
997
# replace the class that is constructed, to check its parameters
998
oldparserclass = config.ConfigObj
999
config.ConfigObj = InstrumentedConfigObj
1000
my_config = config.GlobalConfig()
1002
parser = my_config._get_parser()
1004
config.ConfigObj = oldparserclass
1005
self.assertIsInstance(parser, InstrumentedConfigObj)
1006
self.assertEqual(parser._calls, [('__init__', config.config_filename(),
1010
class TestBranchConfig(tests.TestCaseWithTransport):
1012
def test_constructs_valid(self):
1013
branch = FakeBranch()
1014
my_config = config.BranchConfig(branch)
1015
self.assertIsNot(None, my_config)
1017
def test_constructs_error(self):
1018
self.assertRaises(TypeError, config.BranchConfig)
1020
def test_get_location_config(self):
1021
branch = FakeBranch()
1022
my_config = config.BranchConfig(branch)
1023
location_config = my_config._get_location_config()
1024
self.assertEqual(branch.base, location_config.location)
1025
self.assertIs(location_config, my_config._get_location_config())
1027
def test_get_config(self):
1028
"""The Branch.get_config method works properly"""
1029
b = controldir.ControlDir.create_standalone_workingtree('.').branch
1030
my_config = b.get_config()
1031
self.assertIs(my_config.get_user_option('wacky'), None)
1032
my_config.set_user_option('wacky', 'unlikely')
1033
self.assertEqual(my_config.get_user_option('wacky'), 'unlikely')
1035
# Ensure we get the same thing if we start again
1036
b2 = branch.Branch.open('.')
1037
my_config2 = b2.get_config()
1038
self.assertEqual(my_config2.get_user_option('wacky'), 'unlikely')
1040
def test_has_explicit_nickname(self):
1041
b = self.make_branch('.')
1042
self.assertFalse(b.get_config().has_explicit_nickname())
1044
self.assertTrue(b.get_config().has_explicit_nickname())
1046
def test_config_url(self):
1047
"""The Branch.get_config will use section that uses a local url"""
1048
branch = self.make_branch('branch')
1049
self.assertEqual('branch', branch.nick)
1051
local_url = urlutils.local_path_to_url('branch')
1052
conf = config.LocationConfig.from_string(
1053
'[%s]\nnickname = foobar' % (local_url,),
1054
local_url, save=True)
1055
self.assertIsNot(None, conf)
1056
self.assertEqual('foobar', branch.nick)
1058
def test_config_local_path(self):
1059
"""The Branch.get_config will use a local system path"""
1060
branch = self.make_branch('branch')
1061
self.assertEqual('branch', branch.nick)
1063
local_path = osutils.getcwd().encode('utf8')
1064
config.LocationConfig.from_string(
1065
'[%s/branch]\nnickname = barry' % (local_path,),
1066
'branch', save=True)
1067
# Now the branch will find its nick via the location config
1068
self.assertEqual('barry', branch.nick)
1070
def test_config_creates_local(self):
1071
"""Creating a new entry in config uses a local path."""
1072
branch = self.make_branch('branch', format='knit')
1073
branch.set_push_location('http://foobar')
1074
local_path = osutils.getcwd().encode('utf8')
1075
# Surprisingly ConfigObj doesn't create a trailing newline
1076
self.check_file_contents(config.locations_config_filename(),
1078
'push_location = http://foobar\n'
1079
'push_location:policy = norecurse\n'
1082
def test_autonick_urlencoded(self):
1083
b = self.make_branch('!repo')
1084
self.assertEqual('!repo', b.get_config().get_nickname())
1086
def test_autonick_uses_branch_name(self):
1087
b = self.make_branch('foo', name='bar')
1088
self.assertEqual('bar', b.get_config().get_nickname())
1090
def test_warn_if_masked(self):
1093
warnings.append(args[0] % args[1:])
1094
self.overrideAttr(trace, 'warning', warning)
1096
def set_option(store, warn_masked=True):
1098
conf.set_user_option('example_option', repr(store), store=store,
1099
warn_masked=warn_masked)
1100
def assertWarning(warning):
1102
self.assertEqual(0, len(warnings))
1104
self.assertEqual(1, len(warnings))
1105
self.assertEqual(warning, warnings[0])
1106
branch = self.make_branch('.')
1107
conf = branch.get_config()
1108
set_option(config.STORE_GLOBAL)
1110
set_option(config.STORE_BRANCH)
1112
set_option(config.STORE_GLOBAL)
1113
assertWarning('Value "4" is masked by "3" from branch.conf')
1114
set_option(config.STORE_GLOBAL, warn_masked=False)
1116
set_option(config.STORE_LOCATION)
1118
set_option(config.STORE_BRANCH)
1119
assertWarning('Value "3" is masked by "0" from locations.conf')
1120
set_option(config.STORE_BRANCH, warn_masked=False)
1124
class TestGlobalConfigItems(tests.TestCaseInTempDir):
1126
def _get_empty_config(self):
1127
my_config = config.GlobalConfig()
1130
def _get_sample_config(self):
1131
my_config = config.GlobalConfig.from_string(sample_config_text)
1134
def test_user_id(self):
1135
my_config = config.GlobalConfig.from_string(sample_config_text)
1136
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
1137
my_config._get_user_id())
1139
def test_absent_user_id(self):
1140
my_config = config.GlobalConfig()
1141
self.assertEqual(None, my_config._get_user_id())
1143
def test_get_user_option_default(self):
1144
my_config = self._get_empty_config()
1145
self.assertEqual(None, my_config.get_user_option('no_option'))
1147
def test_get_user_option_global(self):
1148
my_config = self._get_sample_config()
1149
self.assertEqual("something",
1150
my_config.get_user_option('user_global_option'))
1152
def test_configured_validate_signatures_in_log(self):
1153
my_config = self._get_sample_config()
1154
self.assertEqual(True, my_config.validate_signatures_in_log())
1156
def test_get_alias(self):
1157
my_config = self._get_sample_config()
1158
self.assertEqual('help', my_config.get_alias('h'))
1160
def test_get_aliases(self):
1161
my_config = self._get_sample_config()
1162
aliases = my_config.get_aliases()
1163
self.assertEqual(2, len(aliases))
1164
sorted_keys = sorted(aliases)
1165
self.assertEqual('help', aliases[sorted_keys[0]])
1166
self.assertEqual(sample_long_alias, aliases[sorted_keys[1]])
1168
def test_get_no_alias(self):
1169
my_config = self._get_sample_config()
1170
self.assertEqual(None, my_config.get_alias('foo'))
1172
def test_get_long_alias(self):
1173
my_config = self._get_sample_config()
1174
self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
1176
def test_get_change_editor(self):
1177
my_config = self._get_sample_config()
1178
change_editor = my_config.get_change_editor('old', 'new')
1179
self.assertIs(diff.DiffFromTool, change_editor.__class__)
1180
self.assertEqual('vimdiff -of @new_path @old_path',
1181
' '.join(change_editor.command_template))
1183
def test_get_no_change_editor(self):
1184
my_config = self._get_empty_config()
1185
change_editor = my_config.get_change_editor('old', 'new')
1186
self.assertIs(None, change_editor)
1188
def test_get_merge_tools(self):
1189
conf = self._get_sample_config()
1190
tools = conf.get_merge_tools()
1191
self.log(repr(tools))
1193
{u'funkytool' : u'funkytool "arg with spaces" {this_temp}',
1194
u'sometool' : u'sometool {base} {this} {other} -o {result}',
1195
u'newtool' : u'"newtool with spaces" {this_temp}'},
1198
def test_get_merge_tools_empty(self):
1199
conf = self._get_empty_config()
1200
tools = conf.get_merge_tools()
1201
self.assertEqual({}, tools)
1203
def test_find_merge_tool(self):
1204
conf = self._get_sample_config()
1205
cmdline = conf.find_merge_tool('sometool')
1206
self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
1208
def test_find_merge_tool_not_found(self):
1209
conf = self._get_sample_config()
1210
cmdline = conf.find_merge_tool('DOES NOT EXIST')
1211
self.assertIs(cmdline, None)
1213
def test_find_merge_tool_known(self):
1214
conf = self._get_empty_config()
1215
cmdline = conf.find_merge_tool('kdiff3')
1216
self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
1218
def test_find_merge_tool_override_known(self):
1219
conf = self._get_empty_config()
1220
conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
1221
cmdline = conf.find_merge_tool('kdiff3')
1222
self.assertEqual('kdiff3 blah', cmdline)
1225
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
1227
def test_empty(self):
1228
my_config = config.GlobalConfig()
1229
self.assertEqual(0, len(my_config.get_aliases()))
1231
def test_set_alias(self):
1232
my_config = config.GlobalConfig()
1233
alias_value = 'commit --strict'
1234
my_config.set_alias('commit', alias_value)
1235
new_config = config.GlobalConfig()
1236
self.assertEqual(alias_value, new_config.get_alias('commit'))
1238
def test_remove_alias(self):
1239
my_config = config.GlobalConfig()
1240
my_config.set_alias('commit', 'commit --strict')
1241
# Now remove the alias again.
1242
my_config.unset_alias('commit')
1243
new_config = config.GlobalConfig()
1244
self.assertIs(None, new_config.get_alias('commit'))
1247
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
1249
def test_constructs_valid(self):
1250
config.LocationConfig('http://example.com')
1252
def test_constructs_error(self):
1253
self.assertRaises(TypeError, config.LocationConfig)
1255
def test_branch_calls_read_filenames(self):
1256
# This is testing the correct file names are provided.
1257
# TODO: consolidate with the test for GlobalConfigs filename checks.
1259
# replace the class that is constructed, to check its parameters
1260
oldparserclass = config.ConfigObj
1261
config.ConfigObj = InstrumentedConfigObj
1263
my_config = config.LocationConfig('http://www.example.com')
1264
parser = my_config._get_parser()
1266
config.ConfigObj = oldparserclass
1267
self.assertIsInstance(parser, InstrumentedConfigObj)
1268
self.assertEqual(parser._calls,
1269
[('__init__', config.locations_config_filename(),
1272
def test_get_global_config(self):
1273
my_config = config.BranchConfig(FakeBranch('http://example.com'))
1274
global_config = my_config._get_global_config()
1275
self.assertIsInstance(global_config, config.GlobalConfig)
1276
self.assertIs(global_config, my_config._get_global_config())
1278
def assertLocationMatching(self, expected):
1279
self.assertEqual(expected,
1280
list(self.my_location_config._get_matching_sections()))
1282
def test__get_matching_sections_no_match(self):
1283
self.get_branch_config('/')
1284
self.assertLocationMatching([])
1286
def test__get_matching_sections_exact(self):
1287
self.get_branch_config('http://www.example.com')
1288
self.assertLocationMatching([('http://www.example.com', '')])
1290
def test__get_matching_sections_suffix_does_not(self):
1291
self.get_branch_config('http://www.example.com-com')
1292
self.assertLocationMatching([])
1294
def test__get_matching_sections_subdir_recursive(self):
1295
self.get_branch_config('http://www.example.com/com')
1296
self.assertLocationMatching([('http://www.example.com', 'com')])
1298
def test__get_matching_sections_ignoreparent(self):
1299
self.get_branch_config('http://www.example.com/ignoreparent')
1300
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1303
def test__get_matching_sections_ignoreparent_subdir(self):
1304
self.get_branch_config(
1305
'http://www.example.com/ignoreparent/childbranch')
1306
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1309
def test__get_matching_sections_subdir_trailing_slash(self):
1310
self.get_branch_config('/b')
1311
self.assertLocationMatching([('/b/', '')])
1313
def test__get_matching_sections_subdir_child(self):
1314
self.get_branch_config('/a/foo')
1315
self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
1317
def test__get_matching_sections_subdir_child_child(self):
1318
self.get_branch_config('/a/foo/bar')
1319
self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
1321
def test__get_matching_sections_trailing_slash_with_children(self):
1322
self.get_branch_config('/a/')
1323
self.assertLocationMatching([('/a/', '')])
1325
def test__get_matching_sections_explicit_over_glob(self):
1326
# XXX: 2006-09-08 jamesh
1327
# This test only passes because ord('c') > ord('*'). If there
1328
# was a config section for '/a/?', it would get precedence
1330
self.get_branch_config('/a/c')
1331
self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
1333
def test__get_option_policy_normal(self):
1334
self.get_branch_config('http://www.example.com')
1336
self.my_location_config._get_config_policy(
1337
'http://www.example.com', 'normal_option'),
1340
def test__get_option_policy_norecurse(self):
1341
self.get_branch_config('http://www.example.com')
1343
self.my_location_config._get_option_policy(
1344
'http://www.example.com', 'norecurse_option'),
1345
config.POLICY_NORECURSE)
1346
# Test old recurse=False setting:
1348
self.my_location_config._get_option_policy(
1349
'http://www.example.com/norecurse', 'normal_option'),
1350
config.POLICY_NORECURSE)
1352
def test__get_option_policy_normal(self):
1353
self.get_branch_config('http://www.example.com')
1355
self.my_location_config._get_option_policy(
1356
'http://www.example.com', 'appendpath_option'),
1357
config.POLICY_APPENDPATH)
1359
def test__get_options_with_policy(self):
1360
self.get_branch_config('/dir/subdir',
1361
location_config="""\
1363
other_url = /other-dir
1364
other_url:policy = appendpath
1366
other_url = /other-subdir
1369
[(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
1370
(u'other_url', u'/other-dir', u'/dir', 'locations'),
1371
(u'other_url:policy', u'appendpath', u'/dir', 'locations')],
1372
self.my_location_config)
1374
def test_location_without_username(self):
1375
self.get_branch_config('http://www.example.com/ignoreparent')
1376
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1377
self.my_config.username())
1379
def test_location_not_listed(self):
1380
"""Test that the global username is used when no location matches"""
1381
self.get_branch_config('/home/robertc/sources')
1382
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
1383
self.my_config.username())
1385
def test_overriding_location(self):
1386
self.get_branch_config('http://www.example.com/foo')
1387
self.assertEqual('Robert Collins <robertc@example.org>',
1388
self.my_config.username())
1390
def test_get_user_option_global(self):
1391
self.get_branch_config('/a')
1392
self.assertEqual('something',
1393
self.my_config.get_user_option('user_global_option'))
1395
def test_get_user_option_local(self):
1396
self.get_branch_config('/a')
1397
self.assertEqual('local',
1398
self.my_config.get_user_option('user_local_option'))
1400
def test_get_user_option_appendpath(self):
1401
# returned as is for the base path:
1402
self.get_branch_config('http://www.example.com')
1403
self.assertEqual('append',
1404
self.my_config.get_user_option('appendpath_option'))
1405
# Extra path components get appended:
1406
self.get_branch_config('http://www.example.com/a/b/c')
1407
self.assertEqual('append/a/b/c',
1408
self.my_config.get_user_option('appendpath_option'))
1409
# Overriden for http://www.example.com/dir, where it is a
1411
self.get_branch_config('http://www.example.com/dir/a/b/c')
1412
self.assertEqual('normal',
1413
self.my_config.get_user_option('appendpath_option'))
1415
def test_get_user_option_norecurse(self):
1416
self.get_branch_config('http://www.example.com')
1417
self.assertEqual('norecurse',
1418
self.my_config.get_user_option('norecurse_option'))
1419
self.get_branch_config('http://www.example.com/dir')
1420
self.assertEqual(None,
1421
self.my_config.get_user_option('norecurse_option'))
1422
# http://www.example.com/norecurse is a recurse=False section
1423
# that redefines normal_option. Subdirectories do not pick up
1424
# this redefinition.
1425
self.get_branch_config('http://www.example.com/norecurse')
1426
self.assertEqual('norecurse',
1427
self.my_config.get_user_option('normal_option'))
1428
self.get_branch_config('http://www.example.com/norecurse/subdir')
1429
self.assertEqual('normal',
1430
self.my_config.get_user_option('normal_option'))
1432
def test_set_user_option_norecurse(self):
1433
self.get_branch_config('http://www.example.com')
1434
self.my_config.set_user_option('foo', 'bar',
1435
store=config.STORE_LOCATION_NORECURSE)
1437
self.my_location_config._get_option_policy(
1438
'http://www.example.com', 'foo'),
1439
config.POLICY_NORECURSE)
1441
def test_set_user_option_appendpath(self):
1442
self.get_branch_config('http://www.example.com')
1443
self.my_config.set_user_option('foo', 'bar',
1444
store=config.STORE_LOCATION_APPENDPATH)
1446
self.my_location_config._get_option_policy(
1447
'http://www.example.com', 'foo'),
1448
config.POLICY_APPENDPATH)
1450
def test_set_user_option_change_policy(self):
1451
self.get_branch_config('http://www.example.com')
1452
self.my_config.set_user_option('norecurse_option', 'normal',
1453
store=config.STORE_LOCATION)
1455
self.my_location_config._get_option_policy(
1456
'http://www.example.com', 'norecurse_option'),
1459
def get_branch_config(self, location, global_config=None,
1460
location_config=None):
1461
my_branch = FakeBranch(location)
1462
if global_config is None:
1463
global_config = sample_config_text
1464
if location_config is None:
1465
location_config = sample_branches_text
1467
config.GlobalConfig.from_string(global_config, save=True)
1468
config.LocationConfig.from_string(location_config, my_branch.base,
1470
my_config = config.BranchConfig(my_branch)
1471
self.my_config = my_config
1472
self.my_location_config = my_config._get_location_config()
1474
def test_set_user_setting_sets_and_saves2(self):
1475
self.get_branch_config('/a/c')
1476
self.assertIs(self.my_config.get_user_option('foo'), None)
1477
self.my_config.set_user_option('foo', 'bar')
1479
self.my_config.branch.control_files.files['branch.conf'].strip(),
1481
self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
1482
self.my_config.set_user_option('foo', 'baz',
1483
store=config.STORE_LOCATION)
1484
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1485
self.my_config.set_user_option('foo', 'qux')
1486
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1488
def test_get_bzr_remote_path(self):
1489
my_config = config.LocationConfig('/a/c')
1490
self.assertEqual('bzr', my_config.get_bzr_remote_path())
1491
my_config.set_user_option('bzr_remote_path', '/path-bzr')
1492
self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
1493
self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
1494
self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
1497
precedence_global = 'option = global'
1498
precedence_branch = 'option = branch'
1499
precedence_location = """
1503
[http://example.com/specific]
1507
class TestBranchConfigItems(tests.TestCaseInTempDir):
1509
def get_branch_config(self, global_config=None, location=None,
1510
location_config=None, branch_data_config=None):
1511
my_branch = FakeBranch(location)
1512
if global_config is not None:
1513
config.GlobalConfig.from_string(global_config, save=True)
1514
if location_config is not None:
1515
config.LocationConfig.from_string(location_config, my_branch.base,
1517
my_config = config.BranchConfig(my_branch)
1518
if branch_data_config is not None:
1519
my_config.branch.control_files.files['branch.conf'] = \
1523
def test_user_id(self):
1524
branch = FakeBranch()
1525
my_config = config.BranchConfig(branch)
1526
self.assertIsNot(None, my_config.username())
1527
my_config.branch.control_files.files['email'] = "John"
1528
my_config.set_user_option('email',
1529
"Robert Collins <robertc@example.org>")
1530
self.assertEqual("Robert Collins <robertc@example.org>",
1531
my_config.username())
1533
def test_BRZ_EMAIL_OVERRIDES(self):
1534
self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
1535
branch = FakeBranch()
1536
my_config = config.BranchConfig(branch)
1537
self.assertEqual("Robert Collins <robertc@example.org>",
1538
my_config.username())
1540
def test_get_user_option_global(self):
1541
my_config = self.get_branch_config(global_config=sample_config_text)
1542
self.assertEqual('something',
1543
my_config.get_user_option('user_global_option'))
1545
def test_config_precedence(self):
1546
# FIXME: eager test, luckily no persitent config file makes it fail
1548
my_config = self.get_branch_config(global_config=precedence_global)
1549
self.assertEqual(my_config.get_user_option('option'), 'global')
1550
my_config = self.get_branch_config(global_config=precedence_global,
1551
branch_data_config=precedence_branch)
1552
self.assertEqual(my_config.get_user_option('option'), 'branch')
1553
my_config = self.get_branch_config(
1554
global_config=precedence_global,
1555
branch_data_config=precedence_branch,
1556
location_config=precedence_location)
1557
self.assertEqual(my_config.get_user_option('option'), 'recurse')
1558
my_config = self.get_branch_config(
1559
global_config=precedence_global,
1560
branch_data_config=precedence_branch,
1561
location_config=precedence_location,
1562
location='http://example.com/specific')
1563
self.assertEqual(my_config.get_user_option('option'), 'exact')
1566
class TestMailAddressExtraction(tests.TestCase):
1568
def test_extract_email_address(self):
1569
self.assertEqual('jane@test.com',
1570
config.extract_email_address('Jane <jane@test.com>'))
1571
self.assertRaises(config.NoEmailInUsername,
1572
config.extract_email_address, 'Jane Tester')
1574
def test_parse_username(self):
1575
self.assertEqual(('', 'jdoe@example.com'),
1576
config.parse_username('jdoe@example.com'))
1577
self.assertEqual(('', 'jdoe@example.com'),
1578
config.parse_username('<jdoe@example.com>'))
1579
self.assertEqual(('John Doe', 'jdoe@example.com'),
1580
config.parse_username('John Doe <jdoe@example.com>'))
1581
self.assertEqual(('John Doe', ''),
1582
config.parse_username('John Doe'))
1583
self.assertEqual(('John Doe', 'jdoe@example.com'),
1584
config.parse_username('John Doe jdoe@example.com'))
1586
class TestTreeConfig(tests.TestCaseWithTransport):
1588
def test_get_value(self):
1589
"""Test that retreiving a value from a section is possible"""
1590
branch = self.make_branch('.')
1591
tree_config = config.TreeConfig(branch)
1592
tree_config.set_option('value', 'key', 'SECTION')
1593
tree_config.set_option('value2', 'key2')
1594
tree_config.set_option('value3-top', 'key3')
1595
tree_config.set_option('value3-section', 'key3', 'SECTION')
1596
value = tree_config.get_option('key', 'SECTION')
1597
self.assertEqual(value, 'value')
1598
value = tree_config.get_option('key2')
1599
self.assertEqual(value, 'value2')
1600
self.assertEqual(tree_config.get_option('non-existant'), None)
1601
value = tree_config.get_option('non-existant', 'SECTION')
1602
self.assertEqual(value, None)
1603
value = tree_config.get_option('non-existant', default='default')
1604
self.assertEqual(value, 'default')
1605
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1606
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1607
self.assertEqual(value, 'default')
1608
value = tree_config.get_option('key3')
1609
self.assertEqual(value, 'value3-top')
1610
value = tree_config.get_option('key3', 'SECTION')
1611
self.assertEqual(value, 'value3-section')
1614
class TestTransportConfig(tests.TestCaseWithTransport):
1616
def test_load_utf8(self):
1617
"""Ensure we can load an utf8-encoded file."""
1618
t = self.get_transport()
1619
unicode_user = u'b\N{Euro Sign}ar'
1620
unicode_content = u'user=%s' % (unicode_user,)
1621
utf8_content = unicode_content.encode('utf8')
1622
# Store the raw content in the config file
1623
t.put_bytes('foo.conf', utf8_content)
1624
conf = config.TransportConfig(t, 'foo.conf')
1625
self.assertEqual(unicode_user, conf.get_option('user'))
1627
def test_load_non_ascii(self):
1628
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
1629
t = self.get_transport()
1630
t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
1631
conf = config.TransportConfig(t, 'foo.conf')
1632
self.assertRaises(config.ConfigContentError, conf._get_configobj)
1634
def test_load_erroneous_content(self):
1635
"""Ensure we display a proper error on content that can't be parsed."""
1636
t = self.get_transport()
1637
t.put_bytes('foo.conf', '[open_section\n')
1638
conf = config.TransportConfig(t, 'foo.conf')
1639
self.assertRaises(config.ParseConfigError, conf._get_configobj)
1641
def test_load_permission_denied(self):
1642
"""Ensure we get an empty config file if the file is inaccessible."""
1645
warnings.append(args[0] % args[1:])
1646
self.overrideAttr(trace, 'warning', warning)
1648
class DenyingTransport(object):
1650
def __init__(self, base):
1653
def get_bytes(self, relpath):
1654
raise errors.PermissionDenied(relpath, "")
1656
cfg = config.TransportConfig(
1657
DenyingTransport("nonexisting://"), 'control.conf')
1658
self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
1661
[u'Permission denied while trying to open configuration file '
1662
u'nonexisting:///control.conf.'])
1664
def test_get_value(self):
1665
"""Test that retreiving a value from a section is possible"""
1666
bzrdir_config = config.TransportConfig(self.get_transport('.'),
1668
bzrdir_config.set_option('value', 'key', 'SECTION')
1669
bzrdir_config.set_option('value2', 'key2')
1670
bzrdir_config.set_option('value3-top', 'key3')
1671
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
1672
value = bzrdir_config.get_option('key', 'SECTION')
1673
self.assertEqual(value, 'value')
1674
value = bzrdir_config.get_option('key2')
1675
self.assertEqual(value, 'value2')
1676
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
1677
value = bzrdir_config.get_option('non-existant', 'SECTION')
1678
self.assertEqual(value, None)
1679
value = bzrdir_config.get_option('non-existant', default='default')
1680
self.assertEqual(value, 'default')
1681
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
1682
value = bzrdir_config.get_option('key2', 'NOSECTION',
1684
self.assertEqual(value, 'default')
1685
value = bzrdir_config.get_option('key3')
1686
self.assertEqual(value, 'value3-top')
1687
value = bzrdir_config.get_option('key3', 'SECTION')
1688
self.assertEqual(value, 'value3-section')
1690
def test_set_unset_default_stack_on(self):
1691
my_dir = self.make_controldir('.')
1692
bzrdir_config = config.BzrDirConfig(my_dir)
1693
self.assertIs(None, bzrdir_config.get_default_stack_on())
1694
bzrdir_config.set_default_stack_on('Foo')
1695
self.assertEqual('Foo', bzrdir_config._config.get_option(
1696
'default_stack_on'))
1697
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
1698
bzrdir_config.set_default_stack_on(None)
1699
self.assertIs(None, bzrdir_config.get_default_stack_on())
1702
class TestOldConfigHooks(tests.TestCaseWithTransport):
1705
super(TestOldConfigHooks, self).setUp()
1706
create_configs_with_file_option(self)
1708
def assertGetHook(self, conf, name, value):
1712
config.OldConfigHooks.install_named_hook('get', hook, None)
1714
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1715
self.assertLength(0, calls)
1716
actual_value = conf.get_user_option(name)
1717
self.assertEqual(value, actual_value)
1718
self.assertLength(1, calls)
1719
self.assertEqual((conf, name, value), calls[0])
1721
def test_get_hook_breezy(self):
1722
self.assertGetHook(self.breezy_config, 'file', 'breezy')
1724
def test_get_hook_locations(self):
1725
self.assertGetHook(self.locations_config, 'file', 'locations')
1727
def test_get_hook_branch(self):
1728
# Since locations masks branch, we define a different option
1729
self.branch_config.set_user_option('file2', 'branch')
1730
self.assertGetHook(self.branch_config, 'file2', 'branch')
1732
def assertSetHook(self, conf, name, value):
1736
config.OldConfigHooks.install_named_hook('set', hook, None)
1738
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1739
self.assertLength(0, calls)
1740
conf.set_user_option(name, value)
1741
self.assertLength(1, calls)
1742
# We can't assert the conf object below as different configs use
1743
# different means to implement set_user_option and we care only about
1745
self.assertEqual((name, value), calls[0][1:])
1747
def test_set_hook_breezy(self):
1748
self.assertSetHook(self.breezy_config, 'foo', 'breezy')
1750
def test_set_hook_locations(self):
1751
self.assertSetHook(self.locations_config, 'foo', 'locations')
1753
def test_set_hook_branch(self):
1754
self.assertSetHook(self.branch_config, 'foo', 'branch')
1756
def assertRemoveHook(self, conf, name, section_name=None):
1760
config.OldConfigHooks.install_named_hook('remove', hook, None)
1762
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1763
self.assertLength(0, calls)
1764
conf.remove_user_option(name, section_name)
1765
self.assertLength(1, calls)
1766
# We can't assert the conf object below as different configs use
1767
# different means to implement remove_user_option and we care only about
1769
self.assertEqual((name,), calls[0][1:])
1771
def test_remove_hook_breezy(self):
1772
self.assertRemoveHook(self.breezy_config, 'file')
1774
def test_remove_hook_locations(self):
1775
self.assertRemoveHook(self.locations_config, 'file',
1776
self.locations_config.location)
1778
def test_remove_hook_branch(self):
1779
self.assertRemoveHook(self.branch_config, 'file')
1781
def assertLoadHook(self, name, conf_class, *conf_args):
1785
config.OldConfigHooks.install_named_hook('load', hook, None)
1787
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1788
self.assertLength(0, calls)
1790
conf = conf_class(*conf_args)
1791
# Access an option to trigger a load
1792
conf.get_user_option(name)
1793
self.assertLength(1, calls)
1794
# Since we can't assert about conf, we just use the number of calls ;-/
1796
def test_load_hook_breezy(self):
1797
self.assertLoadHook('file', config.GlobalConfig)
1799
def test_load_hook_locations(self):
1800
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1802
def test_load_hook_branch(self):
1803
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1805
def assertSaveHook(self, conf):
1809
config.OldConfigHooks.install_named_hook('save', hook, None)
1811
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1812
self.assertLength(0, calls)
1813
# Setting an option triggers a save
1814
conf.set_user_option('foo', 'bar')
1815
self.assertLength(1, calls)
1816
# Since we can't assert about conf, we just use the number of calls ;-/
1818
def test_save_hook_breezy(self):
1819
self.assertSaveHook(self.breezy_config)
1821
def test_save_hook_locations(self):
1822
self.assertSaveHook(self.locations_config)
1824
def test_save_hook_branch(self):
1825
self.assertSaveHook(self.branch_config)
1828
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1829
"""Tests config hooks for remote configs.
1831
No tests for the remove hook as this is not implemented there.
1835
super(TestOldConfigHooksForRemote, self).setUp()
1836
self.transport_server = test_server.SmartTCPServer_for_testing
1837
create_configs_with_file_option(self)
1839
def assertGetHook(self, conf, name, value):
1843
config.OldConfigHooks.install_named_hook('get', hook, None)
1845
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1846
self.assertLength(0, calls)
1847
actual_value = conf.get_option(name)
1848
self.assertEqual(value, actual_value)
1849
self.assertLength(1, calls)
1850
self.assertEqual((conf, name, value), calls[0])
1852
def test_get_hook_remote_branch(self):
1853
remote_branch = branch.Branch.open(self.get_url('tree'))
1854
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1856
def test_get_hook_remote_bzrdir(self):
1857
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1858
conf = remote_bzrdir._get_config()
1859
conf.set_option('remotedir', 'file')
1860
self.assertGetHook(conf, 'file', 'remotedir')
1862
def assertSetHook(self, conf, name, value):
1866
config.OldConfigHooks.install_named_hook('set', hook, None)
1868
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1869
self.assertLength(0, calls)
1870
conf.set_option(value, name)
1871
self.assertLength(1, calls)
1872
# We can't assert the conf object below as different configs use
1873
# different means to implement set_user_option and we care only about
1875
self.assertEqual((name, value), calls[0][1:])
1877
def test_set_hook_remote_branch(self):
1878
remote_branch = branch.Branch.open(self.get_url('tree'))
1879
self.addCleanup(remote_branch.lock_write().unlock)
1880
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1882
def test_set_hook_remote_bzrdir(self):
1883
remote_branch = branch.Branch.open(self.get_url('tree'))
1884
self.addCleanup(remote_branch.lock_write().unlock)
1885
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1886
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1888
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1892
config.OldConfigHooks.install_named_hook('load', hook, None)
1894
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1895
self.assertLength(0, calls)
1897
conf = conf_class(*conf_args)
1898
# Access an option to trigger a load
1899
conf.get_option(name)
1900
self.assertLength(expected_nb_calls, calls)
1901
# Since we can't assert about conf, we just use the number of calls ;-/
1903
def test_load_hook_remote_branch(self):
1904
remote_branch = branch.Branch.open(self.get_url('tree'))
1905
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
1907
def test_load_hook_remote_bzrdir(self):
1908
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1909
# The config file doesn't exist, set an option to force its creation
1910
conf = remote_bzrdir._get_config()
1911
conf.set_option('remotedir', 'file')
1912
# We get one call for the server and one call for the client, this is
1913
# caused by the differences in implementations betwen
1914
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1915
# SmartServerBranchGetConfigFile (in smart/branch.py)
1916
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1918
def assertSaveHook(self, conf):
1922
config.OldConfigHooks.install_named_hook('save', hook, None)
1924
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1925
self.assertLength(0, calls)
1926
# Setting an option triggers a save
1927
conf.set_option('foo', 'bar')
1928
self.assertLength(1, calls)
1929
# Since we can't assert about conf, we just use the number of calls ;-/
1931
def test_save_hook_remote_branch(self):
1932
remote_branch = branch.Branch.open(self.get_url('tree'))
1933
self.addCleanup(remote_branch.lock_write().unlock)
1934
self.assertSaveHook(remote_branch._get_config())
1936
def test_save_hook_remote_bzrdir(self):
1937
remote_branch = branch.Branch.open(self.get_url('tree'))
1938
self.addCleanup(remote_branch.lock_write().unlock)
1939
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1940
self.assertSaveHook(remote_bzrdir._get_config())
1943
class TestOptionNames(tests.TestCase):
1945
def is_valid(self, name):
1946
return config._option_ref_re.match('{%s}' % name) is not None
1948
def test_valid_names(self):
1949
self.assertTrue(self.is_valid('foo'))
1950
self.assertTrue(self.is_valid('foo.bar'))
1951
self.assertTrue(self.is_valid('f1'))
1952
self.assertTrue(self.is_valid('_'))
1953
self.assertTrue(self.is_valid('__bar__'))
1954
self.assertTrue(self.is_valid('a_'))
1955
self.assertTrue(self.is_valid('a1'))
1956
# Don't break bzr-svn for no good reason
1957
self.assertTrue(self.is_valid('guessed-layout'))
1959
def test_invalid_names(self):
1960
self.assertFalse(self.is_valid(' foo'))
1961
self.assertFalse(self.is_valid('foo '))
1962
self.assertFalse(self.is_valid('1'))
1963
self.assertFalse(self.is_valid('1,2'))
1964
self.assertFalse(self.is_valid('foo$'))
1965
self.assertFalse(self.is_valid('!foo'))
1966
self.assertFalse(self.is_valid('foo.'))
1967
self.assertFalse(self.is_valid('foo..bar'))
1968
self.assertFalse(self.is_valid('{}'))
1969
self.assertFalse(self.is_valid('{a}'))
1970
self.assertFalse(self.is_valid('a\n'))
1971
self.assertFalse(self.is_valid('-'))
1972
self.assertFalse(self.is_valid('-a'))
1973
self.assertFalse(self.is_valid('a-'))
1974
self.assertFalse(self.is_valid('a--a'))
1976
def assertSingleGroup(self, reference):
1977
# the regexp is used with split and as such should match the reference
1978
# *only*, if more groups needs to be defined, (?:...) should be used.
1979
m = config._option_ref_re.match('{a}')
1980
self.assertLength(1, m.groups())
1982
def test_valid_references(self):
1983
self.assertSingleGroup('{a}')
1984
self.assertSingleGroup('{{a}}')
1987
class TestOption(tests.TestCase):
1989
def test_default_value(self):
1990
opt = config.Option('foo', default='bar')
1991
self.assertEqual('bar', opt.get_default())
1993
def test_callable_default_value(self):
1994
def bar_as_unicode():
1996
opt = config.Option('foo', default=bar_as_unicode)
1997
self.assertEqual('bar', opt.get_default())
1999
def test_default_value_from_env(self):
2000
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
2001
self.overrideEnv('FOO', 'quux')
2002
# Env variable provides a default taking over the option one
2003
self.assertEqual('quux', opt.get_default())
2005
def test_first_default_value_from_env_wins(self):
2006
opt = config.Option('foo', default='bar',
2007
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
2008
self.overrideEnv('FOO', 'foo')
2009
self.overrideEnv('BAZ', 'baz')
2010
# The first env var set wins
2011
self.assertEqual('foo', opt.get_default())
2013
def test_not_supported_list_default_value(self):
2014
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
2016
def test_not_supported_object_default_value(self):
2017
self.assertRaises(AssertionError, config.Option, 'foo',
2020
def test_not_supported_callable_default_value_not_unicode(self):
2021
def bar_not_unicode():
2023
opt = config.Option('foo', default=bar_not_unicode)
2024
self.assertRaises(AssertionError, opt.get_default)
2026
def test_get_help_topic(self):
2027
opt = config.Option('foo')
2028
self.assertEqual('foo', opt.get_help_topic())
2031
class TestOptionConverter(tests.TestCase):
2033
def assertConverted(self, expected, opt, value):
2034
self.assertEqual(expected, opt.convert_from_unicode(None, value))
2036
def assertCallsWarning(self, opt, value):
2040
warnings.append(args[0] % args[1:])
2041
self.overrideAttr(trace, 'warning', warning)
2042
self.assertEqual(None, opt.convert_from_unicode(None, value))
2043
self.assertLength(1, warnings)
2045
'Value "%s" is not valid for "%s"' % (value, opt.name),
2048
def assertCallsError(self, opt, value):
2049
self.assertRaises(config.ConfigOptionValueError,
2050
opt.convert_from_unicode, None, value)
2052
def assertConvertInvalid(self, opt, invalid_value):
2054
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2055
opt.invalid = 'warning'
2056
self.assertCallsWarning(opt, invalid_value)
2057
opt.invalid = 'error'
2058
self.assertCallsError(opt, invalid_value)
2061
class TestOptionWithBooleanConverter(TestOptionConverter):
2063
def get_option(self):
2064
return config.Option('foo', help='A boolean.',
2065
from_unicode=config.bool_from_store)
2067
def test_convert_invalid(self):
2068
opt = self.get_option()
2069
# A string that is not recognized as a boolean
2070
self.assertConvertInvalid(opt, u'invalid-boolean')
2071
# A list of strings is never recognized as a boolean
2072
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2074
def test_convert_valid(self):
2075
opt = self.get_option()
2076
self.assertConverted(True, opt, u'True')
2077
self.assertConverted(True, opt, u'1')
2078
self.assertConverted(False, opt, u'False')
2081
class TestOptionWithIntegerConverter(TestOptionConverter):
2083
def get_option(self):
2084
return config.Option('foo', help='An integer.',
2085
from_unicode=config.int_from_store)
2087
def test_convert_invalid(self):
2088
opt = self.get_option()
2089
# A string that is not recognized as an integer
2090
self.assertConvertInvalid(opt, u'forty-two')
2091
# A list of strings is never recognized as an integer
2092
self.assertConvertInvalid(opt, [u'a', u'list'])
2094
def test_convert_valid(self):
2095
opt = self.get_option()
2096
self.assertConverted(16, opt, u'16')
2099
class TestOptionWithSIUnitConverter(TestOptionConverter):
2101
def get_option(self):
2102
return config.Option('foo', help='An integer in SI units.',
2103
from_unicode=config.int_SI_from_store)
2105
def test_convert_invalid(self):
2106
opt = self.get_option()
2107
self.assertConvertInvalid(opt, u'not-a-unit')
2108
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2109
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2110
self.assertConvertInvalid(opt, u'1GG')
2111
self.assertConvertInvalid(opt, u'1Mbb')
2112
self.assertConvertInvalid(opt, u'1MM')
2114
def test_convert_valid(self):
2115
opt = self.get_option()
2116
self.assertConverted(int(5e3), opt, u'5kb')
2117
self.assertConverted(int(5e6), opt, u'5M')
2118
self.assertConverted(int(5e6), opt, u'5MB')
2119
self.assertConverted(int(5e9), opt, u'5g')
2120
self.assertConverted(int(5e9), opt, u'5gB')
2121
self.assertConverted(100, opt, u'100')
2124
class TestListOption(TestOptionConverter):
2126
def get_option(self):
2127
return config.ListOption('foo', help='A list.')
2129
def test_convert_invalid(self):
2130
opt = self.get_option()
2131
# We don't even try to convert a list into a list, we only expect
2133
self.assertConvertInvalid(opt, [1])
2134
# No string is invalid as all forms can be converted to a list
2136
def test_convert_valid(self):
2137
opt = self.get_option()
2138
# An empty string is an empty list
2139
self.assertConverted([], opt, '') # Using a bare str() just in case
2140
self.assertConverted([], opt, u'')
2142
self.assertConverted([u'True'], opt, u'True')
2144
self.assertConverted([u'42'], opt, u'42')
2146
self.assertConverted([u'bar'], opt, u'bar')
2149
class TestRegistryOption(TestOptionConverter):
2151
def get_option(self, registry):
2152
return config.RegistryOption('foo', registry,
2153
help='A registry option.')
2155
def test_convert_invalid(self):
2156
registry = _mod_registry.Registry()
2157
opt = self.get_option(registry)
2158
self.assertConvertInvalid(opt, [1])
2159
self.assertConvertInvalid(opt, u"notregistered")
2161
def test_convert_valid(self):
2162
registry = _mod_registry.Registry()
2163
registry.register("someval", 1234)
2164
opt = self.get_option(registry)
2165
# Using a bare str() just in case
2166
self.assertConverted(1234, opt, "someval")
2167
self.assertConverted(1234, opt, u'someval')
2168
self.assertConverted(None, opt, None)
2170
def test_help(self):
2171
registry = _mod_registry.Registry()
2172
registry.register("someval", 1234, help="some option")
2173
registry.register("dunno", 1234, help="some other option")
2174
opt = self.get_option(registry)
2176
'A registry option.\n'
2178
'The following values are supported:\n'
2179
' dunno - some other option\n'
2180
' someval - some option\n',
2183
def test_get_help_text(self):
2184
registry = _mod_registry.Registry()
2185
registry.register("someval", 1234, help="some option")
2186
registry.register("dunno", 1234, help="some other option")
2187
opt = self.get_option(registry)
2189
'A registry option.\n'
2191
'The following values are supported:\n'
2192
' dunno - some other option\n'
2193
' someval - some option\n',
2194
opt.get_help_text())
2197
class TestOptionRegistry(tests.TestCase):
2200
super(TestOptionRegistry, self).setUp()
2201
# Always start with an empty registry
2202
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2203
self.registry = config.option_registry
2205
def test_register(self):
2206
opt = config.Option('foo')
2207
self.registry.register(opt)
2208
self.assertIs(opt, self.registry.get('foo'))
2210
def test_registered_help(self):
2211
opt = config.Option('foo', help='A simple option')
2212
self.registry.register(opt)
2213
self.assertEqual('A simple option', self.registry.get_help('foo'))
2215
def test_dont_register_illegal_name(self):
2216
self.assertRaises(config.IllegalOptionName,
2217
self.registry.register, config.Option(' foo'))
2218
self.assertRaises(config.IllegalOptionName,
2219
self.registry.register, config.Option('bar,'))
2221
lazy_option = config.Option('lazy_foo', help='Lazy help')
2223
def test_register_lazy(self):
2224
self.registry.register_lazy('lazy_foo', self.__module__,
2225
'TestOptionRegistry.lazy_option')
2226
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2228
def test_registered_lazy_help(self):
2229
self.registry.register_lazy('lazy_foo', self.__module__,
2230
'TestOptionRegistry.lazy_option')
2231
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2233
def test_dont_lazy_register_illegal_name(self):
2234
# This is where the root cause of http://pad.lv/1235099 is better
2235
# understood: 'register_lazy' doc string mentions that key should match
2236
# the option name which indirectly requires that the option name is a
2237
# valid python identifier. We violate that rule here (using a key that
2238
# doesn't match the option name) to test the option name checking.
2239
self.assertRaises(config.IllegalOptionName,
2240
self.registry.register_lazy, ' foo', self.__module__,
2241
'TestOptionRegistry.lazy_option')
2242
self.assertRaises(config.IllegalOptionName,
2243
self.registry.register_lazy, '1,2', self.__module__,
2244
'TestOptionRegistry.lazy_option')
2247
class TestRegisteredOptions(tests.TestCase):
2248
"""All registered options should verify some constraints."""
2250
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2251
in config.option_registry.iteritems()]
2254
super(TestRegisteredOptions, self).setUp()
2255
self.registry = config.option_registry
2257
def test_proper_name(self):
2258
# An option should be registered under its own name, this can't be
2259
# checked at registration time for the lazy ones.
2260
self.assertEqual(self.option_name, self.option.name)
2262
def test_help_is_set(self):
2263
option_help = self.registry.get_help(self.option_name)
2264
# Come on, think about the user, he really wants to know what the
2266
self.assertIsNot(None, option_help)
2267
self.assertNotEqual('', option_help)
2270
class TestSection(tests.TestCase):
2272
# FIXME: Parametrize so that all sections produced by Stores run these
2273
# tests -- vila 2011-04-01
2275
def test_get_a_value(self):
2276
a_dict = dict(foo='bar')
2277
section = config.Section('myID', a_dict)
2278
self.assertEqual('bar', section.get('foo'))
2280
def test_get_unknown_option(self):
2282
section = config.Section(None, a_dict)
2283
self.assertEqual('out of thin air',
2284
section.get('foo', 'out of thin air'))
2286
def test_options_is_shared(self):
2288
section = config.Section(None, a_dict)
2289
self.assertIs(a_dict, section.options)
2292
class TestMutableSection(tests.TestCase):
2294
scenarios = [('mutable',
2296
lambda opts: config.MutableSection('myID', opts)},),
2300
a_dict = dict(foo='bar')
2301
section = self.get_section(a_dict)
2302
section.set('foo', 'new_value')
2303
self.assertEqual('new_value', section.get('foo'))
2304
# The change appears in the shared section
2305
self.assertEqual('new_value', a_dict.get('foo'))
2306
# We keep track of the change
2307
self.assertTrue('foo' in section.orig)
2308
self.assertEqual('bar', section.orig.get('foo'))
2310
def test_set_preserve_original_once(self):
2311
a_dict = dict(foo='bar')
2312
section = self.get_section(a_dict)
2313
section.set('foo', 'first_value')
2314
section.set('foo', 'second_value')
2315
# We keep track of the original value
2316
self.assertTrue('foo' in section.orig)
2317
self.assertEqual('bar', section.orig.get('foo'))
2319
def test_remove(self):
2320
a_dict = dict(foo='bar')
2321
section = self.get_section(a_dict)
2322
section.remove('foo')
2323
# We get None for unknown options via the default value
2324
self.assertEqual(None, section.get('foo'))
2325
# Or we just get the default value
2326
self.assertEqual('unknown', section.get('foo', 'unknown'))
2327
self.assertFalse('foo' in section.options)
2328
# We keep track of the deletion
2329
self.assertTrue('foo' in section.orig)
2330
self.assertEqual('bar', section.orig.get('foo'))
2332
def test_remove_new_option(self):
2334
section = self.get_section(a_dict)
2335
section.set('foo', 'bar')
2336
section.remove('foo')
2337
self.assertFalse('foo' in section.options)
2338
# The option didn't exist initially so it we need to keep track of it
2339
# with a special value
2340
self.assertTrue('foo' in section.orig)
2341
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2344
class TestCommandLineStore(tests.TestCase):
2347
super(TestCommandLineStore, self).setUp()
2348
self.store = config.CommandLineStore()
2349
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2351
def get_section(self):
2352
"""Get the unique section for the command line overrides."""
2353
sections = list(self.store.get_sections())
2354
self.assertLength(1, sections)
2355
store, section = sections[0]
2356
self.assertEqual(self.store, store)
2359
def test_no_override(self):
2360
self.store._from_cmdline([])
2361
section = self.get_section()
2362
self.assertLength(0, list(section.iter_option_names()))
2364
def test_simple_override(self):
2365
self.store._from_cmdline(['a=b'])
2366
section = self.get_section()
2367
self.assertEqual('b', section.get('a'))
2369
def test_list_override(self):
2370
opt = config.ListOption('l')
2371
config.option_registry.register(opt)
2372
self.store._from_cmdline(['l=1,2,3'])
2373
val = self.get_section().get('l')
2374
self.assertEqual('1,2,3', val)
2375
# Reminder: lists should be registered as such explicitely, otherwise
2376
# the conversion needs to be done afterwards.
2377
self.assertEqual(['1', '2', '3'],
2378
opt.convert_from_unicode(self.store, val))
2380
def test_multiple_overrides(self):
2381
self.store._from_cmdline(['a=b', 'x=y'])
2382
section = self.get_section()
2383
self.assertEqual('b', section.get('a'))
2384
self.assertEqual('y', section.get('x'))
2386
def test_wrong_syntax(self):
2387
self.assertRaises(errors.BzrCommandError,
2388
self.store._from_cmdline, ['a=b', 'c'])
2390
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2392
scenarios = [(key, {'get_store': builder}) for key, builder
2393
in config.test_store_builder_registry.iteritems()] + [
2394
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2397
store = self.get_store(self)
2398
if isinstance(store, config.TransportIniFileStore):
2399
raise tests.TestNotApplicable(
2400
"%s is not a concrete Store implementation"
2401
" so it doesn't need an id" % (store.__class__.__name__,))
2402
self.assertIsNot(None, store.id)
2405
class TestStore(tests.TestCaseWithTransport):
2407
def assertSectionContent(self, expected, store_and_section):
2408
"""Assert that some options have the proper values in a section."""
2409
_, section = store_and_section
2410
expected_name, expected_options = expected
2411
self.assertEqual(expected_name, section.id)
2414
dict([(k, section.get(k)) for k in expected_options.keys()]))
2417
class TestReadonlyStore(TestStore):
2419
scenarios = [(key, {'get_store': builder}) for key, builder
2420
in config.test_store_builder_registry.iteritems()]
2422
def test_building_delays_load(self):
2423
store = self.get_store(self)
2424
self.assertEqual(False, store.is_loaded())
2425
store._load_from_string('')
2426
self.assertEqual(True, store.is_loaded())
2428
def test_get_no_sections_for_empty(self):
2429
store = self.get_store(self)
2430
store._load_from_string('')
2431
self.assertEqual([], list(store.get_sections()))
2433
def test_get_default_section(self):
2434
store = self.get_store(self)
2435
store._load_from_string('foo=bar')
2436
sections = list(store.get_sections())
2437
self.assertLength(1, sections)
2438
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2440
def test_get_named_section(self):
2441
store = self.get_store(self)
2442
store._load_from_string('[baz]\nfoo=bar')
2443
sections = list(store.get_sections())
2444
self.assertLength(1, sections)
2445
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2447
def test_load_from_string_fails_for_non_empty_store(self):
2448
store = self.get_store(self)
2449
store._load_from_string('foo=bar')
2450
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2453
class TestStoreQuoting(TestStore):
2455
scenarios = [(key, {'get_store': builder}) for key, builder
2456
in config.test_store_builder_registry.iteritems()]
2459
super(TestStoreQuoting, self).setUp()
2460
self.store = self.get_store(self)
2461
# We need a loaded store but any content will do
2462
self.store._load_from_string('')
2464
def assertIdempotent(self, s):
2465
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2467
What matters here is that option values, as they appear in a store, can
2468
be safely round-tripped out of the store and back.
2470
:param s: A string, quoted if required.
2472
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2473
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2475
def test_empty_string(self):
2476
if isinstance(self.store, config.IniFileStore):
2477
# configobj._quote doesn't handle empty values
2478
self.assertRaises(AssertionError,
2479
self.assertIdempotent, '')
2481
self.assertIdempotent('')
2482
# But quoted empty strings are ok
2483
self.assertIdempotent('""')
2485
def test_embedded_spaces(self):
2486
self.assertIdempotent('" a b c "')
2488
def test_embedded_commas(self):
2489
self.assertIdempotent('" a , b c "')
2491
def test_simple_comma(self):
2492
if isinstance(self.store, config.IniFileStore):
2493
# configobj requires that lists are special-cased
2494
self.assertRaises(AssertionError,
2495
self.assertIdempotent, ',')
2497
self.assertIdempotent(',')
2498
# When a single comma is required, quoting is also required
2499
self.assertIdempotent('","')
2501
def test_list(self):
2502
if isinstance(self.store, config.IniFileStore):
2503
# configobj requires that lists are special-cased
2504
self.assertRaises(AssertionError,
2505
self.assertIdempotent, 'a,b')
2507
self.assertIdempotent('a,b')
2510
class TestDictFromStore(tests.TestCase):
2512
def test_unquote_not_string(self):
2513
conf = config.MemoryStack('x=2\n[a_section]\na=1\n')
2514
value = conf.get('a_section')
2515
# Urgh, despite 'conf' asking for the no-name section, we get the
2516
# content of another section as a dict o_O
2517
self.assertEqual({'a': '1'}, value)
2518
unquoted = conf.store.unquote(value)
2519
# Which cannot be unquoted but shouldn't crash either (the use cases
2520
# are getting the value or displaying it. In the later case, '%s' will
2522
self.assertEqual({'a': '1'}, unquoted)
2523
self.assertEqual("{u'a': u'1'}", '%s' % (unquoted,))
2526
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2527
"""Simulate loading a config store with content of various encodings.
2529
All files produced by bzr are in utf8 content.
2531
Users may modify them manually and end up with a file that can't be
2532
loaded. We need to issue proper error messages in this case.
2535
invalid_utf8_char = '\xff'
2537
def test_load_utf8(self):
2538
"""Ensure we can load an utf8-encoded file."""
2539
t = self.get_transport()
2540
# From http://pad.lv/799212
2541
unicode_user = u'b\N{Euro Sign}ar'
2542
unicode_content = u'user=%s' % (unicode_user,)
2543
utf8_content = unicode_content.encode('utf8')
2544
# Store the raw content in the config file
2545
t.put_bytes('foo.conf', utf8_content)
2546
store = config.TransportIniFileStore(t, 'foo.conf')
2548
stack = config.Stack([store.get_sections], store)
2549
self.assertEqual(unicode_user, stack.get('user'))
2551
def test_load_non_ascii(self):
2552
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2553
t = self.get_transport()
2554
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2555
store = config.TransportIniFileStore(t, 'foo.conf')
2556
self.assertRaises(config.ConfigContentError, store.load)
2558
def test_load_erroneous_content(self):
2559
"""Ensure we display a proper error on content that can't be parsed."""
2560
t = self.get_transport()
2561
t.put_bytes('foo.conf', '[open_section\n')
2562
store = config.TransportIniFileStore(t, 'foo.conf')
2563
self.assertRaises(config.ParseConfigError, store.load)
2565
def test_load_permission_denied(self):
2566
"""Ensure we get warned when trying to load an inaccessible file."""
2569
warnings.append(args[0] % args[1:])
2570
self.overrideAttr(trace, 'warning', warning)
2572
t = self.get_transport()
2574
def get_bytes(relpath):
2575
raise errors.PermissionDenied(relpath, "")
2576
t.get_bytes = get_bytes
2577
store = config.TransportIniFileStore(t, 'foo.conf')
2578
self.assertRaises(errors.PermissionDenied, store.load)
2581
[u'Permission denied while trying to load configuration store %s.'
2582
% store.external_url()])
2585
class TestIniConfigContent(tests.TestCaseWithTransport):
2586
"""Simulate loading a IniBasedConfig with content of various encodings.
2588
All files produced by bzr are in utf8 content.
2590
Users may modify them manually and end up with a file that can't be
2591
loaded. We need to issue proper error messages in this case.
2594
invalid_utf8_char = '\xff'
2596
def test_load_utf8(self):
2597
"""Ensure we can load an utf8-encoded file."""
2598
# From http://pad.lv/799212
2599
unicode_user = u'b\N{Euro Sign}ar'
2600
unicode_content = u'user=%s' % (unicode_user,)
2601
utf8_content = unicode_content.encode('utf8')
2602
# Store the raw content in the config file
2603
with open('foo.conf', 'wb') as f:
2604
f.write(utf8_content)
2605
conf = config.IniBasedConfig(file_name='foo.conf')
2606
self.assertEqual(unicode_user, conf.get_user_option('user'))
2608
def test_load_badly_encoded_content(self):
2609
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2610
with open('foo.conf', 'wb') as f:
2611
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2612
conf = config.IniBasedConfig(file_name='foo.conf')
2613
self.assertRaises(config.ConfigContentError, conf._get_parser)
2615
def test_load_erroneous_content(self):
2616
"""Ensure we display a proper error on content that can't be parsed."""
2617
with open('foo.conf', 'wb') as f:
2618
f.write('[open_section\n')
2619
conf = config.IniBasedConfig(file_name='foo.conf')
2620
self.assertRaises(config.ParseConfigError, conf._get_parser)
2623
class TestMutableStore(TestStore):
2625
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2626
in config.test_store_builder_registry.iteritems()]
2629
super(TestMutableStore, self).setUp()
2630
self.transport = self.get_transport()
2632
def has_store(self, store):
2633
store_basename = urlutils.relative_url(self.transport.external_url(),
2634
store.external_url())
2635
return self.transport.has(store_basename)
2637
def test_save_empty_creates_no_file(self):
2638
# FIXME: There should be a better way than relying on the test
2639
# parametrization to identify branch.conf -- vila 2011-0526
2640
if self.store_id in ('branch', 'remote_branch'):
2641
raise tests.TestNotApplicable(
2642
'branch.conf is *always* created when a branch is initialized')
2643
store = self.get_store(self)
2645
self.assertEqual(False, self.has_store(store))
2647
def test_mutable_section_shared(self):
2648
store = self.get_store(self)
2649
store._load_from_string('foo=bar\n')
2650
# FIXME: There should be a better way than relying on the test
2651
# parametrization to identify branch.conf -- vila 2011-0526
2652
if self.store_id in ('branch', 'remote_branch'):
2653
# branch stores requires write locked branches
2654
self.addCleanup(store.branch.lock_write().unlock)
2655
section1 = store.get_mutable_section(None)
2656
section2 = store.get_mutable_section(None)
2657
# If we get different sections, different callers won't share the
2659
self.assertIs(section1, section2)
2661
def test_save_emptied_succeeds(self):
2662
store = self.get_store(self)
2663
store._load_from_string('foo=bar\n')
2664
# FIXME: There should be a better way than relying on the test
2665
# parametrization to identify branch.conf -- vila 2011-0526
2666
if self.store_id in ('branch', 'remote_branch'):
2667
# branch stores requires write locked branches
2668
self.addCleanup(store.branch.lock_write().unlock)
2669
section = store.get_mutable_section(None)
2670
section.remove('foo')
2672
self.assertEqual(True, self.has_store(store))
2673
modified_store = self.get_store(self)
2674
sections = list(modified_store.get_sections())
2675
self.assertLength(0, sections)
2677
def test_save_with_content_succeeds(self):
2678
# FIXME: There should be a better way than relying on the test
2679
# parametrization to identify branch.conf -- vila 2011-0526
2680
if self.store_id in ('branch', 'remote_branch'):
2681
raise tests.TestNotApplicable(
2682
'branch.conf is *always* created when a branch is initialized')
2683
store = self.get_store(self)
2684
store._load_from_string('foo=bar\n')
2685
self.assertEqual(False, self.has_store(store))
2687
self.assertEqual(True, self.has_store(store))
2688
modified_store = self.get_store(self)
2689
sections = list(modified_store.get_sections())
2690
self.assertLength(1, sections)
2691
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2693
def test_set_option_in_empty_store(self):
2694
store = self.get_store(self)
2695
# FIXME: There should be a better way than relying on the test
2696
# parametrization to identify branch.conf -- vila 2011-0526
2697
if self.store_id in ('branch', 'remote_branch'):
2698
# branch stores requires write locked branches
2699
self.addCleanup(store.branch.lock_write().unlock)
2700
section = store.get_mutable_section(None)
2701
section.set('foo', 'bar')
2703
modified_store = self.get_store(self)
2704
sections = list(modified_store.get_sections())
2705
self.assertLength(1, sections)
2706
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2708
def test_set_option_in_default_section(self):
2709
store = self.get_store(self)
2710
store._load_from_string('')
2711
# FIXME: There should be a better way than relying on the test
2712
# parametrization to identify branch.conf -- vila 2011-0526
2713
if self.store_id in ('branch', 'remote_branch'):
2714
# branch stores requires write locked branches
2715
self.addCleanup(store.branch.lock_write().unlock)
2716
section = store.get_mutable_section(None)
2717
section.set('foo', 'bar')
2719
modified_store = self.get_store(self)
2720
sections = list(modified_store.get_sections())
2721
self.assertLength(1, sections)
2722
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2724
def test_set_option_in_named_section(self):
2725
store = self.get_store(self)
2726
store._load_from_string('')
2727
# FIXME: There should be a better way than relying on the test
2728
# parametrization to identify branch.conf -- vila 2011-0526
2729
if self.store_id in ('branch', 'remote_branch'):
2730
# branch stores requires write locked branches
2731
self.addCleanup(store.branch.lock_write().unlock)
2732
section = store.get_mutable_section('baz')
2733
section.set('foo', 'bar')
2735
modified_store = self.get_store(self)
2736
sections = list(modified_store.get_sections())
2737
self.assertLength(1, sections)
2738
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2740
def test_load_hook(self):
2741
# First, we need to ensure that the store exists
2742
store = self.get_store(self)
2743
# FIXME: There should be a better way than relying on the test
2744
# parametrization to identify branch.conf -- vila 2011-0526
2745
if self.store_id in ('branch', 'remote_branch'):
2746
# branch stores requires write locked branches
2747
self.addCleanup(store.branch.lock_write().unlock)
2748
section = store.get_mutable_section('baz')
2749
section.set('foo', 'bar')
2751
# Now we can try to load it
2752
store = self.get_store(self)
2756
config.ConfigHooks.install_named_hook('load', hook, None)
2757
self.assertLength(0, calls)
2759
self.assertLength(1, calls)
2760
self.assertEqual((store,), calls[0])
2762
def test_save_hook(self):
2766
config.ConfigHooks.install_named_hook('save', hook, None)
2767
self.assertLength(0, calls)
2768
store = self.get_store(self)
2769
# FIXME: There should be a better way than relying on the test
2770
# parametrization to identify branch.conf -- vila 2011-0526
2771
if self.store_id in ('branch', 'remote_branch'):
2772
# branch stores requires write locked branches
2773
self.addCleanup(store.branch.lock_write().unlock)
2774
section = store.get_mutable_section('baz')
2775
section.set('foo', 'bar')
2777
self.assertLength(1, calls)
2778
self.assertEqual((store,), calls[0])
2780
def test_set_mark_dirty(self):
2781
stack = config.MemoryStack('')
2782
self.assertLength(0, stack.store.dirty_sections)
2783
stack.set('foo', 'baz')
2784
self.assertLength(1, stack.store.dirty_sections)
2785
self.assertTrue(stack.store._need_saving())
2787
def test_remove_mark_dirty(self):
2788
stack = config.MemoryStack('foo=bar')
2789
self.assertLength(0, stack.store.dirty_sections)
2791
self.assertLength(1, stack.store.dirty_sections)
2792
self.assertTrue(stack.store._need_saving())
2795
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2796
"""Tests that config changes are kept in memory and saved on-demand."""
2799
super(TestStoreSaveChanges, self).setUp()
2800
self.transport = self.get_transport()
2801
# Most of the tests involve two stores pointing to the same persistent
2802
# storage to observe the effects of concurrent changes
2803
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2804
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2807
self.warnings.append(args[0] % args[1:])
2808
self.overrideAttr(trace, 'warning', warning)
2810
def has_store(self, store):
2811
store_basename = urlutils.relative_url(self.transport.external_url(),
2812
store.external_url())
2813
return self.transport.has(store_basename)
2815
def get_stack(self, store):
2816
# Any stack will do as long as it uses the right store, just a single
2817
# no-name section is enough
2818
return config.Stack([store.get_sections], store)
2820
def test_no_changes_no_save(self):
2821
s = self.get_stack(self.st1)
2822
s.store.save_changes()
2823
self.assertEqual(False, self.has_store(self.st1))
2825
def test_unrelated_concurrent_update(self):
2826
s1 = self.get_stack(self.st1)
2827
s2 = self.get_stack(self.st2)
2828
s1.set('foo', 'bar')
2829
s2.set('baz', 'quux')
2831
# Changes don't propagate magically
2832
self.assertEqual(None, s1.get('baz'))
2833
s2.store.save_changes()
2834
self.assertEqual('quux', s2.get('baz'))
2835
# Changes are acquired when saving
2836
self.assertEqual('bar', s2.get('foo'))
2837
# Since there is no overlap, no warnings are emitted
2838
self.assertLength(0, self.warnings)
2840
def test_concurrent_update_modified(self):
2841
s1 = self.get_stack(self.st1)
2842
s2 = self.get_stack(self.st2)
2843
s1.set('foo', 'bar')
2844
s2.set('foo', 'baz')
2847
s2.store.save_changes()
2848
self.assertEqual('baz', s2.get('foo'))
2849
# But the user get a warning
2850
self.assertLength(1, self.warnings)
2851
warning = self.warnings[0]
2852
self.assertStartsWith(warning, 'Option foo in section None')
2853
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2854
' The baz value will be saved.')
2856
def test_concurrent_deletion(self):
2857
self.st1._load_from_string('foo=bar')
2859
s1 = self.get_stack(self.st1)
2860
s2 = self.get_stack(self.st2)
2863
s1.store.save_changes()
2865
self.assertLength(0, self.warnings)
2866
s2.store.save_changes()
2868
self.assertLength(1, self.warnings)
2869
warning = self.warnings[0]
2870
self.assertStartsWith(warning, 'Option foo in section None')
2871
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2872
' The <DELETED> value will be saved.')
2875
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2877
def get_store(self):
2878
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2880
def test_get_quoted_string(self):
2881
store = self.get_store()
2882
store._load_from_string('foo= " abc "')
2883
stack = config.Stack([store.get_sections])
2884
self.assertEqual(' abc ', stack.get('foo'))
2886
def test_set_quoted_string(self):
2887
store = self.get_store()
2888
stack = config.Stack([store.get_sections], store)
2889
stack.set('foo', ' a b c ')
2891
self.assertFileEqual('foo = " a b c "' + os.linesep, 'foo.conf')
2894
class TestTransportIniFileStore(TestStore):
2896
def test_loading_unknown_file_fails(self):
2897
store = config.TransportIniFileStore(self.get_transport(),
2899
self.assertRaises(errors.NoSuchFile, store.load)
2901
def test_invalid_content(self):
2902
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2903
self.assertEqual(False, store.is_loaded())
2904
exc = self.assertRaises(
2905
config.ParseConfigError, store._load_from_string,
2906
'this is invalid !')
2907
self.assertEndsWith(exc.filename, 'foo.conf')
2908
# And the load failed
2909
self.assertEqual(False, store.is_loaded())
2911
def test_get_embedded_sections(self):
2912
# A more complicated example (which also shows that section names and
2913
# option names share the same name space...)
2914
# FIXME: This should be fixed by forbidding dicts as values ?
2915
# -- vila 2011-04-05
2916
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2917
store._load_from_string('''
2921
foo_in_DEFAULT=foo_DEFAULT
2929
sections = list(store.get_sections())
2930
self.assertLength(4, sections)
2931
# The default section has no name.
2932
# List values are provided as strings and need to be explicitly
2933
# converted by specifying from_unicode=list_from_store at option
2935
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2937
self.assertSectionContent(
2938
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2939
self.assertSectionContent(
2940
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2941
# sub sections are provided as embedded dicts.
2942
self.assertSectionContent(
2943
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2947
class TestLockableIniFileStore(TestStore):
2949
def test_create_store_in_created_dir(self):
2950
self.assertPathDoesNotExist('dir')
2951
t = self.get_transport('dir/subdir')
2952
store = config.LockableIniFileStore(t, 'foo.conf')
2953
store.get_mutable_section(None).set('foo', 'bar')
2955
self.assertPathExists('dir/subdir')
2958
class TestConcurrentStoreUpdates(TestStore):
2959
"""Test that Stores properly handle conccurent updates.
2961
New Store implementation may fail some of these tests but until such
2962
implementations exist it's hard to properly filter them from the scenarios
2963
applied here. If you encounter such a case, contact the bzr devs.
2966
scenarios = [(key, {'get_stack': builder}) for key, builder
2967
in config.test_stack_builder_registry.iteritems()]
2970
super(TestConcurrentStoreUpdates, self).setUp()
2971
self.stack = self.get_stack(self)
2972
if not isinstance(self.stack, config._CompatibleStack):
2973
raise tests.TestNotApplicable(
2974
'%s is not meant to be compatible with the old config design'
2976
self.stack.set('one', '1')
2977
self.stack.set('two', '2')
2979
self.stack.store.save()
2981
def test_simple_read_access(self):
2982
self.assertEqual('1', self.stack.get('one'))
2984
def test_simple_write_access(self):
2985
self.stack.set('one', 'one')
2986
self.assertEqual('one', self.stack.get('one'))
2988
def test_listen_to_the_last_speaker(self):
2990
c2 = self.get_stack(self)
2991
c1.set('one', 'ONE')
2992
c2.set('two', 'TWO')
2993
self.assertEqual('ONE', c1.get('one'))
2994
self.assertEqual('TWO', c2.get('two'))
2995
# The second update respect the first one
2996
self.assertEqual('ONE', c2.get('one'))
2998
def test_last_speaker_wins(self):
2999
# If the same config is not shared, the same variable modified twice
3000
# can only see a single result.
3002
c2 = self.get_stack(self)
3005
self.assertEqual('c2', c2.get('one'))
3006
# The first modification is still available until another refresh
3008
self.assertEqual('c1', c1.get('one'))
3009
c1.set('two', 'done')
3010
self.assertEqual('c2', c1.get('one'))
3012
def test_writes_are_serialized(self):
3014
c2 = self.get_stack(self)
3016
# We spawn a thread that will pause *during* the config saving.
3017
before_writing = threading.Event()
3018
after_writing = threading.Event()
3019
writing_done = threading.Event()
3020
c1_save_without_locking_orig = c1.store.save_without_locking
3021
def c1_save_without_locking():
3022
before_writing.set()
3023
c1_save_without_locking_orig()
3024
# The lock is held. We wait for the main thread to decide when to
3026
after_writing.wait()
3027
c1.store.save_without_locking = c1_save_without_locking
3031
t1 = threading.Thread(target=c1_set)
3032
# Collect the thread after the test
3033
self.addCleanup(t1.join)
3034
# Be ready to unblock the thread if the test goes wrong
3035
self.addCleanup(after_writing.set)
3037
before_writing.wait()
3038
self.assertRaises(errors.LockContention,
3039
c2.set, 'one', 'c2')
3040
self.assertEqual('c1', c1.get('one'))
3041
# Let the lock be released
3045
self.assertEqual('c2', c2.get('one'))
3047
def test_read_while_writing(self):
3049
# We spawn a thread that will pause *during* the write
3050
ready_to_write = threading.Event()
3051
do_writing = threading.Event()
3052
writing_done = threading.Event()
3053
# We override the _save implementation so we know the store is locked
3054
c1_save_without_locking_orig = c1.store.save_without_locking
3055
def c1_save_without_locking():
3056
ready_to_write.set()
3057
# The lock is held. We wait for the main thread to decide when to
3060
c1_save_without_locking_orig()
3062
c1.store.save_without_locking = c1_save_without_locking
3065
t1 = threading.Thread(target=c1_set)
3066
# Collect the thread after the test
3067
self.addCleanup(t1.join)
3068
# Be ready to unblock the thread if the test goes wrong
3069
self.addCleanup(do_writing.set)
3071
# Ensure the thread is ready to write
3072
ready_to_write.wait()
3073
self.assertEqual('c1', c1.get('one'))
3074
# If we read during the write, we get the old value
3075
c2 = self.get_stack(self)
3076
self.assertEqual('1', c2.get('one'))
3077
# Let the writing occur and ensure it occurred
3080
# Now we get the updated value
3081
c3 = self.get_stack(self)
3082
self.assertEqual('c1', c3.get('one'))
3084
# FIXME: It may be worth looking into removing the lock dir when it's not
3085
# needed anymore and look at possible fallouts for concurrent lockers. This
3086
# will matter if/when we use config files outside of breezy directories
3087
# (.config/breezy or .bzr) -- vila 20110-04-111
3090
class TestSectionMatcher(TestStore):
3092
scenarios = [('location', {'matcher': config.LocationMatcher}),
3093
('id', {'matcher': config.NameMatcher}),]
3096
super(TestSectionMatcher, self).setUp()
3097
# Any simple store is good enough
3098
self.get_store = config.test_store_builder_registry.get('configobj')
3100
def test_no_matches_for_empty_stores(self):
3101
store = self.get_store(self)
3102
store._load_from_string('')
3103
matcher = self.matcher(store, '/bar')
3104
self.assertEqual([], list(matcher.get_sections()))
3106
def test_build_doesnt_load_store(self):
3107
store = self.get_store(self)
3108
self.matcher(store, '/bar')
3109
self.assertFalse(store.is_loaded())
3112
class TestLocationSection(tests.TestCase):
3114
def get_section(self, options, extra_path):
3115
section = config.Section('foo', options)
3116
return config.LocationSection(section, extra_path)
3118
def test_simple_option(self):
3119
section = self.get_section({'foo': 'bar'}, '')
3120
self.assertEqual('bar', section.get('foo'))
3122
def test_option_with_extra_path(self):
3123
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3125
self.assertEqual('bar/baz', section.get('foo'))
3127
def test_invalid_policy(self):
3128
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3130
# invalid policies are ignored
3131
self.assertEqual('bar', section.get('foo'))
3134
class TestLocationMatcher(TestStore):
3137
super(TestLocationMatcher, self).setUp()
3138
# Any simple store is good enough
3139
self.get_store = config.test_store_builder_registry.get('configobj')
3141
def test_unrelated_section_excluded(self):
3142
store = self.get_store(self)
3143
store._load_from_string('''
3151
section=/foo/bar/baz
3155
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3157
[section.id for _, section in store.get_sections()])
3158
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3159
sections = [section for _, section in matcher.get_sections()]
3160
self.assertEqual(['/foo/bar', '/foo'],
3161
[section.id for section in sections])
3162
self.assertEqual(['quux', 'bar/quux'],
3163
[section.extra_path for section in sections])
3165
def test_more_specific_sections_first(self):
3166
store = self.get_store(self)
3167
store._load_from_string('''
3173
self.assertEqual(['/foo', '/foo/bar'],
3174
[section.id for _, section in store.get_sections()])
3175
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3176
sections = [section for _, section in matcher.get_sections()]
3177
self.assertEqual(['/foo/bar', '/foo'],
3178
[section.id for section in sections])
3179
self.assertEqual(['baz', 'bar/baz'],
3180
[section.extra_path for section in sections])
3182
def test_appendpath_in_no_name_section(self):
3183
# It's a bit weird to allow appendpath in a no-name section, but
3184
# someone may found a use for it
3185
store = self.get_store(self)
3186
store._load_from_string('''
3188
foo:policy = appendpath
3190
matcher = config.LocationMatcher(store, 'dir/subdir')
3191
sections = list(matcher.get_sections())
3192
self.assertLength(1, sections)
3193
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3195
def test_file_urls_are_normalized(self):
3196
store = self.get_store(self)
3197
if sys.platform == 'win32':
3198
expected_url = 'file:///C:/dir/subdir'
3199
expected_location = 'C:/dir/subdir'
3201
expected_url = 'file:///dir/subdir'
3202
expected_location = '/dir/subdir'
3203
matcher = config.LocationMatcher(store, expected_url)
3204
self.assertEqual(expected_location, matcher.location)
3206
def test_branch_name_colo(self):
3207
store = self.get_store(self)
3208
store._load_from_string(dedent("""\
3210
push_location=my{branchname}
3212
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3213
self.assertEqual('example<', matcher.branch_name)
3214
((_, section),) = matcher.get_sections()
3215
self.assertEqual('example<', section.locals['branchname'])
3217
def test_branch_name_basename(self):
3218
store = self.get_store(self)
3219
store._load_from_string(dedent("""\
3221
push_location=my{branchname}
3223
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3224
self.assertEqual('example<', matcher.branch_name)
3225
((_, section),) = matcher.get_sections()
3226
self.assertEqual('example<', section.locals['branchname'])
3229
class TestStartingPathMatcher(TestStore):
3232
super(TestStartingPathMatcher, self).setUp()
3233
# Any simple store is good enough
3234
self.store = config.IniFileStore()
3236
def assertSectionIDs(self, expected, location, content):
3237
self.store._load_from_string(content)
3238
matcher = config.StartingPathMatcher(self.store, location)
3239
sections = list(matcher.get_sections())
3240
self.assertLength(len(expected), sections)
3241
self.assertEqual(expected, [section.id for _, section in sections])
3244
def test_empty(self):
3245
self.assertSectionIDs([], self.get_url(), '')
3247
def test_url_vs_local_paths(self):
3248
# The matcher location is an url and the section names are local paths
3249
self.assertSectionIDs(['/foo/bar', '/foo'],
3250
'file:///foo/bar/baz', '''\
3255
def test_local_path_vs_url(self):
3256
# The matcher location is a local path and the section names are urls
3257
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3258
'/foo/bar/baz', '''\
3264
def test_no_name_section_included_when_present(self):
3265
# Note that other tests will cover the case where the no-name section
3266
# is empty and as such, not included.
3267
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3268
'/foo/bar/baz', '''\
3269
option = defined so the no-name section exists
3273
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3274
[s.locals['relpath'] for _, s in sections])
3276
def test_order_reversed(self):
3277
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3282
def test_unrelated_section_excluded(self):
3283
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3289
def test_glob_included(self):
3290
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3291
'/foo/bar/baz', '''\
3297
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3298
# nothing really is... as far using {relpath} to append it to something
3299
# else, this seems good enough though.
3300
self.assertEqual(['', 'baz', 'bar/baz'],
3301
[s.locals['relpath'] for _, s in sections])
3303
def test_respect_order(self):
3304
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3305
'/foo/bar/baz', '''\
3313
class TestNameMatcher(TestStore):
3316
super(TestNameMatcher, self).setUp()
3317
self.matcher = config.NameMatcher
3318
# Any simple store is good enough
3319
self.get_store = config.test_store_builder_registry.get('configobj')
3321
def get_matching_sections(self, name):
3322
store = self.get_store(self)
3323
store._load_from_string('''
3331
matcher = self.matcher(store, name)
3332
return list(matcher.get_sections())
3334
def test_matching(self):
3335
sections = self.get_matching_sections('foo')
3336
self.assertLength(1, sections)
3337
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3339
def test_not_matching(self):
3340
sections = self.get_matching_sections('baz')
3341
self.assertLength(0, sections)
3344
class TestBaseStackGet(tests.TestCase):
3347
super(TestBaseStackGet, self).setUp()
3348
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3350
def test_get_first_definition(self):
3351
store1 = config.IniFileStore()
3352
store1._load_from_string('foo=bar')
3353
store2 = config.IniFileStore()
3354
store2._load_from_string('foo=baz')
3355
conf = config.Stack([store1.get_sections, store2.get_sections])
3356
self.assertEqual('bar', conf.get('foo'))
3358
def test_get_with_registered_default_value(self):
3359
config.option_registry.register(config.Option('foo', default='bar'))
3360
conf_stack = config.Stack([])
3361
self.assertEqual('bar', conf_stack.get('foo'))
3363
def test_get_without_registered_default_value(self):
3364
config.option_registry.register(config.Option('foo'))
3365
conf_stack = config.Stack([])
3366
self.assertEqual(None, conf_stack.get('foo'))
3368
def test_get_without_default_value_for_not_registered(self):
3369
conf_stack = config.Stack([])
3370
self.assertEqual(None, conf_stack.get('foo'))
3372
def test_get_for_empty_section_callable(self):
3373
conf_stack = config.Stack([lambda : []])
3374
self.assertEqual(None, conf_stack.get('foo'))
3376
def test_get_for_broken_callable(self):
3377
# Trying to use and invalid callable raises an exception on first use
3378
conf_stack = config.Stack([object])
3379
self.assertRaises(TypeError, conf_stack.get, 'foo')
3382
class TestStackWithSimpleStore(tests.TestCase):
3385
super(TestStackWithSimpleStore, self).setUp()
3386
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3387
self.registry = config.option_registry
3389
def get_conf(self, content=None):
3390
return config.MemoryStack(content)
3392
def test_override_value_from_env(self):
3393
self.overrideEnv('FOO', None)
3394
self.registry.register(
3395
config.Option('foo', default='bar', override_from_env=['FOO']))
3396
self.overrideEnv('FOO', 'quux')
3397
# Env variable provides a default taking over the option one
3398
conf = self.get_conf('foo=store')
3399
self.assertEqual('quux', conf.get('foo'))
3401
def test_first_override_value_from_env_wins(self):
3402
self.overrideEnv('NO_VALUE', None)
3403
self.overrideEnv('FOO', None)
3404
self.overrideEnv('BAZ', None)
3405
self.registry.register(
3406
config.Option('foo', default='bar',
3407
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3408
self.overrideEnv('FOO', 'foo')
3409
self.overrideEnv('BAZ', 'baz')
3410
# The first env var set wins
3411
conf = self.get_conf('foo=store')
3412
self.assertEqual('foo', conf.get('foo'))
3415
class TestMemoryStack(tests.TestCase):
3418
conf = config.MemoryStack('foo=bar')
3419
self.assertEqual('bar', conf.get('foo'))
3422
conf = config.MemoryStack('foo=bar')
3423
conf.set('foo', 'baz')
3424
self.assertEqual('baz', conf.get('foo'))
3426
def test_no_content(self):
3427
conf = config.MemoryStack()
3428
# No content means no loading
3429
self.assertFalse(conf.store.is_loaded())
3430
self.assertRaises(NotImplementedError, conf.get, 'foo')
3431
# But a content can still be provided
3432
conf.store._load_from_string('foo=bar')
3433
self.assertEqual('bar', conf.get('foo'))
3436
class TestStackIterSections(tests.TestCase):
3438
def test_empty_stack(self):
3439
conf = config.Stack([])
3440
sections = list(conf.iter_sections())
3441
self.assertLength(0, sections)
3443
def test_empty_store(self):
3444
store = config.IniFileStore()
3445
store._load_from_string('')
3446
conf = config.Stack([store.get_sections])
3447
sections = list(conf.iter_sections())
3448
self.assertLength(0, sections)
3450
def test_simple_store(self):
3451
store = config.IniFileStore()
3452
store._load_from_string('foo=bar')
3453
conf = config.Stack([store.get_sections])
3454
tuples = list(conf.iter_sections())
3455
self.assertLength(1, tuples)
3456
(found_store, found_section) = tuples[0]
3457
self.assertIs(store, found_store)
3459
def test_two_stores(self):
3460
store1 = config.IniFileStore()
3461
store1._load_from_string('foo=bar')
3462
store2 = config.IniFileStore()
3463
store2._load_from_string('bar=qux')
3464
conf = config.Stack([store1.get_sections, store2.get_sections])
3465
tuples = list(conf.iter_sections())
3466
self.assertLength(2, tuples)
3467
self.assertIs(store1, tuples[0][0])
3468
self.assertIs(store2, tuples[1][0])
3471
class TestStackWithTransport(tests.TestCaseWithTransport):
3473
scenarios = [(key, {'get_stack': builder}) for key, builder
3474
in config.test_stack_builder_registry.iteritems()]
3477
class TestConcreteStacks(TestStackWithTransport):
3479
def test_build_stack(self):
3480
# Just a smoke test to help debug builders
3481
self.get_stack(self)
3484
class TestStackGet(TestStackWithTransport):
3487
super(TestStackGet, self).setUp()
3488
self.conf = self.get_stack(self)
3490
def test_get_for_empty_stack(self):
3491
self.assertEqual(None, self.conf.get('foo'))
3493
def test_get_hook(self):
3494
self.conf.set('foo', 'bar')
3498
config.ConfigHooks.install_named_hook('get', hook, None)
3499
self.assertLength(0, calls)
3500
value = self.conf.get('foo')
3501
self.assertEqual('bar', value)
3502
self.assertLength(1, calls)
3503
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3506
class TestStackGetWithConverter(tests.TestCase):
3509
super(TestStackGetWithConverter, self).setUp()
3510
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3511
self.registry = config.option_registry
3513
def get_conf(self, content=None):
3514
return config.MemoryStack(content)
3516
def register_bool_option(self, name, default=None, default_from_env=None):
3517
b = config.Option(name, help='A boolean.',
3518
default=default, default_from_env=default_from_env,
3519
from_unicode=config.bool_from_store)
3520
self.registry.register(b)
3522
def test_get_default_bool_None(self):
3523
self.register_bool_option('foo')
3524
conf = self.get_conf('')
3525
self.assertEqual(None, conf.get('foo'))
3527
def test_get_default_bool_True(self):
3528
self.register_bool_option('foo', u'True')
3529
conf = self.get_conf('')
3530
self.assertEqual(True, conf.get('foo'))
3532
def test_get_default_bool_False(self):
3533
self.register_bool_option('foo', False)
3534
conf = self.get_conf('')
3535
self.assertEqual(False, conf.get('foo'))
3537
def test_get_default_bool_False_as_string(self):
3538
self.register_bool_option('foo', u'False')
3539
conf = self.get_conf('')
3540
self.assertEqual(False, conf.get('foo'))
3542
def test_get_default_bool_from_env_converted(self):
3543
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3544
self.overrideEnv('FOO', 'False')
3545
conf = self.get_conf('')
3546
self.assertEqual(False, conf.get('foo'))
3548
def test_get_default_bool_when_conversion_fails(self):
3549
self.register_bool_option('foo', default='True')
3550
conf = self.get_conf('foo=invalid boolean')
3551
self.assertEqual(True, conf.get('foo'))
3553
def register_integer_option(self, name,
3554
default=None, default_from_env=None):
3555
i = config.Option(name, help='An integer.',
3556
default=default, default_from_env=default_from_env,
3557
from_unicode=config.int_from_store)
3558
self.registry.register(i)
3560
def test_get_default_integer_None(self):
3561
self.register_integer_option('foo')
3562
conf = self.get_conf('')
3563
self.assertEqual(None, conf.get('foo'))
3565
def test_get_default_integer(self):
3566
self.register_integer_option('foo', 42)
3567
conf = self.get_conf('')
3568
self.assertEqual(42, conf.get('foo'))
3570
def test_get_default_integer_as_string(self):
3571
self.register_integer_option('foo', u'42')
3572
conf = self.get_conf('')
3573
self.assertEqual(42, conf.get('foo'))
3575
def test_get_default_integer_from_env(self):
3576
self.register_integer_option('foo', default_from_env=['FOO'])
3577
self.overrideEnv('FOO', '18')
3578
conf = self.get_conf('')
3579
self.assertEqual(18, conf.get('foo'))
3581
def test_get_default_integer_when_conversion_fails(self):
3582
self.register_integer_option('foo', default='12')
3583
conf = self.get_conf('foo=invalid integer')
3584
self.assertEqual(12, conf.get('foo'))
3586
def register_list_option(self, name, default=None, default_from_env=None):
3587
l = config.ListOption(name, help='A list.', default=default,
3588
default_from_env=default_from_env)
3589
self.registry.register(l)
3591
def test_get_default_list_None(self):
3592
self.register_list_option('foo')
3593
conf = self.get_conf('')
3594
self.assertEqual(None, conf.get('foo'))
3596
def test_get_default_list_empty(self):
3597
self.register_list_option('foo', '')
3598
conf = self.get_conf('')
3599
self.assertEqual([], conf.get('foo'))
3601
def test_get_default_list_from_env(self):
3602
self.register_list_option('foo', default_from_env=['FOO'])
3603
self.overrideEnv('FOO', '')
3604
conf = self.get_conf('')
3605
self.assertEqual([], conf.get('foo'))
3607
def test_get_with_list_converter_no_item(self):
3608
self.register_list_option('foo', None)
3609
conf = self.get_conf('foo=,')
3610
self.assertEqual([], conf.get('foo'))
3612
def test_get_with_list_converter_many_items(self):
3613
self.register_list_option('foo', None)
3614
conf = self.get_conf('foo=m,o,r,e')
3615
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3617
def test_get_with_list_converter_embedded_spaces_many_items(self):
3618
self.register_list_option('foo', None)
3619
conf = self.get_conf('foo=" bar", "baz "')
3620
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3622
def test_get_with_list_converter_stripped_spaces_many_items(self):
3623
self.register_list_option('foo', None)
3624
conf = self.get_conf('foo= bar , baz ')
3625
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3628
class TestIterOptionRefs(tests.TestCase):
3629
"""iter_option_refs is a bit unusual, document some cases."""
3631
def assertRefs(self, expected, string):
3632
self.assertEqual(expected, list(config.iter_option_refs(string)))
3634
def test_empty(self):
3635
self.assertRefs([(False, '')], '')
3637
def test_no_refs(self):
3638
self.assertRefs([(False, 'foo bar')], 'foo bar')
3640
def test_single_ref(self):
3641
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3643
def test_broken_ref(self):
3644
self.assertRefs([(False, '{foo')], '{foo')
3646
def test_embedded_ref(self):
3647
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3650
def test_two_refs(self):
3651
self.assertRefs([(False, ''), (True, '{foo}'),
3652
(False, ''), (True, '{bar}'),
3656
def test_newline_in_refs_are_not_matched(self):
3657
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3660
class TestStackExpandOptions(tests.TestCaseWithTransport):
3663
super(TestStackExpandOptions, self).setUp()
3664
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3665
self.registry = config.option_registry
3666
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3667
self.conf = config.Stack([store.get_sections], store)
3669
def assertExpansion(self, expected, string, env=None):
3670
self.assertEqual(expected, self.conf.expand_options(string, env))
3672
def test_no_expansion(self):
3673
self.assertExpansion('foo', 'foo')
3675
def test_expand_default_value(self):
3676
self.conf.store._load_from_string('bar=baz')
3677
self.registry.register(config.Option('foo', default=u'{bar}'))
3678
self.assertEqual('baz', self.conf.get('foo', expand=True))
3680
def test_expand_default_from_env(self):
3681
self.conf.store._load_from_string('bar=baz')
3682
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3683
self.overrideEnv('FOO', '{bar}')
3684
self.assertEqual('baz', self.conf.get('foo', expand=True))
3686
def test_expand_default_on_failed_conversion(self):
3687
self.conf.store._load_from_string('baz=bogus\nbar=42\nfoo={baz}')
3688
self.registry.register(
3689
config.Option('foo', default=u'{bar}',
3690
from_unicode=config.int_from_store))
3691
self.assertEqual(42, self.conf.get('foo', expand=True))
3693
def test_env_adding_options(self):
3694
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3696
def test_env_overriding_options(self):
3697
self.conf.store._load_from_string('foo=baz')
3698
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3700
def test_simple_ref(self):
3701
self.conf.store._load_from_string('foo=xxx')
3702
self.assertExpansion('xxx', '{foo}')
3704
def test_unknown_ref(self):
3705
self.assertRaises(config.ExpandingUnknownOption,
3706
self.conf.expand_options, '{foo}')
3708
def test_illegal_def_is_ignored(self):
3709
self.assertExpansion('{1,2}', '{1,2}')
3710
self.assertExpansion('{ }', '{ }')
3711
self.assertExpansion('${Foo,f}', '${Foo,f}')
3713
def test_indirect_ref(self):
3714
self.conf.store._load_from_string('''
3718
self.assertExpansion('xxx', '{bar}')
3720
def test_embedded_ref(self):
3721
self.conf.store._load_from_string('''
3725
self.assertExpansion('xxx', '{{bar}}')
3727
def test_simple_loop(self):
3728
self.conf.store._load_from_string('foo={foo}')
3729
self.assertRaises(config.OptionExpansionLoop,
3730
self.conf.expand_options, '{foo}')
3732
def test_indirect_loop(self):
3733
self.conf.store._load_from_string('''
3737
e = self.assertRaises(config.OptionExpansionLoop,
3738
self.conf.expand_options, '{foo}')
3739
self.assertEqual('foo->bar->baz', e.refs)
3740
self.assertEqual('{foo}', e.string)
3742
def test_list(self):
3743
self.conf.store._load_from_string('''
3747
list={foo},{bar},{baz}
3749
self.registry.register(
3750
config.ListOption('list'))
3751
self.assertEqual(['start', 'middle', 'end'],
3752
self.conf.get('list', expand=True))
3754
def test_cascading_list(self):
3755
self.conf.store._load_from_string('''
3761
self.registry.register(config.ListOption('list'))
3762
# Register an intermediate option as a list to ensure no conversion
3763
# happen while expanding. Conversion should only occur for the original
3764
# option ('list' here).
3765
self.registry.register(config.ListOption('baz'))
3766
self.assertEqual(['start', 'middle', 'end'],
3767
self.conf.get('list', expand=True))
3769
def test_pathologically_hidden_list(self):
3770
self.conf.store._load_from_string('''
3776
hidden={start}{middle}{end}
3778
# What matters is what the registration says, the conversion happens
3779
# only after all expansions have been performed
3780
self.registry.register(config.ListOption('hidden'))
3781
self.assertEqual(['bin', 'go'],
3782
self.conf.get('hidden', expand=True))
3785
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3788
super(TestStackCrossSectionsExpand, self).setUp()
3790
def get_config(self, location, string):
3793
# Since we don't save the config we won't strictly require to inherit
3794
# from TestCaseInTempDir, but an error occurs so quickly...
3795
c = config.LocationStack(location)
3796
c.store._load_from_string(string)
3799
def test_dont_cross_unrelated_section(self):
3800
c = self.get_config('/another/branch/path','''
3805
[/another/branch/path]
3808
self.assertRaises(config.ExpandingUnknownOption,
3809
c.get, 'bar', expand=True)
3811
def test_cross_related_sections(self):
3812
c = self.get_config('/project/branch/path','''
3816
[/project/branch/path]
3819
self.assertEqual('quux', c.get('bar', expand=True))
3822
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3824
def test_cross_global_locations(self):
3825
l_store = config.LocationStore()
3826
l_store._load_from_string('''
3832
g_store = config.GlobalStore()
3833
g_store._load_from_string('''
3839
stack = config.LocationStack('/branch')
3840
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3841
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3844
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3846
def test_expand_locals_empty(self):
3847
l_store = config.LocationStore()
3848
l_store._load_from_string('''
3849
[/home/user/project]
3854
stack = config.LocationStack('/home/user/project/')
3855
self.assertEqual('', stack.get('base', expand=True))
3856
self.assertEqual('', stack.get('rel', expand=True))
3858
def test_expand_basename_locally(self):
3859
l_store = config.LocationStore()
3860
l_store._load_from_string('''
3861
[/home/user/project]
3865
stack = config.LocationStack('/home/user/project/branch')
3866
self.assertEqual('branch', stack.get('bfoo', expand=True))
3868
def test_expand_basename_locally_longer_path(self):
3869
l_store = config.LocationStore()
3870
l_store._load_from_string('''
3875
stack = config.LocationStack('/home/user/project/dir/branch')
3876
self.assertEqual('branch', stack.get('bfoo', expand=True))
3878
def test_expand_relpath_locally(self):
3879
l_store = config.LocationStore()
3880
l_store._load_from_string('''
3881
[/home/user/project]
3882
lfoo = loc-foo/{relpath}
3885
stack = config.LocationStack('/home/user/project/branch')
3886
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3888
def test_expand_relpath_unknonw_in_global(self):
3889
g_store = config.GlobalStore()
3890
g_store._load_from_string('''
3895
stack = config.LocationStack('/home/user/project/branch')
3896
self.assertRaises(config.ExpandingUnknownOption,
3897
stack.get, 'gfoo', expand=True)
3899
def test_expand_local_option_locally(self):
3900
l_store = config.LocationStore()
3901
l_store._load_from_string('''
3902
[/home/user/project]
3903
lfoo = loc-foo/{relpath}
3907
g_store = config.GlobalStore()
3908
g_store._load_from_string('''
3914
stack = config.LocationStack('/home/user/project/branch')
3915
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3916
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3918
def test_locals_dont_leak(self):
3919
"""Make sure we chose the right local in presence of several sections.
3921
l_store = config.LocationStore()
3922
l_store._load_from_string('''
3924
lfoo = loc-foo/{relpath}
3925
[/home/user/project]
3926
lfoo = loc-foo/{relpath}
3929
stack = config.LocationStack('/home/user/project/branch')
3930
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3931
stack = config.LocationStack('/home/user/bar/baz')
3932
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3936
class TestStackSet(TestStackWithTransport):
3938
def test_simple_set(self):
3939
conf = self.get_stack(self)
3940
self.assertEqual(None, conf.get('foo'))
3941
conf.set('foo', 'baz')
3942
# Did we get it back ?
3943
self.assertEqual('baz', conf.get('foo'))
3945
def test_set_creates_a_new_section(self):
3946
conf = self.get_stack(self)
3947
conf.set('foo', 'baz')
3948
self.assertEqual, 'baz', conf.get('foo')
3950
def test_set_hook(self):
3954
config.ConfigHooks.install_named_hook('set', hook, None)
3955
self.assertLength(0, calls)
3956
conf = self.get_stack(self)
3957
conf.set('foo', 'bar')
3958
self.assertLength(1, calls)
3959
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3962
class TestStackRemove(TestStackWithTransport):
3964
def test_remove_existing(self):
3965
conf = self.get_stack(self)
3966
conf.set('foo', 'bar')
3967
self.assertEqual('bar', conf.get('foo'))
3969
# Did we get it back ?
3970
self.assertEqual(None, conf.get('foo'))
3972
def test_remove_unknown(self):
3973
conf = self.get_stack(self)
3974
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3976
def test_remove_hook(self):
3980
config.ConfigHooks.install_named_hook('remove', hook, None)
3981
self.assertLength(0, calls)
3982
conf = self.get_stack(self)
3983
conf.set('foo', 'bar')
3985
self.assertLength(1, calls)
3986
self.assertEqual((conf, 'foo'), calls[0])
3989
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3992
super(TestConfigGetOptions, self).setUp()
3993
create_configs(self)
3995
def test_no_variable(self):
3996
# Using branch should query branch, locations and breezy
3997
self.assertOptions([], self.branch_config)
3999
def test_option_in_breezy(self):
4000
self.breezy_config.set_user_option('file', 'breezy')
4001
self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
4004
def test_option_in_locations(self):
4005
self.locations_config.set_user_option('file', 'locations')
4007
[('file', 'locations', self.tree.basedir, 'locations')],
4008
self.locations_config)
4010
def test_option_in_branch(self):
4011
self.branch_config.set_user_option('file', 'branch')
4012
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
4015
def test_option_in_breezy_and_branch(self):
4016
self.breezy_config.set_user_option('file', 'breezy')
4017
self.branch_config.set_user_option('file', 'branch')
4018
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
4019
('file', 'breezy', 'DEFAULT', 'breezy'),],
4022
def test_option_in_branch_and_locations(self):
4023
# Hmm, locations override branch :-/
4024
self.locations_config.set_user_option('file', 'locations')
4025
self.branch_config.set_user_option('file', 'branch')
4027
[('file', 'locations', self.tree.basedir, 'locations'),
4028
('file', 'branch', 'DEFAULT', 'branch'),],
4031
def test_option_in_breezy_locations_and_branch(self):
4032
self.breezy_config.set_user_option('file', 'breezy')
4033
self.locations_config.set_user_option('file', 'locations')
4034
self.branch_config.set_user_option('file', 'branch')
4036
[('file', 'locations', self.tree.basedir, 'locations'),
4037
('file', 'branch', 'DEFAULT', 'branch'),
4038
('file', 'breezy', 'DEFAULT', 'breezy'),],
4042
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4045
super(TestConfigRemoveOption, self).setUp()
4046
create_configs_with_file_option(self)
4048
def test_remove_in_locations(self):
4049
self.locations_config.remove_user_option('file', self.tree.basedir)
4051
[('file', 'branch', 'DEFAULT', 'branch'),
4052
('file', 'breezy', 'DEFAULT', 'breezy'),],
4055
def test_remove_in_branch(self):
4056
self.branch_config.remove_user_option('file')
4058
[('file', 'locations', self.tree.basedir, 'locations'),
4059
('file', 'breezy', 'DEFAULT', 'breezy'),],
4062
def test_remove_in_breezy(self):
4063
self.breezy_config.remove_user_option('file')
4065
[('file', 'locations', self.tree.basedir, 'locations'),
4066
('file', 'branch', 'DEFAULT', 'branch'),],
4070
class TestConfigGetSections(tests.TestCaseWithTransport):
4073
super(TestConfigGetSections, self).setUp()
4074
create_configs(self)
4076
def assertSectionNames(self, expected, conf, name=None):
4077
"""Check which sections are returned for a given config.
4079
If fallback configurations exist their sections can be included.
4081
:param expected: A list of section names.
4083
:param conf: The configuration that will be queried.
4085
:param name: An optional section name that will be passed to
4088
sections = list(conf._get_sections(name))
4089
self.assertLength(len(expected), sections)
4090
self.assertEqual(expected, [n for n, _, _ in sections])
4092
def test_breezy_default_section(self):
4093
self.assertSectionNames(['DEFAULT'], self.breezy_config)
4095
def test_locations_default_section(self):
4096
# No sections are defined in an empty file
4097
self.assertSectionNames([], self.locations_config)
4099
def test_locations_named_section(self):
4100
self.locations_config.set_user_option('file', 'locations')
4101
self.assertSectionNames([self.tree.basedir], self.locations_config)
4103
def test_locations_matching_sections(self):
4104
loc_config = self.locations_config
4105
loc_config.set_user_option('file', 'locations')
4106
# We need to cheat a bit here to create an option in sections above and
4107
# below the 'location' one.
4108
parser = loc_config._get_parser()
4109
# locations.cong deals with '/' ignoring native os.sep
4110
location_names = self.tree.basedir.split('/')
4111
parent = '/'.join(location_names[:-1])
4112
child = '/'.join(location_names + ['child'])
4114
parser[parent]['file'] = 'parent'
4116
parser[child]['file'] = 'child'
4117
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4119
def test_branch_data_default_section(self):
4120
self.assertSectionNames([None],
4121
self.branch_config._get_branch_data_config())
4123
def test_branch_default_sections(self):
4124
# No sections are defined in an empty locations file
4125
self.assertSectionNames([None, 'DEFAULT'],
4127
# Unless we define an option
4128
self.branch_config._get_location_config().set_user_option(
4129
'file', 'locations')
4130
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4133
def test_breezy_named_section(self):
4134
# We need to cheat as the API doesn't give direct access to sections
4135
# other than DEFAULT.
4136
self.breezy_config.set_alias('breezy', 'bzr')
4137
self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
4140
class TestSharedStores(tests.TestCaseInTempDir):
4142
def test_breezy_conf_shared(self):
4143
g1 = config.GlobalStack()
4144
g2 = config.GlobalStack()
4145
# The two stacks share the same store
4146
self.assertIs(g1.store, g2.store)
4149
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4150
"""Test warning for permissions of authentication.conf."""
4153
super(TestAuthenticationConfigFilePermissions, self).setUp()
4154
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4155
with open(self.path, 'w') as f:
4156
f.write(b"""[broken]
4159
port=port # Error: Not an int
4161
self.overrideAttr(config, 'authentication_config_filename',
4163
osutils.chmod_if_possible(self.path, 0o755)
4165
def test_check_warning(self):
4166
conf = config.AuthenticationConfig()
4167
self.assertEqual(conf._filename, self.path)
4168
self.assertContainsRe(self.get_log(),
4169
'Saved passwords may be accessible by other users.')
4171
def test_check_suppressed_warning(self):
4172
global_config = config.GlobalConfig()
4173
global_config.set_user_option('suppress_warnings',
4174
'insecure_permissions')
4175
conf = config.AuthenticationConfig()
4176
self.assertEqual(conf._filename, self.path)
4177
self.assertNotContainsRe(self.get_log(),
4178
'Saved passwords may be accessible by other users.')
4181
class TestAuthenticationConfigFile(tests.TestCase):
4182
"""Test the authentication.conf file matching"""
4184
def _got_user_passwd(self, expected_user, expected_password,
4185
config, *args, **kwargs):
4186
credentials = config.get_credentials(*args, **kwargs)
4187
if credentials is None:
4191
user = credentials['user']
4192
password = credentials['password']
4193
self.assertEqual(expected_user, user)
4194
self.assertEqual(expected_password, password)
4196
def test_empty_config(self):
4197
conf = config.AuthenticationConfig(_file=BytesIO())
4198
self.assertEqual({}, conf._get_config())
4199
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4201
def test_non_utf8_config(self):
4202
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
4203
self.assertRaises(config.ConfigContentError, conf._get_config)
4205
def test_missing_auth_section_header(self):
4206
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
4207
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4209
def test_auth_section_header_not_closed(self):
4210
conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
4211
self.assertRaises(config.ParseConfigError, conf._get_config)
4213
def test_auth_value_not_boolean(self):
4214
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4218
verify_certificates=askme # Error: Not a boolean
4220
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4222
def test_auth_value_not_int(self):
4223
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4227
port=port # Error: Not an int
4229
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4231
def test_unknown_password_encoding(self):
4232
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4236
password_encoding=unknown
4238
self.assertRaises(ValueError, conf.get_password,
4239
'ftp', 'foo.net', 'joe')
4241
def test_credentials_for_scheme_host(self):
4242
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4243
# Identity on foo.net
4248
password=secret-pass
4251
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
4253
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4255
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
4257
def test_credentials_for_host_port(self):
4258
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4259
# Identity on foo.net
4265
password=secret-pass
4268
self._got_user_passwd('joe', 'secret-pass',
4269
conf, 'ftp', 'foo.net', port=10021)
4271
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
4273
def test_for_matching_host(self):
4274
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4275
# Identity on foo.net
4281
[sourceforge domain]
4288
self._got_user_passwd('georges', 'bendover',
4289
conf, 'bzr', 'foo.bzr.sf.net')
4291
self._got_user_passwd(None, None,
4292
conf, 'bzr', 'bbzr.sf.net')
4294
def test_for_matching_host_None(self):
4295
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4296
# Identity on foo.net
4306
self._got_user_passwd('joe', 'joepass',
4307
conf, 'bzr', 'quux.net')
4308
# no host but different scheme
4309
self._got_user_passwd('georges', 'bendover',
4310
conf, 'ftp', 'quux.net')
4312
def test_credentials_for_path(self):
4313
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4328
self._got_user_passwd(None, None,
4329
conf, 'http', host='bar.org', path='/dir3')
4331
self._got_user_passwd('georges', 'bendover',
4332
conf, 'http', host='bar.org', path='/dir2')
4334
self._got_user_passwd('jim', 'jimpass',
4335
conf, 'http', host='bar.org',path='/dir1/subdir')
4337
def test_credentials_for_user(self):
4338
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4346
self._got_user_passwd('jim', 'jimpass',
4347
conf, 'http', 'bar.org')
4349
self._got_user_passwd('jim', 'jimpass',
4350
conf, 'http', 'bar.org', user='jim')
4351
# Don't get a different user if one is specified
4352
self._got_user_passwd(None, None,
4353
conf, 'http', 'bar.org', user='georges')
4355
def test_credentials_for_user_without_password(self):
4356
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4362
# Get user but no password
4363
self._got_user_passwd('jim', None,
4364
conf, 'http', 'bar.org')
4366
def test_verify_certificates(self):
4367
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4373
verify_certificates=False
4380
credentials = conf.get_credentials('https', 'bar.org')
4381
self.assertEqual(False, credentials.get('verify_certificates'))
4382
credentials = conf.get_credentials('https', 'foo.net')
4383
self.assertEqual(True, credentials.get('verify_certificates'))
4386
class TestAuthenticationStorage(tests.TestCaseInTempDir):
4388
def test_set_credentials(self):
4389
conf = config.AuthenticationConfig()
4390
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4391
99, path='/foo', verify_certificates=False, realm='realm')
4392
credentials = conf.get_credentials(host='host', scheme='scheme',
4393
port=99, path='/foo',
4395
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
4396
'verify_certificates': False, 'scheme': 'scheme',
4397
'host': 'host', 'port': 99, 'path': '/foo',
4399
self.assertEqual(CREDENTIALS, credentials)
4400
credentials_from_disk = config.AuthenticationConfig().get_credentials(
4401
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
4402
self.assertEqual(CREDENTIALS, credentials_from_disk)
4404
def test_reset_credentials_different_name(self):
4405
conf = config.AuthenticationConfig()
4406
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
4407
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
4408
self.assertIs(None, conf._get_config().get('name'))
4409
credentials = conf.get_credentials(host='host', scheme='scheme')
4410
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
4411
'password', 'verify_certificates': True,
4412
'scheme': 'scheme', 'host': 'host', 'port': None,
4413
'path': None, 'realm': None}
4414
self.assertEqual(CREDENTIALS, credentials)
4417
class TestAuthenticationConfig(tests.TestCaseInTempDir):
4418
"""Test AuthenticationConfig behaviour"""
4420
def _check_default_password_prompt(self, expected_prompt_format, scheme,
4421
host=None, port=None, realm=None,
4425
user, password = 'jim', 'precious'
4426
expected_prompt = expected_prompt_format % {
4427
'scheme': scheme, 'host': host, 'port': port,
4428
'user': user, 'realm': realm}
4430
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
4431
# We use an empty conf so that the user is always prompted
4432
conf = config.AuthenticationConfig()
4433
self.assertEqual(password,
4434
conf.get_password(scheme, host, user, port=port,
4435
realm=realm, path=path))
4436
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4437
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4439
def _check_default_username_prompt(self, expected_prompt_format, scheme,
4440
host=None, port=None, realm=None,
4445
expected_prompt = expected_prompt_format % {
4446
'scheme': scheme, 'host': host, 'port': port,
4448
ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n')
4449
# We use an empty conf so that the user is always prompted
4450
conf = config.AuthenticationConfig()
4451
self.assertEqual(username, conf.get_user(scheme, host, port=port,
4452
realm=realm, path=path, ask=True))
4453
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4454
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4456
def test_username_defaults_prompts(self):
4457
# HTTP prompts can't be tested here, see test_http.py
4458
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
4459
self._check_default_username_prompt(
4460
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
4461
self._check_default_username_prompt(
4462
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
4464
def test_username_default_no_prompt(self):
4465
conf = config.AuthenticationConfig()
4466
self.assertEqual(None,
4467
conf.get_user('ftp', 'example.com'))
4468
self.assertEqual("explicitdefault",
4469
conf.get_user('ftp', 'example.com', default="explicitdefault"))
4471
def test_password_default_prompts(self):
4472
# HTTP prompts can't be tested here, see test_http.py
4473
self._check_default_password_prompt(
4474
u'FTP %(user)s@%(host)s password: ', 'ftp')
4475
self._check_default_password_prompt(
4476
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
4477
self._check_default_password_prompt(
4478
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
4479
# SMTP port handling is a bit special (it's handled if embedded in the
4481
# FIXME: should we: forbid that, extend it to other schemes, leave
4482
# things as they are that's fine thank you ?
4483
self._check_default_password_prompt(
4484
u'SMTP %(user)s@%(host)s password: ', 'smtp')
4485
self._check_default_password_prompt(
4486
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
4487
self._check_default_password_prompt(
4488
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
4490
def test_ssh_password_emits_warning(self):
4491
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4498
entered_password = 'typed-by-hand'
4499
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4501
# Since the password defined in the authentication config is ignored,
4502
# the user is prompted
4503
self.assertEqual(entered_password,
4504
conf.get_password('ssh', 'bar.org', user='jim'))
4505
self.assertContainsRe(
4507
'password ignored in section \\[ssh with password\\]')
4509
def test_ssh_without_password_doesnt_emit_warning(self):
4510
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4516
entered_password = 'typed-by-hand'
4517
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4519
# Since the password defined in the authentication config is ignored,
4520
# the user is prompted
4521
self.assertEqual(entered_password,
4522
conf.get_password('ssh', 'bar.org', user='jim'))
4523
# No warning shoud be emitted since there is no password. We are only
4525
self.assertNotContainsRe(
4527
'password ignored in section \\[ssh with password\\]')
4529
def test_uses_fallback_stores(self):
4530
self.overrideAttr(config, 'credential_store_registry',
4531
config.CredentialStoreRegistry())
4532
store = StubCredentialStore()
4533
store.add_credentials("http", "example.com", "joe", "secret")
4534
config.credential_store_registry.register("stub", store, fallback=True)
4535
conf = config.AuthenticationConfig(_file=BytesIO())
4536
creds = conf.get_credentials("http", "example.com")
4537
self.assertEqual("joe", creds["user"])
4538
self.assertEqual("secret", creds["password"])
4541
class StubCredentialStore(config.CredentialStore):
4547
def add_credentials(self, scheme, host, user, password=None):
4548
self._username[(scheme, host)] = user
4549
self._password[(scheme, host)] = password
4551
def get_credentials(self, scheme, host, port=None, user=None,
4552
path=None, realm=None):
4553
key = (scheme, host)
4554
if not key in self._username:
4556
return { "scheme": scheme, "host": host, "port": port,
4557
"user": self._username[key], "password": self._password[key]}
4560
class CountingCredentialStore(config.CredentialStore):
4565
def get_credentials(self, scheme, host, port=None, user=None,
4566
path=None, realm=None):
4571
class TestCredentialStoreRegistry(tests.TestCase):
4573
def _get_cs_registry(self):
4574
return config.credential_store_registry
4576
def test_default_credential_store(self):
4577
r = self._get_cs_registry()
4578
default = r.get_credential_store(None)
4579
self.assertIsInstance(default, config.PlainTextCredentialStore)
4581
def test_unknown_credential_store(self):
4582
r = self._get_cs_registry()
4583
# It's hard to imagine someone creating a credential store named
4584
# 'unknown' so we use that as an never registered key.
4585
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
4587
def test_fallback_none_registered(self):
4588
r = config.CredentialStoreRegistry()
4589
self.assertEqual(None,
4590
r.get_fallback_credentials("http", "example.com"))
4592
def test_register(self):
4593
r = config.CredentialStoreRegistry()
4594
r.register("stub", StubCredentialStore(), fallback=False)
4595
r.register("another", StubCredentialStore(), fallback=True)
4596
self.assertEqual(["another", "stub"], r.keys())
4598
def test_register_lazy(self):
4599
r = config.CredentialStoreRegistry()
4600
r.register_lazy("stub", "breezy.tests.test_config",
4601
"StubCredentialStore", fallback=False)
4602
self.assertEqual(["stub"], r.keys())
4603
self.assertIsInstance(r.get_credential_store("stub"),
4604
StubCredentialStore)
4606
def test_is_fallback(self):
4607
r = config.CredentialStoreRegistry()
4608
r.register("stub1", None, fallback=False)
4609
r.register("stub2", None, fallback=True)
4610
self.assertEqual(False, r.is_fallback("stub1"))
4611
self.assertEqual(True, r.is_fallback("stub2"))
4613
def test_no_fallback(self):
4614
r = config.CredentialStoreRegistry()
4615
store = CountingCredentialStore()
4616
r.register("count", store, fallback=False)
4617
self.assertEqual(None,
4618
r.get_fallback_credentials("http", "example.com"))
4619
self.assertEqual(0, store._calls)
4621
def test_fallback_credentials(self):
4622
r = config.CredentialStoreRegistry()
4623
store = StubCredentialStore()
4624
store.add_credentials("http", "example.com",
4625
"somebody", "geheim")
4626
r.register("stub", store, fallback=True)
4627
creds = r.get_fallback_credentials("http", "example.com")
4628
self.assertEqual("somebody", creds["user"])
4629
self.assertEqual("geheim", creds["password"])
4631
def test_fallback_first_wins(self):
4632
r = config.CredentialStoreRegistry()
4633
stub1 = StubCredentialStore()
4634
stub1.add_credentials("http", "example.com",
4635
"somebody", "stub1")
4636
r.register("stub1", stub1, fallback=True)
4637
stub2 = StubCredentialStore()
4638
stub2.add_credentials("http", "example.com",
4639
"somebody", "stub2")
4640
r.register("stub2", stub1, fallback=True)
4641
creds = r.get_fallback_credentials("http", "example.com")
4642
self.assertEqual("somebody", creds["user"])
4643
self.assertEqual("stub1", creds["password"])
4646
class TestPlainTextCredentialStore(tests.TestCase):
4648
def test_decode_password(self):
4649
r = config.credential_store_registry
4650
plain_text = r.get_credential_store()
4651
decoded = plain_text.decode_password(dict(password='secret'))
4652
self.assertEqual('secret', decoded)
4655
class TestBase64CredentialStore(tests.TestCase):
4657
def test_decode_password(self):
4658
r = config.credential_store_registry
4659
plain_text = r.get_credential_store('base64')
4660
decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
4661
self.assertEqual('secret', decoded)
4664
# FIXME: Once we have a way to declare authentication to all test servers, we
4665
# can implement generic tests.
4666
# test_user_password_in_url
4667
# test_user_in_url_password_from_config
4668
# test_user_in_url_password_prompted
4669
# test_user_in_config
4670
# test_user_getpass.getuser
4671
# test_user_prompted ?
4672
class TestAuthenticationRing(tests.TestCaseWithTransport):
4676
class TestAutoUserId(tests.TestCase):
4677
"""Test inferring an automatic user name."""
4679
def test_auto_user_id(self):
4680
"""Automatic inference of user name.
4682
This is a bit hard to test in an isolated way, because it depends on
4683
system functions that go direct to /etc or perhaps somewhere else.
4684
But it's reasonable to say that on Unix, with an /etc/mailname, we ought
4685
to be able to choose a user name with no configuration.
4687
if sys.platform == 'win32':
4688
raise tests.TestSkipped(
4689
"User name inference not implemented on win32")
4690
realname, address = config._auto_user_id()
4691
if os.path.exists('/etc/mailname'):
4692
self.assertIsNot(None, realname)
4693
self.assertIsNot(None, address)
4695
self.assertEqual((None, None), (realname, address))
4698
class TestDefaultMailDomain(tests.TestCaseInTempDir):
4699
"""Test retrieving default domain from mailname file"""
4701
def test_default_mail_domain_simple(self):
4702
f = file('simple', 'w')
4704
f.write("domainname.com\n")
4707
r = config._get_default_mail_domain('simple')
4708
self.assertEqual('domainname.com', r)
4710
def test_default_mail_domain_no_eol(self):
4711
f = file('no_eol', 'w')
4713
f.write("domainname.com")
4716
r = config._get_default_mail_domain('no_eol')
4717
self.assertEqual('domainname.com', r)
4719
def test_default_mail_domain_multiple_lines(self):
4720
f = file('multiple_lines', 'w')
4722
f.write("domainname.com\nsome other text\n")
4725
r = config._get_default_mail_domain('multiple_lines')
4726
self.assertEqual('domainname.com', r)
4729
class EmailOptionTests(tests.TestCase):
4731
def test_default_email_uses_BRZ_EMAIL(self):
4732
conf = config.MemoryStack('email=jelmer@debian.org')
4733
# BRZ_EMAIL takes precedence over EMAIL
4734
self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
4735
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4736
self.assertEqual('jelmer@samba.org', conf.get('email'))
4738
def test_default_email_uses_EMAIL(self):
4739
conf = config.MemoryStack('')
4740
self.overrideEnv('BRZ_EMAIL', None)
4741
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4742
self.assertEqual('jelmer@apache.org', conf.get('email'))
4744
def test_BRZ_EMAIL_overrides(self):
4745
conf = config.MemoryStack('email=jelmer@debian.org')
4746
self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
4747
self.assertEqual('jelmer@apache.org', conf.get('email'))
4748
self.overrideEnv('BRZ_EMAIL', None)
4749
self.overrideEnv('EMAIL', 'jelmer@samba.org')
4750
self.assertEqual('jelmer@debian.org', conf.get('email'))
4753
class MailClientOptionTests(tests.TestCase):
4755
def test_default(self):
4756
conf = config.MemoryStack('')
4757
client = conf.get('mail_client')
4758
self.assertIs(client, mail_client.DefaultMail)
4760
def test_evolution(self):
4761
conf = config.MemoryStack('mail_client=evolution')
4762
client = conf.get('mail_client')
4763
self.assertIs(client, mail_client.Evolution)
4765
def test_kmail(self):
4766
conf = config.MemoryStack('mail_client=kmail')
4767
client = conf.get('mail_client')
4768
self.assertIs(client, mail_client.KMail)
4770
def test_mutt(self):
4771
conf = config.MemoryStack('mail_client=mutt')
4772
client = conf.get('mail_client')
4773
self.assertIs(client, mail_client.Mutt)
4775
def test_thunderbird(self):
4776
conf = config.MemoryStack('mail_client=thunderbird')
4777
client = conf.get('mail_client')
4778
self.assertIs(client, mail_client.Thunderbird)
4780
def test_explicit_default(self):
4781
conf = config.MemoryStack('mail_client=default')
4782
client = conf.get('mail_client')
4783
self.assertIs(client, mail_client.DefaultMail)
4785
def test_editor(self):
4786
conf = config.MemoryStack('mail_client=editor')
4787
client = conf.get('mail_client')
4788
self.assertIs(client, mail_client.Editor)
4790
def test_mapi(self):
4791
conf = config.MemoryStack('mail_client=mapi')
4792
client = conf.get('mail_client')
4793
self.assertIs(client, mail_client.MAPIClient)
4795
def test_xdg_email(self):
4796
conf = config.MemoryStack('mail_client=xdg-email')
4797
client = conf.get('mail_client')
4798
self.assertIs(client, mail_client.XDGEmail)
4800
def test_unknown(self):
4801
conf = config.MemoryStack('mail_client=firebird')
4802
self.assertRaises(config.ConfigOptionValueError, conf.get,