1312
1648
self.assertIs(None, bzrdir_config.get_default_stack_on())
1651
class TestOldConfigHooks(tests.TestCaseWithTransport):
1654
super(TestOldConfigHooks, self).setUp()
1655
create_configs_with_file_option(self)
1657
def assertGetHook(self, conf, name, value):
1662
config.OldConfigHooks.install_named_hook('get', hook, None)
1664
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1665
self.assertLength(0, calls)
1666
actual_value = conf.get_user_option(name)
1667
self.assertEqual(value, actual_value)
1668
self.assertLength(1, calls)
1669
self.assertEqual((conf, name, value), calls[0])
1671
def test_get_hook_breezy(self):
1672
self.assertGetHook(self.breezy_config, 'file', 'breezy')
1674
def test_get_hook_locations(self):
1675
self.assertGetHook(self.locations_config, 'file', 'locations')
1677
def test_get_hook_branch(self):
1678
# Since locations masks branch, we define a different option
1679
self.branch_config.set_user_option('file2', 'branch')
1680
self.assertGetHook(self.branch_config, 'file2', 'branch')
1682
def assertSetHook(self, conf, name, value):
1687
config.OldConfigHooks.install_named_hook('set', hook, None)
1689
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1690
self.assertLength(0, calls)
1691
conf.set_user_option(name, value)
1692
self.assertLength(1, calls)
1693
# We can't assert the conf object below as different configs use
1694
# different means to implement set_user_option and we care only about
1696
self.assertEqual((name, value), calls[0][1:])
1698
def test_set_hook_breezy(self):
1699
self.assertSetHook(self.breezy_config, 'foo', 'breezy')
1701
def test_set_hook_locations(self):
1702
self.assertSetHook(self.locations_config, 'foo', 'locations')
1704
def test_set_hook_branch(self):
1705
self.assertSetHook(self.branch_config, 'foo', 'branch')
1707
def assertRemoveHook(self, conf, name, section_name=None):
1712
config.OldConfigHooks.install_named_hook('remove', hook, None)
1714
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1715
self.assertLength(0, calls)
1716
conf.remove_user_option(name, section_name)
1717
self.assertLength(1, calls)
1718
# We can't assert the conf object below as different configs use
1719
# different means to implement remove_user_option and we care only about
1721
self.assertEqual((name,), calls[0][1:])
1723
def test_remove_hook_breezy(self):
1724
self.assertRemoveHook(self.breezy_config, 'file')
1726
def test_remove_hook_locations(self):
1727
self.assertRemoveHook(self.locations_config, 'file',
1728
self.locations_config.location)
1730
def test_remove_hook_branch(self):
1731
self.assertRemoveHook(self.branch_config, 'file')
1733
def assertLoadHook(self, name, conf_class, *conf_args):
1738
config.OldConfigHooks.install_named_hook('load', hook, None)
1740
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1741
self.assertLength(0, calls)
1743
conf = conf_class(*conf_args)
1744
# Access an option to trigger a load
1745
conf.get_user_option(name)
1746
self.assertLength(1, calls)
1747
# Since we can't assert about conf, we just use the number of calls ;-/
1749
def test_load_hook_breezy(self):
1750
self.assertLoadHook('file', config.GlobalConfig)
1752
def test_load_hook_locations(self):
1753
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1755
def test_load_hook_branch(self):
1756
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1758
def assertSaveHook(self, conf):
1763
config.OldConfigHooks.install_named_hook('save', hook, None)
1765
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1766
self.assertLength(0, calls)
1767
# Setting an option triggers a save
1768
conf.set_user_option('foo', 'bar')
1769
self.assertLength(1, calls)
1770
# Since we can't assert about conf, we just use the number of calls ;-/
1772
def test_save_hook_breezy(self):
1773
self.assertSaveHook(self.breezy_config)
1775
def test_save_hook_locations(self):
1776
self.assertSaveHook(self.locations_config)
1778
def test_save_hook_branch(self):
1779
self.assertSaveHook(self.branch_config)
1782
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1783
"""Tests config hooks for remote configs.
1785
No tests for the remove hook as this is not implemented there.
1789
super(TestOldConfigHooksForRemote, self).setUp()
1790
self.transport_server = test_server.SmartTCPServer_for_testing
1791
create_configs_with_file_option(self)
1793
def assertGetHook(self, conf, name, value):
1798
config.OldConfigHooks.install_named_hook('get', hook, None)
1800
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1801
self.assertLength(0, calls)
1802
actual_value = conf.get_option(name)
1803
self.assertEqual(value, actual_value)
1804
self.assertLength(1, calls)
1805
self.assertEqual((conf, name, value), calls[0])
1807
def test_get_hook_remote_branch(self):
1808
remote_branch = branch.Branch.open(self.get_url('tree'))
1809
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1811
def test_get_hook_remote_bzrdir(self):
1812
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1813
conf = remote_bzrdir._get_config()
1814
conf.set_option('remotedir', 'file')
1815
self.assertGetHook(conf, 'file', 'remotedir')
1817
def assertSetHook(self, conf, name, value):
1822
config.OldConfigHooks.install_named_hook('set', hook, None)
1824
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1825
self.assertLength(0, calls)
1826
conf.set_option(value, name)
1827
self.assertLength(1, calls)
1828
# We can't assert the conf object below as different configs use
1829
# different means to implement set_user_option and we care only about
1831
self.assertEqual((name, value), calls[0][1:])
1833
def test_set_hook_remote_branch(self):
1834
remote_branch = branch.Branch.open(self.get_url('tree'))
1835
self.addCleanup(remote_branch.lock_write().unlock)
1836
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1838
def test_set_hook_remote_bzrdir(self):
1839
remote_branch = branch.Branch.open(self.get_url('tree'))
1840
self.addCleanup(remote_branch.lock_write().unlock)
1841
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1842
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1844
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1849
config.OldConfigHooks.install_named_hook('load', hook, None)
1851
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1852
self.assertLength(0, calls)
1854
conf = conf_class(*conf_args)
1855
# Access an option to trigger a load
1856
conf.get_option(name)
1857
self.assertLength(expected_nb_calls, calls)
1858
# Since we can't assert about conf, we just use the number of calls ;-/
1860
def test_load_hook_remote_branch(self):
1861
remote_branch = branch.Branch.open(self.get_url('tree'))
1862
self.assertLoadHook(
1863
1, 'file', remote.RemoteBranchConfig, remote_branch)
1865
def test_load_hook_remote_bzrdir(self):
1866
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1867
# The config file doesn't exist, set an option to force its creation
1868
conf = remote_bzrdir._get_config()
1869
conf.set_option('remotedir', 'file')
1870
# We get one call for the server and one call for the client, this is
1871
# caused by the differences in implementations betwen
1872
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1873
# SmartServerBranchGetConfigFile (in smart/branch.py)
1874
self.assertLoadHook(
1875
2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1877
def assertSaveHook(self, conf):
1882
config.OldConfigHooks.install_named_hook('save', hook, None)
1884
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1885
self.assertLength(0, calls)
1886
# Setting an option triggers a save
1887
conf.set_option('foo', 'bar')
1888
self.assertLength(1, calls)
1889
# Since we can't assert about conf, we just use the number of calls ;-/
1891
def test_save_hook_remote_branch(self):
1892
remote_branch = branch.Branch.open(self.get_url('tree'))
1893
self.addCleanup(remote_branch.lock_write().unlock)
1894
self.assertSaveHook(remote_branch._get_config())
1896
def test_save_hook_remote_bzrdir(self):
1897
remote_branch = branch.Branch.open(self.get_url('tree'))
1898
self.addCleanup(remote_branch.lock_write().unlock)
1899
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1900
self.assertSaveHook(remote_bzrdir._get_config())
1903
class TestOptionNames(tests.TestCase):
1905
def is_valid(self, name):
1906
return config._option_ref_re.match('{%s}' % name) is not None
1908
def test_valid_names(self):
1909
self.assertTrue(self.is_valid('foo'))
1910
self.assertTrue(self.is_valid('foo.bar'))
1911
self.assertTrue(self.is_valid('f1'))
1912
self.assertTrue(self.is_valid('_'))
1913
self.assertTrue(self.is_valid('__bar__'))
1914
self.assertTrue(self.is_valid('a_'))
1915
self.assertTrue(self.is_valid('a1'))
1916
# Don't break bzr-svn for no good reason
1917
self.assertTrue(self.is_valid('guessed-layout'))
1919
def test_invalid_names(self):
1920
self.assertFalse(self.is_valid(' foo'))
1921
self.assertFalse(self.is_valid('foo '))
1922
self.assertFalse(self.is_valid('1'))
1923
self.assertFalse(self.is_valid('1,2'))
1924
self.assertFalse(self.is_valid('foo$'))
1925
self.assertFalse(self.is_valid('!foo'))
1926
self.assertFalse(self.is_valid('foo.'))
1927
self.assertFalse(self.is_valid('foo..bar'))
1928
self.assertFalse(self.is_valid('{}'))
1929
self.assertFalse(self.is_valid('{a}'))
1930
self.assertFalse(self.is_valid('a\n'))
1931
self.assertFalse(self.is_valid('-'))
1932
self.assertFalse(self.is_valid('-a'))
1933
self.assertFalse(self.is_valid('a-'))
1934
self.assertFalse(self.is_valid('a--a'))
1936
def assertSingleGroup(self, reference):
1937
# the regexp is used with split and as such should match the reference
1938
# *only*, if more groups needs to be defined, (?:...) should be used.
1939
m = config._option_ref_re.match('{a}')
1940
self.assertLength(1, m.groups())
1942
def test_valid_references(self):
1943
self.assertSingleGroup('{a}')
1944
self.assertSingleGroup('{{a}}')
1947
class TestOption(tests.TestCase):
1949
def test_default_value(self):
1950
opt = config.Option('foo', default='bar')
1951
self.assertEqual('bar', opt.get_default())
1953
def test_callable_default_value(self):
1954
def bar_as_unicode():
1956
opt = config.Option('foo', default=bar_as_unicode)
1957
self.assertEqual('bar', opt.get_default())
1959
def test_default_value_from_env(self):
1960
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
1961
self.overrideEnv('FOO', 'quux')
1962
# Env variable provides a default taking over the option one
1963
self.assertEqual('quux', opt.get_default())
1965
def test_first_default_value_from_env_wins(self):
1966
opt = config.Option('foo', default='bar',
1967
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
1968
self.overrideEnv('FOO', 'foo')
1969
self.overrideEnv('BAZ', 'baz')
1970
# The first env var set wins
1971
self.assertEqual('foo', opt.get_default())
1973
def test_not_supported_list_default_value(self):
1974
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
1976
def test_not_supported_object_default_value(self):
1977
self.assertRaises(AssertionError, config.Option, 'foo',
1980
def test_not_supported_callable_default_value_not_unicode(self):
1981
def bar_not_unicode():
1983
opt = config.Option('foo', default=bar_not_unicode)
1984
self.assertRaises(AssertionError, opt.get_default)
1986
def test_get_help_topic(self):
1987
opt = config.Option('foo')
1988
self.assertEqual('foo', opt.get_help_topic())
1991
class TestOptionConverter(tests.TestCase):
1993
def assertConverted(self, expected, opt, value):
1994
self.assertEqual(expected, opt.convert_from_unicode(None, value))
1996
def assertCallsWarning(self, opt, value):
2000
warnings.append(args[0] % args[1:])
2001
self.overrideAttr(trace, 'warning', warning)
2002
self.assertEqual(None, opt.convert_from_unicode(None, value))
2003
self.assertLength(1, warnings)
2005
'Value "%s" is not valid for "%s"' % (value, opt.name),
2008
def assertCallsError(self, opt, value):
2009
self.assertRaises(config.ConfigOptionValueError,
2010
opt.convert_from_unicode, None, value)
2012
def assertConvertInvalid(self, opt, invalid_value):
2014
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2015
opt.invalid = 'warning'
2016
self.assertCallsWarning(opt, invalid_value)
2017
opt.invalid = 'error'
2018
self.assertCallsError(opt, invalid_value)
2021
class TestOptionWithBooleanConverter(TestOptionConverter):
2023
def get_option(self):
2024
return config.Option('foo', help='A boolean.',
2025
from_unicode=config.bool_from_store)
2027
def test_convert_invalid(self):
2028
opt = self.get_option()
2029
# A string that is not recognized as a boolean
2030
self.assertConvertInvalid(opt, u'invalid-boolean')
2031
# A list of strings is never recognized as a boolean
2032
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2034
def test_convert_valid(self):
2035
opt = self.get_option()
2036
self.assertConverted(True, opt, u'True')
2037
self.assertConverted(True, opt, u'1')
2038
self.assertConverted(False, opt, u'False')
2041
class TestOptionWithIntegerConverter(TestOptionConverter):
2043
def get_option(self):
2044
return config.Option('foo', help='An integer.',
2045
from_unicode=config.int_from_store)
2047
def test_convert_invalid(self):
2048
opt = self.get_option()
2049
# A string that is not recognized as an integer
2050
self.assertConvertInvalid(opt, u'forty-two')
2051
# A list of strings is never recognized as an integer
2052
self.assertConvertInvalid(opt, [u'a', u'list'])
2054
def test_convert_valid(self):
2055
opt = self.get_option()
2056
self.assertConverted(16, opt, u'16')
2059
class TestOptionWithSIUnitConverter(TestOptionConverter):
2061
def get_option(self):
2062
return config.Option('foo', help='An integer in SI units.',
2063
from_unicode=config.int_SI_from_store)
2065
def test_convert_invalid(self):
2066
opt = self.get_option()
2067
self.assertConvertInvalid(opt, u'not-a-unit')
2068
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2069
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2070
self.assertConvertInvalid(opt, u'1GG')
2071
self.assertConvertInvalid(opt, u'1Mbb')
2072
self.assertConvertInvalid(opt, u'1MM')
2074
def test_convert_valid(self):
2075
opt = self.get_option()
2076
self.assertConverted(int(5e3), opt, u'5kb')
2077
self.assertConverted(int(5e6), opt, u'5M')
2078
self.assertConverted(int(5e6), opt, u'5MB')
2079
self.assertConverted(int(5e9), opt, u'5g')
2080
self.assertConverted(int(5e9), opt, u'5gB')
2081
self.assertConverted(100, opt, u'100')
2084
class TestListOption(TestOptionConverter):
2086
def get_option(self):
2087
return config.ListOption('foo', help='A list.')
2089
def test_convert_invalid(self):
2090
opt = self.get_option()
2091
# We don't even try to convert a list into a list, we only expect
2093
self.assertConvertInvalid(opt, [1])
2094
# No string is invalid as all forms can be converted to a list
2096
def test_convert_valid(self):
2097
opt = self.get_option()
2098
# An empty string is an empty list
2099
self.assertConverted([], opt, '') # Using a bare str() just in case
2100
self.assertConverted([], opt, u'')
2102
self.assertConverted([u'True'], opt, u'True')
2104
self.assertConverted([u'42'], opt, u'42')
2106
self.assertConverted([u'bar'], opt, u'bar')
2109
class TestRegistryOption(TestOptionConverter):
2111
def get_option(self, registry):
2112
return config.RegistryOption('foo', registry,
2113
help='A registry option.')
2115
def test_convert_invalid(self):
2116
registry = _mod_registry.Registry()
2117
opt = self.get_option(registry)
2118
self.assertConvertInvalid(opt, [1])
2119
self.assertConvertInvalid(opt, u"notregistered")
2121
def test_convert_valid(self):
2122
registry = _mod_registry.Registry()
2123
registry.register("someval", 1234)
2124
opt = self.get_option(registry)
2125
# Using a bare str() just in case
2126
self.assertConverted(1234, opt, "someval")
2127
self.assertConverted(1234, opt, u'someval')
2128
self.assertConverted(None, opt, None)
2130
def test_help(self):
2131
registry = _mod_registry.Registry()
2132
registry.register("someval", 1234, help="some option")
2133
registry.register("dunno", 1234, help="some other option")
2134
opt = self.get_option(registry)
2136
'A registry option.\n'
2138
'The following values are supported:\n'
2139
' dunno - some other option\n'
2140
' someval - some option\n',
2143
def test_get_help_text(self):
2144
registry = _mod_registry.Registry()
2145
registry.register("someval", 1234, help="some option")
2146
registry.register("dunno", 1234, help="some other option")
2147
opt = self.get_option(registry)
2149
'A registry option.\n'
2151
'The following values are supported:\n'
2152
' dunno - some other option\n'
2153
' someval - some option\n',
2154
opt.get_help_text())
2157
class TestOptionRegistry(tests.TestCase):
2160
super(TestOptionRegistry, self).setUp()
2161
# Always start with an empty registry
2162
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2163
self.registry = config.option_registry
2165
def test_register(self):
2166
opt = config.Option('foo')
2167
self.registry.register(opt)
2168
self.assertIs(opt, self.registry.get('foo'))
2170
def test_registered_help(self):
2171
opt = config.Option('foo', help='A simple option')
2172
self.registry.register(opt)
2173
self.assertEqual('A simple option', self.registry.get_help('foo'))
2175
def test_dont_register_illegal_name(self):
2176
self.assertRaises(config.IllegalOptionName,
2177
self.registry.register, config.Option(' foo'))
2178
self.assertRaises(config.IllegalOptionName,
2179
self.registry.register, config.Option('bar,'))
2181
lazy_option = config.Option('lazy_foo', help='Lazy help')
2183
def test_register_lazy(self):
2184
self.registry.register_lazy('lazy_foo', self.__module__,
2185
'TestOptionRegistry.lazy_option')
2186
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2188
def test_registered_lazy_help(self):
2189
self.registry.register_lazy('lazy_foo', self.__module__,
2190
'TestOptionRegistry.lazy_option')
2191
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2193
def test_dont_lazy_register_illegal_name(self):
2194
# This is where the root cause of http://pad.lv/1235099 is better
2195
# understood: 'register_lazy' doc string mentions that key should match
2196
# the option name which indirectly requires that the option name is a
2197
# valid python identifier. We violate that rule here (using a key that
2198
# doesn't match the option name) to test the option name checking.
2199
self.assertRaises(config.IllegalOptionName,
2200
self.registry.register_lazy, ' foo', self.__module__,
2201
'TestOptionRegistry.lazy_option')
2202
self.assertRaises(config.IllegalOptionName,
2203
self.registry.register_lazy, '1,2', self.__module__,
2204
'TestOptionRegistry.lazy_option')
2207
class TestRegisteredOptions(tests.TestCase):
2208
"""All registered options should verify some constraints."""
2210
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2211
in config.option_registry.iteritems()]
2214
super(TestRegisteredOptions, self).setUp()
2215
self.registry = config.option_registry
2217
def test_proper_name(self):
2218
# An option should be registered under its own name, this can't be
2219
# checked at registration time for the lazy ones.
2220
self.assertEqual(self.option_name, self.option.name)
2222
def test_help_is_set(self):
2223
option_help = self.registry.get_help(self.option_name)
2224
# Come on, think about the user, he really wants to know what the
2226
self.assertIsNot(None, option_help)
2227
self.assertNotEqual('', option_help)
2230
class TestSection(tests.TestCase):
2232
# FIXME: Parametrize so that all sections produced by Stores run these
2233
# tests -- vila 2011-04-01
2235
def test_get_a_value(self):
2236
a_dict = dict(foo='bar')
2237
section = config.Section('myID', a_dict)
2238
self.assertEqual('bar', section.get('foo'))
2240
def test_get_unknown_option(self):
2242
section = config.Section(None, a_dict)
2243
self.assertEqual('out of thin air',
2244
section.get('foo', 'out of thin air'))
2246
def test_options_is_shared(self):
2248
section = config.Section(None, a_dict)
2249
self.assertIs(a_dict, section.options)
2252
class TestMutableSection(tests.TestCase):
2254
scenarios = [('mutable',
2256
lambda opts: config.MutableSection('myID', opts)},),
2260
a_dict = dict(foo='bar')
2261
section = self.get_section(a_dict)
2262
section.set('foo', 'new_value')
2263
self.assertEqual('new_value', section.get('foo'))
2264
# The change appears in the shared section
2265
self.assertEqual('new_value', a_dict.get('foo'))
2266
# We keep track of the change
2267
self.assertTrue('foo' in section.orig)
2268
self.assertEqual('bar', section.orig.get('foo'))
2270
def test_set_preserve_original_once(self):
2271
a_dict = dict(foo='bar')
2272
section = self.get_section(a_dict)
2273
section.set('foo', 'first_value')
2274
section.set('foo', 'second_value')
2275
# We keep track of the original value
2276
self.assertTrue('foo' in section.orig)
2277
self.assertEqual('bar', section.orig.get('foo'))
2279
def test_remove(self):
2280
a_dict = dict(foo='bar')
2281
section = self.get_section(a_dict)
2282
section.remove('foo')
2283
# We get None for unknown options via the default value
2284
self.assertEqual(None, section.get('foo'))
2285
# Or we just get the default value
2286
self.assertEqual('unknown', section.get('foo', 'unknown'))
2287
self.assertFalse('foo' in section.options)
2288
# We keep track of the deletion
2289
self.assertTrue('foo' in section.orig)
2290
self.assertEqual('bar', section.orig.get('foo'))
2292
def test_remove_new_option(self):
2294
section = self.get_section(a_dict)
2295
section.set('foo', 'bar')
2296
section.remove('foo')
2297
self.assertFalse('foo' in section.options)
2298
# The option didn't exist initially so it we need to keep track of it
2299
# with a special value
2300
self.assertTrue('foo' in section.orig)
2301
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2304
class TestCommandLineStore(tests.TestCase):
2307
super(TestCommandLineStore, self).setUp()
2308
self.store = config.CommandLineStore()
2309
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2311
def get_section(self):
2312
"""Get the unique section for the command line overrides."""
2313
sections = list(self.store.get_sections())
2314
self.assertLength(1, sections)
2315
store, section = sections[0]
2316
self.assertEqual(self.store, store)
2319
def test_no_override(self):
2320
self.store._from_cmdline([])
2321
section = self.get_section()
2322
self.assertLength(0, list(section.iter_option_names()))
2324
def test_simple_override(self):
2325
self.store._from_cmdline(['a=b'])
2326
section = self.get_section()
2327
self.assertEqual('b', section.get('a'))
2329
def test_list_override(self):
2330
opt = config.ListOption('l')
2331
config.option_registry.register(opt)
2332
self.store._from_cmdline(['l=1,2,3'])
2333
val = self.get_section().get('l')
2334
self.assertEqual('1,2,3', val)
2335
# Reminder: lists should be registered as such explicitely, otherwise
2336
# the conversion needs to be done afterwards.
2337
self.assertEqual(['1', '2', '3'],
2338
opt.convert_from_unicode(self.store, val))
2340
def test_multiple_overrides(self):
2341
self.store._from_cmdline(['a=b', 'x=y'])
2342
section = self.get_section()
2343
self.assertEqual('b', section.get('a'))
2344
self.assertEqual('y', section.get('x'))
2346
def test_wrong_syntax(self):
2347
self.assertRaises(errors.BzrCommandError,
2348
self.store._from_cmdline, ['a=b', 'c'])
2351
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2353
scenarios = [(key, {'get_store': builder}) for key, builder
2354
in config.test_store_builder_registry.iteritems()] + [
2355
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2358
store = self.get_store(self)
2359
if isinstance(store, config.TransportIniFileStore):
2360
raise tests.TestNotApplicable(
2361
"%s is not a concrete Store implementation"
2362
" so it doesn't need an id" % (store.__class__.__name__,))
2363
self.assertIsNot(None, store.id)
2366
class TestStore(tests.TestCaseWithTransport):
2368
def assertSectionContent(self, expected, store_and_section):
2369
"""Assert that some options have the proper values in a section."""
2370
_, section = store_and_section
2371
expected_name, expected_options = expected
2372
self.assertEqual(expected_name, section.id)
2375
dict([(k, section.get(k)) for k in expected_options.keys()]))
2378
class TestReadonlyStore(TestStore):
2380
scenarios = [(key, {'get_store': builder}) for key, builder
2381
in config.test_store_builder_registry.iteritems()]
2383
def test_building_delays_load(self):
2384
store = self.get_store(self)
2385
self.assertEqual(False, store.is_loaded())
2386
store._load_from_string(b'')
2387
self.assertEqual(True, store.is_loaded())
2389
def test_get_no_sections_for_empty(self):
2390
store = self.get_store(self)
2391
store._load_from_string(b'')
2392
self.assertEqual([], list(store.get_sections()))
2394
def test_get_default_section(self):
2395
store = self.get_store(self)
2396
store._load_from_string(b'foo=bar')
2397
sections = list(store.get_sections())
2398
self.assertLength(1, sections)
2399
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2401
def test_get_named_section(self):
2402
store = self.get_store(self)
2403
store._load_from_string(b'[baz]\nfoo=bar')
2404
sections = list(store.get_sections())
2405
self.assertLength(1, sections)
2406
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2408
def test_load_from_string_fails_for_non_empty_store(self):
2409
store = self.get_store(self)
2410
store._load_from_string(b'foo=bar')
2411
self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
2414
class TestStoreQuoting(TestStore):
2416
scenarios = [(key, {'get_store': builder}) for key, builder
2417
in config.test_store_builder_registry.iteritems()]
2420
super(TestStoreQuoting, self).setUp()
2421
self.store = self.get_store(self)
2422
# We need a loaded store but any content will do
2423
self.store._load_from_string(b'')
2425
def assertIdempotent(self, s):
2426
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2428
What matters here is that option values, as they appear in a store, can
2429
be safely round-tripped out of the store and back.
2431
:param s: A string, quoted if required.
2433
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2434
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2436
def test_empty_string(self):
2437
if isinstance(self.store, config.IniFileStore):
2438
# configobj._quote doesn't handle empty values
2439
self.assertRaises(AssertionError,
2440
self.assertIdempotent, '')
2442
self.assertIdempotent('')
2443
# But quoted empty strings are ok
2444
self.assertIdempotent('""')
2446
def test_embedded_spaces(self):
2447
self.assertIdempotent('" a b c "')
2449
def test_embedded_commas(self):
2450
self.assertIdempotent('" a , b c "')
2452
def test_simple_comma(self):
2453
if isinstance(self.store, config.IniFileStore):
2454
# configobj requires that lists are special-cased
2455
self.assertRaises(AssertionError,
2456
self.assertIdempotent, ',')
2458
self.assertIdempotent(',')
2459
# When a single comma is required, quoting is also required
2460
self.assertIdempotent('","')
2462
def test_list(self):
2463
if isinstance(self.store, config.IniFileStore):
2464
# configobj requires that lists are special-cased
2465
self.assertRaises(AssertionError,
2466
self.assertIdempotent, 'a,b')
2468
self.assertIdempotent('a,b')
2471
class TestDictFromStore(tests.TestCase):
2473
def test_unquote_not_string(self):
2474
conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
2475
value = conf.get('a_section')
2476
# Urgh, despite 'conf' asking for the no-name section, we get the
2477
# content of another section as a dict o_O
2478
self.assertEqual({'a': '1'}, value)
2479
unquoted = conf.store.unquote(value)
2480
# Which cannot be unquoted but shouldn't crash either (the use cases
2481
# are getting the value or displaying it. In the later case, '%s' will
2483
self.assertEqual({'a': '1'}, unquoted)
2484
self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
2487
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2488
"""Simulate loading a config store with content of various encodings.
2490
All files produced by bzr are in utf8 content.
2492
Users may modify them manually and end up with a file that can't be
2493
loaded. We need to issue proper error messages in this case.
2496
invalid_utf8_char = b'\xff'
2498
def test_load_utf8(self):
2499
"""Ensure we can load an utf8-encoded file."""
2500
t = self.get_transport()
2501
# From http://pad.lv/799212
2502
unicode_user = u'b\N{Euro Sign}ar'
2503
unicode_content = u'user=%s' % (unicode_user,)
2504
utf8_content = unicode_content.encode('utf8')
2505
# Store the raw content in the config file
2506
t.put_bytes('foo.conf', utf8_content)
2507
store = config.TransportIniFileStore(t, 'foo.conf')
2509
stack = config.Stack([store.get_sections], store)
2510
self.assertEqual(unicode_user, stack.get('user'))
2512
def test_load_non_ascii(self):
2513
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2514
t = self.get_transport()
2515
t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2516
store = config.TransportIniFileStore(t, 'foo.conf')
2517
self.assertRaises(config.ConfigContentError, store.load)
2519
def test_load_erroneous_content(self):
2520
"""Ensure we display a proper error on content that can't be parsed."""
2521
t = self.get_transport()
2522
t.put_bytes('foo.conf', b'[open_section\n')
2523
store = config.TransportIniFileStore(t, 'foo.conf')
2524
self.assertRaises(config.ParseConfigError, store.load)
2526
def test_load_permission_denied(self):
2527
"""Ensure we get warned when trying to load an inaccessible file."""
2531
warnings.append(args[0] % args[1:])
2532
self.overrideAttr(trace, 'warning', warning)
2534
t = self.get_transport()
2536
def get_bytes(relpath):
2537
raise errors.PermissionDenied(relpath, "")
2538
t.get_bytes = get_bytes
2539
store = config.TransportIniFileStore(t, 'foo.conf')
2540
self.assertRaises(errors.PermissionDenied, store.load)
2543
[u'Permission denied while trying to load configuration store %s.'
2544
% store.external_url()])
2547
class TestIniConfigContent(tests.TestCaseWithTransport):
2548
"""Simulate loading a IniBasedConfig with content of various encodings.
2550
All files produced by bzr are in utf8 content.
2552
Users may modify them manually and end up with a file that can't be
2553
loaded. We need to issue proper error messages in this case.
2556
invalid_utf8_char = b'\xff'
2558
def test_load_utf8(self):
2559
"""Ensure we can load an utf8-encoded file."""
2560
# From http://pad.lv/799212
2561
unicode_user = u'b\N{Euro Sign}ar'
2562
unicode_content = u'user=%s' % (unicode_user,)
2563
utf8_content = unicode_content.encode('utf8')
2564
# Store the raw content in the config file
2565
with open('foo.conf', 'wb') as f:
2566
f.write(utf8_content)
2567
conf = config.IniBasedConfig(file_name='foo.conf')
2568
self.assertEqual(unicode_user, conf.get_user_option('user'))
2570
def test_load_badly_encoded_content(self):
2571
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2572
with open('foo.conf', 'wb') as f:
2573
f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2574
conf = config.IniBasedConfig(file_name='foo.conf')
2575
self.assertRaises(config.ConfigContentError, conf._get_parser)
2577
def test_load_erroneous_content(self):
2578
"""Ensure we display a proper error on content that can't be parsed."""
2579
with open('foo.conf', 'wb') as f:
2580
f.write(b'[open_section\n')
2581
conf = config.IniBasedConfig(file_name='foo.conf')
2582
self.assertRaises(config.ParseConfigError, conf._get_parser)
2585
class TestMutableStore(TestStore):
2587
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2588
in config.test_store_builder_registry.iteritems()]
2591
super(TestMutableStore, self).setUp()
2592
self.transport = self.get_transport()
2594
def has_store(self, store):
2595
store_basename = urlutils.relative_url(self.transport.external_url(),
2596
store.external_url())
2597
return self.transport.has(store_basename)
2599
def test_save_empty_creates_no_file(self):
2600
# FIXME: There should be a better way than relying on the test
2601
# parametrization to identify branch.conf -- vila 2011-0526
2602
if self.store_id in ('branch', 'remote_branch'):
2603
raise tests.TestNotApplicable(
2604
'branch.conf is *always* created when a branch is initialized')
2605
store = self.get_store(self)
2607
self.assertEqual(False, self.has_store(store))
2609
def test_mutable_section_shared(self):
2610
store = self.get_store(self)
2611
store._load_from_string(b'foo=bar\n')
2612
# FIXME: There should be a better way than relying on the test
2613
# parametrization to identify branch.conf -- vila 2011-0526
2614
if self.store_id in ('branch', 'remote_branch'):
2615
# branch stores requires write locked branches
2616
self.addCleanup(store.branch.lock_write().unlock)
2617
section1 = store.get_mutable_section(None)
2618
section2 = store.get_mutable_section(None)
2619
# If we get different sections, different callers won't share the
2621
self.assertIs(section1, section2)
2623
def test_save_emptied_succeeds(self):
2624
store = self.get_store(self)
2625
store._load_from_string(b'foo=bar\n')
2626
# FIXME: There should be a better way than relying on the test
2627
# parametrization to identify branch.conf -- vila 2011-0526
2628
if self.store_id in ('branch', 'remote_branch'):
2629
# branch stores requires write locked branches
2630
self.addCleanup(store.branch.lock_write().unlock)
2631
section = store.get_mutable_section(None)
2632
section.remove('foo')
2634
self.assertEqual(True, self.has_store(store))
2635
modified_store = self.get_store(self)
2636
sections = list(modified_store.get_sections())
2637
self.assertLength(0, sections)
2639
def test_save_with_content_succeeds(self):
2640
# FIXME: There should be a better way than relying on the test
2641
# parametrization to identify branch.conf -- vila 2011-0526
2642
if self.store_id in ('branch', 'remote_branch'):
2643
raise tests.TestNotApplicable(
2644
'branch.conf is *always* created when a branch is initialized')
2645
store = self.get_store(self)
2646
store._load_from_string(b'foo=bar\n')
2647
self.assertEqual(False, self.has_store(store))
2649
self.assertEqual(True, self.has_store(store))
2650
modified_store = self.get_store(self)
2651
sections = list(modified_store.get_sections())
2652
self.assertLength(1, sections)
2653
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2655
def test_set_option_in_empty_store(self):
2656
store = self.get_store(self)
2657
# FIXME: There should be a better way than relying on the test
2658
# parametrization to identify branch.conf -- vila 2011-0526
2659
if self.store_id in ('branch', 'remote_branch'):
2660
# branch stores requires write locked branches
2661
self.addCleanup(store.branch.lock_write().unlock)
2662
section = store.get_mutable_section(None)
2663
section.set('foo', 'bar')
2665
modified_store = self.get_store(self)
2666
sections = list(modified_store.get_sections())
2667
self.assertLength(1, sections)
2668
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2670
def test_set_option_in_default_section(self):
2671
store = self.get_store(self)
2672
store._load_from_string(b'')
2673
# FIXME: There should be a better way than relying on the test
2674
# parametrization to identify branch.conf -- vila 2011-0526
2675
if self.store_id in ('branch', 'remote_branch'):
2676
# branch stores requires write locked branches
2677
self.addCleanup(store.branch.lock_write().unlock)
2678
section = store.get_mutable_section(None)
2679
section.set('foo', 'bar')
2681
modified_store = self.get_store(self)
2682
sections = list(modified_store.get_sections())
2683
self.assertLength(1, sections)
2684
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2686
def test_set_option_in_named_section(self):
2687
store = self.get_store(self)
2688
store._load_from_string(b'')
2689
# FIXME: There should be a better way than relying on the test
2690
# parametrization to identify branch.conf -- vila 2011-0526
2691
if self.store_id in ('branch', 'remote_branch'):
2692
# branch stores requires write locked branches
2693
self.addCleanup(store.branch.lock_write().unlock)
2694
section = store.get_mutable_section('baz')
2695
section.set('foo', 'bar')
2697
modified_store = self.get_store(self)
2698
sections = list(modified_store.get_sections())
2699
self.assertLength(1, sections)
2700
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2702
def test_load_hook(self):
2703
# First, we need to ensure that the store exists
2704
store = self.get_store(self)
2705
# FIXME: There should be a better way than relying on the test
2706
# parametrization to identify branch.conf -- vila 2011-0526
2707
if self.store_id in ('branch', 'remote_branch'):
2708
# branch stores requires write locked branches
2709
self.addCleanup(store.branch.lock_write().unlock)
2710
section = store.get_mutable_section('baz')
2711
section.set('foo', 'bar')
2713
# Now we can try to load it
2714
store = self.get_store(self)
2719
config.ConfigHooks.install_named_hook('load', hook, None)
2720
self.assertLength(0, calls)
2722
self.assertLength(1, calls)
2723
self.assertEqual((store,), calls[0])
2725
def test_save_hook(self):
2730
config.ConfigHooks.install_named_hook('save', hook, None)
2731
self.assertLength(0, calls)
2732
store = self.get_store(self)
2733
# FIXME: There should be a better way than relying on the test
2734
# parametrization to identify branch.conf -- vila 2011-0526
2735
if self.store_id in ('branch', 'remote_branch'):
2736
# branch stores requires write locked branches
2737
self.addCleanup(store.branch.lock_write().unlock)
2738
section = store.get_mutable_section('baz')
2739
section.set('foo', 'bar')
2741
self.assertLength(1, calls)
2742
self.assertEqual((store,), calls[0])
2744
def test_set_mark_dirty(self):
2745
stack = config.MemoryStack(b'')
2746
self.assertLength(0, stack.store.dirty_sections)
2747
stack.set('foo', 'baz')
2748
self.assertLength(1, stack.store.dirty_sections)
2749
self.assertTrue(stack.store._need_saving())
2751
def test_remove_mark_dirty(self):
2752
stack = config.MemoryStack(b'foo=bar')
2753
self.assertLength(0, stack.store.dirty_sections)
2755
self.assertLength(1, stack.store.dirty_sections)
2756
self.assertTrue(stack.store._need_saving())
2759
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2760
"""Tests that config changes are kept in memory and saved on-demand."""
2763
super(TestStoreSaveChanges, self).setUp()
2764
self.transport = self.get_transport()
2765
# Most of the tests involve two stores pointing to the same persistent
2766
# storage to observe the effects of concurrent changes
2767
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2768
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2772
self.warnings.append(args[0] % args[1:])
2773
self.overrideAttr(trace, 'warning', warning)
2775
def has_store(self, store):
2776
store_basename = urlutils.relative_url(self.transport.external_url(),
2777
store.external_url())
2778
return self.transport.has(store_basename)
2780
def get_stack(self, store):
2781
# Any stack will do as long as it uses the right store, just a single
2782
# no-name section is enough
2783
return config.Stack([store.get_sections], store)
2785
def test_no_changes_no_save(self):
2786
s = self.get_stack(self.st1)
2787
s.store.save_changes()
2788
self.assertEqual(False, self.has_store(self.st1))
2790
def test_unrelated_concurrent_update(self):
2791
s1 = self.get_stack(self.st1)
2792
s2 = self.get_stack(self.st2)
2793
s1.set('foo', 'bar')
2794
s2.set('baz', 'quux')
2796
# Changes don't propagate magically
2797
self.assertEqual(None, s1.get('baz'))
2798
s2.store.save_changes()
2799
self.assertEqual('quux', s2.get('baz'))
2800
# Changes are acquired when saving
2801
self.assertEqual('bar', s2.get('foo'))
2802
# Since there is no overlap, no warnings are emitted
2803
self.assertLength(0, self.warnings)
2805
def test_concurrent_update_modified(self):
2806
s1 = self.get_stack(self.st1)
2807
s2 = self.get_stack(self.st2)
2808
s1.set('foo', 'bar')
2809
s2.set('foo', 'baz')
2812
s2.store.save_changes()
2813
self.assertEqual('baz', s2.get('foo'))
2814
# But the user get a warning
2815
self.assertLength(1, self.warnings)
2816
warning = self.warnings[0]
2817
self.assertStartsWith(warning, 'Option foo in section None')
2818
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2819
' The baz value will be saved.')
2821
def test_concurrent_deletion(self):
2822
self.st1._load_from_string(b'foo=bar')
2824
s1 = self.get_stack(self.st1)
2825
s2 = self.get_stack(self.st2)
2828
s1.store.save_changes()
2830
self.assertLength(0, self.warnings)
2831
s2.store.save_changes()
2833
self.assertLength(1, self.warnings)
2834
warning = self.warnings[0]
2835
self.assertStartsWith(warning, 'Option foo in section None')
2836
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2837
' The <DELETED> value will be saved.')
2840
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2842
def get_store(self):
2843
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2845
def test_get_quoted_string(self):
2846
store = self.get_store()
2847
store._load_from_string(b'foo= " abc "')
2848
stack = config.Stack([store.get_sections])
2849
self.assertEqual(' abc ', stack.get('foo'))
2851
def test_set_quoted_string(self):
2852
store = self.get_store()
2853
stack = config.Stack([store.get_sections], store)
2854
stack.set('foo', ' a b c ')
2856
self.assertFileEqual(b'foo = " a b c "' +
2857
os.linesep.encode('ascii'), 'foo.conf')
2860
class TestTransportIniFileStore(TestStore):
2862
def test_loading_unknown_file_fails(self):
2863
store = config.TransportIniFileStore(self.get_transport(),
2865
self.assertRaises(errors.NoSuchFile, store.load)
2867
def test_invalid_content(self):
2868
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2869
self.assertEqual(False, store.is_loaded())
2870
exc = self.assertRaises(
2871
config.ParseConfigError, store._load_from_string,
2872
b'this is invalid !')
2873
self.assertEndsWith(exc.filename, 'foo.conf')
2874
# And the load failed
2875
self.assertEqual(False, store.is_loaded())
2877
def test_get_embedded_sections(self):
2878
# A more complicated example (which also shows that section names and
2879
# option names share the same name space...)
2880
# FIXME: This should be fixed by forbidding dicts as values ?
2881
# -- vila 2011-04-05
2882
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2883
store._load_from_string(b'''
2887
foo_in_DEFAULT=foo_DEFAULT
2895
sections = list(store.get_sections())
2896
self.assertLength(4, sections)
2897
# The default section has no name.
2898
# List values are provided as strings and need to be explicitly
2899
# converted by specifying from_unicode=list_from_store at option
2901
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2903
self.assertSectionContent(
2904
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2905
self.assertSectionContent(
2906
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2907
# sub sections are provided as embedded dicts.
2908
self.assertSectionContent(
2909
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2913
class TestLockableIniFileStore(TestStore):
2915
def test_create_store_in_created_dir(self):
2916
self.assertPathDoesNotExist('dir')
2917
t = self.get_transport('dir/subdir')
2918
store = config.LockableIniFileStore(t, 'foo.conf')
2919
store.get_mutable_section(None).set('foo', 'bar')
2921
self.assertPathExists('dir/subdir')
2924
class TestConcurrentStoreUpdates(TestStore):
2925
"""Test that Stores properly handle conccurent updates.
2927
New Store implementation may fail some of these tests but until such
2928
implementations exist it's hard to properly filter them from the scenarios
2929
applied here. If you encounter such a case, contact the bzr devs.
2932
scenarios = [(key, {'get_stack': builder}) for key, builder
2933
in config.test_stack_builder_registry.iteritems()]
2936
super(TestConcurrentStoreUpdates, self).setUp()
2937
self.stack = self.get_stack(self)
2938
if not isinstance(self.stack, config._CompatibleStack):
2939
raise tests.TestNotApplicable(
2940
'%s is not meant to be compatible with the old config design'
2942
self.stack.set('one', '1')
2943
self.stack.set('two', '2')
2945
self.stack.store.save()
2947
def test_simple_read_access(self):
2948
self.assertEqual('1', self.stack.get('one'))
2950
def test_simple_write_access(self):
2951
self.stack.set('one', 'one')
2952
self.assertEqual('one', self.stack.get('one'))
2954
def test_listen_to_the_last_speaker(self):
2956
c2 = self.get_stack(self)
2957
c1.set('one', 'ONE')
2958
c2.set('two', 'TWO')
2959
self.assertEqual('ONE', c1.get('one'))
2960
self.assertEqual('TWO', c2.get('two'))
2961
# The second update respect the first one
2962
self.assertEqual('ONE', c2.get('one'))
2964
def test_last_speaker_wins(self):
2965
# If the same config is not shared, the same variable modified twice
2966
# can only see a single result.
2968
c2 = self.get_stack(self)
2971
self.assertEqual('c2', c2.get('one'))
2972
# The first modification is still available until another refresh
2974
self.assertEqual('c1', c1.get('one'))
2975
c1.set('two', 'done')
2976
self.assertEqual('c2', c1.get('one'))
2978
def test_writes_are_serialized(self):
2980
c2 = self.get_stack(self)
2982
# We spawn a thread that will pause *during* the config saving.
2983
before_writing = threading.Event()
2984
after_writing = threading.Event()
2985
writing_done = threading.Event()
2986
c1_save_without_locking_orig = c1.store.save_without_locking
2988
def c1_save_without_locking():
2989
before_writing.set()
2990
c1_save_without_locking_orig()
2991
# The lock is held. We wait for the main thread to decide when to
2993
after_writing.wait()
2994
c1.store.save_without_locking = c1_save_without_locking
2999
t1 = threading.Thread(target=c1_set)
3000
# Collect the thread after the test
3001
self.addCleanup(t1.join)
3002
# Be ready to unblock the thread if the test goes wrong
3003
self.addCleanup(after_writing.set)
3005
before_writing.wait()
3006
self.assertRaises(errors.LockContention,
3007
c2.set, 'one', 'c2')
3008
self.assertEqual('c1', c1.get('one'))
3009
# Let the lock be released
3013
self.assertEqual('c2', c2.get('one'))
3015
def test_read_while_writing(self):
3017
# We spawn a thread that will pause *during* the write
3018
ready_to_write = threading.Event()
3019
do_writing = threading.Event()
3020
writing_done = threading.Event()
3021
# We override the _save implementation so we know the store is locked
3022
c1_save_without_locking_orig = c1.store.save_without_locking
3024
def c1_save_without_locking():
3025
ready_to_write.set()
3026
# The lock is held. We wait for the main thread to decide when to
3029
c1_save_without_locking_orig()
3031
c1.store.save_without_locking = c1_save_without_locking
3035
t1 = threading.Thread(target=c1_set)
3036
# Collect the thread after the test
3037
self.addCleanup(t1.join)
3038
# Be ready to unblock the thread if the test goes wrong
3039
self.addCleanup(do_writing.set)
3041
# Ensure the thread is ready to write
3042
ready_to_write.wait()
3043
self.assertEqual('c1', c1.get('one'))
3044
# If we read during the write, we get the old value
3045
c2 = self.get_stack(self)
3046
self.assertEqual('1', c2.get('one'))
3047
# Let the writing occur and ensure it occurred
3050
# Now we get the updated value
3051
c3 = self.get_stack(self)
3052
self.assertEqual('c1', c3.get('one'))
3054
# FIXME: It may be worth looking into removing the lock dir when it's not
3055
# needed anymore and look at possible fallouts for concurrent lockers. This
3056
# will matter if/when we use config files outside of breezy directories
3057
# (.config/breezy or .bzr) -- vila 20110-04-111
3060
class TestSectionMatcher(TestStore):
3062
scenarios = [('location', {'matcher': config.LocationMatcher}),
3063
('id', {'matcher': config.NameMatcher}), ]
3066
super(TestSectionMatcher, self).setUp()
3067
# Any simple store is good enough
3068
self.get_store = config.test_store_builder_registry.get('configobj')
3070
def test_no_matches_for_empty_stores(self):
3071
store = self.get_store(self)
3072
store._load_from_string(b'')
3073
matcher = self.matcher(store, '/bar')
3074
self.assertEqual([], list(matcher.get_sections()))
3076
def test_build_doesnt_load_store(self):
3077
store = self.get_store(self)
3078
self.matcher(store, '/bar')
3079
self.assertFalse(store.is_loaded())
3082
class TestLocationSection(tests.TestCase):
3084
def get_section(self, options, extra_path):
3085
section = config.Section('foo', options)
3086
return config.LocationSection(section, extra_path)
3088
def test_simple_option(self):
3089
section = self.get_section({'foo': 'bar'}, '')
3090
self.assertEqual('bar', section.get('foo'))
3092
def test_option_with_extra_path(self):
3093
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3095
self.assertEqual('bar/baz', section.get('foo'))
3097
def test_invalid_policy(self):
3098
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3100
# invalid policies are ignored
3101
self.assertEqual('bar', section.get('foo'))
3104
class TestLocationMatcher(TestStore):
3107
super(TestLocationMatcher, self).setUp()
3108
# Any simple store is good enough
3109
self.get_store = config.test_store_builder_registry.get('configobj')
3111
def test_unrelated_section_excluded(self):
3112
store = self.get_store(self)
3113
store._load_from_string(b'''
3121
section=/foo/bar/baz
3125
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3127
[section.id for _, section in store.get_sections()])
3128
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3129
sections = [section for _, section in matcher.get_sections()]
3130
self.assertEqual(['/foo/bar', '/foo'],
3131
[section.id for section in sections])
3132
self.assertEqual(['quux', 'bar/quux'],
3133
[section.extra_path for section in sections])
3135
def test_more_specific_sections_first(self):
3136
store = self.get_store(self)
3137
store._load_from_string(b'''
3143
self.assertEqual(['/foo', '/foo/bar'],
3144
[section.id for _, section in store.get_sections()])
3145
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3146
sections = [section for _, section in matcher.get_sections()]
3147
self.assertEqual(['/foo/bar', '/foo'],
3148
[section.id for section in sections])
3149
self.assertEqual(['baz', 'bar/baz'],
3150
[section.extra_path for section in sections])
3152
def test_appendpath_in_no_name_section(self):
3153
# It's a bit weird to allow appendpath in a no-name section, but
3154
# someone may found a use for it
3155
store = self.get_store(self)
3156
store._load_from_string(b'''
3158
foo:policy = appendpath
3160
matcher = config.LocationMatcher(store, 'dir/subdir')
3161
sections = list(matcher.get_sections())
3162
self.assertLength(1, sections)
3163
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3165
def test_file_urls_are_normalized(self):
3166
store = self.get_store(self)
3167
if sys.platform == 'win32':
3168
expected_url = 'file:///C:/dir/subdir'
3169
expected_location = 'C:/dir/subdir'
3171
expected_url = 'file:///dir/subdir'
3172
expected_location = '/dir/subdir'
3173
matcher = config.LocationMatcher(store, expected_url)
3174
self.assertEqual(expected_location, matcher.location)
3176
def test_branch_name_colo(self):
3177
store = self.get_store(self)
3178
store._load_from_string(dedent("""\
3180
push_location=my{branchname}
3181
""").encode('ascii'))
3182
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3183
self.assertEqual('example<', matcher.branch_name)
3184
((_, section),) = matcher.get_sections()
3185
self.assertEqual('example<', section.locals['branchname'])
3187
def test_branch_name_basename(self):
3188
store = self.get_store(self)
3189
store._load_from_string(dedent("""\
3191
push_location=my{branchname}
3192
""").encode('ascii'))
3193
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3194
self.assertEqual('example<', matcher.branch_name)
3195
((_, section),) = matcher.get_sections()
3196
self.assertEqual('example<', section.locals['branchname'])
3199
class TestStartingPathMatcher(TestStore):
3202
super(TestStartingPathMatcher, self).setUp()
3203
# Any simple store is good enough
3204
self.store = config.IniFileStore()
3206
def assertSectionIDs(self, expected, location, content):
3207
self.store._load_from_string(content)
3208
matcher = config.StartingPathMatcher(self.store, location)
3209
sections = list(matcher.get_sections())
3210
self.assertLength(len(expected), sections)
3211
self.assertEqual(expected, [section.id for _, section in sections])
3214
def test_empty(self):
3215
self.assertSectionIDs([], self.get_url(), b'')
3217
def test_url_vs_local_paths(self):
3218
# The matcher location is an url and the section names are local paths
3219
self.assertSectionIDs(['/foo/bar', '/foo'],
3220
'file:///foo/bar/baz', b'''\
3225
def test_local_path_vs_url(self):
3226
# The matcher location is a local path and the section names are urls
3227
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3228
'/foo/bar/baz', b'''\
3233
def test_no_name_section_included_when_present(self):
3234
# Note that other tests will cover the case where the no-name section
3235
# is empty and as such, not included.
3236
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3237
'/foo/bar/baz', b'''\
3238
option = defined so the no-name section exists
3242
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3243
[s.locals['relpath'] for _, s in sections])
3245
def test_order_reversed(self):
3246
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3251
def test_unrelated_section_excluded(self):
3252
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3258
def test_glob_included(self):
3259
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3260
'/foo/bar/baz', b'''\
3266
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3267
# nothing really is... as far using {relpath} to append it to something
3268
# else, this seems good enough though.
3269
self.assertEqual(['', 'baz', 'bar/baz'],
3270
[s.locals['relpath'] for _, s in sections])
3272
def test_respect_order(self):
3273
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3274
'/foo/bar/baz', b'''\
3282
class TestNameMatcher(TestStore):
3285
super(TestNameMatcher, self).setUp()
3286
self.matcher = config.NameMatcher
3287
# Any simple store is good enough
3288
self.get_store = config.test_store_builder_registry.get('configobj')
3290
def get_matching_sections(self, name):
3291
store = self.get_store(self)
3292
store._load_from_string(b'''
3300
matcher = self.matcher(store, name)
3301
return list(matcher.get_sections())
3303
def test_matching(self):
3304
sections = self.get_matching_sections('foo')
3305
self.assertLength(1, sections)
3306
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3308
def test_not_matching(self):
3309
sections = self.get_matching_sections('baz')
3310
self.assertLength(0, sections)
3313
class TestBaseStackGet(tests.TestCase):
3316
super(TestBaseStackGet, self).setUp()
3317
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3319
def test_get_first_definition(self):
3320
store1 = config.IniFileStore()
3321
store1._load_from_string(b'foo=bar')
3322
store2 = config.IniFileStore()
3323
store2._load_from_string(b'foo=baz')
3324
conf = config.Stack([store1.get_sections, store2.get_sections])
3325
self.assertEqual('bar', conf.get('foo'))
3327
def test_get_with_registered_default_value(self):
3328
config.option_registry.register(config.Option('foo', default='bar'))
3329
conf_stack = config.Stack([])
3330
self.assertEqual('bar', conf_stack.get('foo'))
3332
def test_get_without_registered_default_value(self):
3333
config.option_registry.register(config.Option('foo'))
3334
conf_stack = config.Stack([])
3335
self.assertEqual(None, conf_stack.get('foo'))
3337
def test_get_without_default_value_for_not_registered(self):
3338
conf_stack = config.Stack([])
3339
self.assertEqual(None, conf_stack.get('foo'))
3341
def test_get_for_empty_section_callable(self):
3342
conf_stack = config.Stack([lambda: []])
3343
self.assertEqual(None, conf_stack.get('foo'))
3345
def test_get_for_broken_callable(self):
3346
# Trying to use and invalid callable raises an exception on first use
3347
conf_stack = config.Stack([object])
3348
self.assertRaises(TypeError, conf_stack.get, 'foo')
3351
class TestStackWithSimpleStore(tests.TestCase):
3354
super(TestStackWithSimpleStore, self).setUp()
3355
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3356
self.registry = config.option_registry
3358
def get_conf(self, content=None):
3359
return config.MemoryStack(content)
3361
def test_override_value_from_env(self):
3362
self.overrideEnv('FOO', None)
3363
self.registry.register(
3364
config.Option('foo', default='bar', override_from_env=['FOO']))
3365
self.overrideEnv('FOO', 'quux')
3366
# Env variable provides a default taking over the option one
3367
conf = self.get_conf(b'foo=store')
3368
self.assertEqual('quux', conf.get('foo'))
3370
def test_first_override_value_from_env_wins(self):
3371
self.overrideEnv('NO_VALUE', None)
3372
self.overrideEnv('FOO', None)
3373
self.overrideEnv('BAZ', None)
3374
self.registry.register(
3375
config.Option('foo', default='bar',
3376
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3377
self.overrideEnv('FOO', 'foo')
3378
self.overrideEnv('BAZ', 'baz')
3379
# The first env var set wins
3380
conf = self.get_conf(b'foo=store')
3381
self.assertEqual('foo', conf.get('foo'))
3384
class TestMemoryStack(tests.TestCase):
3387
conf = config.MemoryStack(b'foo=bar')
3388
self.assertEqual('bar', conf.get('foo'))
3391
conf = config.MemoryStack(b'foo=bar')
3392
conf.set('foo', 'baz')
3393
self.assertEqual('baz', conf.get('foo'))
3395
def test_no_content(self):
3396
conf = config.MemoryStack()
3397
# No content means no loading
3398
self.assertFalse(conf.store.is_loaded())
3399
self.assertRaises(NotImplementedError, conf.get, 'foo')
3400
# But a content can still be provided
3401
conf.store._load_from_string(b'foo=bar')
3402
self.assertEqual('bar', conf.get('foo'))
3405
class TestStackIterSections(tests.TestCase):
3407
def test_empty_stack(self):
3408
conf = config.Stack([])
3409
sections = list(conf.iter_sections())
3410
self.assertLength(0, sections)
3412
def test_empty_store(self):
3413
store = config.IniFileStore()
3414
store._load_from_string(b'')
3415
conf = config.Stack([store.get_sections])
3416
sections = list(conf.iter_sections())
3417
self.assertLength(0, sections)
3419
def test_simple_store(self):
3420
store = config.IniFileStore()
3421
store._load_from_string(b'foo=bar')
3422
conf = config.Stack([store.get_sections])
3423
tuples = list(conf.iter_sections())
3424
self.assertLength(1, tuples)
3425
(found_store, found_section) = tuples[0]
3426
self.assertIs(store, found_store)
3428
def test_two_stores(self):
3429
store1 = config.IniFileStore()
3430
store1._load_from_string(b'foo=bar')
3431
store2 = config.IniFileStore()
3432
store2._load_from_string(b'bar=qux')
3433
conf = config.Stack([store1.get_sections, store2.get_sections])
3434
tuples = list(conf.iter_sections())
3435
self.assertLength(2, tuples)
3436
self.assertIs(store1, tuples[0][0])
3437
self.assertIs(store2, tuples[1][0])
3440
class TestStackWithTransport(tests.TestCaseWithTransport):
3442
scenarios = [(key, {'get_stack': builder}) for key, builder
3443
in config.test_stack_builder_registry.iteritems()]
3446
class TestConcreteStacks(TestStackWithTransport):
3448
def test_build_stack(self):
3449
# Just a smoke test to help debug builders
3450
self.get_stack(self)
3453
class TestStackGet(TestStackWithTransport):
3456
super(TestStackGet, self).setUp()
3457
self.conf = self.get_stack(self)
3459
def test_get_for_empty_stack(self):
3460
self.assertEqual(None, self.conf.get('foo'))
3462
def test_get_hook(self):
3463
self.conf.set('foo', 'bar')
3468
config.ConfigHooks.install_named_hook('get', hook, None)
3469
self.assertLength(0, calls)
3470
value = self.conf.get('foo')
3471
self.assertEqual('bar', value)
3472
self.assertLength(1, calls)
3473
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3476
class TestStackGetWithConverter(tests.TestCase):
3479
super(TestStackGetWithConverter, self).setUp()
3480
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3481
self.registry = config.option_registry
3483
def get_conf(self, content=None):
3484
return config.MemoryStack(content)
3486
def register_bool_option(self, name, default=None, default_from_env=None):
3487
b = config.Option(name, help='A boolean.',
3488
default=default, default_from_env=default_from_env,
3489
from_unicode=config.bool_from_store)
3490
self.registry.register(b)
3492
def test_get_default_bool_None(self):
3493
self.register_bool_option('foo')
3494
conf = self.get_conf(b'')
3495
self.assertEqual(None, conf.get('foo'))
3497
def test_get_default_bool_True(self):
3498
self.register_bool_option('foo', u'True')
3499
conf = self.get_conf(b'')
3500
self.assertEqual(True, conf.get('foo'))
3502
def test_get_default_bool_False(self):
3503
self.register_bool_option('foo', False)
3504
conf = self.get_conf(b'')
3505
self.assertEqual(False, conf.get('foo'))
3507
def test_get_default_bool_False_as_string(self):
3508
self.register_bool_option('foo', u'False')
3509
conf = self.get_conf(b'')
3510
self.assertEqual(False, conf.get('foo'))
3512
def test_get_default_bool_from_env_converted(self):
3513
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3514
self.overrideEnv('FOO', 'False')
3515
conf = self.get_conf(b'')
3516
self.assertEqual(False, conf.get('foo'))
3518
def test_get_default_bool_when_conversion_fails(self):
3519
self.register_bool_option('foo', default='True')
3520
conf = self.get_conf(b'foo=invalid boolean')
3521
self.assertEqual(True, conf.get('foo'))
3523
def register_integer_option(self, name,
3524
default=None, default_from_env=None):
3525
i = config.Option(name, help='An integer.',
3526
default=default, default_from_env=default_from_env,
3527
from_unicode=config.int_from_store)
3528
self.registry.register(i)
3530
def test_get_default_integer_None(self):
3531
self.register_integer_option('foo')
3532
conf = self.get_conf(b'')
3533
self.assertEqual(None, conf.get('foo'))
3535
def test_get_default_integer(self):
3536
self.register_integer_option('foo', 42)
3537
conf = self.get_conf(b'')
3538
self.assertEqual(42, conf.get('foo'))
3540
def test_get_default_integer_as_string(self):
3541
self.register_integer_option('foo', u'42')
3542
conf = self.get_conf(b'')
3543
self.assertEqual(42, conf.get('foo'))
3545
def test_get_default_integer_from_env(self):
3546
self.register_integer_option('foo', default_from_env=['FOO'])
3547
self.overrideEnv('FOO', '18')
3548
conf = self.get_conf(b'')
3549
self.assertEqual(18, conf.get('foo'))
3551
def test_get_default_integer_when_conversion_fails(self):
3552
self.register_integer_option('foo', default='12')
3553
conf = self.get_conf(b'foo=invalid integer')
3554
self.assertEqual(12, conf.get('foo'))
3556
def register_list_option(self, name, default=None, default_from_env=None):
3557
l = config.ListOption(name, help='A list.', default=default,
3558
default_from_env=default_from_env)
3559
self.registry.register(l)
3561
def test_get_default_list_None(self):
3562
self.register_list_option('foo')
3563
conf = self.get_conf(b'')
3564
self.assertEqual(None, conf.get('foo'))
3566
def test_get_default_list_empty(self):
3567
self.register_list_option('foo', '')
3568
conf = self.get_conf(b'')
3569
self.assertEqual([], conf.get('foo'))
3571
def test_get_default_list_from_env(self):
3572
self.register_list_option('foo', default_from_env=['FOO'])
3573
self.overrideEnv('FOO', '')
3574
conf = self.get_conf(b'')
3575
self.assertEqual([], conf.get('foo'))
3577
def test_get_with_list_converter_no_item(self):
3578
self.register_list_option('foo', None)
3579
conf = self.get_conf(b'foo=,')
3580
self.assertEqual([], conf.get('foo'))
3582
def test_get_with_list_converter_many_items(self):
3583
self.register_list_option('foo', None)
3584
conf = self.get_conf(b'foo=m,o,r,e')
3585
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3587
def test_get_with_list_converter_embedded_spaces_many_items(self):
3588
self.register_list_option('foo', None)
3589
conf = self.get_conf(b'foo=" bar", "baz "')
3590
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3592
def test_get_with_list_converter_stripped_spaces_many_items(self):
3593
self.register_list_option('foo', None)
3594
conf = self.get_conf(b'foo= bar , baz ')
3595
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3598
class TestIterOptionRefs(tests.TestCase):
3599
"""iter_option_refs is a bit unusual, document some cases."""
3601
def assertRefs(self, expected, string):
3602
self.assertEqual(expected, list(config.iter_option_refs(string)))
3604
def test_empty(self):
3605
self.assertRefs([(False, '')], '')
3607
def test_no_refs(self):
3608
self.assertRefs([(False, 'foo bar')], 'foo bar')
3610
def test_single_ref(self):
3611
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3613
def test_broken_ref(self):
3614
self.assertRefs([(False, '{foo')], '{foo')
3616
def test_embedded_ref(self):
3617
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3620
def test_two_refs(self):
3621
self.assertRefs([(False, ''), (True, '{foo}'),
3622
(False, ''), (True, '{bar}'),
3626
def test_newline_in_refs_are_not_matched(self):
3627
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3630
class TestStackExpandOptions(tests.TestCaseWithTransport):
3633
super(TestStackExpandOptions, self).setUp()
3634
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3635
self.registry = config.option_registry
3636
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3637
self.conf = config.Stack([store.get_sections], store)
3639
def assertExpansion(self, expected, string, env=None):
3640
self.assertEqual(expected, self.conf.expand_options(string, env))
3642
def test_no_expansion(self):
3643
self.assertExpansion('foo', 'foo')
3645
def test_expand_default_value(self):
3646
self.conf.store._load_from_string(b'bar=baz')
3647
self.registry.register(config.Option('foo', default=u'{bar}'))
3648
self.assertEqual('baz', self.conf.get('foo', expand=True))
3650
def test_expand_default_from_env(self):
3651
self.conf.store._load_from_string(b'bar=baz')
3652
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3653
self.overrideEnv('FOO', '{bar}')
3654
self.assertEqual('baz', self.conf.get('foo', expand=True))
3656
def test_expand_default_on_failed_conversion(self):
3657
self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
3658
self.registry.register(
3659
config.Option('foo', default=u'{bar}',
3660
from_unicode=config.int_from_store))
3661
self.assertEqual(42, self.conf.get('foo', expand=True))
3663
def test_env_adding_options(self):
3664
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3666
def test_env_overriding_options(self):
3667
self.conf.store._load_from_string(b'foo=baz')
3668
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3670
def test_simple_ref(self):
3671
self.conf.store._load_from_string(b'foo=xxx')
3672
self.assertExpansion('xxx', '{foo}')
3674
def test_unknown_ref(self):
3675
self.assertRaises(config.ExpandingUnknownOption,
3676
self.conf.expand_options, '{foo}')
3678
def test_illegal_def_is_ignored(self):
3679
self.assertExpansion('{1,2}', '{1,2}')
3680
self.assertExpansion('{ }', '{ }')
3681
self.assertExpansion('${Foo,f}', '${Foo,f}')
3683
def test_indirect_ref(self):
3684
self.conf.store._load_from_string(b'''
3688
self.assertExpansion('xxx', '{bar}')
3690
def test_embedded_ref(self):
3691
self.conf.store._load_from_string(b'''
3695
self.assertExpansion('xxx', '{{bar}}')
3697
def test_simple_loop(self):
3698
self.conf.store._load_from_string(b'foo={foo}')
3699
self.assertRaises(config.OptionExpansionLoop,
3700
self.conf.expand_options, '{foo}')
3702
def test_indirect_loop(self):
3703
self.conf.store._load_from_string(b'''
3707
e = self.assertRaises(config.OptionExpansionLoop,
3708
self.conf.expand_options, '{foo}')
3709
self.assertEqual('foo->bar->baz', e.refs)
3710
self.assertEqual('{foo}', e.string)
3712
def test_list(self):
3713
self.conf.store._load_from_string(b'''
3717
list={foo},{bar},{baz}
3719
self.registry.register(
3720
config.ListOption('list'))
3721
self.assertEqual(['start', 'middle', 'end'],
3722
self.conf.get('list', expand=True))
3724
def test_cascading_list(self):
3725
self.conf.store._load_from_string(b'''
3731
self.registry.register(config.ListOption('list'))
3732
# Register an intermediate option as a list to ensure no conversion
3733
# happen while expanding. Conversion should only occur for the original
3734
# option ('list' here).
3735
self.registry.register(config.ListOption('baz'))
3736
self.assertEqual(['start', 'middle', 'end'],
3737
self.conf.get('list', expand=True))
3739
def test_pathologically_hidden_list(self):
3740
self.conf.store._load_from_string(b'''
3746
hidden={start}{middle}{end}
3748
# What matters is what the registration says, the conversion happens
3749
# only after all expansions have been performed
3750
self.registry.register(config.ListOption('hidden'))
3751
self.assertEqual(['bin', 'go'],
3752
self.conf.get('hidden', expand=True))
3755
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3758
super(TestStackCrossSectionsExpand, self).setUp()
3760
def get_config(self, location, string):
3763
# Since we don't save the config we won't strictly require to inherit
3764
# from TestCaseInTempDir, but an error occurs so quickly...
3765
c = config.LocationStack(location)
3766
c.store._load_from_string(string)
3769
def test_dont_cross_unrelated_section(self):
3770
c = self.get_config('/another/branch/path', b'''
3775
[/another/branch/path]
3778
self.assertRaises(config.ExpandingUnknownOption,
3779
c.get, 'bar', expand=True)
3781
def test_cross_related_sections(self):
3782
c = self.get_config('/project/branch/path', b'''
3786
[/project/branch/path]
3789
self.assertEqual('quux', c.get('bar', expand=True))
3792
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3794
def test_cross_global_locations(self):
3795
l_store = config.LocationStore()
3796
l_store._load_from_string(b'''
3802
g_store = config.GlobalStore()
3803
g_store._load_from_string(b'''
3809
stack = config.LocationStack('/branch')
3810
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3811
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3814
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3816
def test_expand_locals_empty(self):
3817
l_store = config.LocationStore()
3818
l_store._load_from_string(b'''
3819
[/home/user/project]
3824
stack = config.LocationStack('/home/user/project/')
3825
self.assertEqual('', stack.get('base', expand=True))
3826
self.assertEqual('', stack.get('rel', expand=True))
3828
def test_expand_basename_locally(self):
3829
l_store = config.LocationStore()
3830
l_store._load_from_string(b'''
3831
[/home/user/project]
3835
stack = config.LocationStack('/home/user/project/branch')
3836
self.assertEqual('branch', stack.get('bfoo', expand=True))
3838
def test_expand_basename_locally_longer_path(self):
3839
l_store = config.LocationStore()
3840
l_store._load_from_string(b'''
3845
stack = config.LocationStack('/home/user/project/dir/branch')
3846
self.assertEqual('branch', stack.get('bfoo', expand=True))
3848
def test_expand_relpath_locally(self):
3849
l_store = config.LocationStore()
3850
l_store._load_from_string(b'''
3851
[/home/user/project]
3852
lfoo = loc-foo/{relpath}
3855
stack = config.LocationStack('/home/user/project/branch')
3856
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3858
def test_expand_relpath_unknonw_in_global(self):
3859
g_store = config.GlobalStore()
3860
g_store._load_from_string(b'''
3865
stack = config.LocationStack('/home/user/project/branch')
3866
self.assertRaises(config.ExpandingUnknownOption,
3867
stack.get, 'gfoo', expand=True)
3869
def test_expand_local_option_locally(self):
3870
l_store = config.LocationStore()
3871
l_store._load_from_string(b'''
3872
[/home/user/project]
3873
lfoo = loc-foo/{relpath}
3877
g_store = config.GlobalStore()
3878
g_store._load_from_string(b'''
3884
stack = config.LocationStack('/home/user/project/branch')
3885
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3886
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3888
def test_locals_dont_leak(self):
3889
"""Make sure we chose the right local in presence of several sections.
3891
l_store = config.LocationStore()
3892
l_store._load_from_string(b'''
3894
lfoo = loc-foo/{relpath}
3895
[/home/user/project]
3896
lfoo = loc-foo/{relpath}
3899
stack = config.LocationStack('/home/user/project/branch')
3900
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3901
stack = config.LocationStack('/home/user/bar/baz')
3902
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3905
class TestStackSet(TestStackWithTransport):
3907
def test_simple_set(self):
3908
conf = self.get_stack(self)
3909
self.assertEqual(None, conf.get('foo'))
3910
conf.set('foo', 'baz')
3911
# Did we get it back ?
3912
self.assertEqual('baz', conf.get('foo'))
3914
def test_set_creates_a_new_section(self):
3915
conf = self.get_stack(self)
3916
conf.set('foo', 'baz')
3917
self.assertEqual, 'baz', conf.get('foo')
3919
def test_set_hook(self):
3924
config.ConfigHooks.install_named_hook('set', hook, None)
3925
self.assertLength(0, calls)
3926
conf = self.get_stack(self)
3927
conf.set('foo', 'bar')
3928
self.assertLength(1, calls)
3929
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3932
class TestStackRemove(TestStackWithTransport):
3934
def test_remove_existing(self):
3935
conf = self.get_stack(self)
3936
conf.set('foo', 'bar')
3937
self.assertEqual('bar', conf.get('foo'))
3939
# Did we get it back ?
3940
self.assertEqual(None, conf.get('foo'))
3942
def test_remove_unknown(self):
3943
conf = self.get_stack(self)
3944
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3946
def test_remove_hook(self):
3951
config.ConfigHooks.install_named_hook('remove', hook, None)
3952
self.assertLength(0, calls)
3953
conf = self.get_stack(self)
3954
conf.set('foo', 'bar')
3956
self.assertLength(1, calls)
3957
self.assertEqual((conf, 'foo'), calls[0])
3960
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3963
super(TestConfigGetOptions, self).setUp()
3964
create_configs(self)
3966
def test_no_variable(self):
3967
# Using branch should query branch, locations and breezy
3968
self.assertOptions([], self.branch_config)
3970
def test_option_in_breezy(self):
3971
self.breezy_config.set_user_option('file', 'breezy')
3972
self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
3975
def test_option_in_locations(self):
3976
self.locations_config.set_user_option('file', 'locations')
3978
[('file', 'locations', self.tree.basedir, 'locations')],
3979
self.locations_config)
3981
def test_option_in_branch(self):
3982
self.branch_config.set_user_option('file', 'branch')
3983
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
3986
def test_option_in_breezy_and_branch(self):
3987
self.breezy_config.set_user_option('file', 'breezy')
3988
self.branch_config.set_user_option('file', 'branch')
3989
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
3990
('file', 'breezy', 'DEFAULT', 'breezy'), ],
3993
def test_option_in_branch_and_locations(self):
3994
# Hmm, locations override branch :-/
3995
self.locations_config.set_user_option('file', 'locations')
3996
self.branch_config.set_user_option('file', 'branch')
3998
[('file', 'locations', self.tree.basedir, 'locations'),
3999
('file', 'branch', 'DEFAULT', 'branch'), ],
4002
def test_option_in_breezy_locations_and_branch(self):
4003
self.breezy_config.set_user_option('file', 'breezy')
4004
self.locations_config.set_user_option('file', 'locations')
4005
self.branch_config.set_user_option('file', 'branch')
4007
[('file', 'locations', self.tree.basedir, 'locations'),
4008
('file', 'branch', 'DEFAULT', 'branch'),
4009
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4013
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4016
super(TestConfigRemoveOption, self).setUp()
4017
create_configs_with_file_option(self)
4019
def test_remove_in_locations(self):
4020
self.locations_config.remove_user_option('file', self.tree.basedir)
4022
[('file', 'branch', 'DEFAULT', 'branch'),
4023
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4026
def test_remove_in_branch(self):
4027
self.branch_config.remove_user_option('file')
4029
[('file', 'locations', self.tree.basedir, 'locations'),
4030
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4033
def test_remove_in_breezy(self):
4034
self.breezy_config.remove_user_option('file')
4036
[('file', 'locations', self.tree.basedir, 'locations'),
4037
('file', 'branch', 'DEFAULT', 'branch'), ],
4041
class TestConfigGetSections(tests.TestCaseWithTransport):
4044
super(TestConfigGetSections, self).setUp()
4045
create_configs(self)
4047
def assertSectionNames(self, expected, conf, name=None):
4048
"""Check which sections are returned for a given config.
4050
If fallback configurations exist their sections can be included.
4052
:param expected: A list of section names.
4054
:param conf: The configuration that will be queried.
4056
:param name: An optional section name that will be passed to
4059
sections = list(conf._get_sections(name))
4060
self.assertLength(len(expected), sections)
4061
self.assertEqual(expected, [n for n, _, _ in sections])
4063
def test_breezy_default_section(self):
4064
self.assertSectionNames(['DEFAULT'], self.breezy_config)
4066
def test_locations_default_section(self):
4067
# No sections are defined in an empty file
4068
self.assertSectionNames([], self.locations_config)
4070
def test_locations_named_section(self):
4071
self.locations_config.set_user_option('file', 'locations')
4072
self.assertSectionNames([self.tree.basedir], self.locations_config)
4074
def test_locations_matching_sections(self):
4075
loc_config = self.locations_config
4076
loc_config.set_user_option('file', 'locations')
4077
# We need to cheat a bit here to create an option in sections above and
4078
# below the 'location' one.
4079
parser = loc_config._get_parser()
4080
# locations.cong deals with '/' ignoring native os.sep
4081
location_names = self.tree.basedir.split('/')
4082
parent = '/'.join(location_names[:-1])
4083
child = '/'.join(location_names + ['child'])
4085
parser[parent]['file'] = 'parent'
4087
parser[child]['file'] = 'child'
4088
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4090
def test_branch_data_default_section(self):
4091
self.assertSectionNames([None],
4092
self.branch_config._get_branch_data_config())
4094
def test_branch_default_sections(self):
4095
# No sections are defined in an empty locations file
4096
self.assertSectionNames([None, 'DEFAULT'],
4098
# Unless we define an option
4099
self.branch_config._get_location_config().set_user_option(
4100
'file', 'locations')
4101
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4104
def test_breezy_named_section(self):
4105
# We need to cheat as the API doesn't give direct access to sections
4106
# other than DEFAULT.
4107
self.breezy_config.set_alias('breezy', 'bzr')
4108
self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
4111
class TestSharedStores(tests.TestCaseInTempDir):
4113
def test_breezy_conf_shared(self):
4114
g1 = config.GlobalStack()
4115
g2 = config.GlobalStack()
4116
# The two stacks share the same store
4117
self.assertIs(g1.store, g2.store)
4120
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4121
"""Test warning for permissions of authentication.conf."""
4124
super(TestAuthenticationConfigFilePermissions, self).setUp()
4125
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4126
with open(self.path, 'wb') as f:
4127
f.write(b"""[broken]
4130
port=port # Error: Not an int
4132
self.overrideAttr(bedding, 'authentication_config_path',
4134
osutils.chmod_if_possible(self.path, 0o755)
4136
def test_check_warning(self):
4137
conf = config.AuthenticationConfig()
4138
self.assertEqual(conf._filename, self.path)
4139
self.assertContainsRe(self.get_log(),
4140
'Saved passwords may be accessible by other users.')
4142
def test_check_suppressed_warning(self):
4143
global_config = config.GlobalConfig()
4144
global_config.set_user_option('suppress_warnings',
4145
'insecure_permissions')
4146
conf = config.AuthenticationConfig()
4147
self.assertEqual(conf._filename, self.path)
4148
self.assertNotContainsRe(self.get_log(),
4149
'Saved passwords may be accessible by other users.')
1315
4152
class TestAuthenticationConfigFile(tests.TestCase):
1316
4153
"""Test the authentication.conf file matching"""