1312
2082
self.assertIs(None, bzrdir_config.get_default_stack_on())
2085
class TestOldConfigHooks(tests.TestCaseWithTransport):
2088
super(TestOldConfigHooks, self).setUp()
2089
create_configs_with_file_option(self)
2091
def assertGetHook(self, conf, name, value):
2095
config.OldConfigHooks.install_named_hook('get', hook, None)
2097
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2098
self.assertLength(0, calls)
2099
actual_value = conf.get_user_option(name)
2100
self.assertEquals(value, actual_value)
2101
self.assertLength(1, calls)
2102
self.assertEquals((conf, name, value), calls[0])
2104
def test_get_hook_bazaar(self):
2105
self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
2107
def test_get_hook_locations(self):
2108
self.assertGetHook(self.locations_config, 'file', 'locations')
2110
def test_get_hook_branch(self):
2111
# Since locations masks branch, we define a different option
2112
self.branch_config.set_user_option('file2', 'branch')
2113
self.assertGetHook(self.branch_config, 'file2', 'branch')
2115
def assertSetHook(self, conf, name, value):
2119
config.OldConfigHooks.install_named_hook('set', hook, None)
2121
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2122
self.assertLength(0, calls)
2123
conf.set_user_option(name, value)
2124
self.assertLength(1, calls)
2125
# We can't assert the conf object below as different configs use
2126
# different means to implement set_user_option and we care only about
2128
self.assertEquals((name, value), calls[0][1:])
2130
def test_set_hook_bazaar(self):
2131
self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
2133
def test_set_hook_locations(self):
2134
self.assertSetHook(self.locations_config, 'foo', 'locations')
2136
def test_set_hook_branch(self):
2137
self.assertSetHook(self.branch_config, 'foo', 'branch')
2139
def assertRemoveHook(self, conf, name, section_name=None):
2143
config.OldConfigHooks.install_named_hook('remove', hook, None)
2145
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
2146
self.assertLength(0, calls)
2147
conf.remove_user_option(name, section_name)
2148
self.assertLength(1, calls)
2149
# We can't assert the conf object below as different configs use
2150
# different means to implement remove_user_option and we care only about
2152
self.assertEquals((name,), calls[0][1:])
2154
def test_remove_hook_bazaar(self):
2155
self.assertRemoveHook(self.bazaar_config, 'file')
2157
def test_remove_hook_locations(self):
2158
self.assertRemoveHook(self.locations_config, 'file',
2159
self.locations_config.location)
2161
def test_remove_hook_branch(self):
2162
self.assertRemoveHook(self.branch_config, 'file')
2164
def assertLoadHook(self, name, conf_class, *conf_args):
2168
config.OldConfigHooks.install_named_hook('load', hook, None)
2170
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2171
self.assertLength(0, calls)
2173
conf = conf_class(*conf_args)
2174
# Access an option to trigger a load
2175
conf.get_user_option(name)
2176
self.assertLength(1, calls)
2177
# Since we can't assert about conf, we just use the number of calls ;-/
2179
def test_load_hook_bazaar(self):
2180
self.assertLoadHook('file', config.GlobalConfig)
2182
def test_load_hook_locations(self):
2183
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
2185
def test_load_hook_branch(self):
2186
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
2188
def assertSaveHook(self, conf):
2192
config.OldConfigHooks.install_named_hook('save', hook, None)
2194
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2195
self.assertLength(0, calls)
2196
# Setting an option triggers a save
2197
conf.set_user_option('foo', 'bar')
2198
self.assertLength(1, calls)
2199
# Since we can't assert about conf, we just use the number of calls ;-/
2201
def test_save_hook_bazaar(self):
2202
self.assertSaveHook(self.bazaar_config)
2204
def test_save_hook_locations(self):
2205
self.assertSaveHook(self.locations_config)
2207
def test_save_hook_branch(self):
2208
self.assertSaveHook(self.branch_config)
2211
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
2212
"""Tests config hooks for remote configs.
2214
No tests for the remove hook as this is not implemented there.
2218
super(TestOldConfigHooksForRemote, self).setUp()
2219
self.transport_server = test_server.SmartTCPServer_for_testing
2220
create_configs_with_file_option(self)
2222
def assertGetHook(self, conf, name, value):
2226
config.OldConfigHooks.install_named_hook('get', hook, None)
2228
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2229
self.assertLength(0, calls)
2230
actual_value = conf.get_option(name)
2231
self.assertEquals(value, actual_value)
2232
self.assertLength(1, calls)
2233
self.assertEquals((conf, name, value), calls[0])
2235
def test_get_hook_remote_branch(self):
2236
remote_branch = branch.Branch.open(self.get_url('tree'))
2237
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
2239
def test_get_hook_remote_bzrdir(self):
2240
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2241
conf = remote_bzrdir._get_config()
2242
conf.set_option('remotedir', 'file')
2243
self.assertGetHook(conf, 'file', 'remotedir')
2245
def assertSetHook(self, conf, name, value):
2249
config.OldConfigHooks.install_named_hook('set', hook, None)
2251
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2252
self.assertLength(0, calls)
2253
conf.set_option(value, name)
2254
self.assertLength(1, calls)
2255
# We can't assert the conf object below as different configs use
2256
# different means to implement set_user_option and we care only about
2258
self.assertEquals((name, value), calls[0][1:])
2260
def test_set_hook_remote_branch(self):
2261
remote_branch = branch.Branch.open(self.get_url('tree'))
2262
self.addCleanup(remote_branch.lock_write().unlock)
2263
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
2265
def test_set_hook_remote_bzrdir(self):
2266
remote_branch = branch.Branch.open(self.get_url('tree'))
2267
self.addCleanup(remote_branch.lock_write().unlock)
2268
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2269
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
2271
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
2275
config.OldConfigHooks.install_named_hook('load', hook, None)
2277
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2278
self.assertLength(0, calls)
2280
conf = conf_class(*conf_args)
2281
# Access an option to trigger a load
2282
conf.get_option(name)
2283
self.assertLength(expected_nb_calls, calls)
2284
# Since we can't assert about conf, we just use the number of calls ;-/
2286
def test_load_hook_remote_branch(self):
2287
remote_branch = branch.Branch.open(self.get_url('tree'))
2288
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
2290
def test_load_hook_remote_bzrdir(self):
2291
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2292
# The config file doesn't exist, set an option to force its creation
2293
conf = remote_bzrdir._get_config()
2294
conf.set_option('remotedir', 'file')
2295
# We get one call for the server and one call for the client, this is
2296
# caused by the differences in implementations betwen
2297
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
2298
# SmartServerBranchGetConfigFile (in smart/branch.py)
2299
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
2301
def assertSaveHook(self, conf):
2305
config.OldConfigHooks.install_named_hook('save', hook, None)
2307
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2308
self.assertLength(0, calls)
2309
# Setting an option triggers a save
2310
conf.set_option('foo', 'bar')
2311
self.assertLength(1, calls)
2312
# Since we can't assert about conf, we just use the number of calls ;-/
2314
def test_save_hook_remote_branch(self):
2315
remote_branch = branch.Branch.open(self.get_url('tree'))
2316
self.addCleanup(remote_branch.lock_write().unlock)
2317
self.assertSaveHook(remote_branch._get_config())
2319
def test_save_hook_remote_bzrdir(self):
2320
remote_branch = branch.Branch.open(self.get_url('tree'))
2321
self.addCleanup(remote_branch.lock_write().unlock)
2322
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2323
self.assertSaveHook(remote_bzrdir._get_config())
2326
class TestOption(tests.TestCase):
2328
def test_default_value(self):
2329
opt = config.Option('foo', default='bar')
2330
self.assertEquals('bar', opt.get_default())
2332
def test_callable_default_value(self):
2333
def bar_as_unicode():
2335
opt = config.Option('foo', default=bar_as_unicode)
2336
self.assertEquals('bar', opt.get_default())
2338
def test_default_value_from_env(self):
2339
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
2340
self.overrideEnv('FOO', 'quux')
2341
# Env variable provides a default taking over the option one
2342
self.assertEquals('quux', opt.get_default())
2344
def test_first_default_value_from_env_wins(self):
2345
opt = config.Option('foo', default='bar',
2346
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
2347
self.overrideEnv('FOO', 'foo')
2348
self.overrideEnv('BAZ', 'baz')
2349
# The first env var set wins
2350
self.assertEquals('foo', opt.get_default())
2352
def test_not_supported_list_default_value(self):
2353
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
2355
def test_not_supported_object_default_value(self):
2356
self.assertRaises(AssertionError, config.Option, 'foo',
2359
def test_not_supported_callable_default_value_not_unicode(self):
2360
def bar_not_unicode():
2362
opt = config.Option('foo', default=bar_not_unicode)
2363
self.assertRaises(AssertionError, opt.get_default)
2366
class TestOptionConverterMixin(object):
2368
def assertConverted(self, expected, opt, value):
2369
self.assertEquals(expected, opt.convert_from_unicode(None, value))
2371
def assertWarns(self, opt, value):
2374
warnings.append(args[0] % args[1:])
2375
self.overrideAttr(trace, 'warning', warning)
2376
self.assertEquals(None, opt.convert_from_unicode(None, value))
2377
self.assertLength(1, warnings)
2379
'Value "%s" is not valid for "%s"' % (value, opt.name),
2382
def assertErrors(self, opt, value):
2383
self.assertRaises(errors.ConfigOptionValueError,
2384
opt.convert_from_unicode, None, value)
2386
def assertConvertInvalid(self, opt, invalid_value):
2388
self.assertEquals(None, opt.convert_from_unicode(None, invalid_value))
2389
opt.invalid = 'warning'
2390
self.assertWarns(opt, invalid_value)
2391
opt.invalid = 'error'
2392
self.assertErrors(opt, invalid_value)
2395
class TestOptionWithBooleanConverter(tests.TestCase, TestOptionConverterMixin):
2397
def get_option(self):
2398
return config.Option('foo', help='A boolean.',
2399
from_unicode=config.bool_from_store)
2401
def test_convert_invalid(self):
2402
opt = self.get_option()
2403
# A string that is not recognized as a boolean
2404
self.assertConvertInvalid(opt, u'invalid-boolean')
2405
# A list of strings is never recognized as a boolean
2406
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2408
def test_convert_valid(self):
2409
opt = self.get_option()
2410
self.assertConverted(True, opt, u'True')
2411
self.assertConverted(True, opt, u'1')
2412
self.assertConverted(False, opt, u'False')
2415
class TestOptionWithIntegerConverter(tests.TestCase, TestOptionConverterMixin):
2417
def get_option(self):
2418
return config.Option('foo', help='An integer.',
2419
from_unicode=config.int_from_store)
2421
def test_convert_invalid(self):
2422
opt = self.get_option()
2423
# A string that is not recognized as an integer
2424
self.assertConvertInvalid(opt, u'forty-two')
2425
# A list of strings is never recognized as an integer
2426
self.assertConvertInvalid(opt, [u'a', u'list'])
2428
def test_convert_valid(self):
2429
opt = self.get_option()
2430
self.assertConverted(16, opt, u'16')
2433
class TestOptionWithSIUnitConverter(tests.TestCase, TestOptionConverterMixin):
2435
def get_option(self):
2436
return config.Option('foo', help='An integer in SI units.',
2437
from_unicode=config.int_SI_from_store)
2439
def test_convert_invalid(self):
2440
opt = self.get_option()
2441
self.assertConvertInvalid(opt, u'not-a-unit')
2442
self.assertConvertInvalid(opt, u'Gb') # Forgot the int
2443
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2444
self.assertConvertInvalid(opt, u'1GG')
2445
self.assertConvertInvalid(opt, u'1Mbb')
2446
self.assertConvertInvalid(opt, u'1MM')
2448
def test_convert_valid(self):
2449
opt = self.get_option()
2450
self.assertConverted(int(5e3), opt, u'5kb')
2451
self.assertConverted(int(5e6), opt, u'5M')
2452
self.assertConverted(int(5e6), opt, u'5MB')
2453
self.assertConverted(int(5e9), opt, u'5g')
2454
self.assertConverted(int(5e9), opt, u'5gB')
2455
self.assertConverted(100, opt, u'100')
2458
class TestListOption(tests.TestCase, TestOptionConverterMixin):
2460
def get_option(self):
2461
return config.ListOption('foo', help='A list.')
2463
def test_convert_invalid(self):
2464
opt = self.get_option()
2465
# We don't even try to convert a list into a list, we only expect
2467
self.assertConvertInvalid(opt, [1])
2468
# No string is invalid as all forms can be converted to a list
2470
def test_convert_valid(self):
2471
opt = self.get_option()
2472
# An empty string is an empty list
2473
self.assertConverted([], opt, '') # Using a bare str() just in case
2474
self.assertConverted([], opt, u'')
2476
self.assertConverted([u'True'], opt, u'True')
2478
self.assertConverted([u'42'], opt, u'42')
2480
self.assertConverted([u'bar'], opt, u'bar')
2483
class TestOptionRegistry(tests.TestCase):
2486
super(TestOptionRegistry, self).setUp()
2487
# Always start with an empty registry
2488
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2489
self.registry = config.option_registry
2491
def test_register(self):
2492
opt = config.Option('foo')
2493
self.registry.register(opt)
2494
self.assertIs(opt, self.registry.get('foo'))
2496
def test_registered_help(self):
2497
opt = config.Option('foo', help='A simple option')
2498
self.registry.register(opt)
2499
self.assertEquals('A simple option', self.registry.get_help('foo'))
2501
lazy_option = config.Option('lazy_foo', help='Lazy help')
2503
def test_register_lazy(self):
2504
self.registry.register_lazy('lazy_foo', self.__module__,
2505
'TestOptionRegistry.lazy_option')
2506
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2508
def test_registered_lazy_help(self):
2509
self.registry.register_lazy('lazy_foo', self.__module__,
2510
'TestOptionRegistry.lazy_option')
2511
self.assertEquals('Lazy help', self.registry.get_help('lazy_foo'))
2514
class TestRegisteredOptions(tests.TestCase):
2515
"""All registered options should verify some constraints."""
2517
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2518
in config.option_registry.iteritems()]
2521
super(TestRegisteredOptions, self).setUp()
2522
self.registry = config.option_registry
2524
def test_proper_name(self):
2525
# An option should be registered under its own name, this can't be
2526
# checked at registration time for the lazy ones.
2527
self.assertEquals(self.option_name, self.option.name)
2529
def test_help_is_set(self):
2530
option_help = self.registry.get_help(self.option_name)
2531
self.assertNotEquals(None, option_help)
2532
# Come on, think about the user, he really wants to know what the
2534
self.assertIsNot(None, option_help)
2535
self.assertNotEquals('', option_help)
2538
class TestSection(tests.TestCase):
2540
# FIXME: Parametrize so that all sections produced by Stores run these
2541
# tests -- vila 2011-04-01
2543
def test_get_a_value(self):
2544
a_dict = dict(foo='bar')
2545
section = config.Section('myID', a_dict)
2546
self.assertEquals('bar', section.get('foo'))
2548
def test_get_unknown_option(self):
2550
section = config.Section(None, a_dict)
2551
self.assertEquals('out of thin air',
2552
section.get('foo', 'out of thin air'))
2554
def test_options_is_shared(self):
2556
section = config.Section(None, a_dict)
2557
self.assertIs(a_dict, section.options)
2560
class TestMutableSection(tests.TestCase):
2562
scenarios = [('mutable',
2564
lambda opts: config.MutableSection('myID', opts)},),
2568
a_dict = dict(foo='bar')
2569
section = self.get_section(a_dict)
2570
section.set('foo', 'new_value')
2571
self.assertEquals('new_value', section.get('foo'))
2572
# The change appears in the shared section
2573
self.assertEquals('new_value', a_dict.get('foo'))
2574
# We keep track of the change
2575
self.assertTrue('foo' in section.orig)
2576
self.assertEquals('bar', section.orig.get('foo'))
2578
def test_set_preserve_original_once(self):
2579
a_dict = dict(foo='bar')
2580
section = self.get_section(a_dict)
2581
section.set('foo', 'first_value')
2582
section.set('foo', 'second_value')
2583
# We keep track of the original value
2584
self.assertTrue('foo' in section.orig)
2585
self.assertEquals('bar', section.orig.get('foo'))
2587
def test_remove(self):
2588
a_dict = dict(foo='bar')
2589
section = self.get_section(a_dict)
2590
section.remove('foo')
2591
# We get None for unknown options via the default value
2592
self.assertEquals(None, section.get('foo'))
2593
# Or we just get the default value
2594
self.assertEquals('unknown', section.get('foo', 'unknown'))
2595
self.assertFalse('foo' in section.options)
2596
# We keep track of the deletion
2597
self.assertTrue('foo' in section.orig)
2598
self.assertEquals('bar', section.orig.get('foo'))
2600
def test_remove_new_option(self):
2602
section = self.get_section(a_dict)
2603
section.set('foo', 'bar')
2604
section.remove('foo')
2605
self.assertFalse('foo' in section.options)
2606
# The option didn't exist initially so it we need to keep track of it
2607
# with a special value
2608
self.assertTrue('foo' in section.orig)
2609
self.assertEquals(config._NewlyCreatedOption, section.orig['foo'])
2612
class TestCommandLineStore(tests.TestCase):
2615
super(TestCommandLineStore, self).setUp()
2616
self.store = config.CommandLineStore()
2617
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2619
def get_section(self):
2620
"""Get the unique section for the command line overrides."""
2621
sections = list(self.store.get_sections())
2622
self.assertLength(1, sections)
2623
store, section = sections[0]
2624
self.assertEquals(self.store, store)
2627
def test_no_override(self):
2628
self.store._from_cmdline([])
2629
section = self.get_section()
2630
self.assertLength(0, list(section.iter_option_names()))
2632
def test_simple_override(self):
2633
self.store._from_cmdline(['a=b'])
2634
section = self.get_section()
2635
self.assertEqual('b', section.get('a'))
2637
def test_list_override(self):
2638
opt = config.ListOption('l')
2639
config.option_registry.register(opt)
2640
self.store._from_cmdline(['l=1,2,3'])
2641
val = self.get_section().get('l')
2642
self.assertEqual('1,2,3', val)
2643
# Reminder: lists should be registered as such explicitely, otherwise
2644
# the conversion needs to be done afterwards.
2645
self.assertEqual(['1', '2', '3'],
2646
opt.convert_from_unicode(self.store, val))
2648
def test_multiple_overrides(self):
2649
self.store._from_cmdline(['a=b', 'x=y'])
2650
section = self.get_section()
2651
self.assertEquals('b', section.get('a'))
2652
self.assertEquals('y', section.get('x'))
2654
def test_wrong_syntax(self):
2655
self.assertRaises(errors.BzrCommandError,
2656
self.store._from_cmdline, ['a=b', 'c'])
2659
class TestStore(tests.TestCaseWithTransport):
2661
def assertSectionContent(self, expected, (store, section)):
2662
"""Assert that some options have the proper values in a section."""
2663
expected_name, expected_options = expected
2664
self.assertEquals(expected_name, section.id)
2667
dict([(k, section.get(k)) for k in expected_options.keys()]))
2670
class TestReadonlyStore(TestStore):
2672
scenarios = [(key, {'get_store': builder}) for key, builder
2673
in config.test_store_builder_registry.iteritems()]
2675
def test_building_delays_load(self):
2676
store = self.get_store(self)
2677
self.assertEquals(False, store.is_loaded())
2678
store._load_from_string('')
2679
self.assertEquals(True, store.is_loaded())
2681
def test_get_no_sections_for_empty(self):
2682
store = self.get_store(self)
2683
store._load_from_string('')
2684
self.assertEquals([], list(store.get_sections()))
2686
def test_get_default_section(self):
2687
store = self.get_store(self)
2688
store._load_from_string('foo=bar')
2689
sections = list(store.get_sections())
2690
self.assertLength(1, sections)
2691
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2693
def test_get_named_section(self):
2694
store = self.get_store(self)
2695
store._load_from_string('[baz]\nfoo=bar')
2696
sections = list(store.get_sections())
2697
self.assertLength(1, sections)
2698
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2700
def test_load_from_string_fails_for_non_empty_store(self):
2701
store = self.get_store(self)
2702
store._load_from_string('foo=bar')
2703
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2706
class TestStoreQuoting(TestStore):
2708
scenarios = [(key, {'get_store': builder}) for key, builder
2709
in config.test_store_builder_registry.iteritems()]
2712
super(TestStoreQuoting, self).setUp()
2713
self.store = self.get_store(self)
2714
# We need a loaded store but any content will do
2715
self.store._load_from_string('')
2717
def assertIdempotent(self, s):
2718
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2720
What matters here is that option values, as they appear in a store, can
2721
be safely round-tripped out of the store and back.
2723
:param s: A string, quoted if required.
2725
self.assertEquals(s, self.store.quote(self.store.unquote(s)))
2726
self.assertEquals(s, self.store.unquote(self.store.quote(s)))
2728
def test_empty_string(self):
2729
if isinstance(self.store, config.IniFileStore):
2730
# configobj._quote doesn't handle empty values
2731
self.assertRaises(AssertionError,
2732
self.assertIdempotent, '')
2734
self.assertIdempotent('')
2735
# But quoted empty strings are ok
2736
self.assertIdempotent('""')
2738
def test_embedded_spaces(self):
2739
self.assertIdempotent('" a b c "')
2741
def test_embedded_commas(self):
2742
self.assertIdempotent('" a , b c "')
2744
def test_simple_comma(self):
2745
if isinstance(self.store, config.IniFileStore):
2746
# configobj requires that lists are special-cased
2747
self.assertRaises(AssertionError,
2748
self.assertIdempotent, ',')
2750
self.assertIdempotent(',')
2751
# When a single comma is required, quoting is also required
2752
self.assertIdempotent('","')
2754
def test_list(self):
2755
if isinstance(self.store, config.IniFileStore):
2756
# configobj requires that lists are special-cased
2757
self.assertRaises(AssertionError,
2758
self.assertIdempotent, 'a,b')
2760
self.assertIdempotent('a,b')
2763
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2764
"""Simulate loading a config store with content of various encodings.
2766
All files produced by bzr are in utf8 content.
2768
Users may modify them manually and end up with a file that can't be
2769
loaded. We need to issue proper error messages in this case.
2772
invalid_utf8_char = '\xff'
2774
def test_load_utf8(self):
2775
"""Ensure we can load an utf8-encoded file."""
2776
t = self.get_transport()
2777
# From http://pad.lv/799212
2778
unicode_user = u'b\N{Euro Sign}ar'
2779
unicode_content = u'user=%s' % (unicode_user,)
2780
utf8_content = unicode_content.encode('utf8')
2781
# Store the raw content in the config file
2782
t.put_bytes('foo.conf', utf8_content)
2783
store = config.TransportIniFileStore(t, 'foo.conf')
2785
stack = config.Stack([store.get_sections], store)
2786
self.assertEquals(unicode_user, stack.get('user'))
2788
def test_load_non_ascii(self):
2789
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2790
t = self.get_transport()
2791
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2792
store = config.TransportIniFileStore(t, 'foo.conf')
2793
self.assertRaises(errors.ConfigContentError, store.load)
2795
def test_load_erroneous_content(self):
2796
"""Ensure we display a proper error on content that can't be parsed."""
2797
t = self.get_transport()
2798
t.put_bytes('foo.conf', '[open_section\n')
2799
store = config.TransportIniFileStore(t, 'foo.conf')
2800
self.assertRaises(errors.ParseConfigError, store.load)
2802
def test_load_permission_denied(self):
2803
"""Ensure we get warned when trying to load an inaccessible file."""
2806
warnings.append(args[0] % args[1:])
2807
self.overrideAttr(trace, 'warning', warning)
2809
t = self.get_transport()
2811
def get_bytes(relpath):
2812
raise errors.PermissionDenied(relpath, "")
2813
t.get_bytes = get_bytes
2814
store = config.TransportIniFileStore(t, 'foo.conf')
2815
self.assertRaises(errors.PermissionDenied, store.load)
2818
[u'Permission denied while trying to load configuration store %s.'
2819
% store.external_url()])
2822
class TestIniConfigContent(tests.TestCaseWithTransport):
2823
"""Simulate loading a IniBasedConfig with content of various encodings.
2825
All files produced by bzr are in utf8 content.
2827
Users may modify them manually and end up with a file that can't be
2828
loaded. We need to issue proper error messages in this case.
2831
invalid_utf8_char = '\xff'
2833
def test_load_utf8(self):
2834
"""Ensure we can load an utf8-encoded file."""
2835
# From http://pad.lv/799212
2836
unicode_user = u'b\N{Euro Sign}ar'
2837
unicode_content = u'user=%s' % (unicode_user,)
2838
utf8_content = unicode_content.encode('utf8')
2839
# Store the raw content in the config file
2840
with open('foo.conf', 'wb') as f:
2841
f.write(utf8_content)
2842
conf = config.IniBasedConfig(file_name='foo.conf')
2843
self.assertEquals(unicode_user, conf.get_user_option('user'))
2845
def test_load_badly_encoded_content(self):
2846
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2847
with open('foo.conf', 'wb') as f:
2848
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2849
conf = config.IniBasedConfig(file_name='foo.conf')
2850
self.assertRaises(errors.ConfigContentError, conf._get_parser)
2852
def test_load_erroneous_content(self):
2853
"""Ensure we display a proper error on content that can't be parsed."""
2854
with open('foo.conf', 'wb') as f:
2855
f.write('[open_section\n')
2856
conf = config.IniBasedConfig(file_name='foo.conf')
2857
self.assertRaises(errors.ParseConfigError, conf._get_parser)
2860
class TestMutableStore(TestStore):
2862
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2863
in config.test_store_builder_registry.iteritems()]
2866
super(TestMutableStore, self).setUp()
2867
self.transport = self.get_transport()
2869
def has_store(self, store):
2870
store_basename = urlutils.relative_url(self.transport.external_url(),
2871
store.external_url())
2872
return self.transport.has(store_basename)
2874
def test_save_empty_creates_no_file(self):
2875
# FIXME: There should be a better way than relying on the test
2876
# parametrization to identify branch.conf -- vila 2011-0526
2877
if self.store_id in ('branch', 'remote_branch'):
2878
raise tests.TestNotApplicable(
2879
'branch.conf is *always* created when a branch is initialized')
2880
store = self.get_store(self)
2882
self.assertEquals(False, self.has_store(store))
2884
def test_save_emptied_succeeds(self):
2885
store = self.get_store(self)
2886
store._load_from_string('foo=bar\n')
2887
# FIXME: There should be a better way than relying on the test
2888
# parametrization to identify branch.conf -- vila 2011-0526
2889
if self.store_id in ('branch', 'remote_branch'):
2890
# branch stores requires write locked branches
2891
self.addCleanup(store.branch.lock_write().unlock)
2892
section = store.get_mutable_section(None)
2893
section.remove('foo')
2895
self.assertEquals(True, self.has_store(store))
2896
modified_store = self.get_store(self)
2897
sections = list(modified_store.get_sections())
2898
self.assertLength(0, sections)
2900
def test_save_with_content_succeeds(self):
2901
# FIXME: There should be a better way than relying on the test
2902
# parametrization to identify branch.conf -- vila 2011-0526
2903
if self.store_id in ('branch', 'remote_branch'):
2904
raise tests.TestNotApplicable(
2905
'branch.conf is *always* created when a branch is initialized')
2906
store = self.get_store(self)
2907
store._load_from_string('foo=bar\n')
2908
self.assertEquals(False, self.has_store(store))
2910
self.assertEquals(True, self.has_store(store))
2911
modified_store = self.get_store(self)
2912
sections = list(modified_store.get_sections())
2913
self.assertLength(1, sections)
2914
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2916
def test_set_option_in_empty_store(self):
2917
store = self.get_store(self)
2918
# FIXME: There should be a better way than relying on the test
2919
# parametrization to identify branch.conf -- vila 2011-0526
2920
if self.store_id in ('branch', 'remote_branch'):
2921
# branch stores requires write locked branches
2922
self.addCleanup(store.branch.lock_write().unlock)
2923
section = store.get_mutable_section(None)
2924
section.set('foo', 'bar')
2926
modified_store = self.get_store(self)
2927
sections = list(modified_store.get_sections())
2928
self.assertLength(1, sections)
2929
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2931
def test_set_option_in_default_section(self):
2932
store = self.get_store(self)
2933
store._load_from_string('')
2934
# FIXME: There should be a better way than relying on the test
2935
# parametrization to identify branch.conf -- vila 2011-0526
2936
if self.store_id in ('branch', 'remote_branch'):
2937
# branch stores requires write locked branches
2938
self.addCleanup(store.branch.lock_write().unlock)
2939
section = store.get_mutable_section(None)
2940
section.set('foo', 'bar')
2942
modified_store = self.get_store(self)
2943
sections = list(modified_store.get_sections())
2944
self.assertLength(1, sections)
2945
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2947
def test_set_option_in_named_section(self):
2948
store = self.get_store(self)
2949
store._load_from_string('')
2950
# FIXME: There should be a better way than relying on the test
2951
# parametrization to identify branch.conf -- vila 2011-0526
2952
if self.store_id in ('branch', 'remote_branch'):
2953
# branch stores requires write locked branches
2954
self.addCleanup(store.branch.lock_write().unlock)
2955
section = store.get_mutable_section('baz')
2956
section.set('foo', 'bar')
2958
modified_store = self.get_store(self)
2959
sections = list(modified_store.get_sections())
2960
self.assertLength(1, sections)
2961
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2963
def test_load_hook(self):
2964
# First, we need to ensure that the store exists
2965
store = self.get_store(self)
2966
# FIXME: There should be a better way than relying on the test
2967
# parametrization to identify branch.conf -- vila 2011-0526
2968
if self.store_id in ('branch', 'remote_branch'):
2969
# branch stores requires write locked branches
2970
self.addCleanup(store.branch.lock_write().unlock)
2971
section = store.get_mutable_section('baz')
2972
section.set('foo', 'bar')
2974
# Now we can try to load it
2975
store = self.get_store(self)
2979
config.ConfigHooks.install_named_hook('load', hook, None)
2980
self.assertLength(0, calls)
2982
self.assertLength(1, calls)
2983
self.assertEquals((store,), calls[0])
2985
def test_save_hook(self):
2989
config.ConfigHooks.install_named_hook('save', hook, None)
2990
self.assertLength(0, calls)
2991
store = self.get_store(self)
2992
# FIXME: There should be a better way than relying on the test
2993
# parametrization to identify branch.conf -- vila 2011-0526
2994
if self.store_id in ('branch', 'remote_branch'):
2995
# branch stores requires write locked branches
2996
self.addCleanup(store.branch.lock_write().unlock)
2997
section = store.get_mutable_section('baz')
2998
section.set('foo', 'bar')
3000
self.assertLength(1, calls)
3001
self.assertEquals((store,), calls[0])
3003
def test_set_mark_dirty(self):
3004
stack = config.MemoryStack('')
3005
self.assertLength(0, stack.store.dirty_sections)
3006
stack.set('foo', 'baz')
3007
self.assertLength(1, stack.store.dirty_sections)
3008
self.assertTrue(stack.store._need_saving())
3010
def test_remove_mark_dirty(self):
3011
stack = config.MemoryStack('foo=bar')
3012
self.assertLength(0, stack.store.dirty_sections)
3014
self.assertLength(1, stack.store.dirty_sections)
3015
self.assertTrue(stack.store._need_saving())
3018
class TestStoreSaveChanges(tests.TestCaseWithTransport):
3019
"""Tests that config changes are kept in memory and saved on-demand."""
3022
super(TestStoreSaveChanges, self).setUp()
3023
self.transport = self.get_transport()
3024
# Most of the tests involve two stores pointing to the same persistent
3025
# storage to observe the effects of concurrent changes
3026
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
3027
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
3030
self.warnings.append(args[0] % args[1:])
3031
self.overrideAttr(trace, 'warning', warning)
3033
def has_store(self, store):
3034
store_basename = urlutils.relative_url(self.transport.external_url(),
3035
store.external_url())
3036
return self.transport.has(store_basename)
3038
def get_stack(self, store):
3039
# Any stack will do as long as it uses the right store, just a single
3040
# no-name section is enough
3041
return config.Stack([store.get_sections], store)
3043
def test_no_changes_no_save(self):
3044
s = self.get_stack(self.st1)
3045
s.store.save_changes()
3046
self.assertEquals(False, self.has_store(self.st1))
3048
def test_unrelated_concurrent_update(self):
3049
s1 = self.get_stack(self.st1)
3050
s2 = self.get_stack(self.st2)
3051
s1.set('foo', 'bar')
3052
s2.set('baz', 'quux')
3054
# Changes don't propagate magically
3055
self.assertEquals(None, s1.get('baz'))
3056
s2.store.save_changes()
3057
self.assertEquals('quux', s2.get('baz'))
3058
# Changes are acquired when saving
3059
self.assertEquals('bar', s2.get('foo'))
3060
# Since there is no overlap, no warnings are emitted
3061
self.assertLength(0, self.warnings)
3063
def test_concurrent_update_modified(self):
3064
s1 = self.get_stack(self.st1)
3065
s2 = self.get_stack(self.st2)
3066
s1.set('foo', 'bar')
3067
s2.set('foo', 'baz')
3070
s2.store.save_changes()
3071
self.assertEquals('baz', s2.get('foo'))
3072
# But the user get a warning
3073
self.assertLength(1, self.warnings)
3074
warning = self.warnings[0]
3075
self.assertStartsWith(warning, 'Option foo in section None')
3076
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
3077
' The baz value will be saved.')
3079
def test_concurrent_deletion(self):
3080
self.st1._load_from_string('foo=bar')
3082
s1 = self.get_stack(self.st1)
3083
s2 = self.get_stack(self.st2)
3086
s1.store.save_changes()
3088
self.assertLength(0, self.warnings)
3089
s2.store.save_changes()
3091
self.assertLength(1, self.warnings)
3092
warning = self.warnings[0]
3093
self.assertStartsWith(warning, 'Option foo in section None')
3094
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
3095
' The <DELETED> value will be saved.')
3098
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
3100
def get_store(self):
3101
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3103
def test_get_quoted_string(self):
3104
store = self.get_store()
3105
store._load_from_string('foo= " abc "')
3106
stack = config.Stack([store.get_sections])
3107
self.assertEquals(' abc ', stack.get('foo'))
3109
def test_set_quoted_string(self):
3110
store = self.get_store()
3111
stack = config.Stack([store.get_sections], store)
3112
stack.set('foo', ' a b c ')
3114
self.assertFileEqual('foo = " a b c "\n', 'foo.conf')
3117
class TestTransportIniFileStore(TestStore):
3119
def test_loading_unknown_file_fails(self):
3120
store = config.TransportIniFileStore(self.get_transport(),
3122
self.assertRaises(errors.NoSuchFile, store.load)
3124
def test_invalid_content(self):
3125
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3126
self.assertEquals(False, store.is_loaded())
3127
exc = self.assertRaises(
3128
errors.ParseConfigError, store._load_from_string,
3129
'this is invalid !')
3130
self.assertEndsWith(exc.filename, 'foo.conf')
3131
# And the load failed
3132
self.assertEquals(False, store.is_loaded())
3134
def test_get_embedded_sections(self):
3135
# A more complicated example (which also shows that section names and
3136
# option names share the same name space...)
3137
# FIXME: This should be fixed by forbidding dicts as values ?
3138
# -- vila 2011-04-05
3139
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3140
store._load_from_string('''
3144
foo_in_DEFAULT=foo_DEFAULT
3152
sections = list(store.get_sections())
3153
self.assertLength(4, sections)
3154
# The default section has no name.
3155
# List values are provided as strings and need to be explicitly
3156
# converted by specifying from_unicode=list_from_store at option
3158
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
3160
self.assertSectionContent(
3161
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
3162
self.assertSectionContent(
3163
('bar', {'foo_in_bar': 'barbar'}), sections[2])
3164
# sub sections are provided as embedded dicts.
3165
self.assertSectionContent(
3166
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
3170
class TestLockableIniFileStore(TestStore):
3172
def test_create_store_in_created_dir(self):
3173
self.assertPathDoesNotExist('dir')
3174
t = self.get_transport('dir/subdir')
3175
store = config.LockableIniFileStore(t, 'foo.conf')
3176
store.get_mutable_section(None).set('foo', 'bar')
3178
self.assertPathExists('dir/subdir')
3181
class TestConcurrentStoreUpdates(TestStore):
3182
"""Test that Stores properly handle conccurent updates.
3184
New Store implementation may fail some of these tests but until such
3185
implementations exist it's hard to properly filter them from the scenarios
3186
applied here. If you encounter such a case, contact the bzr devs.
3189
scenarios = [(key, {'get_stack': builder}) for key, builder
3190
in config.test_stack_builder_registry.iteritems()]
3193
super(TestConcurrentStoreUpdates, self).setUp()
3194
self.stack = self.get_stack(self)
3195
if not isinstance(self.stack, config._CompatibleStack):
3196
raise tests.TestNotApplicable(
3197
'%s is not meant to be compatible with the old config design'
3199
self.stack.set('one', '1')
3200
self.stack.set('two', '2')
3202
self.stack.store.save()
3204
def test_simple_read_access(self):
3205
self.assertEquals('1', self.stack.get('one'))
3207
def test_simple_write_access(self):
3208
self.stack.set('one', 'one')
3209
self.assertEquals('one', self.stack.get('one'))
3211
def test_listen_to_the_last_speaker(self):
3213
c2 = self.get_stack(self)
3214
c1.set('one', 'ONE')
3215
c2.set('two', 'TWO')
3216
self.assertEquals('ONE', c1.get('one'))
3217
self.assertEquals('TWO', c2.get('two'))
3218
# The second update respect the first one
3219
self.assertEquals('ONE', c2.get('one'))
3221
def test_last_speaker_wins(self):
3222
# If the same config is not shared, the same variable modified twice
3223
# can only see a single result.
3225
c2 = self.get_stack(self)
3228
self.assertEquals('c2', c2.get('one'))
3229
# The first modification is still available until another refresh
3231
self.assertEquals('c1', c1.get('one'))
3232
c1.set('two', 'done')
3233
self.assertEquals('c2', c1.get('one'))
3235
def test_writes_are_serialized(self):
3237
c2 = self.get_stack(self)
3239
# We spawn a thread that will pause *during* the config saving.
3240
before_writing = threading.Event()
3241
after_writing = threading.Event()
3242
writing_done = threading.Event()
3243
c1_save_without_locking_orig = c1.store.save_without_locking
3244
def c1_save_without_locking():
3245
before_writing.set()
3246
c1_save_without_locking_orig()
3247
# The lock is held. We wait for the main thread to decide when to
3249
after_writing.wait()
3250
c1.store.save_without_locking = c1_save_without_locking
3254
t1 = threading.Thread(target=c1_set)
3255
# Collect the thread after the test
3256
self.addCleanup(t1.join)
3257
# Be ready to unblock the thread if the test goes wrong
3258
self.addCleanup(after_writing.set)
3260
before_writing.wait()
3261
self.assertRaises(errors.LockContention,
3262
c2.set, 'one', 'c2')
3263
self.assertEquals('c1', c1.get('one'))
3264
# Let the lock be released
3268
self.assertEquals('c2', c2.get('one'))
3270
def test_read_while_writing(self):
3272
# We spawn a thread that will pause *during* the write
3273
ready_to_write = threading.Event()
3274
do_writing = threading.Event()
3275
writing_done = threading.Event()
3276
# We override the _save implementation so we know the store is locked
3277
c1_save_without_locking_orig = c1.store.save_without_locking
3278
def c1_save_without_locking():
3279
ready_to_write.set()
3280
# The lock is held. We wait for the main thread to decide when to
3283
c1_save_without_locking_orig()
3285
c1.store.save_without_locking = c1_save_without_locking
3288
t1 = threading.Thread(target=c1_set)
3289
# Collect the thread after the test
3290
self.addCleanup(t1.join)
3291
# Be ready to unblock the thread if the test goes wrong
3292
self.addCleanup(do_writing.set)
3294
# Ensure the thread is ready to write
3295
ready_to_write.wait()
3296
self.assertEquals('c1', c1.get('one'))
3297
# If we read during the write, we get the old value
3298
c2 = self.get_stack(self)
3299
self.assertEquals('1', c2.get('one'))
3300
# Let the writing occur and ensure it occurred
3303
# Now we get the updated value
3304
c3 = self.get_stack(self)
3305
self.assertEquals('c1', c3.get('one'))
3307
# FIXME: It may be worth looking into removing the lock dir when it's not
3308
# needed anymore and look at possible fallouts for concurrent lockers. This
3309
# will matter if/when we use config files outside of bazaar directories
3310
# (.bazaar or .bzr) -- vila 20110-04-111
3313
class TestSectionMatcher(TestStore):
3315
scenarios = [('location', {'matcher': config.LocationMatcher}),
3316
('id', {'matcher': config.NameMatcher}),]
3319
super(TestSectionMatcher, self).setUp()
3320
# Any simple store is good enough
3321
self.get_store = config.test_store_builder_registry.get('configobj')
3323
def test_no_matches_for_empty_stores(self):
3324
store = self.get_store(self)
3325
store._load_from_string('')
3326
matcher = self.matcher(store, '/bar')
3327
self.assertEquals([], list(matcher.get_sections()))
3329
def test_build_doesnt_load_store(self):
3330
store = self.get_store(self)
3331
matcher = self.matcher(store, '/bar')
3332
self.assertFalse(store.is_loaded())
3335
class TestLocationSection(tests.TestCase):
3337
def get_section(self, options, extra_path):
3338
section = config.Section('foo', options)
3339
# We don't care about the length so we use '0'
3340
return config.LocationSection(section, 0, extra_path)
3342
def test_simple_option(self):
3343
section = self.get_section({'foo': 'bar'}, '')
3344
self.assertEquals('bar', section.get('foo'))
3346
def test_option_with_extra_path(self):
3347
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3349
self.assertEquals('bar/baz', section.get('foo'))
3351
def test_invalid_policy(self):
3352
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3354
# invalid policies are ignored
3355
self.assertEquals('bar', section.get('foo'))
3358
class TestLocationMatcher(TestStore):
3361
super(TestLocationMatcher, self).setUp()
3362
# Any simple store is good enough
3363
self.get_store = config.test_store_builder_registry.get('configobj')
3365
def test_unrelated_section_excluded(self):
3366
store = self.get_store(self)
3367
store._load_from_string('''
3375
section=/foo/bar/baz
3379
self.assertEquals(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3381
[section.id for _, section in store.get_sections()])
3382
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3383
sections = [section for s, section in matcher.get_sections()]
3384
self.assertEquals([3, 2],
3385
[section.length for section in sections])
3386
self.assertEquals(['/foo/bar', '/foo'],
3387
[section.id for section in sections])
3388
self.assertEquals(['quux', 'bar/quux'],
3389
[section.extra_path for section in sections])
3391
def test_more_specific_sections_first(self):
3392
store = self.get_store(self)
3393
store._load_from_string('''
3399
self.assertEquals(['/foo', '/foo/bar'],
3400
[section.id for _, section in store.get_sections()])
3401
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3402
sections = [section for s, section in matcher.get_sections()]
3403
self.assertEquals([3, 2],
3404
[section.length for section in sections])
3405
self.assertEquals(['/foo/bar', '/foo'],
3406
[section.id for section in sections])
3407
self.assertEquals(['baz', 'bar/baz'],
3408
[section.extra_path for section in sections])
3410
def test_appendpath_in_no_name_section(self):
3411
# It's a bit weird to allow appendpath in a no-name section, but
3412
# someone may found a use for it
3413
store = self.get_store(self)
3414
store._load_from_string('''
3416
foo:policy = appendpath
3418
matcher = config.LocationMatcher(store, 'dir/subdir')
3419
sections = list(matcher.get_sections())
3420
self.assertLength(1, sections)
3421
self.assertEquals('bar/dir/subdir', sections[0][1].get('foo'))
3423
def test_file_urls_are_normalized(self):
3424
store = self.get_store(self)
3425
if sys.platform == 'win32':
3426
expected_url = 'file:///C:/dir/subdir'
3427
expected_location = 'C:/dir/subdir'
3429
expected_url = 'file:///dir/subdir'
3430
expected_location = '/dir/subdir'
3431
matcher = config.LocationMatcher(store, expected_url)
3432
self.assertEquals(expected_location, matcher.location)
3435
class TestNameMatcher(TestStore):
3438
super(TestNameMatcher, self).setUp()
3439
self.matcher = config.NameMatcher
3440
# Any simple store is good enough
3441
self.get_store = config.test_store_builder_registry.get('configobj')
3443
def get_matching_sections(self, name):
3444
store = self.get_store(self)
3445
store._load_from_string('''
3453
matcher = self.matcher(store, name)
3454
return list(matcher.get_sections())
3456
def test_matching(self):
3457
sections = self.get_matching_sections('foo')
3458
self.assertLength(1, sections)
3459
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3461
def test_not_matching(self):
3462
sections = self.get_matching_sections('baz')
3463
self.assertLength(0, sections)
3466
class TestBaseStackGet(tests.TestCase):
3469
super(TestBaseStackGet, self).setUp()
3470
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3472
def test_get_first_definition(self):
3473
store1 = config.IniFileStore()
3474
store1._load_from_string('foo=bar')
3475
store2 = config.IniFileStore()
3476
store2._load_from_string('foo=baz')
3477
conf = config.Stack([store1.get_sections, store2.get_sections])
3478
self.assertEquals('bar', conf.get('foo'))
3480
def test_get_with_registered_default_value(self):
3481
config.option_registry.register(config.Option('foo', default='bar'))
3482
conf_stack = config.Stack([])
3483
self.assertEquals('bar', conf_stack.get('foo'))
3485
def test_get_without_registered_default_value(self):
3486
config.option_registry.register(config.Option('foo'))
3487
conf_stack = config.Stack([])
3488
self.assertEquals(None, conf_stack.get('foo'))
3490
def test_get_without_default_value_for_not_registered(self):
3491
conf_stack = config.Stack([])
3492
self.assertEquals(None, conf_stack.get('foo'))
3494
def test_get_for_empty_section_callable(self):
3495
conf_stack = config.Stack([lambda : []])
3496
self.assertEquals(None, conf_stack.get('foo'))
3498
def test_get_for_broken_callable(self):
3499
# Trying to use and invalid callable raises an exception on first use
3500
conf_stack = config.Stack([object])
3501
self.assertRaises(TypeError, conf_stack.get, 'foo')
3504
class TestMemoryStack(tests.TestCase):
3507
conf = config.MemoryStack('foo=bar')
3508
self.assertEquals('bar', conf.get('foo'))
3511
conf = config.MemoryStack('foo=bar')
3512
conf.set('foo', 'baz')
3513
self.assertEquals('baz', conf.get('foo'))
3515
def test_no_content(self):
3516
conf = config.MemoryStack()
3517
# No content means no loading
3518
self.assertFalse(conf.store.is_loaded())
3519
self.assertRaises(NotImplementedError, conf.get, 'foo')
3520
# But a content can still be provided
3521
conf.store._load_from_string('foo=bar')
3522
self.assertEquals('bar', conf.get('foo'))
3525
class TestStackWithTransport(tests.TestCaseWithTransport):
3527
scenarios = [(key, {'get_stack': builder}) for key, builder
3528
in config.test_stack_builder_registry.iteritems()]
3531
class TestConcreteStacks(TestStackWithTransport):
3533
def test_build_stack(self):
3534
# Just a smoke test to help debug builders
3535
stack = self.get_stack(self)
3538
class TestStackGet(TestStackWithTransport):
3541
super(TestStackGet, self).setUp()
3542
self.conf = self.get_stack(self)
3544
def test_get_for_empty_stack(self):
3545
self.assertEquals(None, self.conf.get('foo'))
3547
def test_get_hook(self):
3548
self.conf.set('foo', 'bar')
3552
config.ConfigHooks.install_named_hook('get', hook, None)
3553
self.assertLength(0, calls)
3554
value = self.conf.get('foo')
3555
self.assertEquals('bar', value)
3556
self.assertLength(1, calls)
3557
self.assertEquals((self.conf, 'foo', 'bar'), calls[0])
3560
class TestStackGetWithConverter(tests.TestCase):
3563
super(TestStackGetWithConverter, self).setUp()
3564
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3565
self.registry = config.option_registry
3567
def get_conf(self, content=None):
3568
return config.MemoryStack(content)
3570
def register_bool_option(self, name, default=None, default_from_env=None):
3571
b = config.Option(name, help='A boolean.',
3572
default=default, default_from_env=default_from_env,
3573
from_unicode=config.bool_from_store)
3574
self.registry.register(b)
3576
def test_get_default_bool_None(self):
3577
self.register_bool_option('foo')
3578
conf = self.get_conf('')
3579
self.assertEquals(None, conf.get('foo'))
3581
def test_get_default_bool_True(self):
3582
self.register_bool_option('foo', u'True')
3583
conf = self.get_conf('')
3584
self.assertEquals(True, conf.get('foo'))
3586
def test_get_default_bool_False(self):
3587
self.register_bool_option('foo', False)
3588
conf = self.get_conf('')
3589
self.assertEquals(False, conf.get('foo'))
3591
def test_get_default_bool_False_as_string(self):
3592
self.register_bool_option('foo', u'False')
3593
conf = self.get_conf('')
3594
self.assertEquals(False, conf.get('foo'))
3596
def test_get_default_bool_from_env_converted(self):
3597
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3598
self.overrideEnv('FOO', 'False')
3599
conf = self.get_conf('')
3600
self.assertEquals(False, conf.get('foo'))
3602
def test_get_default_bool_when_conversion_fails(self):
3603
self.register_bool_option('foo', default='True')
3604
conf = self.get_conf('foo=invalid boolean')
3605
self.assertEquals(True, conf.get('foo'))
3607
def register_integer_option(self, name,
3608
default=None, default_from_env=None):
3609
i = config.Option(name, help='An integer.',
3610
default=default, default_from_env=default_from_env,
3611
from_unicode=config.int_from_store)
3612
self.registry.register(i)
3614
def test_get_default_integer_None(self):
3615
self.register_integer_option('foo')
3616
conf = self.get_conf('')
3617
self.assertEquals(None, conf.get('foo'))
3619
def test_get_default_integer(self):
3620
self.register_integer_option('foo', 42)
3621
conf = self.get_conf('')
3622
self.assertEquals(42, conf.get('foo'))
3624
def test_get_default_integer_as_string(self):
3625
self.register_integer_option('foo', u'42')
3626
conf = self.get_conf('')
3627
self.assertEquals(42, conf.get('foo'))
3629
def test_get_default_integer_from_env(self):
3630
self.register_integer_option('foo', default_from_env=['FOO'])
3631
self.overrideEnv('FOO', '18')
3632
conf = self.get_conf('')
3633
self.assertEquals(18, conf.get('foo'))
3635
def test_get_default_integer_when_conversion_fails(self):
3636
self.register_integer_option('foo', default='12')
3637
conf = self.get_conf('foo=invalid integer')
3638
self.assertEquals(12, conf.get('foo'))
3640
def register_list_option(self, name, default=None, default_from_env=None):
3641
l = config.ListOption(name, help='A list.', default=default,
3642
default_from_env=default_from_env)
3643
self.registry.register(l)
3645
def test_get_default_list_None(self):
3646
self.register_list_option('foo')
3647
conf = self.get_conf('')
3648
self.assertEquals(None, conf.get('foo'))
3650
def test_get_default_list_empty(self):
3651
self.register_list_option('foo', '')
3652
conf = self.get_conf('')
3653
self.assertEquals([], conf.get('foo'))
3655
def test_get_default_list_from_env(self):
3656
self.register_list_option('foo', default_from_env=['FOO'])
3657
self.overrideEnv('FOO', '')
3658
conf = self.get_conf('')
3659
self.assertEquals([], conf.get('foo'))
3661
def test_get_with_list_converter_no_item(self):
3662
self.register_list_option('foo', None)
3663
conf = self.get_conf('foo=,')
3664
self.assertEquals([], conf.get('foo'))
3666
def test_get_with_list_converter_many_items(self):
3667
self.register_list_option('foo', None)
3668
conf = self.get_conf('foo=m,o,r,e')
3669
self.assertEquals(['m', 'o', 'r', 'e'], conf.get('foo'))
3671
def test_get_with_list_converter_embedded_spaces_many_items(self):
3672
self.register_list_option('foo', None)
3673
conf = self.get_conf('foo=" bar", "baz "')
3674
self.assertEquals([' bar', 'baz '], conf.get('foo'))
3676
def test_get_with_list_converter_stripped_spaces_many_items(self):
3677
self.register_list_option('foo', None)
3678
conf = self.get_conf('foo= bar , baz ')
3679
self.assertEquals(['bar', 'baz'], conf.get('foo'))
3682
class TestIterOptionRefs(tests.TestCase):
3683
"""iter_option_refs is a bit unusual, document some cases."""
3685
def assertRefs(self, expected, string):
3686
self.assertEquals(expected, list(config.iter_option_refs(string)))
3688
def test_empty(self):
3689
self.assertRefs([(False, '')], '')
3691
def test_no_refs(self):
3692
self.assertRefs([(False, 'foo bar')], 'foo bar')
3694
def test_single_ref(self):
3695
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3697
def test_broken_ref(self):
3698
self.assertRefs([(False, '{foo')], '{foo')
3700
def test_embedded_ref(self):
3701
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3704
def test_two_refs(self):
3705
self.assertRefs([(False, ''), (True, '{foo}'),
3706
(False, ''), (True, '{bar}'),
3710
def test_newline_in_refs_are_not_matched(self):
3711
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3714
class TestStackExpandOptions(tests.TestCaseWithTransport):
3717
super(TestStackExpandOptions, self).setUp()
3718
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3719
self.registry = config.option_registry
3720
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3721
self.conf = config.Stack([store.get_sections], store)
3723
def assertExpansion(self, expected, string, env=None):
3724
self.assertEquals(expected, self.conf.expand_options(string, env))
3726
def test_no_expansion(self):
3727
self.assertExpansion('foo', 'foo')
3729
def test_expand_default_value(self):
3730
self.conf.store._load_from_string('bar=baz')
3731
self.registry.register(config.Option('foo', default=u'{bar}'))
3732
self.assertEquals('baz', self.conf.get('foo', expand=True))
3734
def test_expand_default_from_env(self):
3735
self.conf.store._load_from_string('bar=baz')
3736
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3737
self.overrideEnv('FOO', '{bar}')
3738
self.assertEquals('baz', self.conf.get('foo', expand=True))
3740
def test_expand_default_on_failed_conversion(self):
3741
self.conf.store._load_from_string('baz=bogus\nbar=42\nfoo={baz}')
3742
self.registry.register(
3743
config.Option('foo', default=u'{bar}',
3744
from_unicode=config.int_from_store))
3745
self.assertEquals(42, self.conf.get('foo', expand=True))
3747
def test_env_adding_options(self):
3748
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3750
def test_env_overriding_options(self):
3751
self.conf.store._load_from_string('foo=baz')
3752
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3754
def test_simple_ref(self):
3755
self.conf.store._load_from_string('foo=xxx')
3756
self.assertExpansion('xxx', '{foo}')
3758
def test_unknown_ref(self):
3759
self.assertRaises(errors.ExpandingUnknownOption,
3760
self.conf.expand_options, '{foo}')
3762
def test_indirect_ref(self):
3763
self.conf.store._load_from_string('''
3767
self.assertExpansion('xxx', '{bar}')
3769
def test_embedded_ref(self):
3770
self.conf.store._load_from_string('''
3774
self.assertExpansion('xxx', '{{bar}}')
3776
def test_simple_loop(self):
3777
self.conf.store._load_from_string('foo={foo}')
3778
self.assertRaises(errors.OptionExpansionLoop,
3779
self.conf.expand_options, '{foo}')
3781
def test_indirect_loop(self):
3782
self.conf.store._load_from_string('''
3786
e = self.assertRaises(errors.OptionExpansionLoop,
3787
self.conf.expand_options, '{foo}')
3788
self.assertEquals('foo->bar->baz', e.refs)
3789
self.assertEquals('{foo}', e.string)
3791
def test_list(self):
3792
self.conf.store._load_from_string('''
3796
list={foo},{bar},{baz}
3798
self.registry.register(
3799
config.ListOption('list'))
3800
self.assertEquals(['start', 'middle', 'end'],
3801
self.conf.get('list', expand=True))
3803
def test_cascading_list(self):
3804
self.conf.store._load_from_string('''
3810
self.registry.register(
3811
config.ListOption('list'))
3812
self.assertEquals(['start', 'middle', 'end'],
3813
self.conf.get('list', expand=True))
3815
def test_pathologically_hidden_list(self):
3816
self.conf.store._load_from_string('''
3822
hidden={start}{middle}{end}
3824
# What matters is what the registration says, the conversion happens
3825
# only after all expansions have been performed
3826
self.registry.register(config.ListOption('hidden'))
3827
self.assertEquals(['bin', 'go'],
3828
self.conf.get('hidden', expand=True))
3831
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3834
super(TestStackCrossSectionsExpand, self).setUp()
3836
def get_config(self, location, string):
3839
# Since we don't save the config we won't strictly require to inherit
3840
# from TestCaseInTempDir, but an error occurs so quickly...
3841
c = config.LocationStack(location)
3842
c.store._load_from_string(string)
3845
def test_dont_cross_unrelated_section(self):
3846
c = self.get_config('/another/branch/path','''
3851
[/another/branch/path]
3854
self.assertRaises(errors.ExpandingUnknownOption,
3855
c.get, 'bar', expand=True)
3857
def test_cross_related_sections(self):
3858
c = self.get_config('/project/branch/path','''
3862
[/project/branch/path]
3865
self.assertEquals('quux', c.get('bar', expand=True))
3868
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3870
def test_cross_global_locations(self):
3871
l_store = config.LocationStore()
3872
l_store._load_from_string('''
3878
g_store = config.GlobalStore()
3879
g_store._load_from_string('''
3885
stack = config.LocationStack('/branch')
3886
self.assertEquals('glob-bar', stack.get('lbar', expand=True))
3887
self.assertEquals('loc-foo', stack.get('gfoo', expand=True))
3890
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3892
def test_expand_locals_empty(self):
3893
l_store = config.LocationStore()
3894
l_store._load_from_string('''
3895
[/home/user/project]
3900
stack = config.LocationStack('/home/user/project/')
3901
self.assertEquals('', stack.get('base', expand=True))
3902
self.assertEquals('', stack.get('rel', expand=True))
3904
def test_expand_basename_locally(self):
3905
l_store = config.LocationStore()
3906
l_store._load_from_string('''
3907
[/home/user/project]
3911
stack = config.LocationStack('/home/user/project/branch')
3912
self.assertEquals('branch', stack.get('bfoo', expand=True))
3914
def test_expand_basename_locally_longer_path(self):
3915
l_store = config.LocationStore()
3916
l_store._load_from_string('''
3921
stack = config.LocationStack('/home/user/project/dir/branch')
3922
self.assertEquals('branch', stack.get('bfoo', expand=True))
3924
def test_expand_relpath_locally(self):
3925
l_store = config.LocationStore()
3926
l_store._load_from_string('''
3927
[/home/user/project]
3928
lfoo = loc-foo/{relpath}
3931
stack = config.LocationStack('/home/user/project/branch')
3932
self.assertEquals('loc-foo/branch', stack.get('lfoo', expand=True))
3934
def test_expand_relpath_unknonw_in_global(self):
3935
g_store = config.GlobalStore()
3936
g_store._load_from_string('''
3941
stack = config.LocationStack('/home/user/project/branch')
3942
self.assertRaises(errors.ExpandingUnknownOption,
3943
stack.get, 'gfoo', expand=True)
3945
def test_expand_local_option_locally(self):
3946
l_store = config.LocationStore()
3947
l_store._load_from_string('''
3948
[/home/user/project]
3949
lfoo = loc-foo/{relpath}
3953
g_store = config.GlobalStore()
3954
g_store._load_from_string('''
3960
stack = config.LocationStack('/home/user/project/branch')
3961
self.assertEquals('glob-bar', stack.get('lbar', expand=True))
3962
self.assertEquals('loc-foo/branch', stack.get('gfoo', expand=True))
3964
def test_locals_dont_leak(self):
3965
"""Make sure we chose the right local in presence of several sections.
3967
l_store = config.LocationStore()
3968
l_store._load_from_string('''
3970
lfoo = loc-foo/{relpath}
3971
[/home/user/project]
3972
lfoo = loc-foo/{relpath}
3975
stack = config.LocationStack('/home/user/project/branch')
3976
self.assertEquals('loc-foo/branch', stack.get('lfoo', expand=True))
3977
stack = config.LocationStack('/home/user/bar/baz')
3978
self.assertEquals('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3982
class TestStackSet(TestStackWithTransport):
3984
def test_simple_set(self):
3985
conf = self.get_stack(self)
3986
self.assertEquals(None, conf.get('foo'))
3987
conf.set('foo', 'baz')
3988
# Did we get it back ?
3989
self.assertEquals('baz', conf.get('foo'))
3991
def test_set_creates_a_new_section(self):
3992
conf = self.get_stack(self)
3993
conf.set('foo', 'baz')
3994
self.assertEquals, 'baz', conf.get('foo')
3996
def test_set_hook(self):
4000
config.ConfigHooks.install_named_hook('set', hook, None)
4001
self.assertLength(0, calls)
4002
conf = self.get_stack(self)
4003
conf.set('foo', 'bar')
4004
self.assertLength(1, calls)
4005
self.assertEquals((conf, 'foo', 'bar'), calls[0])
4008
class TestStackRemove(TestStackWithTransport):
4010
def test_remove_existing(self):
4011
conf = self.get_stack(self)
4012
conf.set('foo', 'bar')
4013
self.assertEquals('bar', conf.get('foo'))
4015
# Did we get it back ?
4016
self.assertEquals(None, conf.get('foo'))
4018
def test_remove_unknown(self):
4019
conf = self.get_stack(self)
4020
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
4022
def test_remove_hook(self):
4026
config.ConfigHooks.install_named_hook('remove', hook, None)
4027
self.assertLength(0, calls)
4028
conf = self.get_stack(self)
4029
conf.set('foo', 'bar')
4031
self.assertLength(1, calls)
4032
self.assertEquals((conf, 'foo'), calls[0])
4035
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
4038
super(TestConfigGetOptions, self).setUp()
4039
create_configs(self)
4041
def test_no_variable(self):
4042
# Using branch should query branch, locations and bazaar
4043
self.assertOptions([], self.branch_config)
4045
def test_option_in_bazaar(self):
4046
self.bazaar_config.set_user_option('file', 'bazaar')
4047
self.assertOptions([('file', 'bazaar', 'DEFAULT', 'bazaar')],
4050
def test_option_in_locations(self):
4051
self.locations_config.set_user_option('file', 'locations')
4053
[('file', 'locations', self.tree.basedir, 'locations')],
4054
self.locations_config)
4056
def test_option_in_branch(self):
4057
self.branch_config.set_user_option('file', 'branch')
4058
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
4061
def test_option_in_bazaar_and_branch(self):
4062
self.bazaar_config.set_user_option('file', 'bazaar')
4063
self.branch_config.set_user_option('file', 'branch')
4064
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
4065
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4068
def test_option_in_branch_and_locations(self):
4069
# Hmm, locations override branch :-/
4070
self.locations_config.set_user_option('file', 'locations')
4071
self.branch_config.set_user_option('file', 'branch')
4073
[('file', 'locations', self.tree.basedir, 'locations'),
4074
('file', 'branch', 'DEFAULT', 'branch'),],
4077
def test_option_in_bazaar_locations_and_branch(self):
4078
self.bazaar_config.set_user_option('file', 'bazaar')
4079
self.locations_config.set_user_option('file', 'locations')
4080
self.branch_config.set_user_option('file', 'branch')
4082
[('file', 'locations', self.tree.basedir, 'locations'),
4083
('file', 'branch', 'DEFAULT', 'branch'),
4084
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4088
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4091
super(TestConfigRemoveOption, self).setUp()
4092
create_configs_with_file_option(self)
4094
def test_remove_in_locations(self):
4095
self.locations_config.remove_user_option('file', self.tree.basedir)
4097
[('file', 'branch', 'DEFAULT', 'branch'),
4098
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4101
def test_remove_in_branch(self):
4102
self.branch_config.remove_user_option('file')
4104
[('file', 'locations', self.tree.basedir, 'locations'),
4105
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4108
def test_remove_in_bazaar(self):
4109
self.bazaar_config.remove_user_option('file')
4111
[('file', 'locations', self.tree.basedir, 'locations'),
4112
('file', 'branch', 'DEFAULT', 'branch'),],
4116
class TestConfigGetSections(tests.TestCaseWithTransport):
4119
super(TestConfigGetSections, self).setUp()
4120
create_configs(self)
4122
def assertSectionNames(self, expected, conf, name=None):
4123
"""Check which sections are returned for a given config.
4125
If fallback configurations exist their sections can be included.
4127
:param expected: A list of section names.
4129
:param conf: The configuration that will be queried.
4131
:param name: An optional section name that will be passed to
4134
sections = list(conf._get_sections(name))
4135
self.assertLength(len(expected), sections)
4136
self.assertEqual(expected, [name for name, _, _ in sections])
4138
def test_bazaar_default_section(self):
4139
self.assertSectionNames(['DEFAULT'], self.bazaar_config)
4141
def test_locations_default_section(self):
4142
# No sections are defined in an empty file
4143
self.assertSectionNames([], self.locations_config)
4145
def test_locations_named_section(self):
4146
self.locations_config.set_user_option('file', 'locations')
4147
self.assertSectionNames([self.tree.basedir], self.locations_config)
4149
def test_locations_matching_sections(self):
4150
loc_config = self.locations_config
4151
loc_config.set_user_option('file', 'locations')
4152
# We need to cheat a bit here to create an option in sections above and
4153
# below the 'location' one.
4154
parser = loc_config._get_parser()
4155
# locations.cong deals with '/' ignoring native os.sep
4156
location_names = self.tree.basedir.split('/')
4157
parent = '/'.join(location_names[:-1])
4158
child = '/'.join(location_names + ['child'])
4160
parser[parent]['file'] = 'parent'
4162
parser[child]['file'] = 'child'
4163
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4165
def test_branch_data_default_section(self):
4166
self.assertSectionNames([None],
4167
self.branch_config._get_branch_data_config())
4169
def test_branch_default_sections(self):
4170
# No sections are defined in an empty locations file
4171
self.assertSectionNames([None, 'DEFAULT'],
4173
# Unless we define an option
4174
self.branch_config._get_location_config().set_user_option(
4175
'file', 'locations')
4176
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4179
def test_bazaar_named_section(self):
4180
# We need to cheat as the API doesn't give direct access to sections
4181
# other than DEFAULT.
4182
self.bazaar_config.set_alias('bazaar', 'bzr')
4183
self.assertSectionNames(['ALIASES'], self.bazaar_config, 'ALIASES')
1315
4186
class TestAuthenticationConfigFile(tests.TestCase):
1316
4187
"""Test the authentication.conf file matching"""