1312
1715
self.assertIs(None, bzrdir_config.get_default_stack_on())
1718
class TestOldConfigHooks(tests.TestCaseWithTransport):
1721
super(TestOldConfigHooks, self).setUp()
1722
create_configs_with_file_option(self)
1724
def assertGetHook(self, conf, name, value):
1729
config.OldConfigHooks.install_named_hook('get', hook, None)
1731
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1732
self.assertLength(0, calls)
1733
actual_value = conf.get_user_option(name)
1734
self.assertEqual(value, actual_value)
1735
self.assertLength(1, calls)
1736
self.assertEqual((conf, name, value), calls[0])
1738
def test_get_hook_breezy(self):
1739
self.assertGetHook(self.breezy_config, 'file', 'breezy')
1741
def test_get_hook_locations(self):
1742
self.assertGetHook(self.locations_config, 'file', 'locations')
1744
def test_get_hook_branch(self):
1745
# Since locations masks branch, we define a different option
1746
self.branch_config.set_user_option('file2', 'branch')
1747
self.assertGetHook(self.branch_config, 'file2', 'branch')
1749
def assertSetHook(self, conf, name, value):
1754
config.OldConfigHooks.install_named_hook('set', hook, None)
1756
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1757
self.assertLength(0, calls)
1758
conf.set_user_option(name, value)
1759
self.assertLength(1, calls)
1760
# We can't assert the conf object below as different configs use
1761
# different means to implement set_user_option and we care only about
1763
self.assertEqual((name, value), calls[0][1:])
1765
def test_set_hook_breezy(self):
1766
self.assertSetHook(self.breezy_config, 'foo', 'breezy')
1768
def test_set_hook_locations(self):
1769
self.assertSetHook(self.locations_config, 'foo', 'locations')
1771
def test_set_hook_branch(self):
1772
self.assertSetHook(self.branch_config, 'foo', 'branch')
1774
def assertRemoveHook(self, conf, name, section_name=None):
1779
config.OldConfigHooks.install_named_hook('remove', hook, None)
1781
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1782
self.assertLength(0, calls)
1783
conf.remove_user_option(name, section_name)
1784
self.assertLength(1, calls)
1785
# We can't assert the conf object below as different configs use
1786
# different means to implement remove_user_option and we care only about
1788
self.assertEqual((name,), calls[0][1:])
1790
def test_remove_hook_breezy(self):
1791
self.assertRemoveHook(self.breezy_config, 'file')
1793
def test_remove_hook_locations(self):
1794
self.assertRemoveHook(self.locations_config, 'file',
1795
self.locations_config.location)
1797
def test_remove_hook_branch(self):
1798
self.assertRemoveHook(self.branch_config, 'file')
1800
def assertLoadHook(self, name, conf_class, *conf_args):
1805
config.OldConfigHooks.install_named_hook('load', hook, None)
1807
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1808
self.assertLength(0, calls)
1810
conf = conf_class(*conf_args)
1811
# Access an option to trigger a load
1812
conf.get_user_option(name)
1813
self.assertLength(1, calls)
1814
# Since we can't assert about conf, we just use the number of calls ;-/
1816
def test_load_hook_breezy(self):
1817
self.assertLoadHook('file', config.GlobalConfig)
1819
def test_load_hook_locations(self):
1820
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1822
def test_load_hook_branch(self):
1823
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1825
def assertSaveHook(self, conf):
1830
config.OldConfigHooks.install_named_hook('save', hook, None)
1832
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1833
self.assertLength(0, calls)
1834
# Setting an option triggers a save
1835
conf.set_user_option('foo', 'bar')
1836
self.assertLength(1, calls)
1837
# Since we can't assert about conf, we just use the number of calls ;-/
1839
def test_save_hook_breezy(self):
1840
self.assertSaveHook(self.breezy_config)
1842
def test_save_hook_locations(self):
1843
self.assertSaveHook(self.locations_config)
1845
def test_save_hook_branch(self):
1846
self.assertSaveHook(self.branch_config)
1849
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1850
"""Tests config hooks for remote configs.
1852
No tests for the remove hook as this is not implemented there.
1856
super(TestOldConfigHooksForRemote, self).setUp()
1857
self.transport_server = test_server.SmartTCPServer_for_testing
1858
create_configs_with_file_option(self)
1860
def assertGetHook(self, conf, name, value):
1865
config.OldConfigHooks.install_named_hook('get', hook, None)
1867
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1868
self.assertLength(0, calls)
1869
actual_value = conf.get_option(name)
1870
self.assertEqual(value, actual_value)
1871
self.assertLength(1, calls)
1872
self.assertEqual((conf, name, value), calls[0])
1874
def test_get_hook_remote_branch(self):
1875
remote_branch = branch.Branch.open(self.get_url('tree'))
1876
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1878
def test_get_hook_remote_bzrdir(self):
1879
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1880
conf = remote_bzrdir._get_config()
1881
conf.set_option('remotedir', 'file')
1882
self.assertGetHook(conf, 'file', 'remotedir')
1884
def assertSetHook(self, conf, name, value):
1889
config.OldConfigHooks.install_named_hook('set', hook, None)
1891
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1892
self.assertLength(0, calls)
1893
conf.set_option(value, name)
1894
self.assertLength(1, calls)
1895
# We can't assert the conf object below as different configs use
1896
# different means to implement set_user_option and we care only about
1898
self.assertEqual((name, value), calls[0][1:])
1900
def test_set_hook_remote_branch(self):
1901
remote_branch = branch.Branch.open(self.get_url('tree'))
1902
self.addCleanup(remote_branch.lock_write().unlock)
1903
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1905
def test_set_hook_remote_bzrdir(self):
1906
remote_branch = branch.Branch.open(self.get_url('tree'))
1907
self.addCleanup(remote_branch.lock_write().unlock)
1908
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1909
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1911
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1916
config.OldConfigHooks.install_named_hook('load', hook, None)
1918
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1919
self.assertLength(0, calls)
1921
conf = conf_class(*conf_args)
1922
# Access an option to trigger a load
1923
conf.get_option(name)
1924
self.assertLength(expected_nb_calls, calls)
1925
# Since we can't assert about conf, we just use the number of calls ;-/
1927
def test_load_hook_remote_branch(self):
1928
remote_branch = branch.Branch.open(self.get_url('tree'))
1929
self.assertLoadHook(
1930
1, 'file', remote.RemoteBranchConfig, remote_branch)
1932
def test_load_hook_remote_bzrdir(self):
1933
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1934
# The config file doesn't exist, set an option to force its creation
1935
conf = remote_bzrdir._get_config()
1936
conf.set_option('remotedir', 'file')
1937
# We get one call for the server and one call for the client, this is
1938
# caused by the differences in implementations betwen
1939
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1940
# SmartServerBranchGetConfigFile (in smart/branch.py)
1941
self.assertLoadHook(
1942
2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1944
def assertSaveHook(self, conf):
1949
config.OldConfigHooks.install_named_hook('save', hook, None)
1951
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1952
self.assertLength(0, calls)
1953
# Setting an option triggers a save
1954
conf.set_option('foo', 'bar')
1955
self.assertLength(1, calls)
1956
# Since we can't assert about conf, we just use the number of calls ;-/
1958
def test_save_hook_remote_branch(self):
1959
remote_branch = branch.Branch.open(self.get_url('tree'))
1960
self.addCleanup(remote_branch.lock_write().unlock)
1961
self.assertSaveHook(remote_branch._get_config())
1963
def test_save_hook_remote_bzrdir(self):
1964
remote_branch = branch.Branch.open(self.get_url('tree'))
1965
self.addCleanup(remote_branch.lock_write().unlock)
1966
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1967
self.assertSaveHook(remote_bzrdir._get_config())
1970
class TestOptionNames(tests.TestCase):
1972
def is_valid(self, name):
1973
return config._option_ref_re.match('{%s}' % name) is not None
1975
def test_valid_names(self):
1976
self.assertTrue(self.is_valid('foo'))
1977
self.assertTrue(self.is_valid('foo.bar'))
1978
self.assertTrue(self.is_valid('f1'))
1979
self.assertTrue(self.is_valid('_'))
1980
self.assertTrue(self.is_valid('__bar__'))
1981
self.assertTrue(self.is_valid('a_'))
1982
self.assertTrue(self.is_valid('a1'))
1983
# Don't break bzr-svn for no good reason
1984
self.assertTrue(self.is_valid('guessed-layout'))
1986
def test_invalid_names(self):
1987
self.assertFalse(self.is_valid(' foo'))
1988
self.assertFalse(self.is_valid('foo '))
1989
self.assertFalse(self.is_valid('1'))
1990
self.assertFalse(self.is_valid('1,2'))
1991
self.assertFalse(self.is_valid('foo$'))
1992
self.assertFalse(self.is_valid('!foo'))
1993
self.assertFalse(self.is_valid('foo.'))
1994
self.assertFalse(self.is_valid('foo..bar'))
1995
self.assertFalse(self.is_valid('{}'))
1996
self.assertFalse(self.is_valid('{a}'))
1997
self.assertFalse(self.is_valid('a\n'))
1998
self.assertFalse(self.is_valid('-'))
1999
self.assertFalse(self.is_valid('-a'))
2000
self.assertFalse(self.is_valid('a-'))
2001
self.assertFalse(self.is_valid('a--a'))
2003
def assertSingleGroup(self, reference):
2004
# the regexp is used with split and as such should match the reference
2005
# *only*, if more groups needs to be defined, (?:...) should be used.
2006
m = config._option_ref_re.match('{a}')
2007
self.assertLength(1, m.groups())
2009
def test_valid_references(self):
2010
self.assertSingleGroup('{a}')
2011
self.assertSingleGroup('{{a}}')
2014
class TestOption(tests.TestCase):
2016
def test_default_value(self):
2017
opt = config.Option('foo', default='bar')
2018
self.assertEqual('bar', opt.get_default())
2020
def test_callable_default_value(self):
2021
def bar_as_unicode():
2023
opt = config.Option('foo', default=bar_as_unicode)
2024
self.assertEqual('bar', opt.get_default())
2026
def test_default_value_from_env(self):
2027
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
2028
self.overrideEnv('FOO', 'quux')
2029
# Env variable provides a default taking over the option one
2030
self.assertEqual('quux', opt.get_default())
2032
def test_first_default_value_from_env_wins(self):
2033
opt = config.Option('foo', default='bar',
2034
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
2035
self.overrideEnv('FOO', 'foo')
2036
self.overrideEnv('BAZ', 'baz')
2037
# The first env var set wins
2038
self.assertEqual('foo', opt.get_default())
2040
def test_not_supported_list_default_value(self):
2041
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
2043
def test_not_supported_object_default_value(self):
2044
self.assertRaises(AssertionError, config.Option, 'foo',
2047
def test_not_supported_callable_default_value_not_unicode(self):
2048
def bar_not_unicode():
2050
opt = config.Option('foo', default=bar_not_unicode)
2051
self.assertRaises(AssertionError, opt.get_default)
2053
def test_get_help_topic(self):
2054
opt = config.Option('foo')
2055
self.assertEqual('foo', opt.get_help_topic())
2058
class TestOptionConverter(tests.TestCase):
2060
def assertConverted(self, expected, opt, value):
2061
self.assertEqual(expected, opt.convert_from_unicode(None, value))
2063
def assertCallsWarning(self, opt, value):
2067
warnings.append(args[0] % args[1:])
2068
self.overrideAttr(trace, 'warning', warning)
2069
self.assertEqual(None, opt.convert_from_unicode(None, value))
2070
self.assertLength(1, warnings)
2072
'Value "%s" is not valid for "%s"' % (value, opt.name),
2075
def assertCallsError(self, opt, value):
2076
self.assertRaises(config.ConfigOptionValueError,
2077
opt.convert_from_unicode, None, value)
2079
def assertConvertInvalid(self, opt, invalid_value):
2081
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2082
opt.invalid = 'warning'
2083
self.assertCallsWarning(opt, invalid_value)
2084
opt.invalid = 'error'
2085
self.assertCallsError(opt, invalid_value)
2088
class TestOptionWithBooleanConverter(TestOptionConverter):
2090
def get_option(self):
2091
return config.Option('foo', help='A boolean.',
2092
from_unicode=config.bool_from_store)
2094
def test_convert_invalid(self):
2095
opt = self.get_option()
2096
# A string that is not recognized as a boolean
2097
self.assertConvertInvalid(opt, u'invalid-boolean')
2098
# A list of strings is never recognized as a boolean
2099
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2101
def test_convert_valid(self):
2102
opt = self.get_option()
2103
self.assertConverted(True, opt, u'True')
2104
self.assertConverted(True, opt, u'1')
2105
self.assertConverted(False, opt, u'False')
2108
class TestOptionWithIntegerConverter(TestOptionConverter):
2110
def get_option(self):
2111
return config.Option('foo', help='An integer.',
2112
from_unicode=config.int_from_store)
2114
def test_convert_invalid(self):
2115
opt = self.get_option()
2116
# A string that is not recognized as an integer
2117
self.assertConvertInvalid(opt, u'forty-two')
2118
# A list of strings is never recognized as an integer
2119
self.assertConvertInvalid(opt, [u'a', u'list'])
2121
def test_convert_valid(self):
2122
opt = self.get_option()
2123
self.assertConverted(16, opt, u'16')
2126
class TestOptionWithSIUnitConverter(TestOptionConverter):
2128
def get_option(self):
2129
return config.Option('foo', help='An integer in SI units.',
2130
from_unicode=config.int_SI_from_store)
2132
def test_convert_invalid(self):
2133
opt = self.get_option()
2134
self.assertConvertInvalid(opt, u'not-a-unit')
2135
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2136
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2137
self.assertConvertInvalid(opt, u'1GG')
2138
self.assertConvertInvalid(opt, u'1Mbb')
2139
self.assertConvertInvalid(opt, u'1MM')
2141
def test_convert_valid(self):
2142
opt = self.get_option()
2143
self.assertConverted(int(5e3), opt, u'5kb')
2144
self.assertConverted(int(5e6), opt, u'5M')
2145
self.assertConverted(int(5e6), opt, u'5MB')
2146
self.assertConverted(int(5e9), opt, u'5g')
2147
self.assertConverted(int(5e9), opt, u'5gB')
2148
self.assertConverted(100, opt, u'100')
2151
class TestListOption(TestOptionConverter):
2153
def get_option(self):
2154
return config.ListOption('foo', help='A list.')
2156
def test_convert_invalid(self):
2157
opt = self.get_option()
2158
# We don't even try to convert a list into a list, we only expect
2160
self.assertConvertInvalid(opt, [1])
2161
# No string is invalid as all forms can be converted to a list
2163
def test_convert_valid(self):
2164
opt = self.get_option()
2165
# An empty string is an empty list
2166
self.assertConverted([], opt, '') # Using a bare str() just in case
2167
self.assertConverted([], opt, u'')
2169
self.assertConverted([u'True'], opt, u'True')
2171
self.assertConverted([u'42'], opt, u'42')
2173
self.assertConverted([u'bar'], opt, u'bar')
2176
class TestRegistryOption(TestOptionConverter):
2178
def get_option(self, registry):
2179
return config.RegistryOption('foo', registry,
2180
help='A registry option.')
2182
def test_convert_invalid(self):
2183
registry = _mod_registry.Registry()
2184
opt = self.get_option(registry)
2185
self.assertConvertInvalid(opt, [1])
2186
self.assertConvertInvalid(opt, u"notregistered")
2188
def test_convert_valid(self):
2189
registry = _mod_registry.Registry()
2190
registry.register("someval", 1234)
2191
opt = self.get_option(registry)
2192
# Using a bare str() just in case
2193
self.assertConverted(1234, opt, "someval")
2194
self.assertConverted(1234, opt, u'someval')
2195
self.assertConverted(None, opt, None)
2197
def test_help(self):
2198
registry = _mod_registry.Registry()
2199
registry.register("someval", 1234, help="some option")
2200
registry.register("dunno", 1234, help="some other option")
2201
opt = self.get_option(registry)
2203
'A registry option.\n'
2205
'The following values are supported:\n'
2206
' dunno - some other option\n'
2207
' someval - some option\n',
2210
def test_get_help_text(self):
2211
registry = _mod_registry.Registry()
2212
registry.register("someval", 1234, help="some option")
2213
registry.register("dunno", 1234, help="some other option")
2214
opt = self.get_option(registry)
2216
'A registry option.\n'
2218
'The following values are supported:\n'
2219
' dunno - some other option\n'
2220
' someval - some option\n',
2221
opt.get_help_text())
2224
class TestOptionRegistry(tests.TestCase):
2227
super(TestOptionRegistry, self).setUp()
2228
# Always start with an empty registry
2229
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2230
self.registry = config.option_registry
2232
def test_register(self):
2233
opt = config.Option('foo')
2234
self.registry.register(opt)
2235
self.assertIs(opt, self.registry.get('foo'))
2237
def test_registered_help(self):
2238
opt = config.Option('foo', help='A simple option')
2239
self.registry.register(opt)
2240
self.assertEqual('A simple option', self.registry.get_help('foo'))
2242
def test_dont_register_illegal_name(self):
2243
self.assertRaises(config.IllegalOptionName,
2244
self.registry.register, config.Option(' foo'))
2245
self.assertRaises(config.IllegalOptionName,
2246
self.registry.register, config.Option('bar,'))
2248
lazy_option = config.Option('lazy_foo', help='Lazy help')
2250
def test_register_lazy(self):
2251
self.registry.register_lazy('lazy_foo', self.__module__,
2252
'TestOptionRegistry.lazy_option')
2253
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2255
def test_registered_lazy_help(self):
2256
self.registry.register_lazy('lazy_foo', self.__module__,
2257
'TestOptionRegistry.lazy_option')
2258
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2260
def test_dont_lazy_register_illegal_name(self):
2261
# This is where the root cause of http://pad.lv/1235099 is better
2262
# understood: 'register_lazy' doc string mentions that key should match
2263
# the option name which indirectly requires that the option name is a
2264
# valid python identifier. We violate that rule here (using a key that
2265
# doesn't match the option name) to test the option name checking.
2266
self.assertRaises(config.IllegalOptionName,
2267
self.registry.register_lazy, ' foo', self.__module__,
2268
'TestOptionRegistry.lazy_option')
2269
self.assertRaises(config.IllegalOptionName,
2270
self.registry.register_lazy, '1,2', self.__module__,
2271
'TestOptionRegistry.lazy_option')
2274
class TestRegisteredOptions(tests.TestCase):
2275
"""All registered options should verify some constraints."""
2277
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2278
in config.option_registry.iteritems()]
2281
super(TestRegisteredOptions, self).setUp()
2282
self.registry = config.option_registry
2284
def test_proper_name(self):
2285
# An option should be registered under its own name, this can't be
2286
# checked at registration time for the lazy ones.
2287
self.assertEqual(self.option_name, self.option.name)
2289
def test_help_is_set(self):
2290
option_help = self.registry.get_help(self.option_name)
2291
# Come on, think about the user, he really wants to know what the
2293
self.assertIsNot(None, option_help)
2294
self.assertNotEqual('', option_help)
2297
class TestSection(tests.TestCase):
2299
# FIXME: Parametrize so that all sections produced by Stores run these
2300
# tests -- vila 2011-04-01
2302
def test_get_a_value(self):
2303
a_dict = dict(foo='bar')
2304
section = config.Section('myID', a_dict)
2305
self.assertEqual('bar', section.get('foo'))
2307
def test_get_unknown_option(self):
2309
section = config.Section(None, a_dict)
2310
self.assertEqual('out of thin air',
2311
section.get('foo', 'out of thin air'))
2313
def test_options_is_shared(self):
2315
section = config.Section(None, a_dict)
2316
self.assertIs(a_dict, section.options)
2319
class TestMutableSection(tests.TestCase):
2321
scenarios = [('mutable',
2323
lambda opts: config.MutableSection('myID', opts)},),
2327
a_dict = dict(foo='bar')
2328
section = self.get_section(a_dict)
2329
section.set('foo', 'new_value')
2330
self.assertEqual('new_value', section.get('foo'))
2331
# The change appears in the shared section
2332
self.assertEqual('new_value', a_dict.get('foo'))
2333
# We keep track of the change
2334
self.assertTrue('foo' in section.orig)
2335
self.assertEqual('bar', section.orig.get('foo'))
2337
def test_set_preserve_original_once(self):
2338
a_dict = dict(foo='bar')
2339
section = self.get_section(a_dict)
2340
section.set('foo', 'first_value')
2341
section.set('foo', 'second_value')
2342
# We keep track of the original value
2343
self.assertTrue('foo' in section.orig)
2344
self.assertEqual('bar', section.orig.get('foo'))
2346
def test_remove(self):
2347
a_dict = dict(foo='bar')
2348
section = self.get_section(a_dict)
2349
section.remove('foo')
2350
# We get None for unknown options via the default value
2351
self.assertEqual(None, section.get('foo'))
2352
# Or we just get the default value
2353
self.assertEqual('unknown', section.get('foo', 'unknown'))
2354
self.assertFalse('foo' in section.options)
2355
# We keep track of the deletion
2356
self.assertTrue('foo' in section.orig)
2357
self.assertEqual('bar', section.orig.get('foo'))
2359
def test_remove_new_option(self):
2361
section = self.get_section(a_dict)
2362
section.set('foo', 'bar')
2363
section.remove('foo')
2364
self.assertFalse('foo' in section.options)
2365
# The option didn't exist initially so it we need to keep track of it
2366
# with a special value
2367
self.assertTrue('foo' in section.orig)
2368
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2371
class TestCommandLineStore(tests.TestCase):
2374
super(TestCommandLineStore, self).setUp()
2375
self.store = config.CommandLineStore()
2376
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2378
def get_section(self):
2379
"""Get the unique section for the command line overrides."""
2380
sections = list(self.store.get_sections())
2381
self.assertLength(1, sections)
2382
store, section = sections[0]
2383
self.assertEqual(self.store, store)
2386
def test_no_override(self):
2387
self.store._from_cmdline([])
2388
section = self.get_section()
2389
self.assertLength(0, list(section.iter_option_names()))
2391
def test_simple_override(self):
2392
self.store._from_cmdline(['a=b'])
2393
section = self.get_section()
2394
self.assertEqual('b', section.get('a'))
2396
def test_list_override(self):
2397
opt = config.ListOption('l')
2398
config.option_registry.register(opt)
2399
self.store._from_cmdline(['l=1,2,3'])
2400
val = self.get_section().get('l')
2401
self.assertEqual('1,2,3', val)
2402
# Reminder: lists should be registered as such explicitely, otherwise
2403
# the conversion needs to be done afterwards.
2404
self.assertEqual(['1', '2', '3'],
2405
opt.convert_from_unicode(self.store, val))
2407
def test_multiple_overrides(self):
2408
self.store._from_cmdline(['a=b', 'x=y'])
2409
section = self.get_section()
2410
self.assertEqual('b', section.get('a'))
2411
self.assertEqual('y', section.get('x'))
2413
def test_wrong_syntax(self):
2414
self.assertRaises(errors.BzrCommandError,
2415
self.store._from_cmdline, ['a=b', 'c'])
2418
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2420
scenarios = [(key, {'get_store': builder}) for key, builder
2421
in config.test_store_builder_registry.iteritems()] + [
2422
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2425
store = self.get_store(self)
2426
if isinstance(store, config.TransportIniFileStore):
2427
raise tests.TestNotApplicable(
2428
"%s is not a concrete Store implementation"
2429
" so it doesn't need an id" % (store.__class__.__name__,))
2430
self.assertIsNot(None, store.id)
2433
class TestStore(tests.TestCaseWithTransport):
2435
def assertSectionContent(self, expected, store_and_section):
2436
"""Assert that some options have the proper values in a section."""
2437
_, section = store_and_section
2438
expected_name, expected_options = expected
2439
self.assertEqual(expected_name, section.id)
2442
dict([(k, section.get(k)) for k in expected_options.keys()]))
2445
class TestReadonlyStore(TestStore):
2447
scenarios = [(key, {'get_store': builder}) for key, builder
2448
in config.test_store_builder_registry.iteritems()]
2450
def test_building_delays_load(self):
2451
store = self.get_store(self)
2452
self.assertEqual(False, store.is_loaded())
2453
store._load_from_string(b'')
2454
self.assertEqual(True, store.is_loaded())
2456
def test_get_no_sections_for_empty(self):
2457
store = self.get_store(self)
2458
store._load_from_string(b'')
2459
self.assertEqual([], list(store.get_sections()))
2461
def test_get_default_section(self):
2462
store = self.get_store(self)
2463
store._load_from_string(b'foo=bar')
2464
sections = list(store.get_sections())
2465
self.assertLength(1, sections)
2466
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2468
def test_get_named_section(self):
2469
store = self.get_store(self)
2470
store._load_from_string(b'[baz]\nfoo=bar')
2471
sections = list(store.get_sections())
2472
self.assertLength(1, sections)
2473
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2475
def test_load_from_string_fails_for_non_empty_store(self):
2476
store = self.get_store(self)
2477
store._load_from_string(b'foo=bar')
2478
self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
2481
class TestStoreQuoting(TestStore):
2483
scenarios = [(key, {'get_store': builder}) for key, builder
2484
in config.test_store_builder_registry.iteritems()]
2487
super(TestStoreQuoting, self).setUp()
2488
self.store = self.get_store(self)
2489
# We need a loaded store but any content will do
2490
self.store._load_from_string(b'')
2492
def assertIdempotent(self, s):
2493
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2495
What matters here is that option values, as they appear in a store, can
2496
be safely round-tripped out of the store and back.
2498
:param s: A string, quoted if required.
2500
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2501
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2503
def test_empty_string(self):
2504
if isinstance(self.store, config.IniFileStore):
2505
# configobj._quote doesn't handle empty values
2506
self.assertRaises(AssertionError,
2507
self.assertIdempotent, '')
2509
self.assertIdempotent('')
2510
# But quoted empty strings are ok
2511
self.assertIdempotent('""')
2513
def test_embedded_spaces(self):
2514
self.assertIdempotent('" a b c "')
2516
def test_embedded_commas(self):
2517
self.assertIdempotent('" a , b c "')
2519
def test_simple_comma(self):
2520
if isinstance(self.store, config.IniFileStore):
2521
# configobj requires that lists are special-cased
2522
self.assertRaises(AssertionError,
2523
self.assertIdempotent, ',')
2525
self.assertIdempotent(',')
2526
# When a single comma is required, quoting is also required
2527
self.assertIdempotent('","')
2529
def test_list(self):
2530
if isinstance(self.store, config.IniFileStore):
2531
# configobj requires that lists are special-cased
2532
self.assertRaises(AssertionError,
2533
self.assertIdempotent, 'a,b')
2535
self.assertIdempotent('a,b')
2538
class TestDictFromStore(tests.TestCase):
2540
def test_unquote_not_string(self):
2541
conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
2542
value = conf.get('a_section')
2543
# Urgh, despite 'conf' asking for the no-name section, we get the
2544
# content of another section as a dict o_O
2545
self.assertEqual({'a': '1'}, value)
2546
unquoted = conf.store.unquote(value)
2547
# Which cannot be unquoted but shouldn't crash either (the use cases
2548
# are getting the value or displaying it. In the later case, '%s' will
2550
self.assertEqual({'a': '1'}, unquoted)
2551
self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
2554
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2555
"""Simulate loading a config store with content of various encodings.
2557
All files produced by bzr are in utf8 content.
2559
Users may modify them manually and end up with a file that can't be
2560
loaded. We need to issue proper error messages in this case.
2563
invalid_utf8_char = b'\xff'
2565
def test_load_utf8(self):
2566
"""Ensure we can load an utf8-encoded file."""
2567
t = self.get_transport()
2568
# From http://pad.lv/799212
2569
unicode_user = u'b\N{Euro Sign}ar'
2570
unicode_content = u'user=%s' % (unicode_user,)
2571
utf8_content = unicode_content.encode('utf8')
2572
# Store the raw content in the config file
2573
t.put_bytes('foo.conf', utf8_content)
2574
store = config.TransportIniFileStore(t, 'foo.conf')
2576
stack = config.Stack([store.get_sections], store)
2577
self.assertEqual(unicode_user, stack.get('user'))
2579
def test_load_non_ascii(self):
2580
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2581
t = self.get_transport()
2582
t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2583
store = config.TransportIniFileStore(t, 'foo.conf')
2584
self.assertRaises(config.ConfigContentError, store.load)
2586
def test_load_erroneous_content(self):
2587
"""Ensure we display a proper error on content that can't be parsed."""
2588
t = self.get_transport()
2589
t.put_bytes('foo.conf', b'[open_section\n')
2590
store = config.TransportIniFileStore(t, 'foo.conf')
2591
self.assertRaises(config.ParseConfigError, store.load)
2593
def test_load_permission_denied(self):
2594
"""Ensure we get warned when trying to load an inaccessible file."""
2598
warnings.append(args[0] % args[1:])
2599
self.overrideAttr(trace, 'warning', warning)
2601
t = self.get_transport()
2603
def get_bytes(relpath):
2604
raise errors.PermissionDenied(relpath, "")
2605
t.get_bytes = get_bytes
2606
store = config.TransportIniFileStore(t, 'foo.conf')
2607
self.assertRaises(errors.PermissionDenied, store.load)
2610
[u'Permission denied while trying to load configuration store %s.'
2611
% store.external_url()])
2614
class TestIniConfigContent(tests.TestCaseWithTransport):
2615
"""Simulate loading a IniBasedConfig with content of various encodings.
2617
All files produced by bzr are in utf8 content.
2619
Users may modify them manually and end up with a file that can't be
2620
loaded. We need to issue proper error messages in this case.
2623
invalid_utf8_char = b'\xff'
2625
def test_load_utf8(self):
2626
"""Ensure we can load an utf8-encoded file."""
2627
# From http://pad.lv/799212
2628
unicode_user = u'b\N{Euro Sign}ar'
2629
unicode_content = u'user=%s' % (unicode_user,)
2630
utf8_content = unicode_content.encode('utf8')
2631
# Store the raw content in the config file
2632
with open('foo.conf', 'wb') as f:
2633
f.write(utf8_content)
2634
conf = config.IniBasedConfig(file_name='foo.conf')
2635
self.assertEqual(unicode_user, conf.get_user_option('user'))
2637
def test_load_badly_encoded_content(self):
2638
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2639
with open('foo.conf', 'wb') as f:
2640
f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2641
conf = config.IniBasedConfig(file_name='foo.conf')
2642
self.assertRaises(config.ConfigContentError, conf._get_parser)
2644
def test_load_erroneous_content(self):
2645
"""Ensure we display a proper error on content that can't be parsed."""
2646
with open('foo.conf', 'wb') as f:
2647
f.write(b'[open_section\n')
2648
conf = config.IniBasedConfig(file_name='foo.conf')
2649
self.assertRaises(config.ParseConfigError, conf._get_parser)
2652
class TestMutableStore(TestStore):
2654
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2655
in config.test_store_builder_registry.iteritems()]
2658
super(TestMutableStore, self).setUp()
2659
self.transport = self.get_transport()
2661
def has_store(self, store):
2662
store_basename = urlutils.relative_url(self.transport.external_url(),
2663
store.external_url())
2664
return self.transport.has(store_basename)
2666
def test_save_empty_creates_no_file(self):
2667
# FIXME: There should be a better way than relying on the test
2668
# parametrization to identify branch.conf -- vila 2011-0526
2669
if self.store_id in ('branch', 'remote_branch'):
2670
raise tests.TestNotApplicable(
2671
'branch.conf is *always* created when a branch is initialized')
2672
store = self.get_store(self)
2674
self.assertEqual(False, self.has_store(store))
2676
def test_mutable_section_shared(self):
2677
store = self.get_store(self)
2678
store._load_from_string(b'foo=bar\n')
2679
# FIXME: There should be a better way than relying on the test
2680
# parametrization to identify branch.conf -- vila 2011-0526
2681
if self.store_id in ('branch', 'remote_branch'):
2682
# branch stores requires write locked branches
2683
self.addCleanup(store.branch.lock_write().unlock)
2684
section1 = store.get_mutable_section(None)
2685
section2 = store.get_mutable_section(None)
2686
# If we get different sections, different callers won't share the
2688
self.assertIs(section1, section2)
2690
def test_save_emptied_succeeds(self):
2691
store = self.get_store(self)
2692
store._load_from_string(b'foo=bar\n')
2693
# FIXME: There should be a better way than relying on the test
2694
# parametrization to identify branch.conf -- vila 2011-0526
2695
if self.store_id in ('branch', 'remote_branch'):
2696
# branch stores requires write locked branches
2697
self.addCleanup(store.branch.lock_write().unlock)
2698
section = store.get_mutable_section(None)
2699
section.remove('foo')
2701
self.assertEqual(True, self.has_store(store))
2702
modified_store = self.get_store(self)
2703
sections = list(modified_store.get_sections())
2704
self.assertLength(0, sections)
2706
def test_save_with_content_succeeds(self):
2707
# FIXME: There should be a better way than relying on the test
2708
# parametrization to identify branch.conf -- vila 2011-0526
2709
if self.store_id in ('branch', 'remote_branch'):
2710
raise tests.TestNotApplicable(
2711
'branch.conf is *always* created when a branch is initialized')
2712
store = self.get_store(self)
2713
store._load_from_string(b'foo=bar\n')
2714
self.assertEqual(False, self.has_store(store))
2716
self.assertEqual(True, self.has_store(store))
2717
modified_store = self.get_store(self)
2718
sections = list(modified_store.get_sections())
2719
self.assertLength(1, sections)
2720
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2722
def test_set_option_in_empty_store(self):
2723
store = self.get_store(self)
2724
# FIXME: There should be a better way than relying on the test
2725
# parametrization to identify branch.conf -- vila 2011-0526
2726
if self.store_id in ('branch', 'remote_branch'):
2727
# branch stores requires write locked branches
2728
self.addCleanup(store.branch.lock_write().unlock)
2729
section = store.get_mutable_section(None)
2730
section.set('foo', 'bar')
2732
modified_store = self.get_store(self)
2733
sections = list(modified_store.get_sections())
2734
self.assertLength(1, sections)
2735
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2737
def test_set_option_in_default_section(self):
2738
store = self.get_store(self)
2739
store._load_from_string(b'')
2740
# FIXME: There should be a better way than relying on the test
2741
# parametrization to identify branch.conf -- vila 2011-0526
2742
if self.store_id in ('branch', 'remote_branch'):
2743
# branch stores requires write locked branches
2744
self.addCleanup(store.branch.lock_write().unlock)
2745
section = store.get_mutable_section(None)
2746
section.set('foo', 'bar')
2748
modified_store = self.get_store(self)
2749
sections = list(modified_store.get_sections())
2750
self.assertLength(1, sections)
2751
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2753
def test_set_option_in_named_section(self):
2754
store = self.get_store(self)
2755
store._load_from_string(b'')
2756
# FIXME: There should be a better way than relying on the test
2757
# parametrization to identify branch.conf -- vila 2011-0526
2758
if self.store_id in ('branch', 'remote_branch'):
2759
# branch stores requires write locked branches
2760
self.addCleanup(store.branch.lock_write().unlock)
2761
section = store.get_mutable_section('baz')
2762
section.set('foo', 'bar')
2764
modified_store = self.get_store(self)
2765
sections = list(modified_store.get_sections())
2766
self.assertLength(1, sections)
2767
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2769
def test_load_hook(self):
2770
# First, we need to ensure that the store exists
2771
store = self.get_store(self)
2772
# FIXME: There should be a better way than relying on the test
2773
# parametrization to identify branch.conf -- vila 2011-0526
2774
if self.store_id in ('branch', 'remote_branch'):
2775
# branch stores requires write locked branches
2776
self.addCleanup(store.branch.lock_write().unlock)
2777
section = store.get_mutable_section('baz')
2778
section.set('foo', 'bar')
2780
# Now we can try to load it
2781
store = self.get_store(self)
2786
config.ConfigHooks.install_named_hook('load', hook, None)
2787
self.assertLength(0, calls)
2789
self.assertLength(1, calls)
2790
self.assertEqual((store,), calls[0])
2792
def test_save_hook(self):
2797
config.ConfigHooks.install_named_hook('save', hook, None)
2798
self.assertLength(0, calls)
2799
store = self.get_store(self)
2800
# FIXME: There should be a better way than relying on the test
2801
# parametrization to identify branch.conf -- vila 2011-0526
2802
if self.store_id in ('branch', 'remote_branch'):
2803
# branch stores requires write locked branches
2804
self.addCleanup(store.branch.lock_write().unlock)
2805
section = store.get_mutable_section('baz')
2806
section.set('foo', 'bar')
2808
self.assertLength(1, calls)
2809
self.assertEqual((store,), calls[0])
2811
def test_set_mark_dirty(self):
2812
stack = config.MemoryStack(b'')
2813
self.assertLength(0, stack.store.dirty_sections)
2814
stack.set('foo', 'baz')
2815
self.assertLength(1, stack.store.dirty_sections)
2816
self.assertTrue(stack.store._need_saving())
2818
def test_remove_mark_dirty(self):
2819
stack = config.MemoryStack(b'foo=bar')
2820
self.assertLength(0, stack.store.dirty_sections)
2822
self.assertLength(1, stack.store.dirty_sections)
2823
self.assertTrue(stack.store._need_saving())
2826
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2827
"""Tests that config changes are kept in memory and saved on-demand."""
2830
super(TestStoreSaveChanges, self).setUp()
2831
self.transport = self.get_transport()
2832
# Most of the tests involve two stores pointing to the same persistent
2833
# storage to observe the effects of concurrent changes
2834
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2835
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2839
self.warnings.append(args[0] % args[1:])
2840
self.overrideAttr(trace, 'warning', warning)
2842
def has_store(self, store):
2843
store_basename = urlutils.relative_url(self.transport.external_url(),
2844
store.external_url())
2845
return self.transport.has(store_basename)
2847
def get_stack(self, store):
2848
# Any stack will do as long as it uses the right store, just a single
2849
# no-name section is enough
2850
return config.Stack([store.get_sections], store)
2852
def test_no_changes_no_save(self):
2853
s = self.get_stack(self.st1)
2854
s.store.save_changes()
2855
self.assertEqual(False, self.has_store(self.st1))
2857
def test_unrelated_concurrent_update(self):
2858
s1 = self.get_stack(self.st1)
2859
s2 = self.get_stack(self.st2)
2860
s1.set('foo', 'bar')
2861
s2.set('baz', 'quux')
2863
# Changes don't propagate magically
2864
self.assertEqual(None, s1.get('baz'))
2865
s2.store.save_changes()
2866
self.assertEqual('quux', s2.get('baz'))
2867
# Changes are acquired when saving
2868
self.assertEqual('bar', s2.get('foo'))
2869
# Since there is no overlap, no warnings are emitted
2870
self.assertLength(0, self.warnings)
2872
def test_concurrent_update_modified(self):
2873
s1 = self.get_stack(self.st1)
2874
s2 = self.get_stack(self.st2)
2875
s1.set('foo', 'bar')
2876
s2.set('foo', 'baz')
2879
s2.store.save_changes()
2880
self.assertEqual('baz', s2.get('foo'))
2881
# But the user get a warning
2882
self.assertLength(1, self.warnings)
2883
warning = self.warnings[0]
2884
self.assertStartsWith(warning, 'Option foo in section None')
2885
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2886
' The baz value will be saved.')
2888
def test_concurrent_deletion(self):
2889
self.st1._load_from_string(b'foo=bar')
2891
s1 = self.get_stack(self.st1)
2892
s2 = self.get_stack(self.st2)
2895
s1.store.save_changes()
2897
self.assertLength(0, self.warnings)
2898
s2.store.save_changes()
2900
self.assertLength(1, self.warnings)
2901
warning = self.warnings[0]
2902
self.assertStartsWith(warning, 'Option foo in section None')
2903
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2904
' The <DELETED> value will be saved.')
2907
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2909
def get_store(self):
2910
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2912
def test_get_quoted_string(self):
2913
store = self.get_store()
2914
store._load_from_string(b'foo= " abc "')
2915
stack = config.Stack([store.get_sections])
2916
self.assertEqual(' abc ', stack.get('foo'))
2918
def test_set_quoted_string(self):
2919
store = self.get_store()
2920
stack = config.Stack([store.get_sections], store)
2921
stack.set('foo', ' a b c ')
2923
self.assertFileEqual(b'foo = " a b c "' +
2924
os.linesep.encode('ascii'), 'foo.conf')
2927
class TestTransportIniFileStore(TestStore):
2929
def test_loading_unknown_file_fails(self):
2930
store = config.TransportIniFileStore(self.get_transport(),
2932
self.assertRaises(errors.NoSuchFile, store.load)
2934
def test_invalid_content(self):
2935
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2936
self.assertEqual(False, store.is_loaded())
2937
exc = self.assertRaises(
2938
config.ParseConfigError, store._load_from_string,
2939
b'this is invalid !')
2940
self.assertEndsWith(exc.filename, 'foo.conf')
2941
# And the load failed
2942
self.assertEqual(False, store.is_loaded())
2944
def test_get_embedded_sections(self):
2945
# A more complicated example (which also shows that section names and
2946
# option names share the same name space...)
2947
# FIXME: This should be fixed by forbidding dicts as values ?
2948
# -- vila 2011-04-05
2949
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2950
store._load_from_string(b'''
2954
foo_in_DEFAULT=foo_DEFAULT
2962
sections = list(store.get_sections())
2963
self.assertLength(4, sections)
2964
# The default section has no name.
2965
# List values are provided as strings and need to be explicitly
2966
# converted by specifying from_unicode=list_from_store at option
2968
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2970
self.assertSectionContent(
2971
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2972
self.assertSectionContent(
2973
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2974
# sub sections are provided as embedded dicts.
2975
self.assertSectionContent(
2976
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2980
class TestLockableIniFileStore(TestStore):
2982
def test_create_store_in_created_dir(self):
2983
self.assertPathDoesNotExist('dir')
2984
t = self.get_transport('dir/subdir')
2985
store = config.LockableIniFileStore(t, 'foo.conf')
2986
store.get_mutable_section(None).set('foo', 'bar')
2988
self.assertPathExists('dir/subdir')
2991
class TestConcurrentStoreUpdates(TestStore):
2992
"""Test that Stores properly handle conccurent updates.
2994
New Store implementation may fail some of these tests but until such
2995
implementations exist it's hard to properly filter them from the scenarios
2996
applied here. If you encounter such a case, contact the bzr devs.
2999
scenarios = [(key, {'get_stack': builder}) for key, builder
3000
in config.test_stack_builder_registry.iteritems()]
3003
super(TestConcurrentStoreUpdates, self).setUp()
3004
self.stack = self.get_stack(self)
3005
if not isinstance(self.stack, config._CompatibleStack):
3006
raise tests.TestNotApplicable(
3007
'%s is not meant to be compatible with the old config design'
3009
self.stack.set('one', '1')
3010
self.stack.set('two', '2')
3012
self.stack.store.save()
3014
def test_simple_read_access(self):
3015
self.assertEqual('1', self.stack.get('one'))
3017
def test_simple_write_access(self):
3018
self.stack.set('one', 'one')
3019
self.assertEqual('one', self.stack.get('one'))
3021
def test_listen_to_the_last_speaker(self):
3023
c2 = self.get_stack(self)
3024
c1.set('one', 'ONE')
3025
c2.set('two', 'TWO')
3026
self.assertEqual('ONE', c1.get('one'))
3027
self.assertEqual('TWO', c2.get('two'))
3028
# The second update respect the first one
3029
self.assertEqual('ONE', c2.get('one'))
3031
def test_last_speaker_wins(self):
3032
# If the same config is not shared, the same variable modified twice
3033
# can only see a single result.
3035
c2 = self.get_stack(self)
3038
self.assertEqual('c2', c2.get('one'))
3039
# The first modification is still available until another refresh
3041
self.assertEqual('c1', c1.get('one'))
3042
c1.set('two', 'done')
3043
self.assertEqual('c2', c1.get('one'))
3045
def test_writes_are_serialized(self):
3047
c2 = self.get_stack(self)
3049
# We spawn a thread that will pause *during* the config saving.
3050
before_writing = threading.Event()
3051
after_writing = threading.Event()
3052
writing_done = threading.Event()
3053
c1_save_without_locking_orig = c1.store.save_without_locking
3055
def c1_save_without_locking():
3056
before_writing.set()
3057
c1_save_without_locking_orig()
3058
# The lock is held. We wait for the main thread to decide when to
3060
after_writing.wait()
3061
c1.store.save_without_locking = c1_save_without_locking
3066
t1 = threading.Thread(target=c1_set)
3067
# Collect the thread after the test
3068
self.addCleanup(t1.join)
3069
# Be ready to unblock the thread if the test goes wrong
3070
self.addCleanup(after_writing.set)
3072
before_writing.wait()
3073
self.assertRaises(errors.LockContention,
3074
c2.set, 'one', 'c2')
3075
self.assertEqual('c1', c1.get('one'))
3076
# Let the lock be released
3080
self.assertEqual('c2', c2.get('one'))
3082
def test_read_while_writing(self):
3084
# We spawn a thread that will pause *during* the write
3085
ready_to_write = threading.Event()
3086
do_writing = threading.Event()
3087
writing_done = threading.Event()
3088
# We override the _save implementation so we know the store is locked
3089
c1_save_without_locking_orig = c1.store.save_without_locking
3091
def c1_save_without_locking():
3092
ready_to_write.set()
3093
# The lock is held. We wait for the main thread to decide when to
3096
c1_save_without_locking_orig()
3098
c1.store.save_without_locking = c1_save_without_locking
3102
t1 = threading.Thread(target=c1_set)
3103
# Collect the thread after the test
3104
self.addCleanup(t1.join)
3105
# Be ready to unblock the thread if the test goes wrong
3106
self.addCleanup(do_writing.set)
3108
# Ensure the thread is ready to write
3109
ready_to_write.wait()
3110
self.assertEqual('c1', c1.get('one'))
3111
# If we read during the write, we get the old value
3112
c2 = self.get_stack(self)
3113
self.assertEqual('1', c2.get('one'))
3114
# Let the writing occur and ensure it occurred
3117
# Now we get the updated value
3118
c3 = self.get_stack(self)
3119
self.assertEqual('c1', c3.get('one'))
3121
# FIXME: It may be worth looking into removing the lock dir when it's not
3122
# needed anymore and look at possible fallouts for concurrent lockers. This
3123
# will matter if/when we use config files outside of breezy directories
3124
# (.config/breezy or .bzr) -- vila 20110-04-111
3127
class TestSectionMatcher(TestStore):
3129
scenarios = [('location', {'matcher': config.LocationMatcher}),
3130
('id', {'matcher': config.NameMatcher}), ]
3133
super(TestSectionMatcher, self).setUp()
3134
# Any simple store is good enough
3135
self.get_store = config.test_store_builder_registry.get('configobj')
3137
def test_no_matches_for_empty_stores(self):
3138
store = self.get_store(self)
3139
store._load_from_string(b'')
3140
matcher = self.matcher(store, '/bar')
3141
self.assertEqual([], list(matcher.get_sections()))
3143
def test_build_doesnt_load_store(self):
3144
store = self.get_store(self)
3145
self.matcher(store, '/bar')
3146
self.assertFalse(store.is_loaded())
3149
class TestLocationSection(tests.TestCase):
3151
def get_section(self, options, extra_path):
3152
section = config.Section('foo', options)
3153
return config.LocationSection(section, extra_path)
3155
def test_simple_option(self):
3156
section = self.get_section({'foo': 'bar'}, '')
3157
self.assertEqual('bar', section.get('foo'))
3159
def test_option_with_extra_path(self):
3160
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3162
self.assertEqual('bar/baz', section.get('foo'))
3164
def test_invalid_policy(self):
3165
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3167
# invalid policies are ignored
3168
self.assertEqual('bar', section.get('foo'))
3171
class TestLocationMatcher(TestStore):
3174
super(TestLocationMatcher, self).setUp()
3175
# Any simple store is good enough
3176
self.get_store = config.test_store_builder_registry.get('configobj')
3178
def test_unrelated_section_excluded(self):
3179
store = self.get_store(self)
3180
store._load_from_string(b'''
3188
section=/foo/bar/baz
3192
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3194
[section.id for _, section in store.get_sections()])
3195
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3196
sections = [section for _, section in matcher.get_sections()]
3197
self.assertEqual(['/foo/bar', '/foo'],
3198
[section.id for section in sections])
3199
self.assertEqual(['quux', 'bar/quux'],
3200
[section.extra_path for section in sections])
3202
def test_more_specific_sections_first(self):
3203
store = self.get_store(self)
3204
store._load_from_string(b'''
3210
self.assertEqual(['/foo', '/foo/bar'],
3211
[section.id for _, section in store.get_sections()])
3212
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3213
sections = [section for _, section in matcher.get_sections()]
3214
self.assertEqual(['/foo/bar', '/foo'],
3215
[section.id for section in sections])
3216
self.assertEqual(['baz', 'bar/baz'],
3217
[section.extra_path for section in sections])
3219
def test_appendpath_in_no_name_section(self):
3220
# It's a bit weird to allow appendpath in a no-name section, but
3221
# someone may found a use for it
3222
store = self.get_store(self)
3223
store._load_from_string(b'''
3225
foo:policy = appendpath
3227
matcher = config.LocationMatcher(store, 'dir/subdir')
3228
sections = list(matcher.get_sections())
3229
self.assertLength(1, sections)
3230
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3232
def test_file_urls_are_normalized(self):
3233
store = self.get_store(self)
3234
if sys.platform == 'win32':
3235
expected_url = 'file:///C:/dir/subdir'
3236
expected_location = 'C:/dir/subdir'
3238
expected_url = 'file:///dir/subdir'
3239
expected_location = '/dir/subdir'
3240
matcher = config.LocationMatcher(store, expected_url)
3241
self.assertEqual(expected_location, matcher.location)
3243
def test_branch_name_colo(self):
3244
store = self.get_store(self)
3245
store._load_from_string(dedent("""\
3247
push_location=my{branchname}
3248
""").encode('ascii'))
3249
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3250
self.assertEqual('example<', matcher.branch_name)
3251
((_, section),) = matcher.get_sections()
3252
self.assertEqual('example<', section.locals['branchname'])
3254
def test_branch_name_basename(self):
3255
store = self.get_store(self)
3256
store._load_from_string(dedent("""\
3258
push_location=my{branchname}
3259
""").encode('ascii'))
3260
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3261
self.assertEqual('example<', matcher.branch_name)
3262
((_, section),) = matcher.get_sections()
3263
self.assertEqual('example<', section.locals['branchname'])
3266
class TestStartingPathMatcher(TestStore):
3269
super(TestStartingPathMatcher, self).setUp()
3270
# Any simple store is good enough
3271
self.store = config.IniFileStore()
3273
def assertSectionIDs(self, expected, location, content):
3274
self.store._load_from_string(content)
3275
matcher = config.StartingPathMatcher(self.store, location)
3276
sections = list(matcher.get_sections())
3277
self.assertLength(len(expected), sections)
3278
self.assertEqual(expected, [section.id for _, section in sections])
3281
def test_empty(self):
3282
self.assertSectionIDs([], self.get_url(), b'')
3284
def test_url_vs_local_paths(self):
3285
# The matcher location is an url and the section names are local paths
3286
self.assertSectionIDs(['/foo/bar', '/foo'],
3287
'file:///foo/bar/baz', b'''\
3292
def test_local_path_vs_url(self):
3293
# The matcher location is a local path and the section names are urls
3294
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3295
'/foo/bar/baz', b'''\
3300
def test_no_name_section_included_when_present(self):
3301
# Note that other tests will cover the case where the no-name section
3302
# is empty and as such, not included.
3303
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3304
'/foo/bar/baz', b'''\
3305
option = defined so the no-name section exists
3309
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3310
[s.locals['relpath'] for _, s in sections])
3312
def test_order_reversed(self):
3313
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3318
def test_unrelated_section_excluded(self):
3319
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3325
def test_glob_included(self):
3326
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3327
'/foo/bar/baz', b'''\
3333
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3334
# nothing really is... as far using {relpath} to append it to something
3335
# else, this seems good enough though.
3336
self.assertEqual(['', 'baz', 'bar/baz'],
3337
[s.locals['relpath'] for _, s in sections])
3339
def test_respect_order(self):
3340
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3341
'/foo/bar/baz', b'''\
3349
class TestNameMatcher(TestStore):
3352
super(TestNameMatcher, self).setUp()
3353
self.matcher = config.NameMatcher
3354
# Any simple store is good enough
3355
self.get_store = config.test_store_builder_registry.get('configobj')
3357
def get_matching_sections(self, name):
3358
store = self.get_store(self)
3359
store._load_from_string(b'''
3367
matcher = self.matcher(store, name)
3368
return list(matcher.get_sections())
3370
def test_matching(self):
3371
sections = self.get_matching_sections('foo')
3372
self.assertLength(1, sections)
3373
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3375
def test_not_matching(self):
3376
sections = self.get_matching_sections('baz')
3377
self.assertLength(0, sections)
3380
class TestBaseStackGet(tests.TestCase):
3383
super(TestBaseStackGet, self).setUp()
3384
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3386
def test_get_first_definition(self):
3387
store1 = config.IniFileStore()
3388
store1._load_from_string(b'foo=bar')
3389
store2 = config.IniFileStore()
3390
store2._load_from_string(b'foo=baz')
3391
conf = config.Stack([store1.get_sections, store2.get_sections])
3392
self.assertEqual('bar', conf.get('foo'))
3394
def test_get_with_registered_default_value(self):
3395
config.option_registry.register(config.Option('foo', default='bar'))
3396
conf_stack = config.Stack([])
3397
self.assertEqual('bar', conf_stack.get('foo'))
3399
def test_get_without_registered_default_value(self):
3400
config.option_registry.register(config.Option('foo'))
3401
conf_stack = config.Stack([])
3402
self.assertEqual(None, conf_stack.get('foo'))
3404
def test_get_without_default_value_for_not_registered(self):
3405
conf_stack = config.Stack([])
3406
self.assertEqual(None, conf_stack.get('foo'))
3408
def test_get_for_empty_section_callable(self):
3409
conf_stack = config.Stack([lambda: []])
3410
self.assertEqual(None, conf_stack.get('foo'))
3412
def test_get_for_broken_callable(self):
3413
# Trying to use and invalid callable raises an exception on first use
3414
conf_stack = config.Stack([object])
3415
self.assertRaises(TypeError, conf_stack.get, 'foo')
3418
class TestStackWithSimpleStore(tests.TestCase):
3421
super(TestStackWithSimpleStore, self).setUp()
3422
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3423
self.registry = config.option_registry
3425
def get_conf(self, content=None):
3426
return config.MemoryStack(content)
3428
def test_override_value_from_env(self):
3429
self.overrideEnv('FOO', None)
3430
self.registry.register(
3431
config.Option('foo', default='bar', override_from_env=['FOO']))
3432
self.overrideEnv('FOO', 'quux')
3433
# Env variable provides a default taking over the option one
3434
conf = self.get_conf(b'foo=store')
3435
self.assertEqual('quux', conf.get('foo'))
3437
def test_first_override_value_from_env_wins(self):
3438
self.overrideEnv('NO_VALUE', None)
3439
self.overrideEnv('FOO', None)
3440
self.overrideEnv('BAZ', None)
3441
self.registry.register(
3442
config.Option('foo', default='bar',
3443
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3444
self.overrideEnv('FOO', 'foo')
3445
self.overrideEnv('BAZ', 'baz')
3446
# The first env var set wins
3447
conf = self.get_conf(b'foo=store')
3448
self.assertEqual('foo', conf.get('foo'))
3451
class TestMemoryStack(tests.TestCase):
3454
conf = config.MemoryStack(b'foo=bar')
3455
self.assertEqual('bar', conf.get('foo'))
3458
conf = config.MemoryStack(b'foo=bar')
3459
conf.set('foo', 'baz')
3460
self.assertEqual('baz', conf.get('foo'))
3462
def test_no_content(self):
3463
conf = config.MemoryStack()
3464
# No content means no loading
3465
self.assertFalse(conf.store.is_loaded())
3466
self.assertRaises(NotImplementedError, conf.get, 'foo')
3467
# But a content can still be provided
3468
conf.store._load_from_string(b'foo=bar')
3469
self.assertEqual('bar', conf.get('foo'))
3472
class TestStackIterSections(tests.TestCase):
3474
def test_empty_stack(self):
3475
conf = config.Stack([])
3476
sections = list(conf.iter_sections())
3477
self.assertLength(0, sections)
3479
def test_empty_store(self):
3480
store = config.IniFileStore()
3481
store._load_from_string(b'')
3482
conf = config.Stack([store.get_sections])
3483
sections = list(conf.iter_sections())
3484
self.assertLength(0, sections)
3486
def test_simple_store(self):
3487
store = config.IniFileStore()
3488
store._load_from_string(b'foo=bar')
3489
conf = config.Stack([store.get_sections])
3490
tuples = list(conf.iter_sections())
3491
self.assertLength(1, tuples)
3492
(found_store, found_section) = tuples[0]
3493
self.assertIs(store, found_store)
3495
def test_two_stores(self):
3496
store1 = config.IniFileStore()
3497
store1._load_from_string(b'foo=bar')
3498
store2 = config.IniFileStore()
3499
store2._load_from_string(b'bar=qux')
3500
conf = config.Stack([store1.get_sections, store2.get_sections])
3501
tuples = list(conf.iter_sections())
3502
self.assertLength(2, tuples)
3503
self.assertIs(store1, tuples[0][0])
3504
self.assertIs(store2, tuples[1][0])
3507
class TestStackWithTransport(tests.TestCaseWithTransport):
3509
scenarios = [(key, {'get_stack': builder}) for key, builder
3510
in config.test_stack_builder_registry.iteritems()]
3513
class TestConcreteStacks(TestStackWithTransport):
3515
def test_build_stack(self):
3516
# Just a smoke test to help debug builders
3517
self.get_stack(self)
3520
class TestStackGet(TestStackWithTransport):
3523
super(TestStackGet, self).setUp()
3524
self.conf = self.get_stack(self)
3526
def test_get_for_empty_stack(self):
3527
self.assertEqual(None, self.conf.get('foo'))
3529
def test_get_hook(self):
3530
self.conf.set('foo', 'bar')
3535
config.ConfigHooks.install_named_hook('get', hook, None)
3536
self.assertLength(0, calls)
3537
value = self.conf.get('foo')
3538
self.assertEqual('bar', value)
3539
self.assertLength(1, calls)
3540
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3543
class TestStackGetWithConverter(tests.TestCase):
3546
super(TestStackGetWithConverter, self).setUp()
3547
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3548
self.registry = config.option_registry
3550
def get_conf(self, content=None):
3551
return config.MemoryStack(content)
3553
def register_bool_option(self, name, default=None, default_from_env=None):
3554
b = config.Option(name, help='A boolean.',
3555
default=default, default_from_env=default_from_env,
3556
from_unicode=config.bool_from_store)
3557
self.registry.register(b)
3559
def test_get_default_bool_None(self):
3560
self.register_bool_option('foo')
3561
conf = self.get_conf(b'')
3562
self.assertEqual(None, conf.get('foo'))
3564
def test_get_default_bool_True(self):
3565
self.register_bool_option('foo', u'True')
3566
conf = self.get_conf(b'')
3567
self.assertEqual(True, conf.get('foo'))
3569
def test_get_default_bool_False(self):
3570
self.register_bool_option('foo', False)
3571
conf = self.get_conf(b'')
3572
self.assertEqual(False, conf.get('foo'))
3574
def test_get_default_bool_False_as_string(self):
3575
self.register_bool_option('foo', u'False')
3576
conf = self.get_conf(b'')
3577
self.assertEqual(False, conf.get('foo'))
3579
def test_get_default_bool_from_env_converted(self):
3580
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3581
self.overrideEnv('FOO', 'False')
3582
conf = self.get_conf(b'')
3583
self.assertEqual(False, conf.get('foo'))
3585
def test_get_default_bool_when_conversion_fails(self):
3586
self.register_bool_option('foo', default='True')
3587
conf = self.get_conf(b'foo=invalid boolean')
3588
self.assertEqual(True, conf.get('foo'))
3590
def register_integer_option(self, name,
3591
default=None, default_from_env=None):
3592
i = config.Option(name, help='An integer.',
3593
default=default, default_from_env=default_from_env,
3594
from_unicode=config.int_from_store)
3595
self.registry.register(i)
3597
def test_get_default_integer_None(self):
3598
self.register_integer_option('foo')
3599
conf = self.get_conf(b'')
3600
self.assertEqual(None, conf.get('foo'))
3602
def test_get_default_integer(self):
3603
self.register_integer_option('foo', 42)
3604
conf = self.get_conf(b'')
3605
self.assertEqual(42, conf.get('foo'))
3607
def test_get_default_integer_as_string(self):
3608
self.register_integer_option('foo', u'42')
3609
conf = self.get_conf(b'')
3610
self.assertEqual(42, conf.get('foo'))
3612
def test_get_default_integer_from_env(self):
3613
self.register_integer_option('foo', default_from_env=['FOO'])
3614
self.overrideEnv('FOO', '18')
3615
conf = self.get_conf(b'')
3616
self.assertEqual(18, conf.get('foo'))
3618
def test_get_default_integer_when_conversion_fails(self):
3619
self.register_integer_option('foo', default='12')
3620
conf = self.get_conf(b'foo=invalid integer')
3621
self.assertEqual(12, conf.get('foo'))
3623
def register_list_option(self, name, default=None, default_from_env=None):
3624
l = config.ListOption(name, help='A list.', default=default,
3625
default_from_env=default_from_env)
3626
self.registry.register(l)
3628
def test_get_default_list_None(self):
3629
self.register_list_option('foo')
3630
conf = self.get_conf(b'')
3631
self.assertEqual(None, conf.get('foo'))
3633
def test_get_default_list_empty(self):
3634
self.register_list_option('foo', '')
3635
conf = self.get_conf(b'')
3636
self.assertEqual([], conf.get('foo'))
3638
def test_get_default_list_from_env(self):
3639
self.register_list_option('foo', default_from_env=['FOO'])
3640
self.overrideEnv('FOO', '')
3641
conf = self.get_conf(b'')
3642
self.assertEqual([], conf.get('foo'))
3644
def test_get_with_list_converter_no_item(self):
3645
self.register_list_option('foo', None)
3646
conf = self.get_conf(b'foo=,')
3647
self.assertEqual([], conf.get('foo'))
3649
def test_get_with_list_converter_many_items(self):
3650
self.register_list_option('foo', None)
3651
conf = self.get_conf(b'foo=m,o,r,e')
3652
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3654
def test_get_with_list_converter_embedded_spaces_many_items(self):
3655
self.register_list_option('foo', None)
3656
conf = self.get_conf(b'foo=" bar", "baz "')
3657
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3659
def test_get_with_list_converter_stripped_spaces_many_items(self):
3660
self.register_list_option('foo', None)
3661
conf = self.get_conf(b'foo= bar , baz ')
3662
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3665
class TestIterOptionRefs(tests.TestCase):
3666
"""iter_option_refs is a bit unusual, document some cases."""
3668
def assertRefs(self, expected, string):
3669
self.assertEqual(expected, list(config.iter_option_refs(string)))
3671
def test_empty(self):
3672
self.assertRefs([(False, '')], '')
3674
def test_no_refs(self):
3675
self.assertRefs([(False, 'foo bar')], 'foo bar')
3677
def test_single_ref(self):
3678
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3680
def test_broken_ref(self):
3681
self.assertRefs([(False, '{foo')], '{foo')
3683
def test_embedded_ref(self):
3684
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3687
def test_two_refs(self):
3688
self.assertRefs([(False, ''), (True, '{foo}'),
3689
(False, ''), (True, '{bar}'),
3693
def test_newline_in_refs_are_not_matched(self):
3694
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3697
class TestStackExpandOptions(tests.TestCaseWithTransport):
3700
super(TestStackExpandOptions, self).setUp()
3701
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3702
self.registry = config.option_registry
3703
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3704
self.conf = config.Stack([store.get_sections], store)
3706
def assertExpansion(self, expected, string, env=None):
3707
self.assertEqual(expected, self.conf.expand_options(string, env))
3709
def test_no_expansion(self):
3710
self.assertExpansion('foo', 'foo')
3712
def test_expand_default_value(self):
3713
self.conf.store._load_from_string(b'bar=baz')
3714
self.registry.register(config.Option('foo', default=u'{bar}'))
3715
self.assertEqual('baz', self.conf.get('foo', expand=True))
3717
def test_expand_default_from_env(self):
3718
self.conf.store._load_from_string(b'bar=baz')
3719
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3720
self.overrideEnv('FOO', '{bar}')
3721
self.assertEqual('baz', self.conf.get('foo', expand=True))
3723
def test_expand_default_on_failed_conversion(self):
3724
self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
3725
self.registry.register(
3726
config.Option('foo', default=u'{bar}',
3727
from_unicode=config.int_from_store))
3728
self.assertEqual(42, self.conf.get('foo', expand=True))
3730
def test_env_adding_options(self):
3731
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3733
def test_env_overriding_options(self):
3734
self.conf.store._load_from_string(b'foo=baz')
3735
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3737
def test_simple_ref(self):
3738
self.conf.store._load_from_string(b'foo=xxx')
3739
self.assertExpansion('xxx', '{foo}')
3741
def test_unknown_ref(self):
3742
self.assertRaises(config.ExpandingUnknownOption,
3743
self.conf.expand_options, '{foo}')
3745
def test_illegal_def_is_ignored(self):
3746
self.assertExpansion('{1,2}', '{1,2}')
3747
self.assertExpansion('{ }', '{ }')
3748
self.assertExpansion('${Foo,f}', '${Foo,f}')
3750
def test_indirect_ref(self):
3751
self.conf.store._load_from_string(b'''
3755
self.assertExpansion('xxx', '{bar}')
3757
def test_embedded_ref(self):
3758
self.conf.store._load_from_string(b'''
3762
self.assertExpansion('xxx', '{{bar}}')
3764
def test_simple_loop(self):
3765
self.conf.store._load_from_string(b'foo={foo}')
3766
self.assertRaises(config.OptionExpansionLoop,
3767
self.conf.expand_options, '{foo}')
3769
def test_indirect_loop(self):
3770
self.conf.store._load_from_string(b'''
3774
e = self.assertRaises(config.OptionExpansionLoop,
3775
self.conf.expand_options, '{foo}')
3776
self.assertEqual('foo->bar->baz', e.refs)
3777
self.assertEqual('{foo}', e.string)
3779
def test_list(self):
3780
self.conf.store._load_from_string(b'''
3784
list={foo},{bar},{baz}
3786
self.registry.register(
3787
config.ListOption('list'))
3788
self.assertEqual(['start', 'middle', 'end'],
3789
self.conf.get('list', expand=True))
3791
def test_cascading_list(self):
3792
self.conf.store._load_from_string(b'''
3798
self.registry.register(config.ListOption('list'))
3799
# Register an intermediate option as a list to ensure no conversion
3800
# happen while expanding. Conversion should only occur for the original
3801
# option ('list' here).
3802
self.registry.register(config.ListOption('baz'))
3803
self.assertEqual(['start', 'middle', 'end'],
3804
self.conf.get('list', expand=True))
3806
def test_pathologically_hidden_list(self):
3807
self.conf.store._load_from_string(b'''
3813
hidden={start}{middle}{end}
3815
# What matters is what the registration says, the conversion happens
3816
# only after all expansions have been performed
3817
self.registry.register(config.ListOption('hidden'))
3818
self.assertEqual(['bin', 'go'],
3819
self.conf.get('hidden', expand=True))
3822
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3825
super(TestStackCrossSectionsExpand, self).setUp()
3827
def get_config(self, location, string):
3830
# Since we don't save the config we won't strictly require to inherit
3831
# from TestCaseInTempDir, but an error occurs so quickly...
3832
c = config.LocationStack(location)
3833
c.store._load_from_string(string)
3836
def test_dont_cross_unrelated_section(self):
3837
c = self.get_config('/another/branch/path', b'''
3842
[/another/branch/path]
3845
self.assertRaises(config.ExpandingUnknownOption,
3846
c.get, 'bar', expand=True)
3848
def test_cross_related_sections(self):
3849
c = self.get_config('/project/branch/path', b'''
3853
[/project/branch/path]
3856
self.assertEqual('quux', c.get('bar', expand=True))
3859
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3861
def test_cross_global_locations(self):
3862
l_store = config.LocationStore()
3863
l_store._load_from_string(b'''
3869
g_store = config.GlobalStore()
3870
g_store._load_from_string(b'''
3876
stack = config.LocationStack('/branch')
3877
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3878
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3881
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3883
def test_expand_locals_empty(self):
3884
l_store = config.LocationStore()
3885
l_store._load_from_string(b'''
3886
[/home/user/project]
3891
stack = config.LocationStack('/home/user/project/')
3892
self.assertEqual('', stack.get('base', expand=True))
3893
self.assertEqual('', stack.get('rel', expand=True))
3895
def test_expand_basename_locally(self):
3896
l_store = config.LocationStore()
3897
l_store._load_from_string(b'''
3898
[/home/user/project]
3902
stack = config.LocationStack('/home/user/project/branch')
3903
self.assertEqual('branch', stack.get('bfoo', expand=True))
3905
def test_expand_basename_locally_longer_path(self):
3906
l_store = config.LocationStore()
3907
l_store._load_from_string(b'''
3912
stack = config.LocationStack('/home/user/project/dir/branch')
3913
self.assertEqual('branch', stack.get('bfoo', expand=True))
3915
def test_expand_relpath_locally(self):
3916
l_store = config.LocationStore()
3917
l_store._load_from_string(b'''
3918
[/home/user/project]
3919
lfoo = loc-foo/{relpath}
3922
stack = config.LocationStack('/home/user/project/branch')
3923
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3925
def test_expand_relpath_unknonw_in_global(self):
3926
g_store = config.GlobalStore()
3927
g_store._load_from_string(b'''
3932
stack = config.LocationStack('/home/user/project/branch')
3933
self.assertRaises(config.ExpandingUnknownOption,
3934
stack.get, 'gfoo', expand=True)
3936
def test_expand_local_option_locally(self):
3937
l_store = config.LocationStore()
3938
l_store._load_from_string(b'''
3939
[/home/user/project]
3940
lfoo = loc-foo/{relpath}
3944
g_store = config.GlobalStore()
3945
g_store._load_from_string(b'''
3951
stack = config.LocationStack('/home/user/project/branch')
3952
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3953
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3955
def test_locals_dont_leak(self):
3956
"""Make sure we chose the right local in presence of several sections.
3958
l_store = config.LocationStore()
3959
l_store._load_from_string(b'''
3961
lfoo = loc-foo/{relpath}
3962
[/home/user/project]
3963
lfoo = loc-foo/{relpath}
3966
stack = config.LocationStack('/home/user/project/branch')
3967
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3968
stack = config.LocationStack('/home/user/bar/baz')
3969
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3972
class TestStackSet(TestStackWithTransport):
3974
def test_simple_set(self):
3975
conf = self.get_stack(self)
3976
self.assertEqual(None, conf.get('foo'))
3977
conf.set('foo', 'baz')
3978
# Did we get it back ?
3979
self.assertEqual('baz', conf.get('foo'))
3981
def test_set_creates_a_new_section(self):
3982
conf = self.get_stack(self)
3983
conf.set('foo', 'baz')
3984
self.assertEqual, 'baz', conf.get('foo')
3986
def test_set_hook(self):
3991
config.ConfigHooks.install_named_hook('set', hook, None)
3992
self.assertLength(0, calls)
3993
conf = self.get_stack(self)
3994
conf.set('foo', 'bar')
3995
self.assertLength(1, calls)
3996
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3999
class TestStackRemove(TestStackWithTransport):
4001
def test_remove_existing(self):
4002
conf = self.get_stack(self)
4003
conf.set('foo', 'bar')
4004
self.assertEqual('bar', conf.get('foo'))
4006
# Did we get it back ?
4007
self.assertEqual(None, conf.get('foo'))
4009
def test_remove_unknown(self):
4010
conf = self.get_stack(self)
4011
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
4013
def test_remove_hook(self):
4018
config.ConfigHooks.install_named_hook('remove', hook, None)
4019
self.assertLength(0, calls)
4020
conf = self.get_stack(self)
4021
conf.set('foo', 'bar')
4023
self.assertLength(1, calls)
4024
self.assertEqual((conf, 'foo'), calls[0])
4027
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
4030
super(TestConfigGetOptions, self).setUp()
4031
create_configs(self)
4033
def test_no_variable(self):
4034
# Using branch should query branch, locations and breezy
4035
self.assertOptions([], self.branch_config)
4037
def test_option_in_breezy(self):
4038
self.breezy_config.set_user_option('file', 'breezy')
4039
self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
4042
def test_option_in_locations(self):
4043
self.locations_config.set_user_option('file', 'locations')
4045
[('file', 'locations', self.tree.basedir, 'locations')],
4046
self.locations_config)
4048
def test_option_in_branch(self):
4049
self.branch_config.set_user_option('file', 'branch')
4050
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
4053
def test_option_in_breezy_and_branch(self):
4054
self.breezy_config.set_user_option('file', 'breezy')
4055
self.branch_config.set_user_option('file', 'branch')
4056
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
4057
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4060
def test_option_in_branch_and_locations(self):
4061
# Hmm, locations override branch :-/
4062
self.locations_config.set_user_option('file', 'locations')
4063
self.branch_config.set_user_option('file', 'branch')
4065
[('file', 'locations', self.tree.basedir, 'locations'),
4066
('file', 'branch', 'DEFAULT', 'branch'), ],
4069
def test_option_in_breezy_locations_and_branch(self):
4070
self.breezy_config.set_user_option('file', 'breezy')
4071
self.locations_config.set_user_option('file', 'locations')
4072
self.branch_config.set_user_option('file', 'branch')
4074
[('file', 'locations', self.tree.basedir, 'locations'),
4075
('file', 'branch', 'DEFAULT', 'branch'),
4076
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4080
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4083
super(TestConfigRemoveOption, self).setUp()
4084
create_configs_with_file_option(self)
4086
def test_remove_in_locations(self):
4087
self.locations_config.remove_user_option('file', self.tree.basedir)
4089
[('file', 'branch', 'DEFAULT', 'branch'),
4090
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4093
def test_remove_in_branch(self):
4094
self.branch_config.remove_user_option('file')
4096
[('file', 'locations', self.tree.basedir, 'locations'),
4097
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4100
def test_remove_in_breezy(self):
4101
self.breezy_config.remove_user_option('file')
4103
[('file', 'locations', self.tree.basedir, 'locations'),
4104
('file', 'branch', 'DEFAULT', 'branch'), ],
4108
class TestConfigGetSections(tests.TestCaseWithTransport):
4111
super(TestConfigGetSections, self).setUp()
4112
create_configs(self)
4114
def assertSectionNames(self, expected, conf, name=None):
4115
"""Check which sections are returned for a given config.
4117
If fallback configurations exist their sections can be included.
4119
:param expected: A list of section names.
4121
:param conf: The configuration that will be queried.
4123
:param name: An optional section name that will be passed to
4126
sections = list(conf._get_sections(name))
4127
self.assertLength(len(expected), sections)
4128
self.assertEqual(expected, [n for n, _, _ in sections])
4130
def test_breezy_default_section(self):
4131
self.assertSectionNames(['DEFAULT'], self.breezy_config)
4133
def test_locations_default_section(self):
4134
# No sections are defined in an empty file
4135
self.assertSectionNames([], self.locations_config)
4137
def test_locations_named_section(self):
4138
self.locations_config.set_user_option('file', 'locations')
4139
self.assertSectionNames([self.tree.basedir], self.locations_config)
4141
def test_locations_matching_sections(self):
4142
loc_config = self.locations_config
4143
loc_config.set_user_option('file', 'locations')
4144
# We need to cheat a bit here to create an option in sections above and
4145
# below the 'location' one.
4146
parser = loc_config._get_parser()
4147
# locations.cong deals with '/' ignoring native os.sep
4148
location_names = self.tree.basedir.split('/')
4149
parent = '/'.join(location_names[:-1])
4150
child = '/'.join(location_names + ['child'])
4152
parser[parent]['file'] = 'parent'
4154
parser[child]['file'] = 'child'
4155
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4157
def test_branch_data_default_section(self):
4158
self.assertSectionNames([None],
4159
self.branch_config._get_branch_data_config())
4161
def test_branch_default_sections(self):
4162
# No sections are defined in an empty locations file
4163
self.assertSectionNames([None, 'DEFAULT'],
4165
# Unless we define an option
4166
self.branch_config._get_location_config().set_user_option(
4167
'file', 'locations')
4168
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4171
def test_breezy_named_section(self):
4172
# We need to cheat as the API doesn't give direct access to sections
4173
# other than DEFAULT.
4174
self.breezy_config.set_alias('breezy', 'bzr')
4175
self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
4178
class TestSharedStores(tests.TestCaseInTempDir):
4180
def test_breezy_conf_shared(self):
4181
g1 = config.GlobalStack()
4182
g2 = config.GlobalStack()
4183
# The two stacks share the same store
4184
self.assertIs(g1.store, g2.store)
4187
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4188
"""Test warning for permissions of authentication.conf."""
4191
super(TestAuthenticationConfigFilePermissions, self).setUp()
4192
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4193
with open(self.path, 'wb') as f:
4194
f.write(b"""[broken]
4197
port=port # Error: Not an int
4199
self.overrideAttr(config, 'authentication_config_filename',
4201
osutils.chmod_if_possible(self.path, 0o755)
4203
def test_check_warning(self):
4204
conf = config.AuthenticationConfig()
4205
self.assertEqual(conf._filename, self.path)
4206
self.assertContainsRe(self.get_log(),
4207
'Saved passwords may be accessible by other users.')
4209
def test_check_suppressed_warning(self):
4210
global_config = config.GlobalConfig()
4211
global_config.set_user_option('suppress_warnings',
4212
'insecure_permissions')
4213
conf = config.AuthenticationConfig()
4214
self.assertEqual(conf._filename, self.path)
4215
self.assertNotContainsRe(self.get_log(),
4216
'Saved passwords may be accessible by other users.')
1315
4219
class TestAuthenticationConfigFile(tests.TestCase):
1316
4220
"""Test the authentication.conf file matching"""