1312
1626
self.assertIs(None, bzrdir_config.get_default_stack_on())
1629
class TestOldConfigHooks(tests.TestCaseWithTransport):
1632
super(TestOldConfigHooks, self).setUp()
1633
create_configs_with_file_option(self)
1635
def assertGetHook(self, conf, name, value):
1640
config.OldConfigHooks.install_named_hook('get', hook, None)
1642
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1643
self.assertLength(0, calls)
1644
actual_value = conf.get_user_option(name)
1645
self.assertEqual(value, actual_value)
1646
self.assertLength(1, calls)
1647
self.assertEqual((conf, name, value), calls[0])
1649
def test_get_hook_breezy(self):
1650
self.assertGetHook(self.breezy_config, 'file', 'breezy')
1652
def test_get_hook_locations(self):
1653
self.assertGetHook(self.locations_config, 'file', 'locations')
1655
def test_get_hook_branch(self):
1656
# Since locations masks branch, we define a different option
1657
self.branch_config.set_user_option('file2', 'branch')
1658
self.assertGetHook(self.branch_config, 'file2', 'branch')
1660
def assertSetHook(self, conf, name, value):
1665
config.OldConfigHooks.install_named_hook('set', hook, None)
1667
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1668
self.assertLength(0, calls)
1669
conf.set_user_option(name, value)
1670
self.assertLength(1, calls)
1671
# We can't assert the conf object below as different configs use
1672
# different means to implement set_user_option and we care only about
1674
self.assertEqual((name, value), calls[0][1:])
1676
def test_set_hook_breezy(self):
1677
self.assertSetHook(self.breezy_config, 'foo', 'breezy')
1679
def test_set_hook_locations(self):
1680
self.assertSetHook(self.locations_config, 'foo', 'locations')
1682
def test_set_hook_branch(self):
1683
self.assertSetHook(self.branch_config, 'foo', 'branch')
1685
def assertRemoveHook(self, conf, name, section_name=None):
1690
config.OldConfigHooks.install_named_hook('remove', hook, None)
1692
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1693
self.assertLength(0, calls)
1694
conf.remove_user_option(name, section_name)
1695
self.assertLength(1, calls)
1696
# We can't assert the conf object below as different configs use
1697
# different means to implement remove_user_option and we care only about
1699
self.assertEqual((name,), calls[0][1:])
1701
def test_remove_hook_breezy(self):
1702
self.assertRemoveHook(self.breezy_config, 'file')
1704
def test_remove_hook_locations(self):
1705
self.assertRemoveHook(self.locations_config, 'file',
1706
self.locations_config.location)
1708
def test_remove_hook_branch(self):
1709
self.assertRemoveHook(self.branch_config, 'file')
1711
def assertLoadHook(self, name, conf_class, *conf_args):
1716
config.OldConfigHooks.install_named_hook('load', hook, None)
1718
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1719
self.assertLength(0, calls)
1721
conf = conf_class(*conf_args)
1722
# Access an option to trigger a load
1723
conf.get_user_option(name)
1724
self.assertLength(1, calls)
1725
# Since we can't assert about conf, we just use the number of calls ;-/
1727
def test_load_hook_breezy(self):
1728
self.assertLoadHook('file', config.GlobalConfig)
1730
def test_load_hook_locations(self):
1731
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1733
def test_load_hook_branch(self):
1734
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1736
def assertSaveHook(self, conf):
1741
config.OldConfigHooks.install_named_hook('save', hook, None)
1743
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1744
self.assertLength(0, calls)
1745
# Setting an option triggers a save
1746
conf.set_user_option('foo', 'bar')
1747
self.assertLength(1, calls)
1748
# Since we can't assert about conf, we just use the number of calls ;-/
1750
def test_save_hook_breezy(self):
1751
self.assertSaveHook(self.breezy_config)
1753
def test_save_hook_locations(self):
1754
self.assertSaveHook(self.locations_config)
1756
def test_save_hook_branch(self):
1757
self.assertSaveHook(self.branch_config)
1760
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1761
"""Tests config hooks for remote configs.
1763
No tests for the remove hook as this is not implemented there.
1767
super(TestOldConfigHooksForRemote, self).setUp()
1768
self.transport_server = test_server.SmartTCPServer_for_testing
1769
create_configs_with_file_option(self)
1771
def assertGetHook(self, conf, name, value):
1776
config.OldConfigHooks.install_named_hook('get', hook, None)
1778
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1779
self.assertLength(0, calls)
1780
actual_value = conf.get_option(name)
1781
self.assertEqual(value, actual_value)
1782
self.assertLength(1, calls)
1783
self.assertEqual((conf, name, value), calls[0])
1785
def test_get_hook_remote_branch(self):
1786
remote_branch = branch.Branch.open(self.get_url('tree'))
1787
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1789
def test_get_hook_remote_bzrdir(self):
1790
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1791
conf = remote_bzrdir._get_config()
1792
conf.set_option('remotedir', 'file')
1793
self.assertGetHook(conf, 'file', 'remotedir')
1795
def assertSetHook(self, conf, name, value):
1800
config.OldConfigHooks.install_named_hook('set', hook, None)
1802
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1803
self.assertLength(0, calls)
1804
conf.set_option(value, name)
1805
self.assertLength(1, calls)
1806
# We can't assert the conf object below as different configs use
1807
# different means to implement set_user_option and we care only about
1809
self.assertEqual((name, value), calls[0][1:])
1811
def test_set_hook_remote_branch(self):
1812
remote_branch = branch.Branch.open(self.get_url('tree'))
1813
self.addCleanup(remote_branch.lock_write().unlock)
1814
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1816
def test_set_hook_remote_bzrdir(self):
1817
remote_branch = branch.Branch.open(self.get_url('tree'))
1818
self.addCleanup(remote_branch.lock_write().unlock)
1819
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1820
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1822
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1827
config.OldConfigHooks.install_named_hook('load', hook, None)
1829
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1830
self.assertLength(0, calls)
1832
conf = conf_class(*conf_args)
1833
# Access an option to trigger a load
1834
conf.get_option(name)
1835
self.assertLength(expected_nb_calls, calls)
1836
# Since we can't assert about conf, we just use the number of calls ;-/
1838
def test_load_hook_remote_branch(self):
1839
remote_branch = branch.Branch.open(self.get_url('tree'))
1840
self.assertLoadHook(
1841
1, 'file', remote.RemoteBranchConfig, remote_branch)
1843
def test_load_hook_remote_bzrdir(self):
1844
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1845
# The config file doesn't exist, set an option to force its creation
1846
conf = remote_bzrdir._get_config()
1847
conf.set_option('remotedir', 'file')
1848
# We get one call for the server and one call for the client, this is
1849
# caused by the differences in implementations betwen
1850
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1851
# SmartServerBranchGetConfigFile (in smart/branch.py)
1852
self.assertLoadHook(
1853
2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1855
def assertSaveHook(self, conf):
1860
config.OldConfigHooks.install_named_hook('save', hook, None)
1862
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1863
self.assertLength(0, calls)
1864
# Setting an option triggers a save
1865
conf.set_option('foo', 'bar')
1866
self.assertLength(1, calls)
1867
# Since we can't assert about conf, we just use the number of calls ;-/
1869
def test_save_hook_remote_branch(self):
1870
remote_branch = branch.Branch.open(self.get_url('tree'))
1871
self.addCleanup(remote_branch.lock_write().unlock)
1872
self.assertSaveHook(remote_branch._get_config())
1874
def test_save_hook_remote_bzrdir(self):
1875
remote_branch = branch.Branch.open(self.get_url('tree'))
1876
self.addCleanup(remote_branch.lock_write().unlock)
1877
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1878
self.assertSaveHook(remote_bzrdir._get_config())
1881
class TestOptionNames(tests.TestCase):
1883
def is_valid(self, name):
1884
return config._option_ref_re.match('{%s}' % name) is not None
1886
def test_valid_names(self):
1887
self.assertTrue(self.is_valid('foo'))
1888
self.assertTrue(self.is_valid('foo.bar'))
1889
self.assertTrue(self.is_valid('f1'))
1890
self.assertTrue(self.is_valid('_'))
1891
self.assertTrue(self.is_valid('__bar__'))
1892
self.assertTrue(self.is_valid('a_'))
1893
self.assertTrue(self.is_valid('a1'))
1894
# Don't break bzr-svn for no good reason
1895
self.assertTrue(self.is_valid('guessed-layout'))
1897
def test_invalid_names(self):
1898
self.assertFalse(self.is_valid(' foo'))
1899
self.assertFalse(self.is_valid('foo '))
1900
self.assertFalse(self.is_valid('1'))
1901
self.assertFalse(self.is_valid('1,2'))
1902
self.assertFalse(self.is_valid('foo$'))
1903
self.assertFalse(self.is_valid('!foo'))
1904
self.assertFalse(self.is_valid('foo.'))
1905
self.assertFalse(self.is_valid('foo..bar'))
1906
self.assertFalse(self.is_valid('{}'))
1907
self.assertFalse(self.is_valid('{a}'))
1908
self.assertFalse(self.is_valid('a\n'))
1909
self.assertFalse(self.is_valid('-'))
1910
self.assertFalse(self.is_valid('-a'))
1911
self.assertFalse(self.is_valid('a-'))
1912
self.assertFalse(self.is_valid('a--a'))
1914
def assertSingleGroup(self, reference):
1915
# the regexp is used with split and as such should match the reference
1916
# *only*, if more groups needs to be defined, (?:...) should be used.
1917
m = config._option_ref_re.match('{a}')
1918
self.assertLength(1, m.groups())
1920
def test_valid_references(self):
1921
self.assertSingleGroup('{a}')
1922
self.assertSingleGroup('{{a}}')
1925
class TestOption(tests.TestCase):
1927
def test_default_value(self):
1928
opt = config.Option('foo', default='bar')
1929
self.assertEqual('bar', opt.get_default())
1931
def test_callable_default_value(self):
1932
def bar_as_unicode():
1934
opt = config.Option('foo', default=bar_as_unicode)
1935
self.assertEqual('bar', opt.get_default())
1937
def test_default_value_from_env(self):
1938
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
1939
self.overrideEnv('FOO', 'quux')
1940
# Env variable provides a default taking over the option one
1941
self.assertEqual('quux', opt.get_default())
1943
def test_first_default_value_from_env_wins(self):
1944
opt = config.Option('foo', default='bar',
1945
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
1946
self.overrideEnv('FOO', 'foo')
1947
self.overrideEnv('BAZ', 'baz')
1948
# The first env var set wins
1949
self.assertEqual('foo', opt.get_default())
1951
def test_not_supported_list_default_value(self):
1952
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
1954
def test_not_supported_object_default_value(self):
1955
self.assertRaises(AssertionError, config.Option, 'foo',
1958
def test_not_supported_callable_default_value_not_unicode(self):
1959
def bar_not_unicode():
1961
opt = config.Option('foo', default=bar_not_unicode)
1962
self.assertRaises(AssertionError, opt.get_default)
1964
def test_get_help_topic(self):
1965
opt = config.Option('foo')
1966
self.assertEqual('foo', opt.get_help_topic())
1969
class TestOptionConverter(tests.TestCase):
1971
def assertConverted(self, expected, opt, value):
1972
self.assertEqual(expected, opt.convert_from_unicode(None, value))
1974
def assertCallsWarning(self, opt, value):
1978
warnings.append(args[0] % args[1:])
1979
self.overrideAttr(trace, 'warning', warning)
1980
self.assertEqual(None, opt.convert_from_unicode(None, value))
1981
self.assertLength(1, warnings)
1983
'Value "%s" is not valid for "%s"' % (value, opt.name),
1986
def assertCallsError(self, opt, value):
1987
self.assertRaises(config.ConfigOptionValueError,
1988
opt.convert_from_unicode, None, value)
1990
def assertConvertInvalid(self, opt, invalid_value):
1992
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
1993
opt.invalid = 'warning'
1994
self.assertCallsWarning(opt, invalid_value)
1995
opt.invalid = 'error'
1996
self.assertCallsError(opt, invalid_value)
1999
class TestOptionWithBooleanConverter(TestOptionConverter):
2001
def get_option(self):
2002
return config.Option('foo', help='A boolean.',
2003
from_unicode=config.bool_from_store)
2005
def test_convert_invalid(self):
2006
opt = self.get_option()
2007
# A string that is not recognized as a boolean
2008
self.assertConvertInvalid(opt, u'invalid-boolean')
2009
# A list of strings is never recognized as a boolean
2010
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2012
def test_convert_valid(self):
2013
opt = self.get_option()
2014
self.assertConverted(True, opt, u'True')
2015
self.assertConverted(True, opt, u'1')
2016
self.assertConverted(False, opt, u'False')
2019
class TestOptionWithIntegerConverter(TestOptionConverter):
2021
def get_option(self):
2022
return config.Option('foo', help='An integer.',
2023
from_unicode=config.int_from_store)
2025
def test_convert_invalid(self):
2026
opt = self.get_option()
2027
# A string that is not recognized as an integer
2028
self.assertConvertInvalid(opt, u'forty-two')
2029
# A list of strings is never recognized as an integer
2030
self.assertConvertInvalid(opt, [u'a', u'list'])
2032
def test_convert_valid(self):
2033
opt = self.get_option()
2034
self.assertConverted(16, opt, u'16')
2037
class TestOptionWithSIUnitConverter(TestOptionConverter):
2039
def get_option(self):
2040
return config.Option('foo', help='An integer in SI units.',
2041
from_unicode=config.int_SI_from_store)
2043
def test_convert_invalid(self):
2044
opt = self.get_option()
2045
self.assertConvertInvalid(opt, u'not-a-unit')
2046
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2047
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2048
self.assertConvertInvalid(opt, u'1GG')
2049
self.assertConvertInvalid(opt, u'1Mbb')
2050
self.assertConvertInvalid(opt, u'1MM')
2052
def test_convert_valid(self):
2053
opt = self.get_option()
2054
self.assertConverted(int(5e3), opt, u'5kb')
2055
self.assertConverted(int(5e6), opt, u'5M')
2056
self.assertConverted(int(5e6), opt, u'5MB')
2057
self.assertConverted(int(5e9), opt, u'5g')
2058
self.assertConverted(int(5e9), opt, u'5gB')
2059
self.assertConverted(100, opt, u'100')
2062
class TestListOption(TestOptionConverter):
2064
def get_option(self):
2065
return config.ListOption('foo', help='A list.')
2067
def test_convert_invalid(self):
2068
opt = self.get_option()
2069
# We don't even try to convert a list into a list, we only expect
2071
self.assertConvertInvalid(opt, [1])
2072
# No string is invalid as all forms can be converted to a list
2074
def test_convert_valid(self):
2075
opt = self.get_option()
2076
# An empty string is an empty list
2077
self.assertConverted([], opt, '') # Using a bare str() just in case
2078
self.assertConverted([], opt, u'')
2080
self.assertConverted([u'True'], opt, u'True')
2082
self.assertConverted([u'42'], opt, u'42')
2084
self.assertConverted([u'bar'], opt, u'bar')
2087
class TestRegistryOption(TestOptionConverter):
2089
def get_option(self, registry):
2090
return config.RegistryOption('foo', registry,
2091
help='A registry option.')
2093
def test_convert_invalid(self):
2094
registry = _mod_registry.Registry()
2095
opt = self.get_option(registry)
2096
self.assertConvertInvalid(opt, [1])
2097
self.assertConvertInvalid(opt, u"notregistered")
2099
def test_convert_valid(self):
2100
registry = _mod_registry.Registry()
2101
registry.register("someval", 1234)
2102
opt = self.get_option(registry)
2103
# Using a bare str() just in case
2104
self.assertConverted(1234, opt, "someval")
2105
self.assertConverted(1234, opt, u'someval')
2106
self.assertConverted(None, opt, None)
2108
def test_help(self):
2109
registry = _mod_registry.Registry()
2110
registry.register("someval", 1234, help="some option")
2111
registry.register("dunno", 1234, help="some other option")
2112
opt = self.get_option(registry)
2114
'A registry option.\n'
2116
'The following values are supported:\n'
2117
' dunno - some other option\n'
2118
' someval - some option\n',
2121
def test_get_help_text(self):
2122
registry = _mod_registry.Registry()
2123
registry.register("someval", 1234, help="some option")
2124
registry.register("dunno", 1234, help="some other option")
2125
opt = self.get_option(registry)
2127
'A registry option.\n'
2129
'The following values are supported:\n'
2130
' dunno - some other option\n'
2131
' someval - some option\n',
2132
opt.get_help_text())
2135
class TestOptionRegistry(tests.TestCase):
2138
super(TestOptionRegistry, self).setUp()
2139
# Always start with an empty registry
2140
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2141
self.registry = config.option_registry
2143
def test_register(self):
2144
opt = config.Option('foo')
2145
self.registry.register(opt)
2146
self.assertIs(opt, self.registry.get('foo'))
2148
def test_registered_help(self):
2149
opt = config.Option('foo', help='A simple option')
2150
self.registry.register(opt)
2151
self.assertEqual('A simple option', self.registry.get_help('foo'))
2153
def test_dont_register_illegal_name(self):
2154
self.assertRaises(config.IllegalOptionName,
2155
self.registry.register, config.Option(' foo'))
2156
self.assertRaises(config.IllegalOptionName,
2157
self.registry.register, config.Option('bar,'))
2159
lazy_option = config.Option('lazy_foo', help='Lazy help')
2161
def test_register_lazy(self):
2162
self.registry.register_lazy('lazy_foo', self.__module__,
2163
'TestOptionRegistry.lazy_option')
2164
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2166
def test_registered_lazy_help(self):
2167
self.registry.register_lazy('lazy_foo', self.__module__,
2168
'TestOptionRegistry.lazy_option')
2169
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2171
def test_dont_lazy_register_illegal_name(self):
2172
# This is where the root cause of http://pad.lv/1235099 is better
2173
# understood: 'register_lazy' doc string mentions that key should match
2174
# the option name which indirectly requires that the option name is a
2175
# valid python identifier. We violate that rule here (using a key that
2176
# doesn't match the option name) to test the option name checking.
2177
self.assertRaises(config.IllegalOptionName,
2178
self.registry.register_lazy, ' foo', self.__module__,
2179
'TestOptionRegistry.lazy_option')
2180
self.assertRaises(config.IllegalOptionName,
2181
self.registry.register_lazy, '1,2', self.__module__,
2182
'TestOptionRegistry.lazy_option')
2185
class TestRegisteredOptions(tests.TestCase):
2186
"""All registered options should verify some constraints."""
2188
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2189
in config.option_registry.iteritems()]
2192
super(TestRegisteredOptions, self).setUp()
2193
self.registry = config.option_registry
2195
def test_proper_name(self):
2196
# An option should be registered under its own name, this can't be
2197
# checked at registration time for the lazy ones.
2198
self.assertEqual(self.option_name, self.option.name)
2200
def test_help_is_set(self):
2201
option_help = self.registry.get_help(self.option_name)
2202
# Come on, think about the user, he really wants to know what the
2204
self.assertIsNot(None, option_help)
2205
self.assertNotEqual('', option_help)
2208
class TestSection(tests.TestCase):
2210
# FIXME: Parametrize so that all sections produced by Stores run these
2211
# tests -- vila 2011-04-01
2213
def test_get_a_value(self):
2214
a_dict = dict(foo='bar')
2215
section = config.Section('myID', a_dict)
2216
self.assertEqual('bar', section.get('foo'))
2218
def test_get_unknown_option(self):
2220
section = config.Section(None, a_dict)
2221
self.assertEqual('out of thin air',
2222
section.get('foo', 'out of thin air'))
2224
def test_options_is_shared(self):
2226
section = config.Section(None, a_dict)
2227
self.assertIs(a_dict, section.options)
2230
class TestMutableSection(tests.TestCase):
2232
scenarios = [('mutable',
2234
lambda opts: config.MutableSection('myID', opts)},),
2238
a_dict = dict(foo='bar')
2239
section = self.get_section(a_dict)
2240
section.set('foo', 'new_value')
2241
self.assertEqual('new_value', section.get('foo'))
2242
# The change appears in the shared section
2243
self.assertEqual('new_value', a_dict.get('foo'))
2244
# We keep track of the change
2245
self.assertTrue('foo' in section.orig)
2246
self.assertEqual('bar', section.orig.get('foo'))
2248
def test_set_preserve_original_once(self):
2249
a_dict = dict(foo='bar')
2250
section = self.get_section(a_dict)
2251
section.set('foo', 'first_value')
2252
section.set('foo', 'second_value')
2253
# We keep track of the original value
2254
self.assertTrue('foo' in section.orig)
2255
self.assertEqual('bar', section.orig.get('foo'))
2257
def test_remove(self):
2258
a_dict = dict(foo='bar')
2259
section = self.get_section(a_dict)
2260
section.remove('foo')
2261
# We get None for unknown options via the default value
2262
self.assertEqual(None, section.get('foo'))
2263
# Or we just get the default value
2264
self.assertEqual('unknown', section.get('foo', 'unknown'))
2265
self.assertFalse('foo' in section.options)
2266
# We keep track of the deletion
2267
self.assertTrue('foo' in section.orig)
2268
self.assertEqual('bar', section.orig.get('foo'))
2270
def test_remove_new_option(self):
2272
section = self.get_section(a_dict)
2273
section.set('foo', 'bar')
2274
section.remove('foo')
2275
self.assertFalse('foo' in section.options)
2276
# The option didn't exist initially so it we need to keep track of it
2277
# with a special value
2278
self.assertTrue('foo' in section.orig)
2279
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2282
class TestCommandLineStore(tests.TestCase):
2285
super(TestCommandLineStore, self).setUp()
2286
self.store = config.CommandLineStore()
2287
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2289
def get_section(self):
2290
"""Get the unique section for the command line overrides."""
2291
sections = list(self.store.get_sections())
2292
self.assertLength(1, sections)
2293
store, section = sections[0]
2294
self.assertEqual(self.store, store)
2297
def test_no_override(self):
2298
self.store._from_cmdline([])
2299
section = self.get_section()
2300
self.assertLength(0, list(section.iter_option_names()))
2302
def test_simple_override(self):
2303
self.store._from_cmdline(['a=b'])
2304
section = self.get_section()
2305
self.assertEqual('b', section.get('a'))
2307
def test_list_override(self):
2308
opt = config.ListOption('l')
2309
config.option_registry.register(opt)
2310
self.store._from_cmdline(['l=1,2,3'])
2311
val = self.get_section().get('l')
2312
self.assertEqual('1,2,3', val)
2313
# Reminder: lists should be registered as such explicitely, otherwise
2314
# the conversion needs to be done afterwards.
2315
self.assertEqual(['1', '2', '3'],
2316
opt.convert_from_unicode(self.store, val))
2318
def test_multiple_overrides(self):
2319
self.store._from_cmdline(['a=b', 'x=y'])
2320
section = self.get_section()
2321
self.assertEqual('b', section.get('a'))
2322
self.assertEqual('y', section.get('x'))
2324
def test_wrong_syntax(self):
2325
self.assertRaises(errors.BzrCommandError,
2326
self.store._from_cmdline, ['a=b', 'c'])
2329
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2331
scenarios = [(key, {'get_store': builder}) for key, builder
2332
in config.test_store_builder_registry.iteritems()] + [
2333
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2336
store = self.get_store(self)
2337
if isinstance(store, config.TransportIniFileStore):
2338
raise tests.TestNotApplicable(
2339
"%s is not a concrete Store implementation"
2340
" so it doesn't need an id" % (store.__class__.__name__,))
2341
self.assertIsNot(None, store.id)
2344
class TestStore(tests.TestCaseWithTransport):
2346
def assertSectionContent(self, expected, store_and_section):
2347
"""Assert that some options have the proper values in a section."""
2348
_, section = store_and_section
2349
expected_name, expected_options = expected
2350
self.assertEqual(expected_name, section.id)
2353
dict([(k, section.get(k)) for k in expected_options.keys()]))
2356
class TestReadonlyStore(TestStore):
2358
scenarios = [(key, {'get_store': builder}) for key, builder
2359
in config.test_store_builder_registry.iteritems()]
2361
def test_building_delays_load(self):
2362
store = self.get_store(self)
2363
self.assertEqual(False, store.is_loaded())
2364
store._load_from_string(b'')
2365
self.assertEqual(True, store.is_loaded())
2367
def test_get_no_sections_for_empty(self):
2368
store = self.get_store(self)
2369
store._load_from_string(b'')
2370
self.assertEqual([], list(store.get_sections()))
2372
def test_get_default_section(self):
2373
store = self.get_store(self)
2374
store._load_from_string(b'foo=bar')
2375
sections = list(store.get_sections())
2376
self.assertLength(1, sections)
2377
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2379
def test_get_named_section(self):
2380
store = self.get_store(self)
2381
store._load_from_string(b'[baz]\nfoo=bar')
2382
sections = list(store.get_sections())
2383
self.assertLength(1, sections)
2384
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2386
def test_load_from_string_fails_for_non_empty_store(self):
2387
store = self.get_store(self)
2388
store._load_from_string(b'foo=bar')
2389
self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
2392
class TestStoreQuoting(TestStore):
2394
scenarios = [(key, {'get_store': builder}) for key, builder
2395
in config.test_store_builder_registry.iteritems()]
2398
super(TestStoreQuoting, self).setUp()
2399
self.store = self.get_store(self)
2400
# We need a loaded store but any content will do
2401
self.store._load_from_string(b'')
2403
def assertIdempotent(self, s):
2404
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2406
What matters here is that option values, as they appear in a store, can
2407
be safely round-tripped out of the store and back.
2409
:param s: A string, quoted if required.
2411
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2412
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2414
def test_empty_string(self):
2415
if isinstance(self.store, config.IniFileStore):
2416
# configobj._quote doesn't handle empty values
2417
self.assertRaises(AssertionError,
2418
self.assertIdempotent, '')
2420
self.assertIdempotent('')
2421
# But quoted empty strings are ok
2422
self.assertIdempotent('""')
2424
def test_embedded_spaces(self):
2425
self.assertIdempotent('" a b c "')
2427
def test_embedded_commas(self):
2428
self.assertIdempotent('" a , b c "')
2430
def test_simple_comma(self):
2431
if isinstance(self.store, config.IniFileStore):
2432
# configobj requires that lists are special-cased
2433
self.assertRaises(AssertionError,
2434
self.assertIdempotent, ',')
2436
self.assertIdempotent(',')
2437
# When a single comma is required, quoting is also required
2438
self.assertIdempotent('","')
2440
def test_list(self):
2441
if isinstance(self.store, config.IniFileStore):
2442
# configobj requires that lists are special-cased
2443
self.assertRaises(AssertionError,
2444
self.assertIdempotent, 'a,b')
2446
self.assertIdempotent('a,b')
2449
class TestDictFromStore(tests.TestCase):
2451
def test_unquote_not_string(self):
2452
conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
2453
value = conf.get('a_section')
2454
# Urgh, despite 'conf' asking for the no-name section, we get the
2455
# content of another section as a dict o_O
2456
self.assertEqual({'a': '1'}, value)
2457
unquoted = conf.store.unquote(value)
2458
# Which cannot be unquoted but shouldn't crash either (the use cases
2459
# are getting the value or displaying it. In the later case, '%s' will
2461
self.assertEqual({'a': '1'}, unquoted)
2462
self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
2465
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2466
"""Simulate loading a config store with content of various encodings.
2468
All files produced by bzr are in utf8 content.
2470
Users may modify them manually and end up with a file that can't be
2471
loaded. We need to issue proper error messages in this case.
2474
invalid_utf8_char = b'\xff'
2476
def test_load_utf8(self):
2477
"""Ensure we can load an utf8-encoded file."""
2478
t = self.get_transport()
2479
# From http://pad.lv/799212
2480
unicode_user = u'b\N{Euro Sign}ar'
2481
unicode_content = u'user=%s' % (unicode_user,)
2482
utf8_content = unicode_content.encode('utf8')
2483
# Store the raw content in the config file
2484
t.put_bytes('foo.conf', utf8_content)
2485
store = config.TransportIniFileStore(t, 'foo.conf')
2487
stack = config.Stack([store.get_sections], store)
2488
self.assertEqual(unicode_user, stack.get('user'))
2490
def test_load_non_ascii(self):
2491
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2492
t = self.get_transport()
2493
t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2494
store = config.TransportIniFileStore(t, 'foo.conf')
2495
self.assertRaises(config.ConfigContentError, store.load)
2497
def test_load_erroneous_content(self):
2498
"""Ensure we display a proper error on content that can't be parsed."""
2499
t = self.get_transport()
2500
t.put_bytes('foo.conf', b'[open_section\n')
2501
store = config.TransportIniFileStore(t, 'foo.conf')
2502
self.assertRaises(config.ParseConfigError, store.load)
2504
def test_load_permission_denied(self):
2505
"""Ensure we get warned when trying to load an inaccessible file."""
2509
warnings.append(args[0] % args[1:])
2510
self.overrideAttr(trace, 'warning', warning)
2512
t = self.get_transport()
2514
def get_bytes(relpath):
2515
raise errors.PermissionDenied(relpath, "")
2516
t.get_bytes = get_bytes
2517
store = config.TransportIniFileStore(t, 'foo.conf')
2518
self.assertRaises(errors.PermissionDenied, store.load)
2521
[u'Permission denied while trying to load configuration store %s.'
2522
% store.external_url()])
2525
class TestIniConfigContent(tests.TestCaseWithTransport):
2526
"""Simulate loading a IniBasedConfig with content of various encodings.
2528
All files produced by bzr are in utf8 content.
2530
Users may modify them manually and end up with a file that can't be
2531
loaded. We need to issue proper error messages in this case.
2534
invalid_utf8_char = b'\xff'
2536
def test_load_utf8(self):
2537
"""Ensure we can load an utf8-encoded file."""
2538
# From http://pad.lv/799212
2539
unicode_user = u'b\N{Euro Sign}ar'
2540
unicode_content = u'user=%s' % (unicode_user,)
2541
utf8_content = unicode_content.encode('utf8')
2542
# Store the raw content in the config file
2543
with open('foo.conf', 'wb') as f:
2544
f.write(utf8_content)
2545
conf = config.IniBasedConfig(file_name='foo.conf')
2546
self.assertEqual(unicode_user, conf.get_user_option('user'))
2548
def test_load_badly_encoded_content(self):
2549
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2550
with open('foo.conf', 'wb') as f:
2551
f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2552
conf = config.IniBasedConfig(file_name='foo.conf')
2553
self.assertRaises(config.ConfigContentError, conf._get_parser)
2555
def test_load_erroneous_content(self):
2556
"""Ensure we display a proper error on content that can't be parsed."""
2557
with open('foo.conf', 'wb') as f:
2558
f.write(b'[open_section\n')
2559
conf = config.IniBasedConfig(file_name='foo.conf')
2560
self.assertRaises(config.ParseConfigError, conf._get_parser)
2563
class TestMutableStore(TestStore):
2565
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2566
in config.test_store_builder_registry.iteritems()]
2569
super(TestMutableStore, self).setUp()
2570
self.transport = self.get_transport()
2572
def has_store(self, store):
2573
store_basename = urlutils.relative_url(self.transport.external_url(),
2574
store.external_url())
2575
return self.transport.has(store_basename)
2577
def test_save_empty_creates_no_file(self):
2578
# FIXME: There should be a better way than relying on the test
2579
# parametrization to identify branch.conf -- vila 2011-0526
2580
if self.store_id in ('branch', 'remote_branch'):
2581
raise tests.TestNotApplicable(
2582
'branch.conf is *always* created when a branch is initialized')
2583
store = self.get_store(self)
2585
self.assertEqual(False, self.has_store(store))
2587
def test_mutable_section_shared(self):
2588
store = self.get_store(self)
2589
store._load_from_string(b'foo=bar\n')
2590
# FIXME: There should be a better way than relying on the test
2591
# parametrization to identify branch.conf -- vila 2011-0526
2592
if self.store_id in ('branch', 'remote_branch'):
2593
# branch stores requires write locked branches
2594
self.addCleanup(store.branch.lock_write().unlock)
2595
section1 = store.get_mutable_section(None)
2596
section2 = store.get_mutable_section(None)
2597
# If we get different sections, different callers won't share the
2599
self.assertIs(section1, section2)
2601
def test_save_emptied_succeeds(self):
2602
store = self.get_store(self)
2603
store._load_from_string(b'foo=bar\n')
2604
# FIXME: There should be a better way than relying on the test
2605
# parametrization to identify branch.conf -- vila 2011-0526
2606
if self.store_id in ('branch', 'remote_branch'):
2607
# branch stores requires write locked branches
2608
self.addCleanup(store.branch.lock_write().unlock)
2609
section = store.get_mutable_section(None)
2610
section.remove('foo')
2612
self.assertEqual(True, self.has_store(store))
2613
modified_store = self.get_store(self)
2614
sections = list(modified_store.get_sections())
2615
self.assertLength(0, sections)
2617
def test_save_with_content_succeeds(self):
2618
# FIXME: There should be a better way than relying on the test
2619
# parametrization to identify branch.conf -- vila 2011-0526
2620
if self.store_id in ('branch', 'remote_branch'):
2621
raise tests.TestNotApplicable(
2622
'branch.conf is *always* created when a branch is initialized')
2623
store = self.get_store(self)
2624
store._load_from_string(b'foo=bar\n')
2625
self.assertEqual(False, self.has_store(store))
2627
self.assertEqual(True, self.has_store(store))
2628
modified_store = self.get_store(self)
2629
sections = list(modified_store.get_sections())
2630
self.assertLength(1, sections)
2631
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2633
def test_set_option_in_empty_store(self):
2634
store = self.get_store(self)
2635
# FIXME: There should be a better way than relying on the test
2636
# parametrization to identify branch.conf -- vila 2011-0526
2637
if self.store_id in ('branch', 'remote_branch'):
2638
# branch stores requires write locked branches
2639
self.addCleanup(store.branch.lock_write().unlock)
2640
section = store.get_mutable_section(None)
2641
section.set('foo', 'bar')
2643
modified_store = self.get_store(self)
2644
sections = list(modified_store.get_sections())
2645
self.assertLength(1, sections)
2646
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2648
def test_set_option_in_default_section(self):
2649
store = self.get_store(self)
2650
store._load_from_string(b'')
2651
# FIXME: There should be a better way than relying on the test
2652
# parametrization to identify branch.conf -- vila 2011-0526
2653
if self.store_id in ('branch', 'remote_branch'):
2654
# branch stores requires write locked branches
2655
self.addCleanup(store.branch.lock_write().unlock)
2656
section = store.get_mutable_section(None)
2657
section.set('foo', 'bar')
2659
modified_store = self.get_store(self)
2660
sections = list(modified_store.get_sections())
2661
self.assertLength(1, sections)
2662
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2664
def test_set_option_in_named_section(self):
2665
store = self.get_store(self)
2666
store._load_from_string(b'')
2667
# FIXME: There should be a better way than relying on the test
2668
# parametrization to identify branch.conf -- vila 2011-0526
2669
if self.store_id in ('branch', 'remote_branch'):
2670
# branch stores requires write locked branches
2671
self.addCleanup(store.branch.lock_write().unlock)
2672
section = store.get_mutable_section('baz')
2673
section.set('foo', 'bar')
2675
modified_store = self.get_store(self)
2676
sections = list(modified_store.get_sections())
2677
self.assertLength(1, sections)
2678
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2680
def test_load_hook(self):
2681
# First, we need to ensure that the store exists
2682
store = self.get_store(self)
2683
# FIXME: There should be a better way than relying on the test
2684
# parametrization to identify branch.conf -- vila 2011-0526
2685
if self.store_id in ('branch', 'remote_branch'):
2686
# branch stores requires write locked branches
2687
self.addCleanup(store.branch.lock_write().unlock)
2688
section = store.get_mutable_section('baz')
2689
section.set('foo', 'bar')
2691
# Now we can try to load it
2692
store = self.get_store(self)
2697
config.ConfigHooks.install_named_hook('load', hook, None)
2698
self.assertLength(0, calls)
2700
self.assertLength(1, calls)
2701
self.assertEqual((store,), calls[0])
2703
def test_save_hook(self):
2708
config.ConfigHooks.install_named_hook('save', hook, None)
2709
self.assertLength(0, calls)
2710
store = self.get_store(self)
2711
# FIXME: There should be a better way than relying on the test
2712
# parametrization to identify branch.conf -- vila 2011-0526
2713
if self.store_id in ('branch', 'remote_branch'):
2714
# branch stores requires write locked branches
2715
self.addCleanup(store.branch.lock_write().unlock)
2716
section = store.get_mutable_section('baz')
2717
section.set('foo', 'bar')
2719
self.assertLength(1, calls)
2720
self.assertEqual((store,), calls[0])
2722
def test_set_mark_dirty(self):
2723
stack = config.MemoryStack(b'')
2724
self.assertLength(0, stack.store.dirty_sections)
2725
stack.set('foo', 'baz')
2726
self.assertLength(1, stack.store.dirty_sections)
2727
self.assertTrue(stack.store._need_saving())
2729
def test_remove_mark_dirty(self):
2730
stack = config.MemoryStack(b'foo=bar')
2731
self.assertLength(0, stack.store.dirty_sections)
2733
self.assertLength(1, stack.store.dirty_sections)
2734
self.assertTrue(stack.store._need_saving())
2737
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2738
"""Tests that config changes are kept in memory and saved on-demand."""
2741
super(TestStoreSaveChanges, self).setUp()
2742
self.transport = self.get_transport()
2743
# Most of the tests involve two stores pointing to the same persistent
2744
# storage to observe the effects of concurrent changes
2745
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2746
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2750
self.warnings.append(args[0] % args[1:])
2751
self.overrideAttr(trace, 'warning', warning)
2753
def has_store(self, store):
2754
store_basename = urlutils.relative_url(self.transport.external_url(),
2755
store.external_url())
2756
return self.transport.has(store_basename)
2758
def get_stack(self, store):
2759
# Any stack will do as long as it uses the right store, just a single
2760
# no-name section is enough
2761
return config.Stack([store.get_sections], store)
2763
def test_no_changes_no_save(self):
2764
s = self.get_stack(self.st1)
2765
s.store.save_changes()
2766
self.assertEqual(False, self.has_store(self.st1))
2768
def test_unrelated_concurrent_update(self):
2769
s1 = self.get_stack(self.st1)
2770
s2 = self.get_stack(self.st2)
2771
s1.set('foo', 'bar')
2772
s2.set('baz', 'quux')
2774
# Changes don't propagate magically
2775
self.assertEqual(None, s1.get('baz'))
2776
s2.store.save_changes()
2777
self.assertEqual('quux', s2.get('baz'))
2778
# Changes are acquired when saving
2779
self.assertEqual('bar', s2.get('foo'))
2780
# Since there is no overlap, no warnings are emitted
2781
self.assertLength(0, self.warnings)
2783
def test_concurrent_update_modified(self):
2784
s1 = self.get_stack(self.st1)
2785
s2 = self.get_stack(self.st2)
2786
s1.set('foo', 'bar')
2787
s2.set('foo', 'baz')
2790
s2.store.save_changes()
2791
self.assertEqual('baz', s2.get('foo'))
2792
# But the user get a warning
2793
self.assertLength(1, self.warnings)
2794
warning = self.warnings[0]
2795
self.assertStartsWith(warning, 'Option foo in section None')
2796
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2797
' The baz value will be saved.')
2799
def test_concurrent_deletion(self):
2800
self.st1._load_from_string(b'foo=bar')
2802
s1 = self.get_stack(self.st1)
2803
s2 = self.get_stack(self.st2)
2806
s1.store.save_changes()
2808
self.assertLength(0, self.warnings)
2809
s2.store.save_changes()
2811
self.assertLength(1, self.warnings)
2812
warning = self.warnings[0]
2813
self.assertStartsWith(warning, 'Option foo in section None')
2814
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2815
' The <DELETED> value will be saved.')
2818
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2820
def get_store(self):
2821
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2823
def test_get_quoted_string(self):
2824
store = self.get_store()
2825
store._load_from_string(b'foo= " abc "')
2826
stack = config.Stack([store.get_sections])
2827
self.assertEqual(' abc ', stack.get('foo'))
2829
def test_set_quoted_string(self):
2830
store = self.get_store()
2831
stack = config.Stack([store.get_sections], store)
2832
stack.set('foo', ' a b c ')
2834
self.assertFileEqual(b'foo = " a b c "' +
2835
os.linesep.encode('ascii'), 'foo.conf')
2838
class TestTransportIniFileStore(TestStore):
2840
def test_loading_unknown_file_fails(self):
2841
store = config.TransportIniFileStore(self.get_transport(),
2843
self.assertRaises(errors.NoSuchFile, store.load)
2845
def test_invalid_content(self):
2846
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2847
self.assertEqual(False, store.is_loaded())
2848
exc = self.assertRaises(
2849
config.ParseConfigError, store._load_from_string,
2850
b'this is invalid !')
2851
self.assertEndsWith(exc.filename, 'foo.conf')
2852
# And the load failed
2853
self.assertEqual(False, store.is_loaded())
2855
def test_get_embedded_sections(self):
2856
# A more complicated example (which also shows that section names and
2857
# option names share the same name space...)
2858
# FIXME: This should be fixed by forbidding dicts as values ?
2859
# -- vila 2011-04-05
2860
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2861
store._load_from_string(b'''
2865
foo_in_DEFAULT=foo_DEFAULT
2873
sections = list(store.get_sections())
2874
self.assertLength(4, sections)
2875
# The default section has no name.
2876
# List values are provided as strings and need to be explicitly
2877
# converted by specifying from_unicode=list_from_store at option
2879
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2881
self.assertSectionContent(
2882
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2883
self.assertSectionContent(
2884
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2885
# sub sections are provided as embedded dicts.
2886
self.assertSectionContent(
2887
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2891
class TestLockableIniFileStore(TestStore):
2893
def test_create_store_in_created_dir(self):
2894
self.assertPathDoesNotExist('dir')
2895
t = self.get_transport('dir/subdir')
2896
store = config.LockableIniFileStore(t, 'foo.conf')
2897
store.get_mutable_section(None).set('foo', 'bar')
2899
self.assertPathExists('dir/subdir')
2902
class TestConcurrentStoreUpdates(TestStore):
2903
"""Test that Stores properly handle conccurent updates.
2905
New Store implementation may fail some of these tests but until such
2906
implementations exist it's hard to properly filter them from the scenarios
2907
applied here. If you encounter such a case, contact the bzr devs.
2910
scenarios = [(key, {'get_stack': builder}) for key, builder
2911
in config.test_stack_builder_registry.iteritems()]
2914
super(TestConcurrentStoreUpdates, self).setUp()
2915
self.stack = self.get_stack(self)
2916
if not isinstance(self.stack, config._CompatibleStack):
2917
raise tests.TestNotApplicable(
2918
'%s is not meant to be compatible with the old config design'
2920
self.stack.set('one', '1')
2921
self.stack.set('two', '2')
2923
self.stack.store.save()
2925
def test_simple_read_access(self):
2926
self.assertEqual('1', self.stack.get('one'))
2928
def test_simple_write_access(self):
2929
self.stack.set('one', 'one')
2930
self.assertEqual('one', self.stack.get('one'))
2932
def test_listen_to_the_last_speaker(self):
2934
c2 = self.get_stack(self)
2935
c1.set('one', 'ONE')
2936
c2.set('two', 'TWO')
2937
self.assertEqual('ONE', c1.get('one'))
2938
self.assertEqual('TWO', c2.get('two'))
2939
# The second update respect the first one
2940
self.assertEqual('ONE', c2.get('one'))
2942
def test_last_speaker_wins(self):
2943
# If the same config is not shared, the same variable modified twice
2944
# can only see a single result.
2946
c2 = self.get_stack(self)
2949
self.assertEqual('c2', c2.get('one'))
2950
# The first modification is still available until another refresh
2952
self.assertEqual('c1', c1.get('one'))
2953
c1.set('two', 'done')
2954
self.assertEqual('c2', c1.get('one'))
2956
def test_writes_are_serialized(self):
2958
c2 = self.get_stack(self)
2960
# We spawn a thread that will pause *during* the config saving.
2961
before_writing = threading.Event()
2962
after_writing = threading.Event()
2963
writing_done = threading.Event()
2964
c1_save_without_locking_orig = c1.store.save_without_locking
2966
def c1_save_without_locking():
2967
before_writing.set()
2968
c1_save_without_locking_orig()
2969
# The lock is held. We wait for the main thread to decide when to
2971
after_writing.wait()
2972
c1.store.save_without_locking = c1_save_without_locking
2977
t1 = threading.Thread(target=c1_set)
2978
# Collect the thread after the test
2979
self.addCleanup(t1.join)
2980
# Be ready to unblock the thread if the test goes wrong
2981
self.addCleanup(after_writing.set)
2983
before_writing.wait()
2984
self.assertRaises(errors.LockContention,
2985
c2.set, 'one', 'c2')
2986
self.assertEqual('c1', c1.get('one'))
2987
# Let the lock be released
2991
self.assertEqual('c2', c2.get('one'))
2993
def test_read_while_writing(self):
2995
# We spawn a thread that will pause *during* the write
2996
ready_to_write = threading.Event()
2997
do_writing = threading.Event()
2998
writing_done = threading.Event()
2999
# We override the _save implementation so we know the store is locked
3000
c1_save_without_locking_orig = c1.store.save_without_locking
3002
def c1_save_without_locking():
3003
ready_to_write.set()
3004
# The lock is held. We wait for the main thread to decide when to
3007
c1_save_without_locking_orig()
3009
c1.store.save_without_locking = c1_save_without_locking
3013
t1 = threading.Thread(target=c1_set)
3014
# Collect the thread after the test
3015
self.addCleanup(t1.join)
3016
# Be ready to unblock the thread if the test goes wrong
3017
self.addCleanup(do_writing.set)
3019
# Ensure the thread is ready to write
3020
ready_to_write.wait()
3021
self.assertEqual('c1', c1.get('one'))
3022
# If we read during the write, we get the old value
3023
c2 = self.get_stack(self)
3024
self.assertEqual('1', c2.get('one'))
3025
# Let the writing occur and ensure it occurred
3028
# Now we get the updated value
3029
c3 = self.get_stack(self)
3030
self.assertEqual('c1', c3.get('one'))
3032
# FIXME: It may be worth looking into removing the lock dir when it's not
3033
# needed anymore and look at possible fallouts for concurrent lockers. This
3034
# will matter if/when we use config files outside of breezy directories
3035
# (.config/breezy or .bzr) -- vila 20110-04-111
3038
class TestSectionMatcher(TestStore):
3040
scenarios = [('location', {'matcher': config.LocationMatcher}),
3041
('id', {'matcher': config.NameMatcher}), ]
3044
super(TestSectionMatcher, self).setUp()
3045
# Any simple store is good enough
3046
self.get_store = config.test_store_builder_registry.get('configobj')
3048
def test_no_matches_for_empty_stores(self):
3049
store = self.get_store(self)
3050
store._load_from_string(b'')
3051
matcher = self.matcher(store, '/bar')
3052
self.assertEqual([], list(matcher.get_sections()))
3054
def test_build_doesnt_load_store(self):
3055
store = self.get_store(self)
3056
self.matcher(store, '/bar')
3057
self.assertFalse(store.is_loaded())
3060
class TestLocationSection(tests.TestCase):
3062
def get_section(self, options, extra_path):
3063
section = config.Section('foo', options)
3064
return config.LocationSection(section, extra_path)
3066
def test_simple_option(self):
3067
section = self.get_section({'foo': 'bar'}, '')
3068
self.assertEqual('bar', section.get('foo'))
3070
def test_option_with_extra_path(self):
3071
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3073
self.assertEqual('bar/baz', section.get('foo'))
3075
def test_invalid_policy(self):
3076
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3078
# invalid policies are ignored
3079
self.assertEqual('bar', section.get('foo'))
3082
class TestLocationMatcher(TestStore):
3085
super(TestLocationMatcher, self).setUp()
3086
# Any simple store is good enough
3087
self.get_store = config.test_store_builder_registry.get('configobj')
3089
def test_unrelated_section_excluded(self):
3090
store = self.get_store(self)
3091
store._load_from_string(b'''
3099
section=/foo/bar/baz
3103
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3105
[section.id for _, section in store.get_sections()])
3106
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3107
sections = [section for _, section in matcher.get_sections()]
3108
self.assertEqual(['/foo/bar', '/foo'],
3109
[section.id for section in sections])
3110
self.assertEqual(['quux', 'bar/quux'],
3111
[section.extra_path for section in sections])
3113
def test_more_specific_sections_first(self):
3114
store = self.get_store(self)
3115
store._load_from_string(b'''
3121
self.assertEqual(['/foo', '/foo/bar'],
3122
[section.id for _, section in store.get_sections()])
3123
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3124
sections = [section for _, section in matcher.get_sections()]
3125
self.assertEqual(['/foo/bar', '/foo'],
3126
[section.id for section in sections])
3127
self.assertEqual(['baz', 'bar/baz'],
3128
[section.extra_path for section in sections])
3130
def test_appendpath_in_no_name_section(self):
3131
# It's a bit weird to allow appendpath in a no-name section, but
3132
# someone may found a use for it
3133
store = self.get_store(self)
3134
store._load_from_string(b'''
3136
foo:policy = appendpath
3138
matcher = config.LocationMatcher(store, 'dir/subdir')
3139
sections = list(matcher.get_sections())
3140
self.assertLength(1, sections)
3141
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3143
def test_file_urls_are_normalized(self):
3144
store = self.get_store(self)
3145
if sys.platform == 'win32':
3146
expected_url = 'file:///C:/dir/subdir'
3147
expected_location = 'C:/dir/subdir'
3149
expected_url = 'file:///dir/subdir'
3150
expected_location = '/dir/subdir'
3151
matcher = config.LocationMatcher(store, expected_url)
3152
self.assertEqual(expected_location, matcher.location)
3154
def test_branch_name_colo(self):
3155
store = self.get_store(self)
3156
store._load_from_string(dedent("""\
3158
push_location=my{branchname}
3159
""").encode('ascii'))
3160
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3161
self.assertEqual('example<', matcher.branch_name)
3162
((_, section),) = matcher.get_sections()
3163
self.assertEqual('example<', section.locals['branchname'])
3165
def test_branch_name_basename(self):
3166
store = self.get_store(self)
3167
store._load_from_string(dedent("""\
3169
push_location=my{branchname}
3170
""").encode('ascii'))
3171
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3172
self.assertEqual('example<', matcher.branch_name)
3173
((_, section),) = matcher.get_sections()
3174
self.assertEqual('example<', section.locals['branchname'])
3177
class TestStartingPathMatcher(TestStore):
3180
super(TestStartingPathMatcher, self).setUp()
3181
# Any simple store is good enough
3182
self.store = config.IniFileStore()
3184
def assertSectionIDs(self, expected, location, content):
3185
self.store._load_from_string(content)
3186
matcher = config.StartingPathMatcher(self.store, location)
3187
sections = list(matcher.get_sections())
3188
self.assertLength(len(expected), sections)
3189
self.assertEqual(expected, [section.id for _, section in sections])
3192
def test_empty(self):
3193
self.assertSectionIDs([], self.get_url(), b'')
3195
def test_url_vs_local_paths(self):
3196
# The matcher location is an url and the section names are local paths
3197
self.assertSectionIDs(['/foo/bar', '/foo'],
3198
'file:///foo/bar/baz', b'''\
3203
def test_local_path_vs_url(self):
3204
# The matcher location is a local path and the section names are urls
3205
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3206
'/foo/bar/baz', b'''\
3211
def test_no_name_section_included_when_present(self):
3212
# Note that other tests will cover the case where the no-name section
3213
# is empty and as such, not included.
3214
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3215
'/foo/bar/baz', b'''\
3216
option = defined so the no-name section exists
3220
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3221
[s.locals['relpath'] for _, s in sections])
3223
def test_order_reversed(self):
3224
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3229
def test_unrelated_section_excluded(self):
3230
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3236
def test_glob_included(self):
3237
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3238
'/foo/bar/baz', b'''\
3244
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3245
# nothing really is... as far using {relpath} to append it to something
3246
# else, this seems good enough though.
3247
self.assertEqual(['', 'baz', 'bar/baz'],
3248
[s.locals['relpath'] for _, s in sections])
3250
def test_respect_order(self):
3251
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3252
'/foo/bar/baz', b'''\
3260
class TestNameMatcher(TestStore):
3263
super(TestNameMatcher, self).setUp()
3264
self.matcher = config.NameMatcher
3265
# Any simple store is good enough
3266
self.get_store = config.test_store_builder_registry.get('configobj')
3268
def get_matching_sections(self, name):
3269
store = self.get_store(self)
3270
store._load_from_string(b'''
3278
matcher = self.matcher(store, name)
3279
return list(matcher.get_sections())
3281
def test_matching(self):
3282
sections = self.get_matching_sections('foo')
3283
self.assertLength(1, sections)
3284
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3286
def test_not_matching(self):
3287
sections = self.get_matching_sections('baz')
3288
self.assertLength(0, sections)
3291
class TestBaseStackGet(tests.TestCase):
3294
super(TestBaseStackGet, self).setUp()
3295
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3297
def test_get_first_definition(self):
3298
store1 = config.IniFileStore()
3299
store1._load_from_string(b'foo=bar')
3300
store2 = config.IniFileStore()
3301
store2._load_from_string(b'foo=baz')
3302
conf = config.Stack([store1.get_sections, store2.get_sections])
3303
self.assertEqual('bar', conf.get('foo'))
3305
def test_get_with_registered_default_value(self):
3306
config.option_registry.register(config.Option('foo', default='bar'))
3307
conf_stack = config.Stack([])
3308
self.assertEqual('bar', conf_stack.get('foo'))
3310
def test_get_without_registered_default_value(self):
3311
config.option_registry.register(config.Option('foo'))
3312
conf_stack = config.Stack([])
3313
self.assertEqual(None, conf_stack.get('foo'))
3315
def test_get_without_default_value_for_not_registered(self):
3316
conf_stack = config.Stack([])
3317
self.assertEqual(None, conf_stack.get('foo'))
3319
def test_get_for_empty_section_callable(self):
3320
conf_stack = config.Stack([lambda: []])
3321
self.assertEqual(None, conf_stack.get('foo'))
3323
def test_get_for_broken_callable(self):
3324
# Trying to use and invalid callable raises an exception on first use
3325
conf_stack = config.Stack([object])
3326
self.assertRaises(TypeError, conf_stack.get, 'foo')
3329
class TestStackWithSimpleStore(tests.TestCase):
3332
super(TestStackWithSimpleStore, self).setUp()
3333
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3334
self.registry = config.option_registry
3336
def get_conf(self, content=None):
3337
return config.MemoryStack(content)
3339
def test_override_value_from_env(self):
3340
self.overrideEnv('FOO', None)
3341
self.registry.register(
3342
config.Option('foo', default='bar', override_from_env=['FOO']))
3343
self.overrideEnv('FOO', 'quux')
3344
# Env variable provides a default taking over the option one
3345
conf = self.get_conf(b'foo=store')
3346
self.assertEqual('quux', conf.get('foo'))
3348
def test_first_override_value_from_env_wins(self):
3349
self.overrideEnv('NO_VALUE', None)
3350
self.overrideEnv('FOO', None)
3351
self.overrideEnv('BAZ', None)
3352
self.registry.register(
3353
config.Option('foo', default='bar',
3354
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3355
self.overrideEnv('FOO', 'foo')
3356
self.overrideEnv('BAZ', 'baz')
3357
# The first env var set wins
3358
conf = self.get_conf(b'foo=store')
3359
self.assertEqual('foo', conf.get('foo'))
3362
class TestMemoryStack(tests.TestCase):
3365
conf = config.MemoryStack(b'foo=bar')
3366
self.assertEqual('bar', conf.get('foo'))
3369
conf = config.MemoryStack(b'foo=bar')
3370
conf.set('foo', 'baz')
3371
self.assertEqual('baz', conf.get('foo'))
3373
def test_no_content(self):
3374
conf = config.MemoryStack()
3375
# No content means no loading
3376
self.assertFalse(conf.store.is_loaded())
3377
self.assertRaises(NotImplementedError, conf.get, 'foo')
3378
# But a content can still be provided
3379
conf.store._load_from_string(b'foo=bar')
3380
self.assertEqual('bar', conf.get('foo'))
3383
class TestStackIterSections(tests.TestCase):
3385
def test_empty_stack(self):
3386
conf = config.Stack([])
3387
sections = list(conf.iter_sections())
3388
self.assertLength(0, sections)
3390
def test_empty_store(self):
3391
store = config.IniFileStore()
3392
store._load_from_string(b'')
3393
conf = config.Stack([store.get_sections])
3394
sections = list(conf.iter_sections())
3395
self.assertLength(0, sections)
3397
def test_simple_store(self):
3398
store = config.IniFileStore()
3399
store._load_from_string(b'foo=bar')
3400
conf = config.Stack([store.get_sections])
3401
tuples = list(conf.iter_sections())
3402
self.assertLength(1, tuples)
3403
(found_store, found_section) = tuples[0]
3404
self.assertIs(store, found_store)
3406
def test_two_stores(self):
3407
store1 = config.IniFileStore()
3408
store1._load_from_string(b'foo=bar')
3409
store2 = config.IniFileStore()
3410
store2._load_from_string(b'bar=qux')
3411
conf = config.Stack([store1.get_sections, store2.get_sections])
3412
tuples = list(conf.iter_sections())
3413
self.assertLength(2, tuples)
3414
self.assertIs(store1, tuples[0][0])
3415
self.assertIs(store2, tuples[1][0])
3418
class TestStackWithTransport(tests.TestCaseWithTransport):
3420
scenarios = [(key, {'get_stack': builder}) for key, builder
3421
in config.test_stack_builder_registry.iteritems()]
3424
class TestConcreteStacks(TestStackWithTransport):
3426
def test_build_stack(self):
3427
# Just a smoke test to help debug builders
3428
self.get_stack(self)
3431
class TestStackGet(TestStackWithTransport):
3434
super(TestStackGet, self).setUp()
3435
self.conf = self.get_stack(self)
3437
def test_get_for_empty_stack(self):
3438
self.assertEqual(None, self.conf.get('foo'))
3440
def test_get_hook(self):
3441
self.conf.set('foo', 'bar')
3446
config.ConfigHooks.install_named_hook('get', hook, None)
3447
self.assertLength(0, calls)
3448
value = self.conf.get('foo')
3449
self.assertEqual('bar', value)
3450
self.assertLength(1, calls)
3451
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3454
class TestStackGetWithConverter(tests.TestCase):
3457
super(TestStackGetWithConverter, self).setUp()
3458
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3459
self.registry = config.option_registry
3461
def get_conf(self, content=None):
3462
return config.MemoryStack(content)
3464
def register_bool_option(self, name, default=None, default_from_env=None):
3465
b = config.Option(name, help='A boolean.',
3466
default=default, default_from_env=default_from_env,
3467
from_unicode=config.bool_from_store)
3468
self.registry.register(b)
3470
def test_get_default_bool_None(self):
3471
self.register_bool_option('foo')
3472
conf = self.get_conf(b'')
3473
self.assertEqual(None, conf.get('foo'))
3475
def test_get_default_bool_True(self):
3476
self.register_bool_option('foo', u'True')
3477
conf = self.get_conf(b'')
3478
self.assertEqual(True, conf.get('foo'))
3480
def test_get_default_bool_False(self):
3481
self.register_bool_option('foo', False)
3482
conf = self.get_conf(b'')
3483
self.assertEqual(False, conf.get('foo'))
3485
def test_get_default_bool_False_as_string(self):
3486
self.register_bool_option('foo', u'False')
3487
conf = self.get_conf(b'')
3488
self.assertEqual(False, conf.get('foo'))
3490
def test_get_default_bool_from_env_converted(self):
3491
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3492
self.overrideEnv('FOO', 'False')
3493
conf = self.get_conf(b'')
3494
self.assertEqual(False, conf.get('foo'))
3496
def test_get_default_bool_when_conversion_fails(self):
3497
self.register_bool_option('foo', default='True')
3498
conf = self.get_conf(b'foo=invalid boolean')
3499
self.assertEqual(True, conf.get('foo'))
3501
def register_integer_option(self, name,
3502
default=None, default_from_env=None):
3503
i = config.Option(name, help='An integer.',
3504
default=default, default_from_env=default_from_env,
3505
from_unicode=config.int_from_store)
3506
self.registry.register(i)
3508
def test_get_default_integer_None(self):
3509
self.register_integer_option('foo')
3510
conf = self.get_conf(b'')
3511
self.assertEqual(None, conf.get('foo'))
3513
def test_get_default_integer(self):
3514
self.register_integer_option('foo', 42)
3515
conf = self.get_conf(b'')
3516
self.assertEqual(42, conf.get('foo'))
3518
def test_get_default_integer_as_string(self):
3519
self.register_integer_option('foo', u'42')
3520
conf = self.get_conf(b'')
3521
self.assertEqual(42, conf.get('foo'))
3523
def test_get_default_integer_from_env(self):
3524
self.register_integer_option('foo', default_from_env=['FOO'])
3525
self.overrideEnv('FOO', '18')
3526
conf = self.get_conf(b'')
3527
self.assertEqual(18, conf.get('foo'))
3529
def test_get_default_integer_when_conversion_fails(self):
3530
self.register_integer_option('foo', default='12')
3531
conf = self.get_conf(b'foo=invalid integer')
3532
self.assertEqual(12, conf.get('foo'))
3534
def register_list_option(self, name, default=None, default_from_env=None):
3535
l = config.ListOption(name, help='A list.', default=default,
3536
default_from_env=default_from_env)
3537
self.registry.register(l)
3539
def test_get_default_list_None(self):
3540
self.register_list_option('foo')
3541
conf = self.get_conf(b'')
3542
self.assertEqual(None, conf.get('foo'))
3544
def test_get_default_list_empty(self):
3545
self.register_list_option('foo', '')
3546
conf = self.get_conf(b'')
3547
self.assertEqual([], conf.get('foo'))
3549
def test_get_default_list_from_env(self):
3550
self.register_list_option('foo', default_from_env=['FOO'])
3551
self.overrideEnv('FOO', '')
3552
conf = self.get_conf(b'')
3553
self.assertEqual([], conf.get('foo'))
3555
def test_get_with_list_converter_no_item(self):
3556
self.register_list_option('foo', None)
3557
conf = self.get_conf(b'foo=,')
3558
self.assertEqual([], conf.get('foo'))
3560
def test_get_with_list_converter_many_items(self):
3561
self.register_list_option('foo', None)
3562
conf = self.get_conf(b'foo=m,o,r,e')
3563
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3565
def test_get_with_list_converter_embedded_spaces_many_items(self):
3566
self.register_list_option('foo', None)
3567
conf = self.get_conf(b'foo=" bar", "baz "')
3568
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3570
def test_get_with_list_converter_stripped_spaces_many_items(self):
3571
self.register_list_option('foo', None)
3572
conf = self.get_conf(b'foo= bar , baz ')
3573
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3576
class TestIterOptionRefs(tests.TestCase):
3577
"""iter_option_refs is a bit unusual, document some cases."""
3579
def assertRefs(self, expected, string):
3580
self.assertEqual(expected, list(config.iter_option_refs(string)))
3582
def test_empty(self):
3583
self.assertRefs([(False, '')], '')
3585
def test_no_refs(self):
3586
self.assertRefs([(False, 'foo bar')], 'foo bar')
3588
def test_single_ref(self):
3589
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3591
def test_broken_ref(self):
3592
self.assertRefs([(False, '{foo')], '{foo')
3594
def test_embedded_ref(self):
3595
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3598
def test_two_refs(self):
3599
self.assertRefs([(False, ''), (True, '{foo}'),
3600
(False, ''), (True, '{bar}'),
3604
def test_newline_in_refs_are_not_matched(self):
3605
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3608
class TestStackExpandOptions(tests.TestCaseWithTransport):
3611
super(TestStackExpandOptions, self).setUp()
3612
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3613
self.registry = config.option_registry
3614
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3615
self.conf = config.Stack([store.get_sections], store)
3617
def assertExpansion(self, expected, string, env=None):
3618
self.assertEqual(expected, self.conf.expand_options(string, env))
3620
def test_no_expansion(self):
3621
self.assertExpansion('foo', 'foo')
3623
def test_expand_default_value(self):
3624
self.conf.store._load_from_string(b'bar=baz')
3625
self.registry.register(config.Option('foo', default=u'{bar}'))
3626
self.assertEqual('baz', self.conf.get('foo', expand=True))
3628
def test_expand_default_from_env(self):
3629
self.conf.store._load_from_string(b'bar=baz')
3630
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3631
self.overrideEnv('FOO', '{bar}')
3632
self.assertEqual('baz', self.conf.get('foo', expand=True))
3634
def test_expand_default_on_failed_conversion(self):
3635
self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
3636
self.registry.register(
3637
config.Option('foo', default=u'{bar}',
3638
from_unicode=config.int_from_store))
3639
self.assertEqual(42, self.conf.get('foo', expand=True))
3641
def test_env_adding_options(self):
3642
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3644
def test_env_overriding_options(self):
3645
self.conf.store._load_from_string(b'foo=baz')
3646
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3648
def test_simple_ref(self):
3649
self.conf.store._load_from_string(b'foo=xxx')
3650
self.assertExpansion('xxx', '{foo}')
3652
def test_unknown_ref(self):
3653
self.assertRaises(config.ExpandingUnknownOption,
3654
self.conf.expand_options, '{foo}')
3656
def test_illegal_def_is_ignored(self):
3657
self.assertExpansion('{1,2}', '{1,2}')
3658
self.assertExpansion('{ }', '{ }')
3659
self.assertExpansion('${Foo,f}', '${Foo,f}')
3661
def test_indirect_ref(self):
3662
self.conf.store._load_from_string(b'''
3666
self.assertExpansion('xxx', '{bar}')
3668
def test_embedded_ref(self):
3669
self.conf.store._load_from_string(b'''
3673
self.assertExpansion('xxx', '{{bar}}')
3675
def test_simple_loop(self):
3676
self.conf.store._load_from_string(b'foo={foo}')
3677
self.assertRaises(config.OptionExpansionLoop,
3678
self.conf.expand_options, '{foo}')
3680
def test_indirect_loop(self):
3681
self.conf.store._load_from_string(b'''
3685
e = self.assertRaises(config.OptionExpansionLoop,
3686
self.conf.expand_options, '{foo}')
3687
self.assertEqual('foo->bar->baz', e.refs)
3688
self.assertEqual('{foo}', e.string)
3690
def test_list(self):
3691
self.conf.store._load_from_string(b'''
3695
list={foo},{bar},{baz}
3697
self.registry.register(
3698
config.ListOption('list'))
3699
self.assertEqual(['start', 'middle', 'end'],
3700
self.conf.get('list', expand=True))
3702
def test_cascading_list(self):
3703
self.conf.store._load_from_string(b'''
3709
self.registry.register(config.ListOption('list'))
3710
# Register an intermediate option as a list to ensure no conversion
3711
# happen while expanding. Conversion should only occur for the original
3712
# option ('list' here).
3713
self.registry.register(config.ListOption('baz'))
3714
self.assertEqual(['start', 'middle', 'end'],
3715
self.conf.get('list', expand=True))
3717
def test_pathologically_hidden_list(self):
3718
self.conf.store._load_from_string(b'''
3724
hidden={start}{middle}{end}
3726
# What matters is what the registration says, the conversion happens
3727
# only after all expansions have been performed
3728
self.registry.register(config.ListOption('hidden'))
3729
self.assertEqual(['bin', 'go'],
3730
self.conf.get('hidden', expand=True))
3733
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3736
super(TestStackCrossSectionsExpand, self).setUp()
3738
def get_config(self, location, string):
3741
# Since we don't save the config we won't strictly require to inherit
3742
# from TestCaseInTempDir, but an error occurs so quickly...
3743
c = config.LocationStack(location)
3744
c.store._load_from_string(string)
3747
def test_dont_cross_unrelated_section(self):
3748
c = self.get_config('/another/branch/path', b'''
3753
[/another/branch/path]
3756
self.assertRaises(config.ExpandingUnknownOption,
3757
c.get, 'bar', expand=True)
3759
def test_cross_related_sections(self):
3760
c = self.get_config('/project/branch/path', b'''
3764
[/project/branch/path]
3767
self.assertEqual('quux', c.get('bar', expand=True))
3770
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3772
def test_cross_global_locations(self):
3773
l_store = config.LocationStore()
3774
l_store._load_from_string(b'''
3780
g_store = config.GlobalStore()
3781
g_store._load_from_string(b'''
3787
stack = config.LocationStack('/branch')
3788
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3789
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3792
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3794
def test_expand_locals_empty(self):
3795
l_store = config.LocationStore()
3796
l_store._load_from_string(b'''
3797
[/home/user/project]
3802
stack = config.LocationStack('/home/user/project/')
3803
self.assertEqual('', stack.get('base', expand=True))
3804
self.assertEqual('', stack.get('rel', expand=True))
3806
def test_expand_basename_locally(self):
3807
l_store = config.LocationStore()
3808
l_store._load_from_string(b'''
3809
[/home/user/project]
3813
stack = config.LocationStack('/home/user/project/branch')
3814
self.assertEqual('branch', stack.get('bfoo', expand=True))
3816
def test_expand_basename_locally_longer_path(self):
3817
l_store = config.LocationStore()
3818
l_store._load_from_string(b'''
3823
stack = config.LocationStack('/home/user/project/dir/branch')
3824
self.assertEqual('branch', stack.get('bfoo', expand=True))
3826
def test_expand_relpath_locally(self):
3827
l_store = config.LocationStore()
3828
l_store._load_from_string(b'''
3829
[/home/user/project]
3830
lfoo = loc-foo/{relpath}
3833
stack = config.LocationStack('/home/user/project/branch')
3834
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3836
def test_expand_relpath_unknonw_in_global(self):
3837
g_store = config.GlobalStore()
3838
g_store._load_from_string(b'''
3843
stack = config.LocationStack('/home/user/project/branch')
3844
self.assertRaises(config.ExpandingUnknownOption,
3845
stack.get, 'gfoo', expand=True)
3847
def test_expand_local_option_locally(self):
3848
l_store = config.LocationStore()
3849
l_store._load_from_string(b'''
3850
[/home/user/project]
3851
lfoo = loc-foo/{relpath}
3855
g_store = config.GlobalStore()
3856
g_store._load_from_string(b'''
3862
stack = config.LocationStack('/home/user/project/branch')
3863
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3864
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3866
def test_locals_dont_leak(self):
3867
"""Make sure we chose the right local in presence of several sections.
3869
l_store = config.LocationStore()
3870
l_store._load_from_string(b'''
3872
lfoo = loc-foo/{relpath}
3873
[/home/user/project]
3874
lfoo = loc-foo/{relpath}
3877
stack = config.LocationStack('/home/user/project/branch')
3878
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3879
stack = config.LocationStack('/home/user/bar/baz')
3880
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3883
class TestStackSet(TestStackWithTransport):
3885
def test_simple_set(self):
3886
conf = self.get_stack(self)
3887
self.assertEqual(None, conf.get('foo'))
3888
conf.set('foo', 'baz')
3889
# Did we get it back ?
3890
self.assertEqual('baz', conf.get('foo'))
3892
def test_set_creates_a_new_section(self):
3893
conf = self.get_stack(self)
3894
conf.set('foo', 'baz')
3895
self.assertEqual, 'baz', conf.get('foo')
3897
def test_set_hook(self):
3902
config.ConfigHooks.install_named_hook('set', hook, None)
3903
self.assertLength(0, calls)
3904
conf = self.get_stack(self)
3905
conf.set('foo', 'bar')
3906
self.assertLength(1, calls)
3907
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3910
class TestStackRemove(TestStackWithTransport):
3912
def test_remove_existing(self):
3913
conf = self.get_stack(self)
3914
conf.set('foo', 'bar')
3915
self.assertEqual('bar', conf.get('foo'))
3917
# Did we get it back ?
3918
self.assertEqual(None, conf.get('foo'))
3920
def test_remove_unknown(self):
3921
conf = self.get_stack(self)
3922
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3924
def test_remove_hook(self):
3929
config.ConfigHooks.install_named_hook('remove', hook, None)
3930
self.assertLength(0, calls)
3931
conf = self.get_stack(self)
3932
conf.set('foo', 'bar')
3934
self.assertLength(1, calls)
3935
self.assertEqual((conf, 'foo'), calls[0])
3938
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3941
super(TestConfigGetOptions, self).setUp()
3942
create_configs(self)
3944
def test_no_variable(self):
3945
# Using branch should query branch, locations and breezy
3946
self.assertOptions([], self.branch_config)
3948
def test_option_in_breezy(self):
3949
self.breezy_config.set_user_option('file', 'breezy')
3950
self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
3953
def test_option_in_locations(self):
3954
self.locations_config.set_user_option('file', 'locations')
3956
[('file', 'locations', self.tree.basedir, 'locations')],
3957
self.locations_config)
3959
def test_option_in_branch(self):
3960
self.branch_config.set_user_option('file', 'branch')
3961
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
3964
def test_option_in_breezy_and_branch(self):
3965
self.breezy_config.set_user_option('file', 'breezy')
3966
self.branch_config.set_user_option('file', 'branch')
3967
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
3968
('file', 'breezy', 'DEFAULT', 'breezy'), ],
3971
def test_option_in_branch_and_locations(self):
3972
# Hmm, locations override branch :-/
3973
self.locations_config.set_user_option('file', 'locations')
3974
self.branch_config.set_user_option('file', 'branch')
3976
[('file', 'locations', self.tree.basedir, 'locations'),
3977
('file', 'branch', 'DEFAULT', 'branch'), ],
3980
def test_option_in_breezy_locations_and_branch(self):
3981
self.breezy_config.set_user_option('file', 'breezy')
3982
self.locations_config.set_user_option('file', 'locations')
3983
self.branch_config.set_user_option('file', 'branch')
3985
[('file', 'locations', self.tree.basedir, 'locations'),
3986
('file', 'branch', 'DEFAULT', 'branch'),
3987
('file', 'breezy', 'DEFAULT', 'breezy'), ],
3991
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
3994
super(TestConfigRemoveOption, self).setUp()
3995
create_configs_with_file_option(self)
3997
def test_remove_in_locations(self):
3998
self.locations_config.remove_user_option('file', self.tree.basedir)
4000
[('file', 'branch', 'DEFAULT', 'branch'),
4001
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4004
def test_remove_in_branch(self):
4005
self.branch_config.remove_user_option('file')
4007
[('file', 'locations', self.tree.basedir, 'locations'),
4008
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4011
def test_remove_in_breezy(self):
4012
self.breezy_config.remove_user_option('file')
4014
[('file', 'locations', self.tree.basedir, 'locations'),
4015
('file', 'branch', 'DEFAULT', 'branch'), ],
4019
class TestConfigGetSections(tests.TestCaseWithTransport):
4022
super(TestConfigGetSections, self).setUp()
4023
create_configs(self)
4025
def assertSectionNames(self, expected, conf, name=None):
4026
"""Check which sections are returned for a given config.
4028
If fallback configurations exist their sections can be included.
4030
:param expected: A list of section names.
4032
:param conf: The configuration that will be queried.
4034
:param name: An optional section name that will be passed to
4037
sections = list(conf._get_sections(name))
4038
self.assertLength(len(expected), sections)
4039
self.assertEqual(expected, [n for n, _, _ in sections])
4041
def test_breezy_default_section(self):
4042
self.assertSectionNames(['DEFAULT'], self.breezy_config)
4044
def test_locations_default_section(self):
4045
# No sections are defined in an empty file
4046
self.assertSectionNames([], self.locations_config)
4048
def test_locations_named_section(self):
4049
self.locations_config.set_user_option('file', 'locations')
4050
self.assertSectionNames([self.tree.basedir], self.locations_config)
4052
def test_locations_matching_sections(self):
4053
loc_config = self.locations_config
4054
loc_config.set_user_option('file', 'locations')
4055
# We need to cheat a bit here to create an option in sections above and
4056
# below the 'location' one.
4057
parser = loc_config._get_parser()
4058
# locations.cong deals with '/' ignoring native os.sep
4059
location_names = self.tree.basedir.split('/')
4060
parent = '/'.join(location_names[:-1])
4061
child = '/'.join(location_names + ['child'])
4063
parser[parent]['file'] = 'parent'
4065
parser[child]['file'] = 'child'
4066
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4068
def test_branch_data_default_section(self):
4069
self.assertSectionNames([None],
4070
self.branch_config._get_branch_data_config())
4072
def test_branch_default_sections(self):
4073
# No sections are defined in an empty locations file
4074
self.assertSectionNames([None, 'DEFAULT'],
4076
# Unless we define an option
4077
self.branch_config._get_location_config().set_user_option(
4078
'file', 'locations')
4079
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4082
def test_breezy_named_section(self):
4083
# We need to cheat as the API doesn't give direct access to sections
4084
# other than DEFAULT.
4085
self.breezy_config.set_alias('breezy', 'bzr')
4086
self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
4089
class TestSharedStores(tests.TestCaseInTempDir):
4091
def test_breezy_conf_shared(self):
4092
g1 = config.GlobalStack()
4093
g2 = config.GlobalStack()
4094
# The two stacks share the same store
4095
self.assertIs(g1.store, g2.store)
4098
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4099
"""Test warning for permissions of authentication.conf."""
4102
super(TestAuthenticationConfigFilePermissions, self).setUp()
4103
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4104
with open(self.path, 'wb') as f:
4105
f.write(b"""[broken]
4108
port=port # Error: Not an int
4110
self.overrideAttr(bedding, 'authentication_config_path',
4112
osutils.chmod_if_possible(self.path, 0o755)
4114
def test_check_warning(self):
4115
conf = config.AuthenticationConfig()
4116
self.assertEqual(conf._filename, self.path)
4117
self.assertContainsRe(self.get_log(),
4118
'Saved passwords may be accessible by other users.')
4120
def test_check_suppressed_warning(self):
4121
global_config = config.GlobalConfig()
4122
global_config.set_user_option('suppress_warnings',
4123
'insecure_permissions')
4124
conf = config.AuthenticationConfig()
4125
self.assertEqual(conf._filename, self.path)
4126
self.assertNotContainsRe(self.get_log(),
4127
'Saved passwords may be accessible by other users.')
1315
4130
class TestAuthenticationConfigFile(tests.TestCase):
1316
4131
"""Test the authentication.conf file matching"""