/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_config.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-16 23:15:15 UTC
  • mfrom: (7180 work)
  • mto: This revision was merged to the branch mainline in revision 7183.
  • Revision ID: jelmer@jelmer.uk-20181116231515-zqd2yn6kj8lfydyp
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
63
63
        ('locations',
64
64
         {'config_class': config.LocationConfig,
65
65
          'config_args': ['.'],
66
 
          'config_section': '.'}),]
 
66
          'config_section': '.'}), ]
67
67
 
68
68
 
69
69
load_tests = scenarios.load_tests_apply_scenarios
112
112
    build_backing_branch(test, 'branch')
113
113
    b = branch.Branch.open('branch')
114
114
    return config.BranchStore(b)
 
115
 
 
116
 
115
117
config.test_store_builder_registry.register('branch', build_branch_store)
116
118
 
117
119
 
119
121
    build_backing_branch(test, 'branch')
120
122
    b = controldir.ControlDir.open('branch')
121
123
    return config.ControlStore(b)
 
124
 
 
125
 
122
126
config.test_store_builder_registry.register('control', build_control_store)
123
127
 
124
128
 
130
134
    build_backing_branch(test, 'branch', transport_class, server_class)
131
135
    b = branch.Branch.open(test.get_url('branch'))
132
136
    return config.BranchStore(b)
 
137
 
 
138
 
133
139
config.test_store_builder_registry.register('remote_branch',
134
140
                                            build_remote_branch_store)
135
141
 
144
150
    build_backing_branch(test, 'branch')
145
151
    b = branch.Branch.open('branch')
146
152
    return config.BranchStack(b)
 
153
 
 
154
 
147
155
config.test_stack_builder_registry.register('branch', build_branch_stack)
148
156
 
149
157
 
155
163
    build_backing_branch(test, 'branch', transport_class, server_class)
156
164
    b = branch.Branch.open(test.get_url('branch'))
157
165
    return config.BranchOnlyStack(b)
 
166
 
 
167
 
158
168
config.test_stack_builder_registry.register('branch_only',
159
169
                                            build_branch_only_stack)
160
170
 
 
171
 
161
172
def build_remote_control_stack(test):
162
173
    # There is only one permutation (but we won't be able to handle more with
163
174
    # this design anyway)
168
179
    build_backing_branch(test, 'branch', transport_class, server_class)
169
180
    b = branch.Branch.open(test.get_url('branch'))
170
181
    return config.RemoteControlStack(b.controldir)
 
182
 
 
183
 
171
184
config.test_stack_builder_registry.register('remote_control',
172
185
                                            build_remote_control_stack)
173
186
 
174
187
 
175
 
sample_long_alias="log -r-15..-1 --line"
 
188
sample_long_alias = "log -r-15..-1 --line"
176
189
sample_config_text = u"""
177
190
[DEFAULT]
178
191
email=Erik B\u00e5gfors <erik@bagfors.nu>
539
552
 
540
553
    def test_xdg_cache_dir(self):
541
554
        self.assertEqual(config.xdg_cache_dir(),
542
 
            '/home/bogus/.cache')
 
555
                         '/home/bogus/.cache')
543
556
 
544
557
 
545
558
class TestConfigPathFallback(tests.TestCaseInTempDir):
571
584
 
572
585
    def test_xdg_cache_dir(self):
573
586
        self.assertEqual(config.xdg_cache_dir(),
574
 
            os.path.join(self.test_dir, '.cache'))
 
587
                         os.path.join(self.test_dir, '.cache'))
575
588
 
576
589
 
577
590
class TestXDGConfigDir(tests.TestCaseInTempDir):
728
741
list={foo},{bar},{baz}
729
742
''')
730
743
        self.assertEqual(['start', 'middle', 'end'],
731
 
                           conf.get_user_option('list', expand=True))
 
744
                         conf.get_user_option('list', expand=True))
732
745
 
733
746
    def test_cascading_list(self):
734
747
        conf = self.get_config('''
738
751
list={foo}
739
752
''')
740
753
        self.assertEqual(['start', 'middle', 'end'],
741
 
                           conf.get_user_option('list', expand=True))
 
754
                         conf.get_user_option('list', expand=True))
742
755
 
743
756
    def test_pathological_hidden_list(self):
744
757
        conf = self.get_config('''
752
765
        # Nope, it's either a string or a list, and the list wins as soon as a
753
766
        # ',' appears, so the string concatenation never occur.
754
767
        self.assertEqual(['{foo', '}', '{', 'bar}'],
755
 
                          conf.get_user_option('hidden', expand=True))
 
768
                         conf.get_user_option('hidden', expand=True))
756
769
 
757
770
 
758
771
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
870
883
        after_writing = threading.Event()
871
884
        writing_done = threading.Event()
872
885
        c1_orig = c1._write_config_file
 
886
 
873
887
        def c1_write_config_file():
874
888
            before_writing.set()
875
889
            c1_orig()
877
891
            # continue
878
892
            after_writing.wait()
879
893
        c1._write_config_file = c1_write_config_file
 
894
 
880
895
        def c1_set_option():
881
896
            c1.set_user_option('one', 'c1')
882
897
            writing_done.set()
898
913
        self.assertEqual('c2', c2.get_user_option('one'))
899
914
 
900
915
    def test_read_while_writing(self):
901
 
       c1 = self.config
902
 
       # We spawn a thread that will pause *during* the write
903
 
       ready_to_write = threading.Event()
904
 
       do_writing = threading.Event()
905
 
       writing_done = threading.Event()
906
 
       c1_orig = c1._write_config_file
907
 
       def c1_write_config_file():
908
 
           ready_to_write.set()
909
 
           # The lock is held. We wait for the main thread to decide when to
910
 
           # continue
911
 
           do_writing.wait()
912
 
           c1_orig()
913
 
           writing_done.set()
914
 
       c1._write_config_file = c1_write_config_file
915
 
       def c1_set_option():
916
 
           c1.set_user_option('one', 'c1')
917
 
       t1 = threading.Thread(target=c1_set_option)
918
 
       # Collect the thread after the test
919
 
       self.addCleanup(t1.join)
920
 
       # Be ready to unblock the thread if the test goes wrong
921
 
       self.addCleanup(do_writing.set)
922
 
       t1.start()
923
 
       # Ensure the thread is ready to write
924
 
       ready_to_write.wait()
925
 
       self.assertTrue(c1._lock.is_held)
926
 
       self.assertEqual('c1', c1.get_user_option('one'))
927
 
       # If we read during the write, we get the old value
928
 
       c2 = self.get_existing_config()
929
 
       self.assertEqual('1', c2.get_user_option('one'))
930
 
       # Let the writing occur and ensure it occurred
931
 
       do_writing.set()
932
 
       writing_done.wait()
933
 
       # Now we get the updated value
934
 
       c3 = self.get_existing_config()
935
 
       self.assertEqual('c1', c3.get_user_option('one'))
 
916
        c1 = self.config
 
917
        # We spawn a thread that will pause *during* the write
 
918
        ready_to_write = threading.Event()
 
919
        do_writing = threading.Event()
 
920
        writing_done = threading.Event()
 
921
        c1_orig = c1._write_config_file
 
922
 
 
923
        def c1_write_config_file():
 
924
            ready_to_write.set()
 
925
            # The lock is held. We wait for the main thread to decide when to
 
926
            # continue
 
927
            do_writing.wait()
 
928
            c1_orig()
 
929
            writing_done.set()
 
930
        c1._write_config_file = c1_write_config_file
 
931
 
 
932
        def c1_set_option():
 
933
            c1.set_user_option('one', 'c1')
 
934
        t1 = threading.Thread(target=c1_set_option)
 
935
        # Collect the thread after the test
 
936
        self.addCleanup(t1.join)
 
937
        # Be ready to unblock the thread if the test goes wrong
 
938
        self.addCleanup(do_writing.set)
 
939
        t1.start()
 
940
        # Ensure the thread is ready to write
 
941
        ready_to_write.wait()
 
942
        self.assertTrue(c1._lock.is_held)
 
943
        self.assertEqual('c1', c1.get_user_option('one'))
 
944
        # If we read during the write, we get the old value
 
945
        c2 = self.get_existing_config()
 
946
        self.assertEqual('1', c2.get_user_option('one'))
 
947
        # Let the writing occur and ensure it occurred
 
948
        do_writing.set()
 
949
        writing_done.wait()
 
950
        # Now we get the updated value
 
951
        c3 = self.get_existing_config()
 
952
        self.assertEqual('c1', c3.get_user_option('one'))
936
953
 
937
954
 
938
955
class TestGetUserOptionAs(TestIniConfig):
948
965
        self.assertEqual(True, get_bool('a_true_bool'))
949
966
        self.assertEqual(False, get_bool('a_false_bool'))
950
967
        warnings = []
 
968
 
951
969
        def warning(*args):
952
970
            warnings.append(args[0] % args[1:])
953
971
        self.overrideAttr(trace, 'warning', warning)
1064
1082
        local_path = osutils.getcwd().encode('utf8')
1065
1083
        config.LocationConfig.from_string(
1066
1084
            b'[%s/branch]\nnickname = barry' % (local_path,),
1067
 
            'branch',  save=True)
 
1085
            'branch', save=True)
1068
1086
        # Now the branch will find its nick via the location config
1069
1087
        self.assertEqual('barry', branch.nick)
1070
1088
 
1090
1108
 
1091
1109
    def test_warn_if_masked(self):
1092
1110
        warnings = []
 
1111
 
1093
1112
        def warning(*args):
1094
1113
            warnings.append(args[0] % args[1:])
1095
1114
        self.overrideAttr(trace, 'warning', warning)
1098
1117
            warnings[:] = []
1099
1118
            conf.set_user_option('example_option', repr(store), store=store,
1100
1119
                                 warn_masked=warn_masked)
 
1120
 
1101
1121
        def assertWarning(warning):
1102
1122
            if warning is None:
1103
1123
                self.assertEqual(0, len(warnings))
1191
1211
        tools = conf.get_merge_tools()
1192
1212
        self.log(repr(tools))
1193
1213
        self.assertEqual(
1194
 
            {u'funkytool' : u'funkytool "arg with spaces" {this_temp}',
1195
 
            u'sometool' : u'sometool {base} {this} {other} -o {result}',
1196
 
            u'newtool' : u'"newtool with spaces" {this_temp}'},
 
1214
            {u'funkytool': u'funkytool "arg with spaces" {this_temp}',
 
1215
             u'sometool': u'sometool {base} {this} {other} -o {result}',
 
1216
             u'newtool': u'"newtool with spaces" {this_temp}'},
1197
1217
            tools)
1198
1218
 
1199
1219
    def test_get_merge_tools_empty(self):
1335
1355
        self.get_branch_config('http://www.example.com')
1336
1356
        self.assertEqual(
1337
1357
            self.my_location_config._get_config_policy(
1338
 
            'http://www.example.com', 'normal_option'),
 
1358
                'http://www.example.com', 'normal_option'),
1339
1359
            config.POLICY_NONE)
1340
1360
 
1341
1361
    def test__get_option_policy_norecurse(self):
1342
1362
        self.get_branch_config('http://www.example.com')
1343
1363
        self.assertEqual(
1344
1364
            self.my_location_config._get_option_policy(
1345
 
            'http://www.example.com', 'norecurse_option'),
 
1365
                'http://www.example.com', 'norecurse_option'),
1346
1366
            config.POLICY_NORECURSE)
1347
1367
        # Test old recurse=False setting:
1348
1368
        self.assertEqual(
1349
1369
            self.my_location_config._get_option_policy(
1350
 
            'http://www.example.com/norecurse', 'normal_option'),
 
1370
                'http://www.example.com/norecurse', 'normal_option'),
1351
1371
            config.POLICY_NORECURSE)
1352
1372
 
1353
1373
    def test__get_option_policy_normal(self):
1354
1374
        self.get_branch_config('http://www.example.com')
1355
1375
        self.assertEqual(
1356
1376
            self.my_location_config._get_option_policy(
1357
 
            'http://www.example.com', 'appendpath_option'),
 
1377
                'http://www.example.com', 'appendpath_option'),
1358
1378
            config.POLICY_APPENDPATH)
1359
1379
 
1360
1380
    def test__get_options_with_policy(self):
1436
1456
                                       store=config.STORE_LOCATION_NORECURSE)
1437
1457
        self.assertEqual(
1438
1458
            self.my_location_config._get_option_policy(
1439
 
            'http://www.example.com', 'foo'),
 
1459
                'http://www.example.com', 'foo'),
1440
1460
            config.POLICY_NORECURSE)
1441
1461
 
1442
1462
    def test_set_user_option_appendpath(self):
1445
1465
                                       store=config.STORE_LOCATION_APPENDPATH)
1446
1466
        self.assertEqual(
1447
1467
            self.my_location_config._get_option_policy(
1448
 
            'http://www.example.com', 'foo'),
 
1468
                'http://www.example.com', 'foo'),
1449
1469
            config.POLICY_APPENDPATH)
1450
1470
 
1451
1471
    def test_set_user_option_change_policy(self):
1454
1474
                                       store=config.STORE_LOCATION)
1455
1475
        self.assertEqual(
1456
1476
            self.my_location_config._get_option_policy(
1457
 
            'http://www.example.com', 'norecurse_option'),
 
1477
                'http://www.example.com', 'norecurse_option'),
1458
1478
            config.POLICY_NONE)
1459
1479
 
1460
1480
    def get_branch_config(self, location, global_config=None,
1505
1525
option = exact
1506
1526
"""
1507
1527
 
 
1528
 
1508
1529
class TestBranchConfigItems(tests.TestCaseInTempDir):
1509
1530
 
1510
1531
    def get_branch_config(self, global_config=None, location=None,
1529
1550
        my_config.set_user_option('email',
1530
1551
                                  "Robert Collins <robertc@example.org>")
1531
1552
        self.assertEqual("Robert Collins <robertc@example.org>",
1532
 
                        my_config.username())
 
1553
                         my_config.username())
1533
1554
 
1534
1555
    def test_BRZ_EMAIL_OVERRIDES(self):
1535
1556
        self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
1584
1605
        self.assertEqual(('John Doe', 'jdoe@example.com'),
1585
1606
                         config.parse_username('John Doe jdoe@example.com'))
1586
1607
 
 
1608
 
1587
1609
class TestTreeConfig(tests.TestCaseWithTransport):
1588
1610
 
1589
1611
    def test_get_value(self):
1642
1664
    def test_load_permission_denied(self):
1643
1665
        """Ensure we get an empty config file if the file is inaccessible."""
1644
1666
        warnings = []
 
1667
 
1645
1668
        def warning(*args):
1646
1669
            warnings.append(args[0] % args[1:])
1647
1670
        self.overrideAttr(trace, 'warning', warning)
1708
1731
 
1709
1732
    def assertGetHook(self, conf, name, value):
1710
1733
        calls = []
 
1734
 
1711
1735
        def hook(*args):
1712
1736
            calls.append(args)
1713
1737
        config.OldConfigHooks.install_named_hook('get', hook, None)
1732
1756
 
1733
1757
    def assertSetHook(self, conf, name, value):
1734
1758
        calls = []
 
1759
 
1735
1760
        def hook(*args):
1736
1761
            calls.append(args)
1737
1762
        config.OldConfigHooks.install_named_hook('set', hook, None)
1756
1781
 
1757
1782
    def assertRemoveHook(self, conf, name, section_name=None):
1758
1783
        calls = []
 
1784
 
1759
1785
        def hook(*args):
1760
1786
            calls.append(args)
1761
1787
        config.OldConfigHooks.install_named_hook('remove', hook, None)
1781
1807
 
1782
1808
    def assertLoadHook(self, name, conf_class, *conf_args):
1783
1809
        calls = []
 
1810
 
1784
1811
        def hook(*args):
1785
1812
            calls.append(args)
1786
1813
        config.OldConfigHooks.install_named_hook('load', hook, None)
1805
1832
 
1806
1833
    def assertSaveHook(self, conf):
1807
1834
        calls = []
 
1835
 
1808
1836
        def hook(*args):
1809
1837
            calls.append(args)
1810
1838
        config.OldConfigHooks.install_named_hook('save', hook, None)
1839
1867
 
1840
1868
    def assertGetHook(self, conf, name, value):
1841
1869
        calls = []
 
1870
 
1842
1871
        def hook(*args):
1843
1872
            calls.append(args)
1844
1873
        config.OldConfigHooks.install_named_hook('get', hook, None)
1862
1891
 
1863
1892
    def assertSetHook(self, conf, name, value):
1864
1893
        calls = []
 
1894
 
1865
1895
        def hook(*args):
1866
1896
            calls.append(args)
1867
1897
        config.OldConfigHooks.install_named_hook('set', hook, None)
1888
1918
 
1889
1919
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
1890
1920
        calls = []
 
1921
 
1891
1922
        def hook(*args):
1892
1923
            calls.append(args)
1893
1924
        config.OldConfigHooks.install_named_hook('load', hook, None)
1903
1934
 
1904
1935
    def test_load_hook_remote_branch(self):
1905
1936
        remote_branch = branch.Branch.open(self.get_url('tree'))
1906
 
        self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
 
1937
        self.assertLoadHook(
 
1938
            1, 'file', remote.RemoteBranchConfig, remote_branch)
1907
1939
 
1908
1940
    def test_load_hook_remote_bzrdir(self):
1909
1941
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
1914
1946
        # caused by the differences in implementations betwen
1915
1947
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
1916
1948
        # SmartServerBranchGetConfigFile (in smart/branch.py)
1917
 
        self.assertLoadHook(2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
 
1949
        self.assertLoadHook(
 
1950
            2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
1918
1951
 
1919
1952
    def assertSaveHook(self, conf):
1920
1953
        calls = []
 
1954
 
1921
1955
        def hook(*args):
1922
1956
            calls.append(args)
1923
1957
        config.OldConfigHooks.install_named_hook('save', hook, None)
2282
2316
        a_dict = dict()
2283
2317
        section = config.Section(None, a_dict)
2284
2318
        self.assertEqual('out of thin air',
2285
 
                          section.get('foo', 'out of thin air'))
 
2319
                         section.get('foo', 'out of thin air'))
2286
2320
 
2287
2321
    def test_options_is_shared(self):
2288
2322
        a_dict = dict()
2294
2328
 
2295
2329
    scenarios = [('mutable',
2296
2330
                  {'get_section':
2297
 
                       lambda opts: config.MutableSection('myID', opts)},),
2298
 
        ]
 
2331
                   lambda opts: config.MutableSection('myID', opts)},),
 
2332
                 ]
2299
2333
 
2300
2334
    def test_set(self):
2301
2335
        a_dict = dict(foo='bar')
2388
2422
        self.assertRaises(errors.BzrCommandError,
2389
2423
                          self.store._from_cmdline, ['a=b', 'c'])
2390
2424
 
 
2425
 
2391
2426
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2392
2427
 
2393
2428
    scenarios = [(key, {'get_store': builder}) for key, builder
2492
2527
    def test_simple_comma(self):
2493
2528
        if isinstance(self.store, config.IniFileStore):
2494
2529
            # configobj requires that lists are special-cased
2495
 
           self.assertRaises(AssertionError,
2496
 
                             self.assertIdempotent, ',')
 
2530
            self.assertRaises(AssertionError,
 
2531
                              self.assertIdempotent, ',')
2497
2532
        else:
2498
2533
            self.assertIdempotent(',')
2499
2534
        # When a single comma is required, quoting is also required
2566
2601
    def test_load_permission_denied(self):
2567
2602
        """Ensure we get warned when trying to load an inaccessible file."""
2568
2603
        warnings = []
 
2604
 
2569
2605
        def warning(*args):
2570
2606
            warnings.append(args[0] % args[1:])
2571
2607
        self.overrideAttr(trace, 'warning', warning)
2752
2788
        # Now we can try to load it
2753
2789
        store = self.get_store(self)
2754
2790
        calls = []
 
2791
 
2755
2792
        def hook(*args):
2756
2793
            calls.append(args)
2757
2794
        config.ConfigHooks.install_named_hook('load', hook, None)
2762
2799
 
2763
2800
    def test_save_hook(self):
2764
2801
        calls = []
 
2802
 
2765
2803
        def hook(*args):
2766
2804
            calls.append(args)
2767
2805
        config.ConfigHooks.install_named_hook('save', hook, None)
2804
2842
        self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
2805
2843
        self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
2806
2844
        self.warnings = []
 
2845
 
2807
2846
        def warning(*args):
2808
2847
            self.warnings.append(args[0] % args[1:])
2809
2848
        self.overrideAttr(trace, 'warning', warning)
2889
2928
        stack = config.Stack([store.get_sections], store)
2890
2929
        stack.set('foo', ' a b c ')
2891
2930
        store.save()
2892
 
        self.assertFileEqual(b'foo = " a b c "' + os.linesep.encode('ascii'), 'foo.conf')
 
2931
        self.assertFileEqual(b'foo = " a b c "' +
 
2932
                             os.linesep.encode('ascii'), 'foo.conf')
2893
2933
 
2894
2934
 
2895
2935
class TestTransportIniFileStore(TestStore):
2896
2936
 
2897
2937
    def test_loading_unknown_file_fails(self):
2898
2938
        store = config.TransportIniFileStore(self.get_transport(),
2899
 
            'I-do-not-exist')
 
2939
                                             'I-do-not-exist')
2900
2940
        self.assertRaises(errors.NoSuchFile, store.load)
2901
2941
 
2902
2942
    def test_invalid_content(self):
3019
3059
        after_writing = threading.Event()
3020
3060
        writing_done = threading.Event()
3021
3061
        c1_save_without_locking_orig = c1.store.save_without_locking
 
3062
 
3022
3063
        def c1_save_without_locking():
3023
3064
            before_writing.set()
3024
3065
            c1_save_without_locking_orig()
3026
3067
            # continue
3027
3068
            after_writing.wait()
3028
3069
        c1.store.save_without_locking = c1_save_without_locking
 
3070
 
3029
3071
        def c1_set():
3030
3072
            c1.set('one', 'c1')
3031
3073
            writing_done.set()
3046
3088
        self.assertEqual('c2', c2.get('one'))
3047
3089
 
3048
3090
    def test_read_while_writing(self):
3049
 
       c1 = self.stack
3050
 
       # We spawn a thread that will pause *during* the write
3051
 
       ready_to_write = threading.Event()
3052
 
       do_writing = threading.Event()
3053
 
       writing_done = threading.Event()
3054
 
       # We override the _save implementation so we know the store is locked
3055
 
       c1_save_without_locking_orig = c1.store.save_without_locking
3056
 
       def c1_save_without_locking():
3057
 
           ready_to_write.set()
3058
 
           # The lock is held. We wait for the main thread to decide when to
3059
 
           # continue
3060
 
           do_writing.wait()
3061
 
           c1_save_without_locking_orig()
3062
 
           writing_done.set()
3063
 
       c1.store.save_without_locking = c1_save_without_locking
3064
 
       def c1_set():
3065
 
           c1.set('one', 'c1')
3066
 
       t1 = threading.Thread(target=c1_set)
3067
 
       # Collect the thread after the test
3068
 
       self.addCleanup(t1.join)
3069
 
       # Be ready to unblock the thread if the test goes wrong
3070
 
       self.addCleanup(do_writing.set)
3071
 
       t1.start()
3072
 
       # Ensure the thread is ready to write
3073
 
       ready_to_write.wait()
3074
 
       self.assertEqual('c1', c1.get('one'))
3075
 
       # If we read during the write, we get the old value
3076
 
       c2 = self.get_stack(self)
3077
 
       self.assertEqual('1', c2.get('one'))
3078
 
       # Let the writing occur and ensure it occurred
3079
 
       do_writing.set()
3080
 
       writing_done.wait()
3081
 
       # Now we get the updated value
3082
 
       c3 = self.get_stack(self)
3083
 
       self.assertEqual('c1', c3.get('one'))
 
3091
        c1 = self.stack
 
3092
        # We spawn a thread that will pause *during* the write
 
3093
        ready_to_write = threading.Event()
 
3094
        do_writing = threading.Event()
 
3095
        writing_done = threading.Event()
 
3096
        # We override the _save implementation so we know the store is locked
 
3097
        c1_save_without_locking_orig = c1.store.save_without_locking
 
3098
 
 
3099
        def c1_save_without_locking():
 
3100
            ready_to_write.set()
 
3101
            # The lock is held. We wait for the main thread to decide when to
 
3102
            # continue
 
3103
            do_writing.wait()
 
3104
            c1_save_without_locking_orig()
 
3105
            writing_done.set()
 
3106
        c1.store.save_without_locking = c1_save_without_locking
 
3107
 
 
3108
        def c1_set():
 
3109
            c1.set('one', 'c1')
 
3110
        t1 = threading.Thread(target=c1_set)
 
3111
        # Collect the thread after the test
 
3112
        self.addCleanup(t1.join)
 
3113
        # Be ready to unblock the thread if the test goes wrong
 
3114
        self.addCleanup(do_writing.set)
 
3115
        t1.start()
 
3116
        # Ensure the thread is ready to write
 
3117
        ready_to_write.wait()
 
3118
        self.assertEqual('c1', c1.get('one'))
 
3119
        # If we read during the write, we get the old value
 
3120
        c2 = self.get_stack(self)
 
3121
        self.assertEqual('1', c2.get('one'))
 
3122
        # Let the writing occur and ensure it occurred
 
3123
        do_writing.set()
 
3124
        writing_done.wait()
 
3125
        # Now we get the updated value
 
3126
        c3 = self.get_stack(self)
 
3127
        self.assertEqual('c1', c3.get('one'))
3084
3128
 
3085
3129
    # FIXME: It may be worth looking into removing the lock dir when it's not
3086
3130
    # needed anymore and look at possible fallouts for concurrent lockers. This
3091
3135
class TestSectionMatcher(TestStore):
3092
3136
 
3093
3137
    scenarios = [('location', {'matcher': config.LocationMatcher}),
3094
 
                 ('id', {'matcher': config.NameMatcher}),]
 
3138
                 ('id', {'matcher': config.NameMatcher}), ]
3095
3139
 
3096
3140
    def setUp(self):
3097
3141
        super(TestSectionMatcher, self).setUp()
3154
3198
section=/quux/quux
3155
3199
''')
3156
3200
        self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3157
 
                           '/quux/quux'],
3158
 
                          [section.id for _, section in store.get_sections()])
 
3201
                          '/quux/quux'],
 
3202
                         [section.id for _, section in store.get_sections()])
3159
3203
        matcher = config.LocationMatcher(store, '/foo/bar/quux')
3160
3204
        sections = [section for _, section in matcher.get_sections()]
3161
3205
        self.assertEqual(['/foo/bar', '/foo'],
3162
 
                          [section.id for section in sections])
 
3206
                         [section.id for section in sections])
3163
3207
        self.assertEqual(['quux', 'bar/quux'],
3164
 
                          [section.extra_path for section in sections])
 
3208
                         [section.extra_path for section in sections])
3165
3209
 
3166
3210
    def test_more_specific_sections_first(self):
3167
3211
        store = self.get_store(self)
3172
3216
section=/foo/bar
3173
3217
''')
3174
3218
        self.assertEqual(['/foo', '/foo/bar'],
3175
 
                          [section.id for _, section in store.get_sections()])
 
3219
                         [section.id for _, section in store.get_sections()])
3176
3220
        matcher = config.LocationMatcher(store, '/foo/bar/baz')
3177
3221
        sections = [section for _, section in matcher.get_sections()]
3178
3222
        self.assertEqual(['/foo/bar', '/foo'],
3179
 
                          [section.id for section in sections])
 
3223
                         [section.id for section in sections])
3180
3224
        self.assertEqual(['baz', 'bar/baz'],
3181
 
                          [section.extra_path for section in sections])
 
3225
                         [section.extra_path for section in sections])
3182
3226
 
3183
3227
    def test_appendpath_in_no_name_section(self):
3184
3228
        # It's a bit weird to allow appendpath in a no-name section, but
3261
3305
[file:///foo/bar]
3262
3306
''')
3263
3307
 
3264
 
 
3265
3308
    def test_no_name_section_included_when_present(self):
3266
3309
        # Note that other tests will cover the case where the no-name section
3267
3310
        # is empty and as such, not included.
3272
3315
[/foo/bar]
3273
3316
''')
3274
3317
        self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
3275
 
                          [s.locals['relpath'] for _, s in sections])
 
3318
                         [s.locals['relpath'] for _, s in sections])
3276
3319
 
3277
3320
    def test_order_reversed(self):
3278
3321
        self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
3299
3342
        # nothing really is... as far using {relpath} to append it to something
3300
3343
        # else, this seems good enough though.
3301
3344
        self.assertEqual(['', 'baz', 'bar/baz'],
3302
 
                          [s.locals['relpath'] for _, s in sections])
 
3345
                         [s.locals['relpath'] for _, s in sections])
3303
3346
 
3304
3347
    def test_respect_order(self):
3305
3348
        self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3371
3414
        self.assertEqual(None, conf_stack.get('foo'))
3372
3415
 
3373
3416
    def test_get_for_empty_section_callable(self):
3374
 
        conf_stack = config.Stack([lambda : []])
 
3417
        conf_stack = config.Stack([lambda: []])
3375
3418
        self.assertEqual(None, conf_stack.get('foo'))
3376
3419
 
3377
3420
    def test_get_for_broken_callable(self):
3494
3537
    def test_get_hook(self):
3495
3538
        self.conf.set('foo', 'bar')
3496
3539
        calls = []
 
3540
 
3497
3541
        def hook(*args):
3498
3542
            calls.append(args)
3499
3543
        config.ConfigHooks.install_named_hook('get', hook, None)
3651
3695
    def test_two_refs(self):
3652
3696
        self.assertRefs([(False, ''), (True, '{foo}'),
3653
3697
                         (False, ''), (True, '{bar}'),
3654
 
                         (False, ''),],
 
3698
                         (False, ''), ],
3655
3699
                        '{foo}{bar}')
3656
3700
 
3657
3701
    def test_newline_in_refs_are_not_matched(self):
3750
3794
        self.registry.register(
3751
3795
            config.ListOption('list'))
3752
3796
        self.assertEqual(['start', 'middle', 'end'],
3753
 
                           self.conf.get('list', expand=True))
 
3797
                         self.conf.get('list', expand=True))
3754
3798
 
3755
3799
    def test_cascading_list(self):
3756
3800
        self.conf.store._load_from_string(b'''
3765
3809
        # option ('list' here).
3766
3810
        self.registry.register(config.ListOption('baz'))
3767
3811
        self.assertEqual(['start', 'middle', 'end'],
3768
 
                           self.conf.get('list', expand=True))
 
3812
                         self.conf.get('list', expand=True))
3769
3813
 
3770
3814
    def test_pathologically_hidden_list(self):
3771
3815
        self.conf.store._load_from_string(b'''
3780
3824
        # only after all expansions have been performed
3781
3825
        self.registry.register(config.ListOption('hidden'))
3782
3826
        self.assertEqual(['bin', 'go'],
3783
 
                          self.conf.get('hidden', expand=True))
 
3827
                         self.conf.get('hidden', expand=True))
3784
3828
 
3785
3829
 
3786
3830
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3933
3977
        self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
3934
3978
 
3935
3979
 
3936
 
 
3937
3980
class TestStackSet(TestStackWithTransport):
3938
3981
 
3939
3982
    def test_simple_set(self):
3950
3993
 
3951
3994
    def test_set_hook(self):
3952
3995
        calls = []
 
3996
 
3953
3997
        def hook(*args):
3954
3998
            calls.append(args)
3955
3999
        config.ConfigHooks.install_named_hook('set', hook, None)
3976
4020
 
3977
4021
    def test_remove_hook(self):
3978
4022
        calls = []
 
4023
 
3979
4024
        def hook(*args):
3980
4025
            calls.append(args)
3981
4026
        config.ConfigHooks.install_named_hook('remove', hook, None)
4017
4062
        self.breezy_config.set_user_option('file', 'breezy')
4018
4063
        self.branch_config.set_user_option('file', 'branch')
4019
4064
        self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
4020
 
                            ('file', 'breezy', 'DEFAULT', 'breezy'),],
 
4065
                            ('file', 'breezy', 'DEFAULT', 'breezy'), ],
4021
4066
                           self.branch_config)
4022
4067
 
4023
4068
    def test_option_in_branch_and_locations(self):
4026
4071
        self.branch_config.set_user_option('file', 'branch')
4027
4072
        self.assertOptions(
4028
4073
            [('file', 'locations', self.tree.basedir, 'locations'),
4029
 
             ('file', 'branch', 'DEFAULT', 'branch'),],
 
4074
             ('file', 'branch', 'DEFAULT', 'branch'), ],
4030
4075
            self.branch_config)
4031
4076
 
4032
4077
    def test_option_in_breezy_locations_and_branch(self):
4036
4081
        self.assertOptions(
4037
4082
            [('file', 'locations', self.tree.basedir, 'locations'),
4038
4083
             ('file', 'branch', 'DEFAULT', 'branch'),
4039
 
             ('file', 'breezy', 'DEFAULT', 'breezy'),],
 
4084
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
4040
4085
            self.branch_config)
4041
4086
 
4042
4087
 
4050
4095
        self.locations_config.remove_user_option('file', self.tree.basedir)
4051
4096
        self.assertOptions(
4052
4097
            [('file', 'branch', 'DEFAULT', 'branch'),
4053
 
             ('file', 'breezy', 'DEFAULT', 'breezy'),],
 
4098
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
4054
4099
            self.branch_config)
4055
4100
 
4056
4101
    def test_remove_in_branch(self):
4057
4102
        self.branch_config.remove_user_option('file')
4058
4103
        self.assertOptions(
4059
4104
            [('file', 'locations', self.tree.basedir, 'locations'),
4060
 
             ('file', 'breezy', 'DEFAULT', 'breezy'),],
 
4105
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
4061
4106
            self.branch_config)
4062
4107
 
4063
4108
    def test_remove_in_breezy(self):
4064
4109
        self.breezy_config.remove_user_option('file')
4065
4110
        self.assertOptions(
4066
4111
            [('file', 'locations', self.tree.basedir, 'locations'),
4067
 
             ('file', 'branch', 'DEFAULT', 'branch'),],
 
4112
             ('file', 'branch', 'DEFAULT', 'branch'), ],
4068
4113
            self.branch_config)
4069
4114
 
4070
4115
 
4160
4205
port=port # Error: Not an int
4161
4206
""")
4162
4207
        self.overrideAttr(config, 'authentication_config_filename',
4163
 
            lambda: self.path)
 
4208
                          lambda: self.path)
4164
4209
        osutils.chmod_if_possible(self.path, 0o755)
4165
4210
 
4166
4211
    def test_check_warning(self):
4167
4212
        conf = config.AuthenticationConfig()
4168
4213
        self.assertEqual(conf._filename, self.path)
4169
4214
        self.assertContainsRe(self.get_log(),
4170
 
            'Saved passwords may be accessible by other users.')
 
4215
                              'Saved passwords may be accessible by other users.')
4171
4216
 
4172
4217
    def test_check_suppressed_warning(self):
4173
4218
        global_config = config.GlobalConfig()
4174
4219
        global_config.set_user_option('suppress_warnings',
4175
 
            'insecure_permissions')
 
4220
                                      'insecure_permissions')
4176
4221
        conf = config.AuthenticationConfig()
4177
4222
        self.assertEqual(conf._filename, self.path)
4178
4223
        self.assertNotContainsRe(self.get_log(),
4179
 
            'Saved passwords may be accessible by other users.')
 
4224
                                 'Saved passwords may be accessible by other users.')
4180
4225
 
4181
4226
 
4182
4227
class TestAuthenticationConfigFile(tests.TestCase):
4389
4434
    def test_set_credentials(self):
4390
4435
        conf = config.AuthenticationConfig()
4391
4436
        conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4392
 
        99, path='/foo', verify_certificates=False, realm='realm')
 
4437
                             99, path='/foo', verify_certificates=False, realm='realm')
4393
4438
        credentials = conf.get_credentials(host='host', scheme='scheme',
4394
4439
                                           port=99, path='/foo',
4395
4440
                                           realm='realm')
4432
4477
        # We use an empty conf so that the user is always prompted
4433
4478
        conf = config.AuthenticationConfig()
4434
4479
        self.assertEqual(password,
4435
 
                          conf.get_password(scheme, host, user, port=port,
4436
 
                                            realm=realm, path=path))
 
4480
                         conf.get_password(scheme, host, user, port=port,
 
4481
                                           realm=realm, path=path))
4437
4482
        self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4438
4483
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
4439
4484
 
4446
4491
        expected_prompt = expected_prompt_format % {
4447
4492
            'scheme': scheme, 'host': host, 'port': port,
4448
4493
            'realm': realm}
4449
 
        ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n')
 
4494
        ui.ui_factory = tests.TestUIFactory(stdin=username + '\n')
4450
4495
        # We use an empty conf so that the user is always prompted
4451
4496
        conf = config.AuthenticationConfig()
4452
4497
        self.assertEqual(username, conf.get_user(scheme, host, port=port,
4453
 
                          realm=realm, path=path, ask=True))
 
4498
                                                 realm=realm, path=path, ask=True))
4454
4499
        self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
4455
4500
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
4456
4501
 
4465
4510
    def test_username_default_no_prompt(self):
4466
4511
        conf = config.AuthenticationConfig()
4467
4512
        self.assertEqual(None,
4468
 
            conf.get_user('ftp', 'example.com'))
 
4513
                         conf.get_user('ftp', 'example.com'))
4469
4514
        self.assertEqual("explicitdefault",
4470
 
            conf.get_user('ftp', 'example.com', default="explicitdefault"))
 
4515
                         conf.get_user('ftp', 'example.com', default="explicitdefault"))
4471
4516
 
4472
4517
    def test_password_default_prompts(self):
4473
4518
        # HTTP prompts can't be tested here, see test_http.py
4502
4547
        # Since the password defined in the authentication config is ignored,
4503
4548
        # the user is prompted
4504
4549
        self.assertEqual(entered_password,
4505
 
                          conf.get_password('ssh', 'bar.org', user='jim'))
 
4550
                         conf.get_password('ssh', 'bar.org', user='jim'))
4506
4551
        self.assertContainsRe(
4507
4552
            self.get_log(),
4508
4553
            'password ignored in section \\[ssh with password\\]')
4520
4565
        # Since the password defined in the authentication config is ignored,
4521
4566
        # the user is prompted
4522
4567
        self.assertEqual(entered_password,
4523
 
                          conf.get_password('ssh', 'bar.org', user='jim'))
 
4568
                         conf.get_password('ssh', 'bar.org', user='jim'))
4524
4569
        # No warning shoud be emitted since there is no password. We are only
4525
4570
        # providing "user".
4526
4571
        self.assertNotContainsRe(
4550
4595
        self._password[(scheme, host)] = password
4551
4596
 
4552
4597
    def get_credentials(self, scheme, host, port=None, user=None,
4553
 
        path=None, realm=None):
 
4598
                        path=None, realm=None):
4554
4599
        key = (scheme, host)
4555
4600
        if not key in self._username:
4556
4601
            return None
4557
 
        return { "scheme": scheme, "host": host, "port": port,
 
4602
        return {"scheme": scheme, "host": host, "port": port,
4558
4603
                "user": self._username[key], "password": self._password[key]}
4559
4604
 
4560
4605
 
4564
4609
        self._calls = 0
4565
4610
 
4566
4611
    def get_credentials(self, scheme, host, port=None, user=None,
4567
 
        path=None, realm=None):
 
4612
                        path=None, realm=None):
4568
4613
        self._calls += 1
4569
4614
        return None
4570
4615
 
4588
4633
    def test_fallback_none_registered(self):
4589
4634
        r = config.CredentialStoreRegistry()
4590
4635
        self.assertEqual(None,
4591
 
                          r.get_fallback_credentials("http", "example.com"))
 
4636
                         r.get_fallback_credentials("http", "example.com"))
4592
4637
 
4593
4638
    def test_register(self):
4594
4639
        r = config.CredentialStoreRegistry()
4616
4661
        store = CountingCredentialStore()
4617
4662
        r.register("count", store, fallback=False)
4618
4663
        self.assertEqual(None,
4619
 
                          r.get_fallback_credentials("http", "example.com"))
 
4664
                         r.get_fallback_credentials("http", "example.com"))
4620
4665
        self.assertEqual(0, store._calls)
4621
4666
 
4622
4667
    def test_fallback_credentials(self):
4792
4837
    def test_unknown(self):
4793
4838
        conf = config.MemoryStack(b'mail_client=firebird')
4794
4839
        self.assertRaises(config.ConfigOptionValueError, conf.get,
4795
 
                'mail_client')
 
4840
                          'mail_client')