1119
396
my_config = self._get_sample_config()
1120
397
self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
1122
def test_get_change_editor(self):
1123
my_config = self._get_sample_config()
1124
change_editor = my_config.get_change_editor('old', 'new')
1125
self.assertIs(diff.DiffFromTool, change_editor.__class__)
1126
self.assertEqual('vimdiff -of {new_path} {old_path}',
1127
' '.join(change_editor.command_template))
1129
def test_get_no_change_editor(self):
1130
my_config = self._get_empty_config()
1131
change_editor = my_config.get_change_editor('old', 'new')
1132
self.assertIs(None, change_editor)
1134
def test_get_merge_tools(self):
1135
conf = self._get_sample_config()
1136
tools = conf.get_merge_tools()
1137
self.log(repr(tools))
1139
{u'funkytool': u'funkytool "arg with spaces" {this_temp}',
1140
u'sometool': u'sometool {base} {this} {other} -o {result}',
1141
u'newtool': u'"newtool with spaces" {this_temp}'},
1144
def test_get_merge_tools_empty(self):
1145
conf = self._get_empty_config()
1146
tools = conf.get_merge_tools()
1147
self.assertEqual({}, tools)
1149
def test_find_merge_tool(self):
1150
conf = self._get_sample_config()
1151
cmdline = conf.find_merge_tool('sometool')
1152
self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
1154
def test_find_merge_tool_not_found(self):
1155
conf = self._get_sample_config()
1156
cmdline = conf.find_merge_tool('DOES NOT EXIST')
1157
self.assertIs(cmdline, None)
1159
def test_find_merge_tool_known(self):
1160
conf = self._get_empty_config()
1161
cmdline = conf.find_merge_tool('kdiff3')
1162
self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
1164
def test_find_merge_tool_override_known(self):
1165
conf = self._get_empty_config()
1166
conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
1167
cmdline = conf.find_merge_tool('kdiff3')
1168
self.assertEqual('kdiff3 blah', cmdline)
1171
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
1173
def test_empty(self):
1174
my_config = config.GlobalConfig()
1175
self.assertEqual(0, len(my_config.get_aliases()))
1177
def test_set_alias(self):
1178
my_config = config.GlobalConfig()
1179
alias_value = 'commit --strict'
1180
my_config.set_alias('commit', alias_value)
1181
new_config = config.GlobalConfig()
1182
self.assertEqual(alias_value, new_config.get_alias('commit'))
1184
def test_remove_alias(self):
1185
my_config = config.GlobalConfig()
1186
my_config.set_alias('commit', 'commit --strict')
1187
# Now remove the alias again.
1188
my_config.unset_alias('commit')
1189
new_config = config.GlobalConfig()
1190
self.assertIs(None, new_config.get_alias('commit'))
1193
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
1195
def test_constructs_valid(self):
1196
config.LocationConfig('http://example.com')
1198
def test_constructs_error(self):
399
class TestLocationConfig(TestCase):
401
def test_constructs(self):
402
my_config = config.LocationConfig('http://example.com')
1199
403
self.assertRaises(TypeError, config.LocationConfig)
1201
405
def test_branch_calls_read_filenames(self):
1202
406
# This is testing the correct file names are provided.
1203
407
# TODO: consolidate with the test for GlobalConfigs filename checks.
1205
# replace the class that is constructed, to check its parameters
409
# replace the class that is constructured, to check its parameters
1206
410
oldparserclass = config.ConfigObj
1207
411
config.ConfigObj = InstrumentedConfigObj
412
my_config = config.LocationConfig('http://www.example.com')
1209
my_config = config.LocationConfig('http://www.example.com')
1210
414
parser = my_config._get_parser()
1212
416
config.ConfigObj = oldparserclass
1213
self.assertIsInstance(parser, InstrumentedConfigObj)
417
self.failUnless(isinstance(parser, InstrumentedConfigObj))
1214
418
self.assertEqual(parser._calls,
1215
[('__init__', bedding.locations_config_path(),
419
[('__init__', config.branches_config_filename())])
1218
421
def test_get_global_config(self):
1219
my_config = config.BranchConfig(FakeBranch('http://example.com'))
422
my_config = config.LocationConfig('http://example.com')
1220
423
global_config = my_config._get_global_config()
1221
self.assertIsInstance(global_config, config.GlobalConfig)
1222
self.assertIs(global_config, my_config._get_global_config())
1224
def assertLocationMatching(self, expected):
1225
self.assertEqual(expected,
1226
list(self.my_location_config._get_matching_sections()))
1228
def test__get_matching_sections_no_match(self):
1229
self.get_branch_config('/')
1230
self.assertLocationMatching([])
1232
def test__get_matching_sections_exact(self):
1233
self.get_branch_config('http://www.example.com')
1234
self.assertLocationMatching([('http://www.example.com', '')])
1236
def test__get_matching_sections_suffix_does_not(self):
1237
self.get_branch_config('http://www.example.com-com')
1238
self.assertLocationMatching([])
1240
def test__get_matching_sections_subdir_recursive(self):
1241
self.get_branch_config('http://www.example.com/com')
1242
self.assertLocationMatching([('http://www.example.com', 'com')])
1244
def test__get_matching_sections_ignoreparent(self):
1245
self.get_branch_config('http://www.example.com/ignoreparent')
1246
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1249
def test__get_matching_sections_ignoreparent_subdir(self):
1250
self.get_branch_config(
1251
'http://www.example.com/ignoreparent/childbranch')
1252
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1255
def test__get_matching_sections_subdir_trailing_slash(self):
1256
self.get_branch_config('/b')
1257
self.assertLocationMatching([('/b/', '')])
1259
def test__get_matching_sections_subdir_child(self):
1260
self.get_branch_config('/a/foo')
1261
self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
1263
def test__get_matching_sections_subdir_child_child(self):
1264
self.get_branch_config('/a/foo/bar')
1265
self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
1267
def test__get_matching_sections_trailing_slash_with_children(self):
1268
self.get_branch_config('/a/')
1269
self.assertLocationMatching([('/a/', '')])
1271
def test__get_matching_sections_explicit_over_glob(self):
1272
# XXX: 2006-09-08 jamesh
1273
# This test only passes because ord('c') > ord('*'). If there
1274
# was a config section for '/a/?', it would get precedence
1276
self.get_branch_config('/a/c')
1277
self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
1279
def test__get_option_policy_normal(self):
1280
self.get_branch_config('http://www.example.com')
1282
self.my_location_config._get_config_policy(
1283
'http://www.example.com', 'normal_option'),
1286
def test__get_option_policy_norecurse(self):
1287
self.get_branch_config('http://www.example.com')
1289
self.my_location_config._get_option_policy(
1290
'http://www.example.com', 'norecurse_option'),
1291
config.POLICY_NORECURSE)
1292
# Test old recurse=False setting:
1294
self.my_location_config._get_option_policy(
1295
'http://www.example.com/norecurse', 'normal_option'),
1296
config.POLICY_NORECURSE)
1298
def test__get_option_policy_normal(self):
1299
self.get_branch_config('http://www.example.com')
1301
self.my_location_config._get_option_policy(
1302
'http://www.example.com', 'appendpath_option'),
1303
config.POLICY_APPENDPATH)
1305
def test__get_options_with_policy(self):
1306
self.get_branch_config('/dir/subdir',
1307
location_config="""\
1309
other_url = /other-dir
1310
other_url:policy = appendpath
1312
other_url = /other-subdir
1315
[(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
1316
(u'other_url', u'/other-dir', u'/dir', 'locations'),
1317
(u'other_url:policy', u'appendpath', u'/dir', 'locations')],
1318
self.my_location_config)
424
self.failUnless(isinstance(global_config, config.GlobalConfig))
425
self.failUnless(global_config is my_config._get_global_config())
427
def test__get_section_no_match(self):
428
self.get_location_config('/')
429
self.assertEqual(None, self.my_config._get_section())
431
def test__get_section_exact(self):
432
self.get_location_config('http://www.example.com')
433
self.assertEqual('http://www.example.com',
434
self.my_config._get_section())
436
def test__get_section_suffix_does_not(self):
437
self.get_location_config('http://www.example.com-com')
438
self.assertEqual(None, self.my_config._get_section())
440
def test__get_section_subdir_recursive(self):
441
self.get_location_config('http://www.example.com/com')
442
self.assertEqual('http://www.example.com',
443
self.my_config._get_section())
445
def test__get_section_subdir_matches(self):
446
self.get_location_config('http://www.example.com/useglobal')
447
self.assertEqual('http://www.example.com/useglobal',
448
self.my_config._get_section())
450
def test__get_section_subdir_nonrecursive(self):
451
self.get_location_config(
452
'http://www.example.com/useglobal/childbranch')
453
self.assertEqual('http://www.example.com',
454
self.my_config._get_section())
456
def test__get_section_subdir_trailing_slash(self):
457
self.get_location_config('/b')
458
self.assertEqual('/b/', self.my_config._get_section())
460
def test__get_section_subdir_child(self):
461
self.get_location_config('/a/foo')
462
self.assertEqual('/a/*', self.my_config._get_section())
464
def test__get_section_subdir_child_child(self):
465
self.get_location_config('/a/foo/bar')
466
self.assertEqual('/a/', self.my_config._get_section())
468
def test__get_section_trailing_slash_with_children(self):
469
self.get_location_config('/a/')
470
self.assertEqual('/a/', self.my_config._get_section())
472
def test__get_section_explicit_over_glob(self):
473
self.get_location_config('/a/c')
474
self.assertEqual('/a/c', self.my_config._get_section())
476
def get_location_config(self, location, global_config=None):
477
if global_config is None:
478
global_file = StringIO(sample_config_text)
480
global_file = StringIO(global_config)
481
branches_file = StringIO(sample_branches_text)
482
self.my_config = config.LocationConfig(location)
483
self.my_config._get_parser(branches_file)
484
self.my_config._get_global_config()._get_parser(global_file)
1320
486
def test_location_without_username(self):
1321
self.get_branch_config('http://www.example.com/ignoreparent')
1322
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
487
self.get_location_config('http://www.example.com/useglobal')
488
self.assertEqual('Robert Collins <robertc@example.com>',
1323
489
self.my_config.username())
1325
491
def test_location_not_listed(self):
1326
"""Test that the global username is used when no location matches"""
1327
self.get_branch_config('/home/robertc/sources')
1328
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
492
self.get_location_config('/home/robertc/sources')
493
self.assertEqual('Robert Collins <robertc@example.com>',
1329
494
self.my_config.username())
1331
496
def test_overriding_location(self):
1332
self.get_branch_config('http://www.example.com/foo')
497
self.get_location_config('http://www.example.com/foo')
1333
498
self.assertEqual('Robert Collins <robertc@example.org>',
1334
499
self.my_config.username())
501
def test_signatures_not_set(self):
502
self.get_location_config('http://www.example.com',
503
global_config=sample_ignore_signatures)
504
self.assertEqual(config.CHECK_NEVER,
505
self.my_config.signature_checking())
507
def test_signatures_never(self):
508
self.get_location_config('/a/c')
509
self.assertEqual(config.CHECK_NEVER,
510
self.my_config.signature_checking())
512
def test_signatures_when_available(self):
513
self.get_location_config('/a/', global_config=sample_ignore_signatures)
514
self.assertEqual(config.CHECK_IF_POSSIBLE,
515
self.my_config.signature_checking())
517
def test_signatures_always(self):
518
self.get_location_config('/b')
519
self.assertEqual(config.CHECK_ALWAYS,
520
self.my_config.signature_checking())
522
def test_gpg_signing_command(self):
523
self.get_location_config('/b')
524
self.assertEqual("gnome-gpg", self.my_config.gpg_signing_command())
526
def test_gpg_signing_command_missing(self):
527
self.get_location_config('/a')
528
self.assertEqual("false", self.my_config.gpg_signing_command())
1336
530
def test_get_user_option_global(self):
1337
self.get_branch_config('/a')
531
self.get_location_config('/a')
1338
532
self.assertEqual('something',
1339
533
self.my_config.get_user_option('user_global_option'))
1341
535
def test_get_user_option_local(self):
1342
self.get_branch_config('/a')
536
self.get_location_config('/a')
1343
537
self.assertEqual('local',
1344
538
self.my_config.get_user_option('user_local_option'))
1346
def test_get_user_option_appendpath(self):
1347
# returned as is for the base path:
1348
self.get_branch_config('http://www.example.com')
1349
self.assertEqual('append',
1350
self.my_config.get_user_option('appendpath_option'))
1351
# Extra path components get appended:
1352
self.get_branch_config('http://www.example.com/a/b/c')
1353
self.assertEqual('append/a/b/c',
1354
self.my_config.get_user_option('appendpath_option'))
1355
# Overriden for http://www.example.com/dir, where it is a
1357
self.get_branch_config('http://www.example.com/dir/a/b/c')
1358
self.assertEqual('normal',
1359
self.my_config.get_user_option('appendpath_option'))
1361
def test_get_user_option_norecurse(self):
1362
self.get_branch_config('http://www.example.com')
1363
self.assertEqual('norecurse',
1364
self.my_config.get_user_option('norecurse_option'))
1365
self.get_branch_config('http://www.example.com/dir')
1366
self.assertEqual(None,
1367
self.my_config.get_user_option('norecurse_option'))
1368
# http://www.example.com/norecurse is a recurse=False section
1369
# that redefines normal_option. Subdirectories do not pick up
1370
# this redefinition.
1371
self.get_branch_config('http://www.example.com/norecurse')
1372
self.assertEqual('norecurse',
1373
self.my_config.get_user_option('normal_option'))
1374
self.get_branch_config('http://www.example.com/norecurse/subdir')
1375
self.assertEqual('normal',
1376
self.my_config.get_user_option('normal_option'))
1378
def test_set_user_option_norecurse(self):
1379
self.get_branch_config('http://www.example.com')
1380
self.my_config.set_user_option('foo', 'bar',
1381
store=config.STORE_LOCATION_NORECURSE)
1383
self.my_location_config._get_option_policy(
1384
'http://www.example.com', 'foo'),
1385
config.POLICY_NORECURSE)
1387
def test_set_user_option_appendpath(self):
1388
self.get_branch_config('http://www.example.com')
1389
self.my_config.set_user_option('foo', 'bar',
1390
store=config.STORE_LOCATION_APPENDPATH)
1392
self.my_location_config._get_option_policy(
1393
'http://www.example.com', 'foo'),
1394
config.POLICY_APPENDPATH)
1396
def test_set_user_option_change_policy(self):
1397
self.get_branch_config('http://www.example.com')
1398
self.my_config.set_user_option('norecurse_option', 'normal',
1399
store=config.STORE_LOCATION)
1401
self.my_location_config._get_option_policy(
1402
'http://www.example.com', 'norecurse_option'),
1405
def get_branch_config(self, location, global_config=None,
1406
location_config=None):
1407
my_branch = FakeBranch(location)
540
def test_post_commit_default(self):
541
self.get_location_config('/a/c')
542
self.assertEqual('bzrlib.tests.test_config.post_commit',
543
self.my_config.post_commit())
546
class TestLocationConfig(TestCaseInTempDir):
548
def get_location_config(self, location, global_config=None):
1408
549
if global_config is None:
1409
global_config = sample_config_text
1410
if location_config is None:
1411
location_config = sample_branches_text
1413
config.GlobalConfig.from_string(global_config, save=True)
1414
config.LocationConfig.from_string(location_config, my_branch.base,
1416
my_config = config.BranchConfig(my_branch)
1417
self.my_config = my_config
1418
self.my_location_config = my_config._get_location_config()
1420
def test_set_user_setting_sets_and_saves2(self):
1421
self.get_branch_config('/a/c')
1422
self.assertIs(self.my_config.get_user_option('foo'), None)
1423
self.my_config.set_user_option('foo', 'bar')
1425
self.my_config.branch.control_files.files['branch.conf'].strip(),
1427
self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
1428
self.my_config.set_user_option('foo', 'baz',
1429
store=config.STORE_LOCATION)
1430
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1431
self.my_config.set_user_option('foo', 'qux')
1432
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1434
def test_get_bzr_remote_path(self):
1435
my_config = config.LocationConfig('/a/c')
1436
self.assertEqual('bzr', my_config.get_bzr_remote_path())
1437
my_config.set_user_option('bzr_remote_path', '/path-bzr')
1438
self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
1439
self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
1440
self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
1443
precedence_global = b'option = global'
1444
precedence_branch = b'option = branch'
1445
precedence_location = b"""
1449
[http://example.com/specific]
1454
class TestBranchConfigItems(tests.TestCaseInTempDir):
1456
def get_branch_config(self, global_config=None, location=None,
1457
location_config=None, branch_data_config=None):
1458
my_branch = FakeBranch(location)
1459
if global_config is not None:
1460
config.GlobalConfig.from_string(global_config, save=True)
1461
if location_config is not None:
1462
config.LocationConfig.from_string(location_config, my_branch.base,
1464
my_config = config.BranchConfig(my_branch)
1465
if branch_data_config is not None:
1466
my_config.branch.control_files.files['branch.conf'] = \
550
global_file = StringIO(sample_config_text.encode('utf-8'))
552
global_file = StringIO(global_config.encode('utf-8'))
553
branches_file = StringIO(sample_branches_text.encode('utf-8'))
554
self.my_config = config.LocationConfig(location)
555
self.my_config._get_parser(branches_file)
556
self.my_config._get_global_config()._get_parser(global_file)
558
def test_set_user_setting_sets_and_saves(self):
559
self.get_location_config('/a/c')
560
record = InstrumentedConfigObj("foo")
561
self.my_config._parser = record
563
real_mkdir = os.mkdir
565
def checked_mkdir(path, mode=0777):
566
self.log('making directory: %s', path)
567
real_mkdir(path, mode)
570
os.mkdir = checked_mkdir
572
self.my_config.set_user_option('foo', 'bar')
574
os.mkdir = real_mkdir
576
self.failUnless(self.created, 'Failed to create ~/.bazaar')
577
self.assertEqual([('__contains__', '/a/c'),
578
('__contains__', '/a/c/'),
579
('__setitem__', '/a/c', {}),
580
('__getitem__', '/a/c'),
581
('__setitem__', 'foo', 'bar'),
586
class TestBranchConfigItems(TestCase):
1470
588
def test_user_id(self):
1471
589
branch = FakeBranch()
1472
590
my_config = config.BranchConfig(branch)
1473
self.assertIsNot(None, my_config.username())
1474
my_config.branch.control_files.files['email'] = "John"
1475
my_config.set_user_option('email',
1476
"Robert Collins <robertc@example.org>")
1477
self.assertEqual("Robert Collins <robertc@example.org>",
1478
my_config.username())
1480
def test_BRZ_EMAIL_OVERRIDES(self):
1481
self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
1482
branch = FakeBranch()
1483
my_config = config.BranchConfig(branch)
1484
self.assertEqual("Robert Collins <robertc@example.org>",
1485
my_config.username())
1487
def test_BRZ_EMAIL_OVERRIDES(self):
1488
self.overrideEnv('BZR_EMAIL', "Robert Collins <robertc@example.org>")
1489
branch = FakeBranch()
1490
my_config = config.BranchConfig(branch)
1491
self.assertEqual("Robert Collins <robertc@example.org>",
1492
my_config.username())
591
self.assertEqual("Robert Collins <robertc@example.net>",
592
my_config._get_user_id())
593
branch.control_files.email = "John"
594
self.assertEqual("John", my_config._get_user_id())
596
def test_not_set_in_branch(self):
597
branch = FakeBranch()
598
my_config = config.BranchConfig(branch)
599
branch.control_files.email = None
600
config_file = StringIO(sample_config_text.encode('utf-8'))
601
(my_config._get_location_config().
602
_get_global_config()._get_parser(config_file))
603
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
604
my_config._get_user_id())
605
branch.control_files.email = "John"
606
self.assertEqual("John", my_config._get_user_id())
608
def test_BZREMAIL_OVERRIDES(self):
609
os.environ['BZREMAIL'] = "Robert Collins <robertc@example.org>"
610
branch = FakeBranch()
611
my_config = config.BranchConfig(branch)
612
self.assertEqual("Robert Collins <robertc@example.org>",
613
my_config.username())
615
def test_signatures_forced(self):
616
branch = FakeBranch()
617
my_config = config.BranchConfig(branch)
618
config_file = StringIO(sample_always_signatures)
619
(my_config._get_location_config().
620
_get_global_config()._get_parser(config_file))
621
self.assertEqual(config.CHECK_ALWAYS, my_config.signature_checking())
623
def test_gpg_signing_command(self):
624
branch = FakeBranch()
625
my_config = config.BranchConfig(branch)
626
config_file = StringIO(sample_config_text.encode('utf-8'))
627
(my_config._get_location_config().
628
_get_global_config()._get_parser(config_file))
629
self.assertEqual('gnome-gpg', my_config.gpg_signing_command())
1494
631
def test_get_user_option_global(self):
1495
my_config = self.get_branch_config(global_config=sample_config_text)
632
branch = FakeBranch()
633
my_config = config.BranchConfig(branch)
634
config_file = StringIO(sample_config_text.encode('utf-8'))
635
(my_config._get_location_config().
636
_get_global_config()._get_parser(config_file))
1496
637
self.assertEqual('something',
1497
638
my_config.get_user_option('user_global_option'))
1499
def test_config_precedence(self):
1500
# FIXME: eager test, luckily no persitent config file makes it fail
1502
my_config = self.get_branch_config(global_config=precedence_global)
1503
self.assertEqual(my_config.get_user_option('option'), 'global')
1504
my_config = self.get_branch_config(global_config=precedence_global,
1505
branch_data_config=precedence_branch)
1506
self.assertEqual(my_config.get_user_option('option'), 'branch')
1507
my_config = self.get_branch_config(
1508
global_config=precedence_global,
1509
branch_data_config=precedence_branch,
1510
location_config=precedence_location)
1511
self.assertEqual(my_config.get_user_option('option'), 'recurse')
1512
my_config = self.get_branch_config(
1513
global_config=precedence_global,
1514
branch_data_config=precedence_branch,
1515
location_config=precedence_location,
1516
location='http://example.com/specific')
1517
self.assertEqual(my_config.get_user_option('option'), 'exact')
1520
class TestMailAddressExtraction(tests.TestCase):
640
def test_post_commit_default(self):
641
branch = FakeBranch()
643
my_config = config.BranchConfig(branch)
644
config_file = StringIO(sample_config_text.encode('utf-8'))
645
(my_config._get_location_config().
646
_get_global_config()._get_parser(config_file))
647
branch_file = StringIO(sample_branches_text)
648
my_config._get_location_config()._get_parser(branch_file)
649
self.assertEqual('bzrlib.tests.test_config.post_commit',
650
my_config.post_commit())
653
class TestMailAddressExtraction(TestCase):
1522
655
def test_extract_email_address(self):
1523
656
self.assertEqual('jane@test.com',
1524
657
config.extract_email_address('Jane <jane@test.com>'))
1525
self.assertRaises(config.NoEmailInUsername,
658
self.assertRaises(errors.BzrError,
1526
659
config.extract_email_address, 'Jane Tester')
1528
def test_parse_username(self):
1529
self.assertEqual(('', 'jdoe@example.com'),
1530
config.parse_username('jdoe@example.com'))
1531
self.assertEqual(('', 'jdoe@example.com'),
1532
config.parse_username('<jdoe@example.com>'))
1533
self.assertEqual(('John Doe', 'jdoe@example.com'),
1534
config.parse_username('John Doe <jdoe@example.com>'))
1535
self.assertEqual(('John Doe', ''),
1536
config.parse_username('John Doe'))
1537
self.assertEqual(('John Doe', 'jdoe@example.com'),
1538
config.parse_username('John Doe jdoe@example.com'))
1541
class TestTreeConfig(tests.TestCaseWithTransport):
1543
def test_get_value(self):
1544
"""Test that retreiving a value from a section is possible"""
1545
branch = self.make_branch('.')
1546
tree_config = config.TreeConfig(branch)
1547
tree_config.set_option('value', 'key', 'SECTION')
1548
tree_config.set_option('value2', 'key2')
1549
tree_config.set_option('value3-top', 'key3')
1550
tree_config.set_option('value3-section', 'key3', 'SECTION')
1551
value = tree_config.get_option('key', 'SECTION')
1552
self.assertEqual(value, 'value')
1553
value = tree_config.get_option('key2')
1554
self.assertEqual(value, 'value2')
1555
self.assertEqual(tree_config.get_option('non-existant'), None)
1556
value = tree_config.get_option('non-existant', 'SECTION')
1557
self.assertEqual(value, None)
1558
value = tree_config.get_option('non-existant', default='default')
1559
self.assertEqual(value, 'default')
1560
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1561
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1562
self.assertEqual(value, 'default')
1563
value = tree_config.get_option('key3')
1564
self.assertEqual(value, 'value3-top')
1565
value = tree_config.get_option('key3', 'SECTION')
1566
self.assertEqual(value, 'value3-section')
1569
class TestTransportConfig(tests.TestCaseWithTransport):
1571
def test_load_utf8(self):
1572
"""Ensure we can load an utf8-encoded file."""
1573
t = self.get_transport()
1574
unicode_user = u'b\N{Euro Sign}ar'
1575
unicode_content = u'user=%s' % (unicode_user,)
1576
utf8_content = unicode_content.encode('utf8')
1577
# Store the raw content in the config file
1578
t.put_bytes('foo.conf', utf8_content)
1579
conf = config.TransportConfig(t, 'foo.conf')
1580
self.assertEqual(unicode_user, conf.get_option('user'))
1582
def test_load_non_ascii(self):
1583
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
1584
t = self.get_transport()
1585
t.put_bytes('foo.conf', b'user=foo\n#\xff\n')
1586
conf = config.TransportConfig(t, 'foo.conf')
1587
self.assertRaises(config.ConfigContentError, conf._get_configobj)
1589
def test_load_erroneous_content(self):
1590
"""Ensure we display a proper error on content that can't be parsed."""
1591
t = self.get_transport()
1592
t.put_bytes('foo.conf', b'[open_section\n')
1593
conf = config.TransportConfig(t, 'foo.conf')
1594
self.assertRaises(config.ParseConfigError, conf._get_configobj)
1596
def test_load_permission_denied(self):
1597
"""Ensure we get an empty config file if the file is inaccessible."""
1601
warnings.append(args[0] % args[1:])
1602
self.overrideAttr(trace, 'warning', warning)
1604
class DenyingTransport(object):
1606
def __init__(self, base):
1609
def get_bytes(self, relpath):
1610
raise errors.PermissionDenied(relpath, "")
1612
cfg = config.TransportConfig(
1613
DenyingTransport("nonexisting://"), 'control.conf')
1614
self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
1617
[u'Permission denied while trying to open configuration file '
1618
u'nonexisting:///control.conf.'])
1620
def test_get_value(self):
1621
"""Test that retreiving a value from a section is possible"""
1622
bzrdir_config = config.TransportConfig(self.get_transport('.'),
1624
bzrdir_config.set_option('value', 'key', 'SECTION')
1625
bzrdir_config.set_option('value2', 'key2')
1626
bzrdir_config.set_option('value3-top', 'key3')
1627
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
1628
value = bzrdir_config.get_option('key', 'SECTION')
1629
self.assertEqual(value, 'value')
1630
value = bzrdir_config.get_option('key2')
1631
self.assertEqual(value, 'value2')
1632
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
1633
value = bzrdir_config.get_option('non-existant', 'SECTION')
1634
self.assertEqual(value, None)
1635
value = bzrdir_config.get_option('non-existant', default='default')
1636
self.assertEqual(value, 'default')
1637
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
1638
value = bzrdir_config.get_option('key2', 'NOSECTION',
1640
self.assertEqual(value, 'default')
1641
value = bzrdir_config.get_option('key3')
1642
self.assertEqual(value, 'value3-top')
1643
value = bzrdir_config.get_option('key3', 'SECTION')
1644
self.assertEqual(value, 'value3-section')
1646
def test_set_unset_default_stack_on(self):
1647
my_dir = self.make_controldir('.')
1648
bzrdir_config = config.BzrDirConfig(my_dir)
1649
self.assertIs(None, bzrdir_config.get_default_stack_on())
1650
bzrdir_config.set_default_stack_on('Foo')
1651
self.assertEqual('Foo', bzrdir_config._config.get_option(
1652
'default_stack_on'))
1653
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
1654
bzrdir_config.set_default_stack_on(None)
1655
self.assertIs(None, bzrdir_config.get_default_stack_on())
1658
class TestOldConfigHooks(tests.TestCaseWithTransport):
1661
super(TestOldConfigHooks, self).setUp()
1662
create_configs_with_file_option(self)
1664
def assertGetHook(self, conf, name, value):
1669
config.OldConfigHooks.install_named_hook('get', hook, None)
1671
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1672
self.assertLength(0, calls)
1673
actual_value = conf.get_user_option(name)
1674
self.assertEqual(value, actual_value)
1675
self.assertLength(1, calls)
1676
self.assertEqual((conf, name, value), calls[0])
1678
def test_get_hook_breezy(self):
1679
self.assertGetHook(self.breezy_config, 'file', 'breezy')
1681
def test_get_hook_locations(self):
1682
self.assertGetHook(self.locations_config, 'file', 'locations')
1684
def test_get_hook_branch(self):
1685
# Since locations masks branch, we define a different option
1686
self.branch_config.set_user_option('file2', 'branch')
1687
self.assertGetHook(self.branch_config, 'file2', 'branch')
1689
def assertSetHook(self, conf, name, value):
1694
config.OldConfigHooks.install_named_hook('set', hook, None)
1696
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1697
self.assertLength(0, calls)
1698
conf.set_user_option(name, value)
1699
self.assertLength(1, calls)
1700
# We can't assert the conf object below as different configs use
1701
# different means to implement set_user_option and we care only about
1703
self.assertEqual((name, value), calls[0][1:])
1705
def test_set_hook_breezy(self):
1706
self.assertSetHook(self.breezy_config, 'foo', 'breezy')
1708
def test_set_hook_locations(self):
1709
self.assertSetHook(self.locations_config, 'foo', 'locations')
1711
def test_set_hook_branch(self):
1712
self.assertSetHook(self.branch_config, 'foo', 'branch')
1714
def assertRemoveHook(self, conf, name, section_name=None):
1719
config.OldConfigHooks.install_named_hook('remove', hook, None)
1721
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
1722
self.assertLength(0, calls)
1723
conf.remove_user_option(name, section_name)
1724
self.assertLength(1, calls)
1725
# We can't assert the conf object below as different configs use
1726
# different means to implement remove_user_option and we care only about
1728
self.assertEqual((name,), calls[0][1:])
1730
def test_remove_hook_breezy(self):
1731
self.assertRemoveHook(self.breezy_config, 'file')
1733
def test_remove_hook_locations(self):
1734
self.assertRemoveHook(self.locations_config, 'file',
1735
self.locations_config.location)
1737
def test_remove_hook_branch(self):
1738
self.assertRemoveHook(self.branch_config, 'file')
1740
def assertLoadHook(self, name, conf_class, *conf_args):
1745
config.OldConfigHooks.install_named_hook('load', hook, None)
1747
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1748
self.assertLength(0, calls)
1750
conf = conf_class(*conf_args)
1751
# Access an option to trigger a load
1752
conf.get_user_option(name)
1753
self.assertLength(1, calls)
1754
# Since we can't assert about conf, we just use the number of calls ;-/
1756
def test_load_hook_breezy(self):
1757
self.assertLoadHook('file', config.GlobalConfig)
1759
def test_load_hook_locations(self):
1760
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
1762
def test_load_hook_branch(self):
1763
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
1765
def assertSaveHook(self, conf):
1770
config.OldConfigHooks.install_named_hook('save', hook, None)
1772
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1773
self.assertLength(0, calls)
1774
# Setting an option triggers a save
1775
conf.set_user_option('foo', 'bar')
1776
self.assertLength(1, calls)
1777
# Since we can't assert about conf, we just use the number of calls ;-/
1779
def test_save_hook_breezy(self):
1780
self.assertSaveHook(self.breezy_config)
1782
def test_save_hook_locations(self):
1783
self.assertSaveHook(self.locations_config)
1785
def test_save_hook_branch(self):
1786
self.assertSaveHook(self.branch_config)
1789
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
1790
"""Tests config hooks for remote configs.
1792
No tests for the remove hook as this is not implemented there.
1796
super(TestOldConfigHooksForRemote, self).setUp()
1797
self.transport_server = test_server.SmartTCPServer_for_testing
1798
create_configs_with_file_option(self)
1800
def assertGetHook(self, conf, name, value):
1805
config.OldConfigHooks.install_named_hook('get', hook, None)
1807
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1808
self.assertLength(0, calls)
1809
actual_value = conf.get_option(name)
1810
self.assertEqual(value, actual_value)
1811
self.assertLength(1, calls)
1812
self.assertEqual((conf, name, value), calls[0])
1814
def test_get_hook_remote_branch(self):
1815
remote_branch = branch.Branch.open(self.get_url('tree'))
1816
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
1818
def test_get_hook_remote_bzrdir(self):
1819
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1820
conf = remote_bzrdir._get_config()
1821
conf.set_option('remotedir', 'file')
1822
self.assertGetHook(conf, 'file', 'remotedir')
1824
def assertSetHook(self, conf, name, value):
1829
config.OldConfigHooks.install_named_hook('set', hook, None)
1831
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1832
self.assertLength(0, calls)
1833
conf.set_option(value, name)
1834
self.assertLength(1, calls)
1835
# We can't assert the conf object below as different configs use
1836
# different means to implement set_user_option and we care only about
1838
self.assertEqual((name, value), calls[0][1:])
1840
def test_set_hook_remote_branch(self):
1841
remote_branch = branch.Branch.open(self.get_url('tree'))
1842
self.addCleanup(remote_branch.lock_write().unlock)
1843
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
1845
def test_set_hook_remote_bzrdir(self):
1846
remote_branch = branch.Branch.open(self.get_url('tree'))
1847
self.addCleanup(remote_branch.lock_write().unlock)
1848
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1849
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
1851
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1856
config.OldConfigHooks.install_named_hook('load', hook, None)
1858
config.OldConfigHooks.uninstall_named_hook, 'load', None)
1859
self.assertLength(0, calls)
1861
conf = conf_class(*conf_args)
1862
# Access an option to trigger a load
1863
conf.get_option(name)
1864
self.assertLength(expected_nb_calls, calls)
1865
# Since we can't assert about conf, we just use the number of calls ;-/
1867
def test_load_hook_remote_branch(self):
1868
remote_branch = branch.Branch.open(self.get_url('tree'))
1869
self.assertLoadHook(
1870
1, 'file', remote.RemoteBranchConfig, remote_branch)
1872
def test_load_hook_remote_bzrdir(self):
1873
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1874
# The config file doesn't exist, set an option to force its creation
1875
conf = remote_bzrdir._get_config()
1876
conf.set_option('remotedir', 'file')
1877
# We get one call for the server and one call for the client, this is
1878
# caused by the differences in implementations betwen
1879
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1880
# SmartServerBranchGetConfigFile (in smart/branch.py)
1881
self.assertLoadHook(
1882
2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1884
def assertSaveHook(self, conf):
1889
config.OldConfigHooks.install_named_hook('save', hook, None)
1891
config.OldConfigHooks.uninstall_named_hook, 'save', None)
1892
self.assertLength(0, calls)
1893
# Setting an option triggers a save
1894
conf.set_option('foo', 'bar')
1895
self.assertLength(1, calls)
1896
# Since we can't assert about conf, we just use the number of calls ;-/
1898
def test_save_hook_remote_branch(self):
1899
remote_branch = branch.Branch.open(self.get_url('tree'))
1900
self.addCleanup(remote_branch.lock_write().unlock)
1901
self.assertSaveHook(remote_branch._get_config())
1903
def test_save_hook_remote_bzrdir(self):
1904
remote_branch = branch.Branch.open(self.get_url('tree'))
1905
self.addCleanup(remote_branch.lock_write().unlock)
1906
remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1907
self.assertSaveHook(remote_bzrdir._get_config())
1910
class TestOptionNames(tests.TestCase):
1912
def is_valid(self, name):
1913
return config._option_ref_re.match('{%s}' % name) is not None
1915
def test_valid_names(self):
1916
self.assertTrue(self.is_valid('foo'))
1917
self.assertTrue(self.is_valid('foo.bar'))
1918
self.assertTrue(self.is_valid('f1'))
1919
self.assertTrue(self.is_valid('_'))
1920
self.assertTrue(self.is_valid('__bar__'))
1921
self.assertTrue(self.is_valid('a_'))
1922
self.assertTrue(self.is_valid('a1'))
1923
# Don't break bzr-svn for no good reason
1924
self.assertTrue(self.is_valid('guessed-layout'))
1926
def test_invalid_names(self):
1927
self.assertFalse(self.is_valid(' foo'))
1928
self.assertFalse(self.is_valid('foo '))
1929
self.assertFalse(self.is_valid('1'))
1930
self.assertFalse(self.is_valid('1,2'))
1931
self.assertFalse(self.is_valid('foo$'))
1932
self.assertFalse(self.is_valid('!foo'))
1933
self.assertFalse(self.is_valid('foo.'))
1934
self.assertFalse(self.is_valid('foo..bar'))
1935
self.assertFalse(self.is_valid('{}'))
1936
self.assertFalse(self.is_valid('{a}'))
1937
self.assertFalse(self.is_valid('a\n'))
1938
self.assertFalse(self.is_valid('-'))
1939
self.assertFalse(self.is_valid('-a'))
1940
self.assertFalse(self.is_valid('a-'))
1941
self.assertFalse(self.is_valid('a--a'))
1943
def assertSingleGroup(self, reference):
1944
# the regexp is used with split and as such should match the reference
1945
# *only*, if more groups needs to be defined, (?:...) should be used.
1946
m = config._option_ref_re.match('{a}')
1947
self.assertLength(1, m.groups())
1949
def test_valid_references(self):
1950
self.assertSingleGroup('{a}')
1951
self.assertSingleGroup('{{a}}')
1954
class TestOption(tests.TestCase):
1956
def test_default_value(self):
1957
opt = config.Option('foo', default='bar')
1958
self.assertEqual('bar', opt.get_default())
1960
def test_callable_default_value(self):
1961
def bar_as_unicode():
1963
opt = config.Option('foo', default=bar_as_unicode)
1964
self.assertEqual('bar', opt.get_default())
1966
def test_default_value_from_env(self):
1967
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
1968
self.overrideEnv('FOO', 'quux')
1969
# Env variable provides a default taking over the option one
1970
self.assertEqual('quux', opt.get_default())
1972
def test_first_default_value_from_env_wins(self):
1973
opt = config.Option('foo', default='bar',
1974
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
1975
self.overrideEnv('FOO', 'foo')
1976
self.overrideEnv('BAZ', 'baz')
1977
# The first env var set wins
1978
self.assertEqual('foo', opt.get_default())
1980
def test_not_supported_list_default_value(self):
1981
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
1983
def test_not_supported_object_default_value(self):
1984
self.assertRaises(AssertionError, config.Option, 'foo',
1987
def test_not_supported_callable_default_value_not_unicode(self):
1988
def bar_not_unicode():
1990
opt = config.Option('foo', default=bar_not_unicode)
1991
self.assertRaises(AssertionError, opt.get_default)
1993
def test_get_help_topic(self):
1994
opt = config.Option('foo')
1995
self.assertEqual('foo', opt.get_help_topic())
1998
class TestOptionConverter(tests.TestCase):
2000
def assertConverted(self, expected, opt, value):
2001
self.assertEqual(expected, opt.convert_from_unicode(None, value))
2003
def assertCallsWarning(self, opt, value):
2007
warnings.append(args[0] % args[1:])
2008
self.overrideAttr(trace, 'warning', warning)
2009
self.assertEqual(None, opt.convert_from_unicode(None, value))
2010
self.assertLength(1, warnings)
2012
'Value "%s" is not valid for "%s"' % (value, opt.name),
2015
def assertCallsError(self, opt, value):
2016
self.assertRaises(config.ConfigOptionValueError,
2017
opt.convert_from_unicode, None, value)
2019
def assertConvertInvalid(self, opt, invalid_value):
2021
self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
2022
opt.invalid = 'warning'
2023
self.assertCallsWarning(opt, invalid_value)
2024
opt.invalid = 'error'
2025
self.assertCallsError(opt, invalid_value)
2028
class TestOptionWithBooleanConverter(TestOptionConverter):
2030
def get_option(self):
2031
return config.Option('foo', help='A boolean.',
2032
from_unicode=config.bool_from_store)
2034
def test_convert_invalid(self):
2035
opt = self.get_option()
2036
# A string that is not recognized as a boolean
2037
self.assertConvertInvalid(opt, u'invalid-boolean')
2038
# A list of strings is never recognized as a boolean
2039
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2041
def test_convert_valid(self):
2042
opt = self.get_option()
2043
self.assertConverted(True, opt, u'True')
2044
self.assertConverted(True, opt, u'1')
2045
self.assertConverted(False, opt, u'False')
2048
class TestOptionWithIntegerConverter(TestOptionConverter):
2050
def get_option(self):
2051
return config.Option('foo', help='An integer.',
2052
from_unicode=config.int_from_store)
2054
def test_convert_invalid(self):
2055
opt = self.get_option()
2056
# A string that is not recognized as an integer
2057
self.assertConvertInvalid(opt, u'forty-two')
2058
# A list of strings is never recognized as an integer
2059
self.assertConvertInvalid(opt, [u'a', u'list'])
2061
def test_convert_valid(self):
2062
opt = self.get_option()
2063
self.assertConverted(16, opt, u'16')
2066
class TestOptionWithSIUnitConverter(TestOptionConverter):
2068
def get_option(self):
2069
return config.Option('foo', help='An integer in SI units.',
2070
from_unicode=config.int_SI_from_store)
2072
def test_convert_invalid(self):
2073
opt = self.get_option()
2074
self.assertConvertInvalid(opt, u'not-a-unit')
2075
self.assertConvertInvalid(opt, u'Gb') # Forgot the value
2076
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2077
self.assertConvertInvalid(opt, u'1GG')
2078
self.assertConvertInvalid(opt, u'1Mbb')
2079
self.assertConvertInvalid(opt, u'1MM')
2081
def test_convert_valid(self):
2082
opt = self.get_option()
2083
self.assertConverted(int(5e3), opt, u'5kb')
2084
self.assertConverted(int(5e6), opt, u'5M')
2085
self.assertConverted(int(5e6), opt, u'5MB')
2086
self.assertConverted(int(5e9), opt, u'5g')
2087
self.assertConverted(int(5e9), opt, u'5gB')
2088
self.assertConverted(100, opt, u'100')
2091
class TestListOption(TestOptionConverter):
2093
def get_option(self):
2094
return config.ListOption('foo', help='A list.')
2096
def test_convert_invalid(self):
2097
opt = self.get_option()
2098
# We don't even try to convert a list into a list, we only expect
2100
self.assertConvertInvalid(opt, [1])
2101
# No string is invalid as all forms can be converted to a list
2103
def test_convert_valid(self):
2104
opt = self.get_option()
2105
# An empty string is an empty list
2106
self.assertConverted([], opt, '') # Using a bare str() just in case
2107
self.assertConverted([], opt, u'')
2109
self.assertConverted([u'True'], opt, u'True')
2111
self.assertConverted([u'42'], opt, u'42')
2113
self.assertConverted([u'bar'], opt, u'bar')
2116
class TestRegistryOption(TestOptionConverter):
2118
def get_option(self, registry):
2119
return config.RegistryOption('foo', registry,
2120
help='A registry option.')
2122
def test_convert_invalid(self):
2123
registry = _mod_registry.Registry()
2124
opt = self.get_option(registry)
2125
self.assertConvertInvalid(opt, [1])
2126
self.assertConvertInvalid(opt, u"notregistered")
2128
def test_convert_valid(self):
2129
registry = _mod_registry.Registry()
2130
registry.register("someval", 1234)
2131
opt = self.get_option(registry)
2132
# Using a bare str() just in case
2133
self.assertConverted(1234, opt, "someval")
2134
self.assertConverted(1234, opt, u'someval')
2135
self.assertConverted(None, opt, None)
2137
def test_help(self):
2138
registry = _mod_registry.Registry()
2139
registry.register("someval", 1234, help="some option")
2140
registry.register("dunno", 1234, help="some other option")
2141
opt = self.get_option(registry)
2143
'A registry option.\n'
2145
'The following values are supported:\n'
2146
' dunno - some other option\n'
2147
' someval - some option\n',
2150
def test_get_help_text(self):
2151
registry = _mod_registry.Registry()
2152
registry.register("someval", 1234, help="some option")
2153
registry.register("dunno", 1234, help="some other option")
2154
opt = self.get_option(registry)
2156
'A registry option.\n'
2158
'The following values are supported:\n'
2159
' dunno - some other option\n'
2160
' someval - some option\n',
2161
opt.get_help_text())
2164
class TestOptionRegistry(tests.TestCase):
2167
super(TestOptionRegistry, self).setUp()
2168
# Always start with an empty registry
2169
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2170
self.registry = config.option_registry
2172
def test_register(self):
2173
opt = config.Option('foo')
2174
self.registry.register(opt)
2175
self.assertIs(opt, self.registry.get('foo'))
2177
def test_registered_help(self):
2178
opt = config.Option('foo', help='A simple option')
2179
self.registry.register(opt)
2180
self.assertEqual('A simple option', self.registry.get_help('foo'))
2182
def test_dont_register_illegal_name(self):
2183
self.assertRaises(config.IllegalOptionName,
2184
self.registry.register, config.Option(' foo'))
2185
self.assertRaises(config.IllegalOptionName,
2186
self.registry.register, config.Option('bar,'))
2188
lazy_option = config.Option('lazy_foo', help='Lazy help')
2190
def test_register_lazy(self):
2191
self.registry.register_lazy('lazy_foo', self.__module__,
2192
'TestOptionRegistry.lazy_option')
2193
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2195
def test_registered_lazy_help(self):
2196
self.registry.register_lazy('lazy_foo', self.__module__,
2197
'TestOptionRegistry.lazy_option')
2198
self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
2200
def test_dont_lazy_register_illegal_name(self):
2201
# This is where the root cause of http://pad.lv/1235099 is better
2202
# understood: 'register_lazy' doc string mentions that key should match
2203
# the option name which indirectly requires that the option name is a
2204
# valid python identifier. We violate that rule here (using a key that
2205
# doesn't match the option name) to test the option name checking.
2206
self.assertRaises(config.IllegalOptionName,
2207
self.registry.register_lazy, ' foo', self.__module__,
2208
'TestOptionRegistry.lazy_option')
2209
self.assertRaises(config.IllegalOptionName,
2210
self.registry.register_lazy, '1,2', self.__module__,
2211
'TestOptionRegistry.lazy_option')
2214
class TestRegisteredOptions(tests.TestCase):
2215
"""All registered options should verify some constraints."""
2217
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2218
in config.option_registry.iteritems()]
2221
super(TestRegisteredOptions, self).setUp()
2222
self.registry = config.option_registry
2224
def test_proper_name(self):
2225
# An option should be registered under its own name, this can't be
2226
# checked at registration time for the lazy ones.
2227
self.assertEqual(self.option_name, self.option.name)
2229
def test_help_is_set(self):
2230
option_help = self.registry.get_help(self.option_name)
2231
# Come on, think about the user, he really wants to know what the
2233
self.assertIsNot(None, option_help)
2234
self.assertNotEqual('', option_help)
2237
class TestSection(tests.TestCase):
2239
# FIXME: Parametrize so that all sections produced by Stores run these
2240
# tests -- vila 2011-04-01
2242
def test_get_a_value(self):
2243
a_dict = dict(foo='bar')
2244
section = config.Section('myID', a_dict)
2245
self.assertEqual('bar', section.get('foo'))
2247
def test_get_unknown_option(self):
2249
section = config.Section(None, a_dict)
2250
self.assertEqual('out of thin air',
2251
section.get('foo', 'out of thin air'))
2253
def test_options_is_shared(self):
2255
section = config.Section(None, a_dict)
2256
self.assertIs(a_dict, section.options)
2259
class TestMutableSection(tests.TestCase):
2261
scenarios = [('mutable',
2263
lambda opts: config.MutableSection('myID', opts)},),
2267
a_dict = dict(foo='bar')
2268
section = self.get_section(a_dict)
2269
section.set('foo', 'new_value')
2270
self.assertEqual('new_value', section.get('foo'))
2271
# The change appears in the shared section
2272
self.assertEqual('new_value', a_dict.get('foo'))
2273
# We keep track of the change
2274
self.assertTrue('foo' in section.orig)
2275
self.assertEqual('bar', section.orig.get('foo'))
2277
def test_set_preserve_original_once(self):
2278
a_dict = dict(foo='bar')
2279
section = self.get_section(a_dict)
2280
section.set('foo', 'first_value')
2281
section.set('foo', 'second_value')
2282
# We keep track of the original value
2283
self.assertTrue('foo' in section.orig)
2284
self.assertEqual('bar', section.orig.get('foo'))
2286
def test_remove(self):
2287
a_dict = dict(foo='bar')
2288
section = self.get_section(a_dict)
2289
section.remove('foo')
2290
# We get None for unknown options via the default value
2291
self.assertEqual(None, section.get('foo'))
2292
# Or we just get the default value
2293
self.assertEqual('unknown', section.get('foo', 'unknown'))
2294
self.assertFalse('foo' in section.options)
2295
# We keep track of the deletion
2296
self.assertTrue('foo' in section.orig)
2297
self.assertEqual('bar', section.orig.get('foo'))
2299
def test_remove_new_option(self):
2301
section = self.get_section(a_dict)
2302
section.set('foo', 'bar')
2303
section.remove('foo')
2304
self.assertFalse('foo' in section.options)
2305
# The option didn't exist initially so it we need to keep track of it
2306
# with a special value
2307
self.assertTrue('foo' in section.orig)
2308
self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
2311
class TestCommandLineStore(tests.TestCase):
2314
super(TestCommandLineStore, self).setUp()
2315
self.store = config.CommandLineStore()
2316
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2318
def get_section(self):
2319
"""Get the unique section for the command line overrides."""
2320
sections = list(self.store.get_sections())
2321
self.assertLength(1, sections)
2322
store, section = sections[0]
2323
self.assertEqual(self.store, store)
2326
def test_no_override(self):
2327
self.store._from_cmdline([])
2328
section = self.get_section()
2329
self.assertLength(0, list(section.iter_option_names()))
2331
def test_simple_override(self):
2332
self.store._from_cmdline(['a=b'])
2333
section = self.get_section()
2334
self.assertEqual('b', section.get('a'))
2336
def test_list_override(self):
2337
opt = config.ListOption('l')
2338
config.option_registry.register(opt)
2339
self.store._from_cmdline(['l=1,2,3'])
2340
val = self.get_section().get('l')
2341
self.assertEqual('1,2,3', val)
2342
# Reminder: lists should be registered as such explicitely, otherwise
2343
# the conversion needs to be done afterwards.
2344
self.assertEqual(['1', '2', '3'],
2345
opt.convert_from_unicode(self.store, val))
2347
def test_multiple_overrides(self):
2348
self.store._from_cmdline(['a=b', 'x=y'])
2349
section = self.get_section()
2350
self.assertEqual('b', section.get('a'))
2351
self.assertEqual('y', section.get('x'))
2353
def test_wrong_syntax(self):
2354
self.assertRaises(errors.BzrCommandError,
2355
self.store._from_cmdline, ['a=b', 'c'])
2358
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2360
scenarios = [(key, {'get_store': builder}) for key, builder
2361
in config.test_store_builder_registry.iteritems()] + [
2362
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2365
store = self.get_store(self)
2366
if isinstance(store, config.TransportIniFileStore):
2367
raise tests.TestNotApplicable(
2368
"%s is not a concrete Store implementation"
2369
" so it doesn't need an id" % (store.__class__.__name__,))
2370
self.assertIsNot(None, store.id)
2373
class TestStore(tests.TestCaseWithTransport):
2375
def assertSectionContent(self, expected, store_and_section):
2376
"""Assert that some options have the proper values in a section."""
2377
_, section = store_and_section
2378
expected_name, expected_options = expected
2379
self.assertEqual(expected_name, section.id)
2382
dict([(k, section.get(k)) for k in expected_options.keys()]))
2385
class TestReadonlyStore(TestStore):
2387
scenarios = [(key, {'get_store': builder}) for key, builder
2388
in config.test_store_builder_registry.iteritems()]
2390
def test_building_delays_load(self):
2391
store = self.get_store(self)
2392
self.assertEqual(False, store.is_loaded())
2393
store._load_from_string(b'')
2394
self.assertEqual(True, store.is_loaded())
2396
def test_get_no_sections_for_empty(self):
2397
store = self.get_store(self)
2398
store._load_from_string(b'')
2399
self.assertEqual([], list(store.get_sections()))
2401
def test_get_default_section(self):
2402
store = self.get_store(self)
2403
store._load_from_string(b'foo=bar')
2404
sections = list(store.get_sections())
2405
self.assertLength(1, sections)
2406
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2408
def test_get_named_section(self):
2409
store = self.get_store(self)
2410
store._load_from_string(b'[baz]\nfoo=bar')
2411
sections = list(store.get_sections())
2412
self.assertLength(1, sections)
2413
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2415
def test_load_from_string_fails_for_non_empty_store(self):
2416
store = self.get_store(self)
2417
store._load_from_string(b'foo=bar')
2418
self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
2421
class TestStoreQuoting(TestStore):
2423
scenarios = [(key, {'get_store': builder}) for key, builder
2424
in config.test_store_builder_registry.iteritems()]
2427
super(TestStoreQuoting, self).setUp()
2428
self.store = self.get_store(self)
2429
# We need a loaded store but any content will do
2430
self.store._load_from_string(b'')
2432
def assertIdempotent(self, s):
2433
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2435
What matters here is that option values, as they appear in a store, can
2436
be safely round-tripped out of the store and back.
2438
:param s: A string, quoted if required.
2440
self.assertEqual(s, self.store.quote(self.store.unquote(s)))
2441
self.assertEqual(s, self.store.unquote(self.store.quote(s)))
2443
def test_empty_string(self):
2444
if isinstance(self.store, config.IniFileStore):
2445
# configobj._quote doesn't handle empty values
2446
self.assertRaises(AssertionError,
2447
self.assertIdempotent, '')
2449
self.assertIdempotent('')
2450
# But quoted empty strings are ok
2451
self.assertIdempotent('""')
2453
def test_embedded_spaces(self):
2454
self.assertIdempotent('" a b c "')
2456
def test_embedded_commas(self):
2457
self.assertIdempotent('" a , b c "')
2459
def test_simple_comma(self):
2460
if isinstance(self.store, config.IniFileStore):
2461
# configobj requires that lists are special-cased
2462
self.assertRaises(AssertionError,
2463
self.assertIdempotent, ',')
2465
self.assertIdempotent(',')
2466
# When a single comma is required, quoting is also required
2467
self.assertIdempotent('","')
2469
def test_list(self):
2470
if isinstance(self.store, config.IniFileStore):
2471
# configobj requires that lists are special-cased
2472
self.assertRaises(AssertionError,
2473
self.assertIdempotent, 'a,b')
2475
self.assertIdempotent('a,b')
2478
class TestDictFromStore(tests.TestCase):
2480
def test_unquote_not_string(self):
2481
conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
2482
value = conf.get('a_section')
2483
# Urgh, despite 'conf' asking for the no-name section, we get the
2484
# content of another section as a dict o_O
2485
self.assertEqual({'a': '1'}, value)
2486
unquoted = conf.store.unquote(value)
2487
# Which cannot be unquoted but shouldn't crash either (the use cases
2488
# are getting the value or displaying it. In the later case, '%s' will
2490
self.assertEqual({'a': '1'}, unquoted)
2491
self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
2494
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2495
"""Simulate loading a config store with content of various encodings.
2497
All files produced by bzr are in utf8 content.
2499
Users may modify them manually and end up with a file that can't be
2500
loaded. We need to issue proper error messages in this case.
2503
invalid_utf8_char = b'\xff'
2505
def test_load_utf8(self):
2506
"""Ensure we can load an utf8-encoded file."""
2507
t = self.get_transport()
2508
# From http://pad.lv/799212
2509
unicode_user = u'b\N{Euro Sign}ar'
2510
unicode_content = u'user=%s' % (unicode_user,)
2511
utf8_content = unicode_content.encode('utf8')
2512
# Store the raw content in the config file
2513
t.put_bytes('foo.conf', utf8_content)
2514
store = config.TransportIniFileStore(t, 'foo.conf')
2516
stack = config.Stack([store.get_sections], store)
2517
self.assertEqual(unicode_user, stack.get('user'))
2519
def test_load_non_ascii(self):
2520
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2521
t = self.get_transport()
2522
t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2523
store = config.TransportIniFileStore(t, 'foo.conf')
2524
self.assertRaises(config.ConfigContentError, store.load)
2526
def test_load_erroneous_content(self):
2527
"""Ensure we display a proper error on content that can't be parsed."""
2528
t = self.get_transport()
2529
t.put_bytes('foo.conf', b'[open_section\n')
2530
store = config.TransportIniFileStore(t, 'foo.conf')
2531
self.assertRaises(config.ParseConfigError, store.load)
2533
def test_load_permission_denied(self):
2534
"""Ensure we get warned when trying to load an inaccessible file."""
2538
warnings.append(args[0] % args[1:])
2539
self.overrideAttr(trace, 'warning', warning)
2541
t = self.get_transport()
2543
def get_bytes(relpath):
2544
raise errors.PermissionDenied(relpath, "")
2545
t.get_bytes = get_bytes
2546
store = config.TransportIniFileStore(t, 'foo.conf')
2547
self.assertRaises(errors.PermissionDenied, store.load)
2550
[u'Permission denied while trying to load configuration store %s.'
2551
% store.external_url()])
2554
class TestIniConfigContent(tests.TestCaseWithTransport):
2555
"""Simulate loading a IniBasedConfig with content of various encodings.
2557
All files produced by bzr are in utf8 content.
2559
Users may modify them manually and end up with a file that can't be
2560
loaded. We need to issue proper error messages in this case.
2563
invalid_utf8_char = b'\xff'
2565
def test_load_utf8(self):
2566
"""Ensure we can load an utf8-encoded file."""
2567
# From http://pad.lv/799212
2568
unicode_user = u'b\N{Euro Sign}ar'
2569
unicode_content = u'user=%s' % (unicode_user,)
2570
utf8_content = unicode_content.encode('utf8')
2571
# Store the raw content in the config file
2572
with open('foo.conf', 'wb') as f:
2573
f.write(utf8_content)
2574
conf = config.IniBasedConfig(file_name='foo.conf')
2575
self.assertEqual(unicode_user, conf.get_user_option('user'))
2577
def test_load_badly_encoded_content(self):
2578
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2579
with open('foo.conf', 'wb') as f:
2580
f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2581
conf = config.IniBasedConfig(file_name='foo.conf')
2582
self.assertRaises(config.ConfigContentError, conf._get_parser)
2584
def test_load_erroneous_content(self):
2585
"""Ensure we display a proper error on content that can't be parsed."""
2586
with open('foo.conf', 'wb') as f:
2587
f.write(b'[open_section\n')
2588
conf = config.IniBasedConfig(file_name='foo.conf')
2589
self.assertRaises(config.ParseConfigError, conf._get_parser)
2592
class TestMutableStore(TestStore):
2594
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2595
in config.test_store_builder_registry.iteritems()]
2598
super(TestMutableStore, self).setUp()
2599
self.transport = self.get_transport()
2601
def has_store(self, store):
2602
store_basename = urlutils.relative_url(self.transport.external_url(),
2603
store.external_url())
2604
return self.transport.has(store_basename)
2606
def test_save_empty_creates_no_file(self):
2607
# FIXME: There should be a better way than relying on the test
2608
# parametrization to identify branch.conf -- vila 2011-0526
2609
if self.store_id in ('branch', 'remote_branch'):
2610
raise tests.TestNotApplicable(
2611
'branch.conf is *always* created when a branch is initialized')
2612
store = self.get_store(self)
2614
self.assertEqual(False, self.has_store(store))
2616
def test_mutable_section_shared(self):
2617
store = self.get_store(self)
2618
store._load_from_string(b'foo=bar\n')
2619
# FIXME: There should be a better way than relying on the test
2620
# parametrization to identify branch.conf -- vila 2011-0526
2621
if self.store_id in ('branch', 'remote_branch'):
2622
# branch stores requires write locked branches
2623
self.addCleanup(store.branch.lock_write().unlock)
2624
section1 = store.get_mutable_section(None)
2625
section2 = store.get_mutable_section(None)
2626
# If we get different sections, different callers won't share the
2628
self.assertIs(section1, section2)
2630
def test_save_emptied_succeeds(self):
2631
store = self.get_store(self)
2632
store._load_from_string(b'foo=bar\n')
2633
# FIXME: There should be a better way than relying on the test
2634
# parametrization to identify branch.conf -- vila 2011-0526
2635
if self.store_id in ('branch', 'remote_branch'):
2636
# branch stores requires write locked branches
2637
self.addCleanup(store.branch.lock_write().unlock)
2638
section = store.get_mutable_section(None)
2639
section.remove('foo')
2641
self.assertEqual(True, self.has_store(store))
2642
modified_store = self.get_store(self)
2643
sections = list(modified_store.get_sections())
2644
self.assertLength(0, sections)
2646
def test_save_with_content_succeeds(self):
2647
# FIXME: There should be a better way than relying on the test
2648
# parametrization to identify branch.conf -- vila 2011-0526
2649
if self.store_id in ('branch', 'remote_branch'):
2650
raise tests.TestNotApplicable(
2651
'branch.conf is *always* created when a branch is initialized')
2652
store = self.get_store(self)
2653
store._load_from_string(b'foo=bar\n')
2654
self.assertEqual(False, self.has_store(store))
2656
self.assertEqual(True, self.has_store(store))
2657
modified_store = self.get_store(self)
2658
sections = list(modified_store.get_sections())
2659
self.assertLength(1, sections)
2660
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2662
def test_set_option_in_empty_store(self):
2663
store = self.get_store(self)
2664
# FIXME: There should be a better way than relying on the test
2665
# parametrization to identify branch.conf -- vila 2011-0526
2666
if self.store_id in ('branch', 'remote_branch'):
2667
# branch stores requires write locked branches
2668
self.addCleanup(store.branch.lock_write().unlock)
2669
section = store.get_mutable_section(None)
2670
section.set('foo', 'bar')
2672
modified_store = self.get_store(self)
2673
sections = list(modified_store.get_sections())
2674
self.assertLength(1, sections)
2675
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2677
def test_set_option_in_default_section(self):
2678
store = self.get_store(self)
2679
store._load_from_string(b'')
2680
# FIXME: There should be a better way than relying on the test
2681
# parametrization to identify branch.conf -- vila 2011-0526
2682
if self.store_id in ('branch', 'remote_branch'):
2683
# branch stores requires write locked branches
2684
self.addCleanup(store.branch.lock_write().unlock)
2685
section = store.get_mutable_section(None)
2686
section.set('foo', 'bar')
2688
modified_store = self.get_store(self)
2689
sections = list(modified_store.get_sections())
2690
self.assertLength(1, sections)
2691
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2693
def test_set_option_in_named_section(self):
2694
store = self.get_store(self)
2695
store._load_from_string(b'')
2696
# FIXME: There should be a better way than relying on the test
2697
# parametrization to identify branch.conf -- vila 2011-0526
2698
if self.store_id in ('branch', 'remote_branch'):
2699
# branch stores requires write locked branches
2700
self.addCleanup(store.branch.lock_write().unlock)
2701
section = store.get_mutable_section('baz')
2702
section.set('foo', 'bar')
2704
modified_store = self.get_store(self)
2705
sections = list(modified_store.get_sections())
2706
self.assertLength(1, sections)
2707
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2709
def test_load_hook(self):
2710
# First, we need to ensure that the store exists
2711
store = self.get_store(self)
2712
# FIXME: There should be a better way than relying on the test
2713
# parametrization to identify branch.conf -- vila 2011-0526
2714
if self.store_id in ('branch', 'remote_branch'):
2715
# branch stores requires write locked branches
2716
self.addCleanup(store.branch.lock_write().unlock)
2717
section = store.get_mutable_section('baz')
2718
section.set('foo', 'bar')
2720
# Now we can try to load it
2721
store = self.get_store(self)
2726
config.ConfigHooks.install_named_hook('load', hook, None)
2727
self.assertLength(0, calls)
2729
self.assertLength(1, calls)
2730
self.assertEqual((store,), calls[0])
2732
def test_save_hook(self):
2737
config.ConfigHooks.install_named_hook('save', hook, None)
2738
self.assertLength(0, calls)
2739
store = self.get_store(self)
2740
# FIXME: There should be a better way than relying on the test
2741
# parametrization to identify branch.conf -- vila 2011-0526
2742
if self.store_id in ('branch', 'remote_branch'):
2743
# branch stores requires write locked branches
2744
self.addCleanup(store.branch.lock_write().unlock)
2745
section = store.get_mutable_section('baz')
2746
section.set('foo', 'bar')
2748
self.assertLength(1, calls)
2749
self.assertEqual((store,), calls[0])
2751
def test_set_mark_dirty(self):
2752
stack = config.MemoryStack(b'')
2753
self.assertLength(0, stack.store.dirty_sections)
2754
stack.set('foo', 'baz')
2755
self.assertLength(1, stack.store.dirty_sections)
2756
self.assertTrue(stack.store._need_saving())
2758
def test_remove_mark_dirty(self):
2759
stack = config.MemoryStack(b'foo=bar')
2760
self.assertLength(0, stack.store.dirty_sections)
2762
self.assertLength(1, stack.store.dirty_sections)
2763
self.assertTrue(stack.store._need_saving())
2766
class TestStoreSaveChanges(tests.TestCaseWithTransport):
2767
"""Tests that config changes are kept in memory and saved on-demand."""
2770
super(TestStoreSaveChanges, self).setUp()
2771
self.transport = self.get_transport()
2772
# Most of the tests involve two stores pointing to the same persistent
2773
# storage to observe the effects of concurrent changes
2774
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2775
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2779
self.warnings.append(args[0] % args[1:])
2780
self.overrideAttr(trace, 'warning', warning)
2782
def has_store(self, store):
2783
store_basename = urlutils.relative_url(self.transport.external_url(),
2784
store.external_url())
2785
return self.transport.has(store_basename)
2787
def get_stack(self, store):
2788
# Any stack will do as long as it uses the right store, just a single
2789
# no-name section is enough
2790
return config.Stack([store.get_sections], store)
2792
def test_no_changes_no_save(self):
2793
s = self.get_stack(self.st1)
2794
s.store.save_changes()
2795
self.assertEqual(False, self.has_store(self.st1))
2797
def test_unrelated_concurrent_update(self):
2798
s1 = self.get_stack(self.st1)
2799
s2 = self.get_stack(self.st2)
2800
s1.set('foo', 'bar')
2801
s2.set('baz', 'quux')
2803
# Changes don't propagate magically
2804
self.assertEqual(None, s1.get('baz'))
2805
s2.store.save_changes()
2806
self.assertEqual('quux', s2.get('baz'))
2807
# Changes are acquired when saving
2808
self.assertEqual('bar', s2.get('foo'))
2809
# Since there is no overlap, no warnings are emitted
2810
self.assertLength(0, self.warnings)
2812
def test_concurrent_update_modified(self):
2813
s1 = self.get_stack(self.st1)
2814
s2 = self.get_stack(self.st2)
2815
s1.set('foo', 'bar')
2816
s2.set('foo', 'baz')
2819
s2.store.save_changes()
2820
self.assertEqual('baz', s2.get('foo'))
2821
# But the user get a warning
2822
self.assertLength(1, self.warnings)
2823
warning = self.warnings[0]
2824
self.assertStartsWith(warning, 'Option foo in section None')
2825
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
2826
' The baz value will be saved.')
2828
def test_concurrent_deletion(self):
2829
self.st1._load_from_string(b'foo=bar')
2831
s1 = self.get_stack(self.st1)
2832
s2 = self.get_stack(self.st2)
2835
s1.store.save_changes()
2837
self.assertLength(0, self.warnings)
2838
s2.store.save_changes()
2840
self.assertLength(1, self.warnings)
2841
warning = self.warnings[0]
2842
self.assertStartsWith(warning, 'Option foo in section None')
2843
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
2844
' The <DELETED> value will be saved.')
2847
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
2849
def get_store(self):
2850
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2852
def test_get_quoted_string(self):
2853
store = self.get_store()
2854
store._load_from_string(b'foo= " abc "')
2855
stack = config.Stack([store.get_sections])
2856
self.assertEqual(' abc ', stack.get('foo'))
2858
def test_set_quoted_string(self):
2859
store = self.get_store()
2860
stack = config.Stack([store.get_sections], store)
2861
stack.set('foo', ' a b c ')
2863
self.assertFileEqual(b'foo = " a b c "' +
2864
os.linesep.encode('ascii'), 'foo.conf')
2867
class TestTransportIniFileStore(TestStore):
2869
def test_loading_unknown_file_fails(self):
2870
store = config.TransportIniFileStore(self.get_transport(),
2872
self.assertRaises(errors.NoSuchFile, store.load)
2874
def test_invalid_content(self):
2875
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2876
self.assertEqual(False, store.is_loaded())
2877
exc = self.assertRaises(
2878
config.ParseConfigError, store._load_from_string,
2879
b'this is invalid !')
2880
self.assertEndsWith(exc.filename, 'foo.conf')
2881
# And the load failed
2882
self.assertEqual(False, store.is_loaded())
2884
def test_get_embedded_sections(self):
2885
# A more complicated example (which also shows that section names and
2886
# option names share the same name space...)
2887
# FIXME: This should be fixed by forbidding dicts as values ?
2888
# -- vila 2011-04-05
2889
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
2890
store._load_from_string(b'''
2894
foo_in_DEFAULT=foo_DEFAULT
2902
sections = list(store.get_sections())
2903
self.assertLength(4, sections)
2904
# The default section has no name.
2905
# List values are provided as strings and need to be explicitly
2906
# converted by specifying from_unicode=list_from_store at option
2908
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
2910
self.assertSectionContent(
2911
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2912
self.assertSectionContent(
2913
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2914
# sub sections are provided as embedded dicts.
2915
self.assertSectionContent(
2916
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2920
class TestLockableIniFileStore(TestStore):
2922
def test_create_store_in_created_dir(self):
2923
self.assertPathDoesNotExist('dir')
2924
t = self.get_transport('dir/subdir')
2925
store = config.LockableIniFileStore(t, 'foo.conf')
2926
store.get_mutable_section(None).set('foo', 'bar')
2928
self.assertPathExists('dir/subdir')
2931
class TestConcurrentStoreUpdates(TestStore):
2932
"""Test that Stores properly handle conccurent updates.
2934
New Store implementation may fail some of these tests but until such
2935
implementations exist it's hard to properly filter them from the scenarios
2936
applied here. If you encounter such a case, contact the bzr devs.
2939
scenarios = [(key, {'get_stack': builder}) for key, builder
2940
in config.test_stack_builder_registry.iteritems()]
2943
super(TestConcurrentStoreUpdates, self).setUp()
2944
self.stack = self.get_stack(self)
2945
if not isinstance(self.stack, config._CompatibleStack):
2946
raise tests.TestNotApplicable(
2947
'%s is not meant to be compatible with the old config design'
2949
self.stack.set('one', '1')
2950
self.stack.set('two', '2')
2952
self.stack.store.save()
2954
def test_simple_read_access(self):
2955
self.assertEqual('1', self.stack.get('one'))
2957
def test_simple_write_access(self):
2958
self.stack.set('one', 'one')
2959
self.assertEqual('one', self.stack.get('one'))
2961
def test_listen_to_the_last_speaker(self):
2963
c2 = self.get_stack(self)
2964
c1.set('one', 'ONE')
2965
c2.set('two', 'TWO')
2966
self.assertEqual('ONE', c1.get('one'))
2967
self.assertEqual('TWO', c2.get('two'))
2968
# The second update respect the first one
2969
self.assertEqual('ONE', c2.get('one'))
2971
def test_last_speaker_wins(self):
2972
# If the same config is not shared, the same variable modified twice
2973
# can only see a single result.
2975
c2 = self.get_stack(self)
2978
self.assertEqual('c2', c2.get('one'))
2979
# The first modification is still available until another refresh
2981
self.assertEqual('c1', c1.get('one'))
2982
c1.set('two', 'done')
2983
self.assertEqual('c2', c1.get('one'))
2985
def test_writes_are_serialized(self):
2987
c2 = self.get_stack(self)
2989
# We spawn a thread that will pause *during* the config saving.
2990
before_writing = threading.Event()
2991
after_writing = threading.Event()
2992
writing_done = threading.Event()
2993
c1_save_without_locking_orig = c1.store.save_without_locking
2995
def c1_save_without_locking():
2996
before_writing.set()
2997
c1_save_without_locking_orig()
2998
# The lock is held. We wait for the main thread to decide when to
3000
after_writing.wait()
3001
c1.store.save_without_locking = c1_save_without_locking
3006
t1 = threading.Thread(target=c1_set)
3007
# Collect the thread after the test
3008
self.addCleanup(t1.join)
3009
# Be ready to unblock the thread if the test goes wrong
3010
self.addCleanup(after_writing.set)
3012
before_writing.wait()
3013
self.assertRaises(errors.LockContention,
3014
c2.set, 'one', 'c2')
3015
self.assertEqual('c1', c1.get('one'))
3016
# Let the lock be released
3020
self.assertEqual('c2', c2.get('one'))
3022
def test_read_while_writing(self):
3024
# We spawn a thread that will pause *during* the write
3025
ready_to_write = threading.Event()
3026
do_writing = threading.Event()
3027
writing_done = threading.Event()
3028
# We override the _save implementation so we know the store is locked
3029
c1_save_without_locking_orig = c1.store.save_without_locking
3031
def c1_save_without_locking():
3032
ready_to_write.set()
3033
# The lock is held. We wait for the main thread to decide when to
3036
c1_save_without_locking_orig()
3038
c1.store.save_without_locking = c1_save_without_locking
3042
t1 = threading.Thread(target=c1_set)
3043
# Collect the thread after the test
3044
self.addCleanup(t1.join)
3045
# Be ready to unblock the thread if the test goes wrong
3046
self.addCleanup(do_writing.set)
3048
# Ensure the thread is ready to write
3049
ready_to_write.wait()
3050
self.assertEqual('c1', c1.get('one'))
3051
# If we read during the write, we get the old value
3052
c2 = self.get_stack(self)
3053
self.assertEqual('1', c2.get('one'))
3054
# Let the writing occur and ensure it occurred
3057
# Now we get the updated value
3058
c3 = self.get_stack(self)
3059
self.assertEqual('c1', c3.get('one'))
3061
# FIXME: It may be worth looking into removing the lock dir when it's not
3062
# needed anymore and look at possible fallouts for concurrent lockers. This
3063
# will matter if/when we use config files outside of breezy directories
3064
# (.config/breezy or .bzr) -- vila 20110-04-111
3067
class TestSectionMatcher(TestStore):
3069
scenarios = [('location', {'matcher': config.LocationMatcher}),
3070
('id', {'matcher': config.NameMatcher}), ]
3073
super(TestSectionMatcher, self).setUp()
3074
# Any simple store is good enough
3075
self.get_store = config.test_store_builder_registry.get('configobj')
3077
def test_no_matches_for_empty_stores(self):
3078
store = self.get_store(self)
3079
store._load_from_string(b'')
3080
matcher = self.matcher(store, '/bar')
3081
self.assertEqual([], list(matcher.get_sections()))
3083
def test_build_doesnt_load_store(self):
3084
store = self.get_store(self)
3085
self.matcher(store, '/bar')
3086
self.assertFalse(store.is_loaded())
3089
class TestLocationSection(tests.TestCase):
3091
def get_section(self, options, extra_path):
3092
section = config.Section('foo', options)
3093
return config.LocationSection(section, extra_path)
3095
def test_simple_option(self):
3096
section = self.get_section({'foo': 'bar'}, '')
3097
self.assertEqual('bar', section.get('foo'))
3099
def test_option_with_extra_path(self):
3100
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3102
self.assertEqual('bar/baz', section.get('foo'))
3104
def test_invalid_policy(self):
3105
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3107
# invalid policies are ignored
3108
self.assertEqual('bar', section.get('foo'))
3111
class TestLocationMatcher(TestStore):
3114
super(TestLocationMatcher, self).setUp()
3115
# Any simple store is good enough
3116
self.get_store = config.test_store_builder_registry.get('configobj')
3118
def test_unrelated_section_excluded(self):
3119
store = self.get_store(self)
3120
store._load_from_string(b'''
3128
section=/foo/bar/baz
3132
self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3134
[section.id for _, section in store.get_sections()])
3135
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3136
sections = [section for _, section in matcher.get_sections()]
3137
self.assertEqual(['/foo/bar', '/foo'],
3138
[section.id for section in sections])
3139
self.assertEqual(['quux', 'bar/quux'],
3140
[section.extra_path for section in sections])
3142
def test_more_specific_sections_first(self):
3143
store = self.get_store(self)
3144
store._load_from_string(b'''
3150
self.assertEqual(['/foo', '/foo/bar'],
3151
[section.id for _, section in store.get_sections()])
3152
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3153
sections = [section for _, section in matcher.get_sections()]
3154
self.assertEqual(['/foo/bar', '/foo'],
3155
[section.id for section in sections])
3156
self.assertEqual(['baz', 'bar/baz'],
3157
[section.extra_path for section in sections])
3159
def test_appendpath_in_no_name_section(self):
3160
# It's a bit weird to allow appendpath in a no-name section, but
3161
# someone may found a use for it
3162
store = self.get_store(self)
3163
store._load_from_string(b'''
3165
foo:policy = appendpath
3167
matcher = config.LocationMatcher(store, 'dir/subdir')
3168
sections = list(matcher.get_sections())
3169
self.assertLength(1, sections)
3170
self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
3172
def test_file_urls_are_normalized(self):
3173
store = self.get_store(self)
3174
if sys.platform == 'win32':
3175
expected_url = 'file:///C:/dir/subdir'
3176
expected_location = 'C:/dir/subdir'
3178
expected_url = 'file:///dir/subdir'
3179
expected_location = '/dir/subdir'
3180
matcher = config.LocationMatcher(store, expected_url)
3181
self.assertEqual(expected_location, matcher.location)
3183
def test_branch_name_colo(self):
3184
store = self.get_store(self)
3185
store._load_from_string(dedent("""\
3187
push_location=my{branchname}
3188
""").encode('ascii'))
3189
matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
3190
self.assertEqual('example<', matcher.branch_name)
3191
((_, section),) = matcher.get_sections()
3192
self.assertEqual('example<', section.locals['branchname'])
3194
def test_branch_name_basename(self):
3195
store = self.get_store(self)
3196
store._load_from_string(dedent("""\
3198
push_location=my{branchname}
3199
""").encode('ascii'))
3200
matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
3201
self.assertEqual('example<', matcher.branch_name)
3202
((_, section),) = matcher.get_sections()
3203
self.assertEqual('example<', section.locals['branchname'])
3206
class TestStartingPathMatcher(TestStore):
3209
super(TestStartingPathMatcher, self).setUp()
3210
# Any simple store is good enough
3211
self.store = config.IniFileStore()
3213
def assertSectionIDs(self, expected, location, content):
3214
self.store._load_from_string(content)
3215
matcher = config.StartingPathMatcher(self.store, location)
3216
sections = list(matcher.get_sections())
3217
self.assertLength(len(expected), sections)
3218
self.assertEqual(expected, [section.id for _, section in sections])
3221
def test_empty(self):
3222
self.assertSectionIDs([], self.get_url(), b'')
3224
def test_url_vs_local_paths(self):
3225
# The matcher location is an url and the section names are local paths
3226
self.assertSectionIDs(['/foo/bar', '/foo'],
3227
'file:///foo/bar/baz', b'''\
3232
def test_local_path_vs_url(self):
3233
# The matcher location is a local path and the section names are urls
3234
self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3235
'/foo/bar/baz', b'''\
3240
def test_no_name_section_included_when_present(self):
3241
# Note that other tests will cover the case where the no-name section
3242
# is empty and as such, not included.
3243
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3244
'/foo/bar/baz', b'''\
3245
option = defined so the no-name section exists
3249
self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3250
[s.locals['relpath'] for _, s in sections])
3252
def test_order_reversed(self):
3253
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3258
def test_unrelated_section_excluded(self):
3259
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3265
def test_glob_included(self):
3266
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3267
'/foo/bar/baz', b'''\
3273
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3274
# nothing really is... as far using {relpath} to append it to something
3275
# else, this seems good enough though.
3276
self.assertEqual(['', 'baz', 'bar/baz'],
3277
[s.locals['relpath'] for _, s in sections])
3279
def test_respect_order(self):
3280
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3281
'/foo/bar/baz', b'''\
3289
class TestNameMatcher(TestStore):
3292
super(TestNameMatcher, self).setUp()
3293
self.matcher = config.NameMatcher
3294
# Any simple store is good enough
3295
self.get_store = config.test_store_builder_registry.get('configobj')
3297
def get_matching_sections(self, name):
3298
store = self.get_store(self)
3299
store._load_from_string(b'''
3307
matcher = self.matcher(store, name)
3308
return list(matcher.get_sections())
3310
def test_matching(self):
3311
sections = self.get_matching_sections('foo')
3312
self.assertLength(1, sections)
3313
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3315
def test_not_matching(self):
3316
sections = self.get_matching_sections('baz')
3317
self.assertLength(0, sections)
3320
class TestBaseStackGet(tests.TestCase):
3323
super(TestBaseStackGet, self).setUp()
3324
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3326
def test_get_first_definition(self):
3327
store1 = config.IniFileStore()
3328
store1._load_from_string(b'foo=bar')
3329
store2 = config.IniFileStore()
3330
store2._load_from_string(b'foo=baz')
3331
conf = config.Stack([store1.get_sections, store2.get_sections])
3332
self.assertEqual('bar', conf.get('foo'))
3334
def test_get_with_registered_default_value(self):
3335
config.option_registry.register(config.Option('foo', default='bar'))
3336
conf_stack = config.Stack([])
3337
self.assertEqual('bar', conf_stack.get('foo'))
3339
def test_get_without_registered_default_value(self):
3340
config.option_registry.register(config.Option('foo'))
3341
conf_stack = config.Stack([])
3342
self.assertEqual(None, conf_stack.get('foo'))
3344
def test_get_without_default_value_for_not_registered(self):
3345
conf_stack = config.Stack([])
3346
self.assertEqual(None, conf_stack.get('foo'))
3348
def test_get_for_empty_section_callable(self):
3349
conf_stack = config.Stack([lambda: []])
3350
self.assertEqual(None, conf_stack.get('foo'))
3352
def test_get_for_broken_callable(self):
3353
# Trying to use and invalid callable raises an exception on first use
3354
conf_stack = config.Stack([object])
3355
self.assertRaises(TypeError, conf_stack.get, 'foo')
3358
class TestStackWithSimpleStore(tests.TestCase):
3361
super(TestStackWithSimpleStore, self).setUp()
3362
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3363
self.registry = config.option_registry
3365
def get_conf(self, content=None):
3366
return config.MemoryStack(content)
3368
def test_override_value_from_env(self):
3369
self.overrideEnv('FOO', None)
3370
self.registry.register(
3371
config.Option('foo', default='bar', override_from_env=['FOO']))
3372
self.overrideEnv('FOO', 'quux')
3373
# Env variable provides a default taking over the option one
3374
conf = self.get_conf(b'foo=store')
3375
self.assertEqual('quux', conf.get('foo'))
3377
def test_first_override_value_from_env_wins(self):
3378
self.overrideEnv('NO_VALUE', None)
3379
self.overrideEnv('FOO', None)
3380
self.overrideEnv('BAZ', None)
3381
self.registry.register(
3382
config.Option('foo', default='bar',
3383
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3384
self.overrideEnv('FOO', 'foo')
3385
self.overrideEnv('BAZ', 'baz')
3386
# The first env var set wins
3387
conf = self.get_conf(b'foo=store')
3388
self.assertEqual('foo', conf.get('foo'))
3391
class TestMemoryStack(tests.TestCase):
3394
conf = config.MemoryStack(b'foo=bar')
3395
self.assertEqual('bar', conf.get('foo'))
3398
conf = config.MemoryStack(b'foo=bar')
3399
conf.set('foo', 'baz')
3400
self.assertEqual('baz', conf.get('foo'))
3402
def test_no_content(self):
3403
conf = config.MemoryStack()
3404
# No content means no loading
3405
self.assertFalse(conf.store.is_loaded())
3406
self.assertRaises(NotImplementedError, conf.get, 'foo')
3407
# But a content can still be provided
3408
conf.store._load_from_string(b'foo=bar')
3409
self.assertEqual('bar', conf.get('foo'))
3412
class TestStackIterSections(tests.TestCase):
3414
def test_empty_stack(self):
3415
conf = config.Stack([])
3416
sections = list(conf.iter_sections())
3417
self.assertLength(0, sections)
3419
def test_empty_store(self):
3420
store = config.IniFileStore()
3421
store._load_from_string(b'')
3422
conf = config.Stack([store.get_sections])
3423
sections = list(conf.iter_sections())
3424
self.assertLength(0, sections)
3426
def test_simple_store(self):
3427
store = config.IniFileStore()
3428
store._load_from_string(b'foo=bar')
3429
conf = config.Stack([store.get_sections])
3430
tuples = list(conf.iter_sections())
3431
self.assertLength(1, tuples)
3432
(found_store, found_section) = tuples[0]
3433
self.assertIs(store, found_store)
3435
def test_two_stores(self):
3436
store1 = config.IniFileStore()
3437
store1._load_from_string(b'foo=bar')
3438
store2 = config.IniFileStore()
3439
store2._load_from_string(b'bar=qux')
3440
conf = config.Stack([store1.get_sections, store2.get_sections])
3441
tuples = list(conf.iter_sections())
3442
self.assertLength(2, tuples)
3443
self.assertIs(store1, tuples[0][0])
3444
self.assertIs(store2, tuples[1][0])
3447
class TestStackWithTransport(tests.TestCaseWithTransport):
3449
scenarios = [(key, {'get_stack': builder}) for key, builder
3450
in config.test_stack_builder_registry.iteritems()]
3453
class TestConcreteStacks(TestStackWithTransport):
3455
def test_build_stack(self):
3456
# Just a smoke test to help debug builders
3457
self.get_stack(self)
3460
class TestStackGet(TestStackWithTransport):
3463
super(TestStackGet, self).setUp()
3464
self.conf = self.get_stack(self)
3466
def test_get_for_empty_stack(self):
3467
self.assertEqual(None, self.conf.get('foo'))
3469
def test_get_hook(self):
3470
self.conf.set('foo', 'bar')
3475
config.ConfigHooks.install_named_hook('get', hook, None)
3476
self.assertLength(0, calls)
3477
value = self.conf.get('foo')
3478
self.assertEqual('bar', value)
3479
self.assertLength(1, calls)
3480
self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
3483
class TestStackGetWithConverter(tests.TestCase):
3486
super(TestStackGetWithConverter, self).setUp()
3487
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3488
self.registry = config.option_registry
3490
def get_conf(self, content=None):
3491
return config.MemoryStack(content)
3493
def register_bool_option(self, name, default=None, default_from_env=None):
3494
b = config.Option(name, help='A boolean.',
3495
default=default, default_from_env=default_from_env,
3496
from_unicode=config.bool_from_store)
3497
self.registry.register(b)
3499
def test_get_default_bool_None(self):
3500
self.register_bool_option('foo')
3501
conf = self.get_conf(b'')
3502
self.assertEqual(None, conf.get('foo'))
3504
def test_get_default_bool_True(self):
3505
self.register_bool_option('foo', u'True')
3506
conf = self.get_conf(b'')
3507
self.assertEqual(True, conf.get('foo'))
3509
def test_get_default_bool_False(self):
3510
self.register_bool_option('foo', False)
3511
conf = self.get_conf(b'')
3512
self.assertEqual(False, conf.get('foo'))
3514
def test_get_default_bool_False_as_string(self):
3515
self.register_bool_option('foo', u'False')
3516
conf = self.get_conf(b'')
3517
self.assertEqual(False, conf.get('foo'))
3519
def test_get_default_bool_from_env_converted(self):
3520
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3521
self.overrideEnv('FOO', 'False')
3522
conf = self.get_conf(b'')
3523
self.assertEqual(False, conf.get('foo'))
3525
def test_get_default_bool_when_conversion_fails(self):
3526
self.register_bool_option('foo', default='True')
3527
conf = self.get_conf(b'foo=invalid boolean')
3528
self.assertEqual(True, conf.get('foo'))
3530
def register_integer_option(self, name,
3531
default=None, default_from_env=None):
3532
i = config.Option(name, help='An integer.',
3533
default=default, default_from_env=default_from_env,
3534
from_unicode=config.int_from_store)
3535
self.registry.register(i)
3537
def test_get_default_integer_None(self):
3538
self.register_integer_option('foo')
3539
conf = self.get_conf(b'')
3540
self.assertEqual(None, conf.get('foo'))
3542
def test_get_default_integer(self):
3543
self.register_integer_option('foo', 42)
3544
conf = self.get_conf(b'')
3545
self.assertEqual(42, conf.get('foo'))
3547
def test_get_default_integer_as_string(self):
3548
self.register_integer_option('foo', u'42')
3549
conf = self.get_conf(b'')
3550
self.assertEqual(42, conf.get('foo'))
3552
def test_get_default_integer_from_env(self):
3553
self.register_integer_option('foo', default_from_env=['FOO'])
3554
self.overrideEnv('FOO', '18')
3555
conf = self.get_conf(b'')
3556
self.assertEqual(18, conf.get('foo'))
3558
def test_get_default_integer_when_conversion_fails(self):
3559
self.register_integer_option('foo', default='12')
3560
conf = self.get_conf(b'foo=invalid integer')
3561
self.assertEqual(12, conf.get('foo'))
3563
def register_list_option(self, name, default=None, default_from_env=None):
3564
l = config.ListOption(name, help='A list.', default=default,
3565
default_from_env=default_from_env)
3566
self.registry.register(l)
3568
def test_get_default_list_None(self):
3569
self.register_list_option('foo')
3570
conf = self.get_conf(b'')
3571
self.assertEqual(None, conf.get('foo'))
3573
def test_get_default_list_empty(self):
3574
self.register_list_option('foo', '')
3575
conf = self.get_conf(b'')
3576
self.assertEqual([], conf.get('foo'))
3578
def test_get_default_list_from_env(self):
3579
self.register_list_option('foo', default_from_env=['FOO'])
3580
self.overrideEnv('FOO', '')
3581
conf = self.get_conf(b'')
3582
self.assertEqual([], conf.get('foo'))
3584
def test_get_with_list_converter_no_item(self):
3585
self.register_list_option('foo', None)
3586
conf = self.get_conf(b'foo=,')
3587
self.assertEqual([], conf.get('foo'))
3589
def test_get_with_list_converter_many_items(self):
3590
self.register_list_option('foo', None)
3591
conf = self.get_conf(b'foo=m,o,r,e')
3592
self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
3594
def test_get_with_list_converter_embedded_spaces_many_items(self):
3595
self.register_list_option('foo', None)
3596
conf = self.get_conf(b'foo=" bar", "baz "')
3597
self.assertEqual([' bar', 'baz '], conf.get('foo'))
3599
def test_get_with_list_converter_stripped_spaces_many_items(self):
3600
self.register_list_option('foo', None)
3601
conf = self.get_conf(b'foo= bar , baz ')
3602
self.assertEqual(['bar', 'baz'], conf.get('foo'))
3605
class TestIterOptionRefs(tests.TestCase):
3606
"""iter_option_refs is a bit unusual, document some cases."""
3608
def assertRefs(self, expected, string):
3609
self.assertEqual(expected, list(config.iter_option_refs(string)))
3611
def test_empty(self):
3612
self.assertRefs([(False, '')], '')
3614
def test_no_refs(self):
3615
self.assertRefs([(False, 'foo bar')], 'foo bar')
3617
def test_single_ref(self):
3618
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3620
def test_broken_ref(self):
3621
self.assertRefs([(False, '{foo')], '{foo')
3623
def test_embedded_ref(self):
3624
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3627
def test_two_refs(self):
3628
self.assertRefs([(False, ''), (True, '{foo}'),
3629
(False, ''), (True, '{bar}'),
3633
def test_newline_in_refs_are_not_matched(self):
3634
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3637
class TestStackExpandOptions(tests.TestCaseWithTransport):
3640
super(TestStackExpandOptions, self).setUp()
3641
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3642
self.registry = config.option_registry
3643
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3644
self.conf = config.Stack([store.get_sections], store)
3646
def assertExpansion(self, expected, string, env=None):
3647
self.assertEqual(expected, self.conf.expand_options(string, env))
3649
def test_no_expansion(self):
3650
self.assertExpansion('foo', 'foo')
3652
def test_expand_default_value(self):
3653
self.conf.store._load_from_string(b'bar=baz')
3654
self.registry.register(config.Option('foo', default=u'{bar}'))
3655
self.assertEqual('baz', self.conf.get('foo', expand=True))
3657
def test_expand_default_from_env(self):
3658
self.conf.store._load_from_string(b'bar=baz')
3659
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3660
self.overrideEnv('FOO', '{bar}')
3661
self.assertEqual('baz', self.conf.get('foo', expand=True))
3663
def test_expand_default_on_failed_conversion(self):
3664
self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
3665
self.registry.register(
3666
config.Option('foo', default=u'{bar}',
3667
from_unicode=config.int_from_store))
3668
self.assertEqual(42, self.conf.get('foo', expand=True))
3670
def test_env_adding_options(self):
3671
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3673
def test_env_overriding_options(self):
3674
self.conf.store._load_from_string(b'foo=baz')
3675
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3677
def test_simple_ref(self):
3678
self.conf.store._load_from_string(b'foo=xxx')
3679
self.assertExpansion('xxx', '{foo}')
3681
def test_unknown_ref(self):
3682
self.assertRaises(config.ExpandingUnknownOption,
3683
self.conf.expand_options, '{foo}')
3685
def test_illegal_def_is_ignored(self):
3686
self.assertExpansion('{1,2}', '{1,2}')
3687
self.assertExpansion('{ }', '{ }')
3688
self.assertExpansion('${Foo,f}', '${Foo,f}')
3690
def test_indirect_ref(self):
3691
self.conf.store._load_from_string(b'''
3695
self.assertExpansion('xxx', '{bar}')
3697
def test_embedded_ref(self):
3698
self.conf.store._load_from_string(b'''
3702
self.assertExpansion('xxx', '{{bar}}')
3704
def test_simple_loop(self):
3705
self.conf.store._load_from_string(b'foo={foo}')
3706
self.assertRaises(config.OptionExpansionLoop,
3707
self.conf.expand_options, '{foo}')
3709
def test_indirect_loop(self):
3710
self.conf.store._load_from_string(b'''
3714
e = self.assertRaises(config.OptionExpansionLoop,
3715
self.conf.expand_options, '{foo}')
3716
self.assertEqual('foo->bar->baz', e.refs)
3717
self.assertEqual('{foo}', e.string)
3719
def test_list(self):
3720
self.conf.store._load_from_string(b'''
3724
list={foo},{bar},{baz}
3726
self.registry.register(
3727
config.ListOption('list'))
3728
self.assertEqual(['start', 'middle', 'end'],
3729
self.conf.get('list', expand=True))
3731
def test_cascading_list(self):
3732
self.conf.store._load_from_string(b'''
3738
self.registry.register(config.ListOption('list'))
3739
# Register an intermediate option as a list to ensure no conversion
3740
# happen while expanding. Conversion should only occur for the original
3741
# option ('list' here).
3742
self.registry.register(config.ListOption('baz'))
3743
self.assertEqual(['start', 'middle', 'end'],
3744
self.conf.get('list', expand=True))
3746
def test_pathologically_hidden_list(self):
3747
self.conf.store._load_from_string(b'''
3753
hidden={start}{middle}{end}
3755
# What matters is what the registration says, the conversion happens
3756
# only after all expansions have been performed
3757
self.registry.register(config.ListOption('hidden'))
3758
self.assertEqual(['bin', 'go'],
3759
self.conf.get('hidden', expand=True))
3762
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3765
super(TestStackCrossSectionsExpand, self).setUp()
3767
def get_config(self, location, string):
3770
# Since we don't save the config we won't strictly require to inherit
3771
# from TestCaseInTempDir, but an error occurs so quickly...
3772
c = config.LocationStack(location)
3773
c.store._load_from_string(string)
3776
def test_dont_cross_unrelated_section(self):
3777
c = self.get_config('/another/branch/path', b'''
3782
[/another/branch/path]
3785
self.assertRaises(config.ExpandingUnknownOption,
3786
c.get, 'bar', expand=True)
3788
def test_cross_related_sections(self):
3789
c = self.get_config('/project/branch/path', b'''
3793
[/project/branch/path]
3796
self.assertEqual('quux', c.get('bar', expand=True))
3799
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3801
def test_cross_global_locations(self):
3802
l_store = config.LocationStore()
3803
l_store._load_from_string(b'''
3809
g_store = config.GlobalStore()
3810
g_store._load_from_string(b'''
3816
stack = config.LocationStack('/branch')
3817
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3818
self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
3821
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3823
def test_expand_locals_empty(self):
3824
l_store = config.LocationStore()
3825
l_store._load_from_string(b'''
3826
[/home/user/project]
3831
stack = config.LocationStack('/home/user/project/')
3832
self.assertEqual('', stack.get('base', expand=True))
3833
self.assertEqual('', stack.get('rel', expand=True))
3835
def test_expand_basename_locally(self):
3836
l_store = config.LocationStore()
3837
l_store._load_from_string(b'''
3838
[/home/user/project]
3842
stack = config.LocationStack('/home/user/project/branch')
3843
self.assertEqual('branch', stack.get('bfoo', expand=True))
3845
def test_expand_basename_locally_longer_path(self):
3846
l_store = config.LocationStore()
3847
l_store._load_from_string(b'''
3852
stack = config.LocationStack('/home/user/project/dir/branch')
3853
self.assertEqual('branch', stack.get('bfoo', expand=True))
3855
def test_expand_relpath_locally(self):
3856
l_store = config.LocationStore()
3857
l_store._load_from_string(b'''
3858
[/home/user/project]
3859
lfoo = loc-foo/{relpath}
3862
stack = config.LocationStack('/home/user/project/branch')
3863
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3865
def test_expand_relpath_unknonw_in_global(self):
3866
g_store = config.GlobalStore()
3867
g_store._load_from_string(b'''
3872
stack = config.LocationStack('/home/user/project/branch')
3873
self.assertRaises(config.ExpandingUnknownOption,
3874
stack.get, 'gfoo', expand=True)
3876
def test_expand_local_option_locally(self):
3877
l_store = config.LocationStore()
3878
l_store._load_from_string(b'''
3879
[/home/user/project]
3880
lfoo = loc-foo/{relpath}
3884
g_store = config.GlobalStore()
3885
g_store._load_from_string(b'''
3891
stack = config.LocationStack('/home/user/project/branch')
3892
self.assertEqual('glob-bar', stack.get('lbar', expand=True))
3893
self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
3895
def test_locals_dont_leak(self):
3896
"""Make sure we chose the right local in presence of several sections.
3898
l_store = config.LocationStore()
3899
l_store._load_from_string(b'''
3901
lfoo = loc-foo/{relpath}
3902
[/home/user/project]
3903
lfoo = loc-foo/{relpath}
3906
stack = config.LocationStack('/home/user/project/branch')
3907
self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
3908
stack = config.LocationStack('/home/user/bar/baz')
3909
self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3912
class TestStackSet(TestStackWithTransport):
3914
def test_simple_set(self):
3915
conf = self.get_stack(self)
3916
self.assertEqual(None, conf.get('foo'))
3917
conf.set('foo', 'baz')
3918
# Did we get it back ?
3919
self.assertEqual('baz', conf.get('foo'))
3921
def test_set_creates_a_new_section(self):
3922
conf = self.get_stack(self)
3923
conf.set('foo', 'baz')
3924
self.assertEqual, 'baz', conf.get('foo')
3926
def test_set_hook(self):
3931
config.ConfigHooks.install_named_hook('set', hook, None)
3932
self.assertLength(0, calls)
3933
conf = self.get_stack(self)
3934
conf.set('foo', 'bar')
3935
self.assertLength(1, calls)
3936
self.assertEqual((conf, 'foo', 'bar'), calls[0])
3939
class TestStackRemove(TestStackWithTransport):
3941
def test_remove_existing(self):
3942
conf = self.get_stack(self)
3943
conf.set('foo', 'bar')
3944
self.assertEqual('bar', conf.get('foo'))
3946
# Did we get it back ?
3947
self.assertEqual(None, conf.get('foo'))
3949
def test_remove_unknown(self):
3950
conf = self.get_stack(self)
3951
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3953
def test_remove_hook(self):
3958
config.ConfigHooks.install_named_hook('remove', hook, None)
3959
self.assertLength(0, calls)
3960
conf = self.get_stack(self)
3961
conf.set('foo', 'bar')
3963
self.assertLength(1, calls)
3964
self.assertEqual((conf, 'foo'), calls[0])
3967
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3970
super(TestConfigGetOptions, self).setUp()
3971
create_configs(self)
3973
def test_no_variable(self):
3974
# Using branch should query branch, locations and breezy
3975
self.assertOptions([], self.branch_config)
3977
def test_option_in_breezy(self):
3978
self.breezy_config.set_user_option('file', 'breezy')
3979
self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
3982
def test_option_in_locations(self):
3983
self.locations_config.set_user_option('file', 'locations')
3985
[('file', 'locations', self.tree.basedir, 'locations')],
3986
self.locations_config)
3988
def test_option_in_branch(self):
3989
self.branch_config.set_user_option('file', 'branch')
3990
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
3993
def test_option_in_breezy_and_branch(self):
3994
self.breezy_config.set_user_option('file', 'breezy')
3995
self.branch_config.set_user_option('file', 'branch')
3996
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
3997
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4000
def test_option_in_branch_and_locations(self):
4001
# Hmm, locations override branch :-/
4002
self.locations_config.set_user_option('file', 'locations')
4003
self.branch_config.set_user_option('file', 'branch')
4005
[('file', 'locations', self.tree.basedir, 'locations'),
4006
('file', 'branch', 'DEFAULT', 'branch'), ],
4009
def test_option_in_breezy_locations_and_branch(self):
4010
self.breezy_config.set_user_option('file', 'breezy')
4011
self.locations_config.set_user_option('file', 'locations')
4012
self.branch_config.set_user_option('file', 'branch')
4014
[('file', 'locations', self.tree.basedir, 'locations'),
4015
('file', 'branch', 'DEFAULT', 'branch'),
4016
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4020
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4023
super(TestConfigRemoveOption, self).setUp()
4024
create_configs_with_file_option(self)
4026
def test_remove_in_locations(self):
4027
self.locations_config.remove_user_option('file', self.tree.basedir)
4029
[('file', 'branch', 'DEFAULT', 'branch'),
4030
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4033
def test_remove_in_branch(self):
4034
self.branch_config.remove_user_option('file')
4036
[('file', 'locations', self.tree.basedir, 'locations'),
4037
('file', 'breezy', 'DEFAULT', 'breezy'), ],
4040
def test_remove_in_breezy(self):
4041
self.breezy_config.remove_user_option('file')
4043
[('file', 'locations', self.tree.basedir, 'locations'),
4044
('file', 'branch', 'DEFAULT', 'branch'), ],
4048
class TestConfigGetSections(tests.TestCaseWithTransport):
4051
super(TestConfigGetSections, self).setUp()
4052
create_configs(self)
4054
def assertSectionNames(self, expected, conf, name=None):
4055
"""Check which sections are returned for a given config.
4057
If fallback configurations exist their sections can be included.
4059
:param expected: A list of section names.
4061
:param conf: The configuration that will be queried.
4063
:param name: An optional section name that will be passed to
4066
sections = list(conf._get_sections(name))
4067
self.assertLength(len(expected), sections)
4068
self.assertEqual(expected, [n for n, _, _ in sections])
4070
def test_breezy_default_section(self):
4071
self.assertSectionNames(['DEFAULT'], self.breezy_config)
4073
def test_locations_default_section(self):
4074
# No sections are defined in an empty file
4075
self.assertSectionNames([], self.locations_config)
4077
def test_locations_named_section(self):
4078
self.locations_config.set_user_option('file', 'locations')
4079
self.assertSectionNames([self.tree.basedir], self.locations_config)
4081
def test_locations_matching_sections(self):
4082
loc_config = self.locations_config
4083
loc_config.set_user_option('file', 'locations')
4084
# We need to cheat a bit here to create an option in sections above and
4085
# below the 'location' one.
4086
parser = loc_config._get_parser()
4087
# locations.cong deals with '/' ignoring native os.sep
4088
location_names = self.tree.basedir.split('/')
4089
parent = '/'.join(location_names[:-1])
4090
child = '/'.join(location_names + ['child'])
4092
parser[parent]['file'] = 'parent'
4094
parser[child]['file'] = 'child'
4095
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4097
def test_branch_data_default_section(self):
4098
self.assertSectionNames([None],
4099
self.branch_config._get_branch_data_config())
4101
def test_branch_default_sections(self):
4102
# No sections are defined in an empty locations file
4103
self.assertSectionNames([None, 'DEFAULT'],
4105
# Unless we define an option
4106
self.branch_config._get_location_config().set_user_option(
4107
'file', 'locations')
4108
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4111
def test_breezy_named_section(self):
4112
# We need to cheat as the API doesn't give direct access to sections
4113
# other than DEFAULT.
4114
self.breezy_config.set_alias('breezy', 'bzr')
4115
self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
4118
class TestSharedStores(tests.TestCaseInTempDir):
4120
def test_breezy_conf_shared(self):
4121
g1 = config.GlobalStack()
4122
g2 = config.GlobalStack()
4123
# The two stacks share the same store
4124
self.assertIs(g1.store, g2.store)
4127
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
4128
"""Test warning for permissions of authentication.conf."""
4131
super(TestAuthenticationConfigFilePermissions, self).setUp()
4132
self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
4133
with open(self.path, 'wb') as f:
4134
f.write(b"""[broken]
4137
port=port # Error: Not an int
4139
self.overrideAttr(bedding, 'authentication_config_path',
4141
osutils.chmod_if_possible(self.path, 0o755)
4143
def test_check_warning(self):
4144
conf = config.AuthenticationConfig()
4145
self.assertEqual(conf._filename, self.path)
4146
self.assertContainsRe(self.get_log(),
4147
'Saved passwords may be accessible by other users.')
4149
def test_check_suppressed_warning(self):
4150
global_config = config.GlobalConfig()
4151
global_config.set_user_option('suppress_warnings',
4152
'insecure_permissions')
4153
conf = config.AuthenticationConfig()
4154
self.assertEqual(conf._filename, self.path)
4155
self.assertNotContainsRe(self.get_log(),
4156
'Saved passwords may be accessible by other users.')
4159
class TestAuthenticationConfigFile(tests.TestCase):
4160
"""Test the authentication.conf file matching"""
4162
def _got_user_passwd(self, expected_user, expected_password,
4163
config, *args, **kwargs):
4164
credentials = config.get_credentials(*args, **kwargs)
4165
if credentials is None:
4169
user = credentials['user']
4170
password = credentials['password']
4171
self.assertEqual(expected_user, user)
4172
self.assertEqual(expected_password, password)
4174
def test_empty_config(self):
4175
conf = config.AuthenticationConfig(_file=BytesIO())
4176
self.assertEqual({}, conf._get_config())
4177
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4179
def test_non_utf8_config(self):
4180
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
4181
self.assertRaises(config.ConfigContentError, conf._get_config)
4183
def test_missing_auth_section_header(self):
4184
conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
4185
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4187
def test_auth_section_header_not_closed(self):
4188
conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
4189
self.assertRaises(config.ParseConfigError, conf._get_config)
4191
def test_auth_value_not_boolean(self):
4192
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4196
verify_certificates=askme # Error: Not a boolean
4198
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4200
def test_auth_value_not_int(self):
4201
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4205
port=port # Error: Not an int
4207
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4209
def test_unknown_password_encoding(self):
4210
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4214
password_encoding=unknown
4216
self.assertRaises(ValueError, conf.get_password,
4217
'ftp', 'foo.net', 'joe')
4219
def test_credentials_for_scheme_host(self):
4220
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4221
# Identity on foo.net
4226
password=secret-pass
4229
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
4231
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4233
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
4235
def test_credentials_for_host_port(self):
4236
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4237
# Identity on foo.net
4243
password=secret-pass
4246
self._got_user_passwd('joe', 'secret-pass',
4247
conf, 'ftp', 'foo.net', port=10021)
4249
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
4251
def test_for_matching_host(self):
4252
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4253
# Identity on foo.net
4259
[sourceforge domain]
4266
self._got_user_passwd('georges', 'bendover',
4267
conf, 'bzr', 'foo.bzr.sf.net')
4269
self._got_user_passwd(None, None,
4270
conf, 'bzr', 'bbzr.sf.net')
4272
def test_for_matching_host_None(self):
4273
conf = config.AuthenticationConfig(_file=BytesIO(b"""\
4274
# Identity on foo.net
4284
self._got_user_passwd('joe', 'joepass',
4285
conf, 'bzr', 'quux.net')
4286
# no host but different scheme
4287
self._got_user_passwd('georges', 'bendover',
4288
conf, 'ftp', 'quux.net')
4290
def test_credentials_for_path(self):
4291
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4306
self._got_user_passwd(None, None,
4307
conf, 'http', host='bar.org', path='/dir3')
4309
self._got_user_passwd('georges', 'bendover',
4310
conf, 'http', host='bar.org', path='/dir2')
4312
self._got_user_passwd('jim', 'jimpass',
4313
conf, 'http', host='bar.org', path='/dir1/subdir')
4315
def test_credentials_for_user(self):
4316
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4324
self._got_user_passwd('jim', 'jimpass',
4325
conf, 'http', 'bar.org')
4327
self._got_user_passwd('jim', 'jimpass',
4328
conf, 'http', 'bar.org', user='jim')
4329
# Don't get a different user if one is specified
4330
self._got_user_passwd(None, None,
4331
conf, 'http', 'bar.org', user='georges')
4333
def test_credentials_for_user_without_password(self):
4334
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4340
# Get user but no password
4341
self._got_user_passwd('jim', None,
4342
conf, 'http', 'bar.org')
4344
def test_verify_certificates(self):
4345
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4351
verify_certificates=False
4358
credentials = conf.get_credentials('https', 'bar.org')
4359
self.assertEqual(False, credentials.get('verify_certificates'))
4360
credentials = conf.get_credentials('https', 'foo.net')
4361
self.assertEqual(True, credentials.get('verify_certificates'))
4364
class TestAuthenticationStorage(tests.TestCaseInTempDir):
4366
def test_set_credentials(self):
4367
conf = config.AuthenticationConfig()
4368
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4369
99, path='/foo', verify_certificates=False, realm='realm')
4370
credentials = conf.get_credentials(host='host', scheme='scheme',
4371
port=99, path='/foo',
4373
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
4374
'verify_certificates': False, 'scheme': 'scheme',
4375
'host': 'host', 'port': 99, 'path': '/foo',
4377
self.assertEqual(CREDENTIALS, credentials)
4378
credentials_from_disk = config.AuthenticationConfig().get_credentials(
4379
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
4380
self.assertEqual(CREDENTIALS, credentials_from_disk)
4382
def test_reset_credentials_different_name(self):
4383
conf = config.AuthenticationConfig()
4384
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
4385
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
4386
self.assertIs(None, conf._get_config().get('name'))
4387
credentials = conf.get_credentials(host='host', scheme='scheme')
4388
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
4389
'password', 'verify_certificates': True,
4390
'scheme': 'scheme', 'host': 'host', 'port': None,
4391
'path': None, 'realm': None}
4392
self.assertEqual(CREDENTIALS, credentials)
4395
class TestAuthenticationConfig(tests.TestCaseInTempDir):
4396
"""Test AuthenticationConfig behaviour"""
4398
def _check_default_password_prompt(self, expected_prompt_format, scheme,
4399
host=None, port=None, realm=None,
4403
user, password = 'jim', 'precious'
4404
expected_prompt = expected_prompt_format % {
4405
'scheme': scheme, 'host': host, 'port': port,
4406
'user': user, 'realm': realm}
4408
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
4409
# We use an empty conf so that the user is always prompted
4410
conf = config.AuthenticationConfig()
4411
self.assertEqual(password,
4412
conf.get_password(scheme, host, user, port=port,
4413
realm=realm, path=path))
4414
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4415
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4417
def _check_default_username_prompt(self, expected_prompt_format, scheme,
4418
host=None, port=None, realm=None,
4423
expected_prompt = expected_prompt_format % {
4424
'scheme': scheme, 'host': host, 'port': port,
4426
ui.ui_factory = tests.TestUIFactory(stdin=username + '\n')
4427
# We use an empty conf so that the user is always prompted
4428
conf = config.AuthenticationConfig()
4429
self.assertEqual(username, conf.get_user(scheme, host, port=port,
4430
realm=realm, path=path, ask=True))
4431
self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4432
self.assertEqual('', ui.ui_factory.stdout.getvalue())
4434
def test_username_defaults_prompts(self):
4435
# HTTP prompts can't be tested here, see test_http.py
4436
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
4437
self._check_default_username_prompt(
4438
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
4439
self._check_default_username_prompt(
4440
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
4442
def test_username_default_no_prompt(self):
4443
conf = config.AuthenticationConfig()
4444
self.assertEqual(None,
4445
conf.get_user('ftp', 'example.com'))
4446
self.assertEqual("explicitdefault",
4447
conf.get_user('ftp', 'example.com', default="explicitdefault"))
4449
def test_password_default_prompts(self):
4450
# HTTP prompts can't be tested here, see test_http.py
4451
self._check_default_password_prompt(
4452
u'FTP %(user)s@%(host)s password: ', 'ftp')
4453
self._check_default_password_prompt(
4454
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
4455
self._check_default_password_prompt(
4456
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
4457
# SMTP port handling is a bit special (it's handled if embedded in the
4459
# FIXME: should we: forbid that, extend it to other schemes, leave
4460
# things as they are that's fine thank you ?
4461
self._check_default_password_prompt(
4462
u'SMTP %(user)s@%(host)s password: ', 'smtp')
4463
self._check_default_password_prompt(
4464
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
4465
self._check_default_password_prompt(
4466
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
4468
def test_ssh_password_emits_warning(self):
4469
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4476
entered_password = 'typed-by-hand'
4477
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4479
# Since the password defined in the authentication config is ignored,
4480
# the user is prompted
4481
self.assertEqual(entered_password,
4482
conf.get_password('ssh', 'bar.org', user='jim'))
4483
self.assertContainsRe(
4485
'password ignored in section \\[ssh with password\\]')
4487
def test_ssh_without_password_doesnt_emit_warning(self):
4488
conf = config.AuthenticationConfig(_file=BytesIO(b"""
4494
entered_password = 'typed-by-hand'
4495
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
4497
# Since the password defined in the authentication config is ignored,
4498
# the user is prompted
4499
self.assertEqual(entered_password,
4500
conf.get_password('ssh', 'bar.org', user='jim'))
4501
# No warning shoud be emitted since there is no password. We are only
4503
self.assertNotContainsRe(
4505
'password ignored in section \\[ssh with password\\]')
4507
def test_uses_fallback_stores(self):
4508
self.overrideAttr(config, 'credential_store_registry',
4509
config.CredentialStoreRegistry())
4510
store = StubCredentialStore()
4511
store.add_credentials("http", "example.com", "joe", "secret")
4512
config.credential_store_registry.register("stub", store, fallback=True)
4513
conf = config.AuthenticationConfig(_file=BytesIO())
4514
creds = conf.get_credentials("http", "example.com")
4515
self.assertEqual("joe", creds["user"])
4516
self.assertEqual("secret", creds["password"])
4519
class StubCredentialStore(config.CredentialStore):
4525
def add_credentials(self, scheme, host, user, password=None):
4526
self._username[(scheme, host)] = user
4527
self._password[(scheme, host)] = password
4529
def get_credentials(self, scheme, host, port=None, user=None,
4530
path=None, realm=None):
4531
key = (scheme, host)
4532
if key not in self._username:
4534
return {"scheme": scheme, "host": host, "port": port,
4535
"user": self._username[key], "password": self._password[key]}
4538
class CountingCredentialStore(config.CredentialStore):
4543
def get_credentials(self, scheme, host, port=None, user=None,
4544
path=None, realm=None):
4549
class TestCredentialStoreRegistry(tests.TestCase):
4551
def _get_cs_registry(self):
4552
return config.credential_store_registry
4554
def test_default_credential_store(self):
4555
r = self._get_cs_registry()
4556
default = r.get_credential_store(None)
4557
self.assertIsInstance(default, config.PlainTextCredentialStore)
4559
def test_unknown_credential_store(self):
4560
r = self._get_cs_registry()
4561
# It's hard to imagine someone creating a credential store named
4562
# 'unknown' so we use that as an never registered key.
4563
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
4565
def test_fallback_none_registered(self):
4566
r = config.CredentialStoreRegistry()
4567
self.assertEqual(None,
4568
r.get_fallback_credentials("http", "example.com"))
4570
def test_register(self):
4571
r = config.CredentialStoreRegistry()
4572
r.register("stub", StubCredentialStore(), fallback=False)
4573
r.register("another", StubCredentialStore(), fallback=True)
4574
self.assertEqual(["another", "stub"], r.keys())
4576
def test_register_lazy(self):
4577
r = config.CredentialStoreRegistry()
4578
r.register_lazy("stub", "breezy.tests.test_config",
4579
"StubCredentialStore", fallback=False)
4580
self.assertEqual(["stub"], r.keys())
4581
self.assertIsInstance(r.get_credential_store("stub"),
4582
StubCredentialStore)
4584
def test_is_fallback(self):
4585
r = config.CredentialStoreRegistry()
4586
r.register("stub1", None, fallback=False)
4587
r.register("stub2", None, fallback=True)
4588
self.assertEqual(False, r.is_fallback("stub1"))
4589
self.assertEqual(True, r.is_fallback("stub2"))
4591
def test_no_fallback(self):
4592
r = config.CredentialStoreRegistry()
4593
store = CountingCredentialStore()
4594
r.register("count", store, fallback=False)
4595
self.assertEqual(None,
4596
r.get_fallback_credentials("http", "example.com"))
4597
self.assertEqual(0, store._calls)
4599
def test_fallback_credentials(self):
4600
r = config.CredentialStoreRegistry()
4601
store = StubCredentialStore()
4602
store.add_credentials("http", "example.com",
4603
"somebody", "geheim")
4604
r.register("stub", store, fallback=True)
4605
creds = r.get_fallback_credentials("http", "example.com")
4606
self.assertEqual("somebody", creds["user"])
4607
self.assertEqual("geheim", creds["password"])
4609
def test_fallback_first_wins(self):
4610
r = config.CredentialStoreRegistry()
4611
stub1 = StubCredentialStore()
4612
stub1.add_credentials("http", "example.com",
4613
"somebody", "stub1")
4614
r.register("stub1", stub1, fallback=True)
4615
stub2 = StubCredentialStore()
4616
stub2.add_credentials("http", "example.com",
4617
"somebody", "stub2")
4618
r.register("stub2", stub1, fallback=True)
4619
creds = r.get_fallback_credentials("http", "example.com")
4620
self.assertEqual("somebody", creds["user"])
4621
self.assertEqual("stub1", creds["password"])
4624
class TestPlainTextCredentialStore(tests.TestCase):
4626
def test_decode_password(self):
4627
r = config.credential_store_registry
4628
plain_text = r.get_credential_store()
4629
decoded = plain_text.decode_password(dict(password='secret'))
4630
self.assertEqual('secret', decoded)
4633
class TestBase64CredentialStore(tests.TestCase):
4635
def test_decode_password(self):
4636
r = config.credential_store_registry
4637
plain_text = r.get_credential_store('base64')
4638
decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
4639
self.assertEqual(b'secret', decoded)
4642
# FIXME: Once we have a way to declare authentication to all test servers, we
4643
# can implement generic tests.
4644
# test_user_password_in_url
4645
# test_user_in_url_password_from_config
4646
# test_user_in_url_password_prompted
4647
# test_user_in_config
4648
# test_user_getpass.getuser
4649
# test_user_prompted ?
4650
class TestAuthenticationRing(tests.TestCaseWithTransport):
4654
class EmailOptionTests(tests.TestCase):
4656
def test_default_email_uses_BRZ_EMAIL(self):
4657
conf = config.MemoryStack(b'email=jelmer@debian.org')
4658
# BRZ_EMAIL takes precedence over BZR_EMAIL and EMAIL
4659
self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
4660
self.overrideEnv('BZR_EMAIL', 'jelmer@jelmer.uk')
4661
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4662
self.assertEqual('jelmer@samba.org', conf.get('email'))
4664
def test_default_email_uses_BZR_EMAIL(self):
4665
conf = config.MemoryStack(b'email=jelmer@debian.org')
4666
# BZR_EMAIL takes precedence over EMAIL
4667
self.overrideEnv('BZR_EMAIL', 'jelmer@samba.org')
4668
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4669
self.assertEqual('jelmer@samba.org', conf.get('email'))
4671
def test_default_email_uses_EMAIL(self):
4672
conf = config.MemoryStack(b'')
4673
self.overrideEnv('BRZ_EMAIL', None)
4674
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4675
self.assertEqual('jelmer@apache.org', conf.get('email'))
4677
def test_BRZ_EMAIL_overrides(self):
4678
conf = config.MemoryStack(b'email=jelmer@debian.org')
4679
self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
4680
self.assertEqual('jelmer@apache.org', conf.get('email'))
4681
self.overrideEnv('BRZ_EMAIL', None)
4682
self.overrideEnv('EMAIL', 'jelmer@samba.org')
4683
self.assertEqual('jelmer@debian.org', conf.get('email'))
4686
class MailClientOptionTests(tests.TestCase):
4688
def test_default(self):
4689
conf = config.MemoryStack(b'')
4690
client = conf.get('mail_client')
4691
self.assertIs(client, mail_client.DefaultMail)
4693
def test_evolution(self):
4694
conf = config.MemoryStack(b'mail_client=evolution')
4695
client = conf.get('mail_client')
4696
self.assertIs(client, mail_client.Evolution)
4698
def test_kmail(self):
4699
conf = config.MemoryStack(b'mail_client=kmail')
4700
client = conf.get('mail_client')
4701
self.assertIs(client, mail_client.KMail)
4703
def test_mutt(self):
4704
conf = config.MemoryStack(b'mail_client=mutt')
4705
client = conf.get('mail_client')
4706
self.assertIs(client, mail_client.Mutt)
4708
def test_thunderbird(self):
4709
conf = config.MemoryStack(b'mail_client=thunderbird')
4710
client = conf.get('mail_client')
4711
self.assertIs(client, mail_client.Thunderbird)
4713
def test_explicit_default(self):
4714
conf = config.MemoryStack(b'mail_client=default')
4715
client = conf.get('mail_client')
4716
self.assertIs(client, mail_client.DefaultMail)
4718
def test_editor(self):
4719
conf = config.MemoryStack(b'mail_client=editor')
4720
client = conf.get('mail_client')
4721
self.assertIs(client, mail_client.Editor)
4723
def test_mapi(self):
4724
conf = config.MemoryStack(b'mail_client=mapi')
4725
client = conf.get('mail_client')
4726
self.assertIs(client, mail_client.MAPIClient)
4728
def test_xdg_email(self):
4729
conf = config.MemoryStack(b'mail_client=xdg-email')
4730
client = conf.get('mail_client')
4731
self.assertIs(client, mail_client.XDGEmail)
4733
def test_unknown(self):
4734
conf = config.MemoryStack(b'mail_client=firebird')
4735
self.assertRaises(config.ConfigOptionValueError, conf.get,