/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_config.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-24 00:39:50 UTC
  • mto: This revision was merged to the branch mainline in revision 7504.
  • Revision ID: jelmer@jelmer.uk-20200524003950-bbc545r76vc5yajg
Add github action.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2014, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for finding and reading the bzr config file[s]."""
 
18
 
 
19
from textwrap import dedent
 
20
from io import BytesIO
 
21
import os
 
22
import sys
 
23
import threading
 
24
 
 
25
import configobj
 
26
from testtools import matchers
 
27
 
 
28
from .. import (
 
29
    branch,
 
30
    config,
 
31
    bedding,
 
32
    controldir,
 
33
    diff,
 
34
    errors,
 
35
    lock,
 
36
    osutils,
 
37
    mail_client,
 
38
    ui,
 
39
    urlutils,
 
40
    registry as _mod_registry,
 
41
    tests,
 
42
    trace,
 
43
    )
 
44
from ..bzr import (
 
45
    remote,
 
46
    )
 
47
from ..transport import remote as transport_remote
 
48
from . import (
 
49
    features,
 
50
    scenarios,
 
51
    test_server,
 
52
    )
 
53
 
 
54
 
 
55
def lockable_config_scenarios():
 
56
    return [
 
57
        ('global',
 
58
         {'config_class': config.GlobalConfig,
 
59
          'config_args': [],
 
60
          'config_section': 'DEFAULT'}),
 
61
        ('locations',
 
62
         {'config_class': config.LocationConfig,
 
63
          'config_args': ['.'],
 
64
          'config_section': '.'}), ]
 
65
 
 
66
 
 
67
load_tests = scenarios.load_tests_apply_scenarios
 
68
 
 
69
# Register helpers to build stores
 
70
config.test_store_builder_registry.register(
 
71
    'configobj', lambda test: config.TransportIniFileStore(
 
72
        test.get_transport(), 'configobj.conf'))
 
73
config.test_store_builder_registry.register(
 
74
    'breezy', lambda test: config.GlobalStore())
 
75
config.test_store_builder_registry.register(
 
76
    'location', lambda test: config.LocationStore())
 
77
 
 
78
 
 
79
def build_backing_branch(test, relpath,
 
80
                         transport_class=None, server_class=None):
 
81
    """Test helper to create a backing branch only once.
 
82
 
 
83
    Some tests needs multiple stores/stacks to check concurrent update
 
84
    behaviours. As such, they need to build different branch *objects* even if
 
85
    they share the branch on disk.
 
86
 
 
87
    :param relpath: The relative path to the branch. (Note that the helper
 
88
        should always specify the same relpath).
 
89
 
 
90
    :param transport_class: The Transport class the test needs to use.
 
91
 
 
92
    :param server_class: The server associated with the ``transport_class``
 
93
        above.
 
94
 
 
95
    Either both or neither of ``transport_class`` and ``server_class`` should
 
96
    be specified.
 
97
    """
 
98
    if transport_class is not None and server_class is not None:
 
99
        test.transport_class = transport_class
 
100
        test.transport_server = server_class
 
101
    elif not (transport_class is None and server_class is None):
 
102
        raise AssertionError('Specify both ``transport_class`` and '
 
103
                             '``server_class`` or neither of them')
 
104
    if getattr(test, 'backing_branch', None) is None:
 
105
        # First call, let's build the branch on disk
 
106
        test.backing_branch = test.make_branch(relpath)
 
107
 
 
108
 
 
109
def build_branch_store(test):
 
110
    build_backing_branch(test, 'branch')
 
111
    b = branch.Branch.open('branch')
 
112
    return config.BranchStore(b)
 
113
 
 
114
 
 
115
config.test_store_builder_registry.register('branch', build_branch_store)
 
116
 
 
117
 
 
118
def build_control_store(test):
 
119
    build_backing_branch(test, 'branch')
 
120
    b = controldir.ControlDir.open('branch')
 
121
    return config.ControlStore(b)
 
122
 
 
123
 
 
124
config.test_store_builder_registry.register('control', build_control_store)
 
125
 
 
126
 
 
127
def build_remote_branch_store(test):
 
128
    # There is only one permutation (but we won't be able to handle more with
 
129
    # this design anyway)
 
130
    (transport_class,
 
131
     server_class) = transport_remote.get_test_permutations()[0]
 
132
    build_backing_branch(test, 'branch', transport_class, server_class)
 
133
    b = branch.Branch.open(test.get_url('branch'))
 
134
    return config.BranchStore(b)
 
135
 
 
136
 
 
137
config.test_store_builder_registry.register('remote_branch',
 
138
                                            build_remote_branch_store)
 
139
 
 
140
 
 
141
config.test_stack_builder_registry.register(
 
142
    'breezy', lambda test: config.GlobalStack())
 
143
config.test_stack_builder_registry.register(
 
144
    'location', lambda test: config.LocationStack('.'))
 
145
 
 
146
 
 
147
def build_branch_stack(test):
 
148
    build_backing_branch(test, 'branch')
 
149
    b = branch.Branch.open('branch')
 
150
    return config.BranchStack(b)
 
151
 
 
152
 
 
153
config.test_stack_builder_registry.register('branch', build_branch_stack)
 
154
 
 
155
 
 
156
def build_branch_only_stack(test):
 
157
    # There is only one permutation (but we won't be able to handle more with
 
158
    # this design anyway)
 
159
    (transport_class,
 
160
     server_class) = transport_remote.get_test_permutations()[0]
 
161
    build_backing_branch(test, 'branch', transport_class, server_class)
 
162
    b = branch.Branch.open(test.get_url('branch'))
 
163
    return config.BranchOnlyStack(b)
 
164
 
 
165
 
 
166
config.test_stack_builder_registry.register('branch_only',
 
167
                                            build_branch_only_stack)
 
168
 
 
169
 
 
170
def build_remote_control_stack(test):
 
171
    # There is only one permutation (but we won't be able to handle more with
 
172
    # this design anyway)
 
173
    (transport_class,
 
174
     server_class) = transport_remote.get_test_permutations()[0]
 
175
    # We need only a bzrdir for this, not a full branch, but it's not worth
 
176
    # creating a dedicated helper to create only the bzrdir
 
177
    build_backing_branch(test, 'branch', transport_class, server_class)
 
178
    b = branch.Branch.open(test.get_url('branch'))
 
179
    return config.RemoteControlStack(b.controldir)
 
180
 
 
181
 
 
182
config.test_stack_builder_registry.register('remote_control',
 
183
                                            build_remote_control_stack)
 
184
 
 
185
 
 
186
sample_long_alias = "log -r-15..-1 --line"
 
187
sample_config_text = u"""
 
188
[DEFAULT]
 
189
email=Erik B\u00e5gfors <erik@bagfors.nu>
 
190
editor=vim
 
191
change_editor=vimdiff -of {new_path} {old_path}
 
192
gpg_signing_key=DD4D5088
 
193
log_format=short
 
194
validate_signatures_in_log=true
 
195
acceptable_keys=amy
 
196
user_global_option=something
 
197
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
 
198
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
 
199
bzr.mergetool.newtool='"newtool with spaces" {this_temp}'
 
200
bzr.default_mergetool=sometool
 
201
[ALIASES]
 
202
h=help
 
203
ll=""".encode('utf-8') + sample_long_alias.encode('utf-8') + b"\n"
 
204
 
 
205
 
 
206
sample_always_signatures = b"""
 
207
[DEFAULT]
 
208
check_signatures=ignore
 
209
create_signatures=always
 
210
"""
 
211
 
 
212
sample_ignore_signatures = b"""
 
213
[DEFAULT]
 
214
check_signatures=require
 
215
create_signatures=never
 
216
"""
 
217
 
 
218
sample_maybe_signatures = b"""
 
219
[DEFAULT]
 
220
check_signatures=ignore
 
221
create_signatures=when-required
 
222
"""
 
223
 
 
224
sample_branches_text = b"""
 
225
[http://www.example.com]
 
226
# Top level policy
 
227
email=Robert Collins <robertc@example.org>
 
228
normal_option = normal
 
229
appendpath_option = append
 
230
appendpath_option:policy = appendpath
 
231
norecurse_option = norecurse
 
232
norecurse_option:policy = norecurse
 
233
[http://www.example.com/ignoreparent]
 
234
# different project: ignore parent dir config
 
235
ignore_parents=true
 
236
[http://www.example.com/norecurse]
 
237
# configuration items that only apply to this dir
 
238
recurse=false
 
239
normal_option = norecurse
 
240
[http://www.example.com/dir]
 
241
appendpath_option = normal
 
242
[/b/]
 
243
check_signatures=require
 
244
# test trailing / matching with no children
 
245
[/a/]
 
246
check_signatures=check-available
 
247
gpg_signing_key=default
 
248
user_local_option=local
 
249
# test trailing / matching
 
250
[/a/*]
 
251
#subdirs will match but not the parent
 
252
[/a/c]
 
253
check_signatures=ignore
 
254
post_commit=breezy.tests.test_config.post_commit
 
255
#testing explicit beats globs
 
256
"""
 
257
 
 
258
 
 
259
def create_configs(test):
 
260
    """Create configuration files for a given test.
 
261
 
 
262
    This requires creating a tree (and populate the ``test.tree`` attribute)
 
263
    and its associated branch and will populate the following attributes:
 
264
 
 
265
    - branch_config: A BranchConfig for the associated branch.
 
266
 
 
267
    - locations_config : A LocationConfig for the associated branch
 
268
 
 
269
    - breezy_config: A GlobalConfig.
 
270
 
 
271
    The tree and branch are created in a 'tree' subdirectory so the tests can
 
272
    still use the test directory to stay outside of the branch.
 
273
    """
 
274
    tree = test.make_branch_and_tree('tree')
 
275
    test.tree = tree
 
276
    test.branch_config = config.BranchConfig(tree.branch)
 
277
    test.locations_config = config.LocationConfig(tree.basedir)
 
278
    test.breezy_config = config.GlobalConfig()
 
279
 
 
280
 
 
281
def create_configs_with_file_option(test):
 
282
    """Create configuration files with a ``file`` option set in each.
 
283
 
 
284
    This builds on ``create_configs`` and add one ``file`` option in each
 
285
    configuration with a value which allows identifying the configuration file.
 
286
    """
 
287
    create_configs(test)
 
288
    test.breezy_config.set_user_option('file', 'breezy')
 
289
    test.locations_config.set_user_option('file', 'locations')
 
290
    test.branch_config.set_user_option('file', 'branch')
 
291
 
 
292
 
 
293
class TestOptionsMixin:
 
294
 
 
295
    def assertOptions(self, expected, conf):
 
296
        # We don't care about the parser (as it will make tests hard to write
 
297
        # and error-prone anyway)
 
298
        self.assertThat([opt[:4] for opt in conf._get_options()],
 
299
                        matchers.Equals(expected))
 
300
 
 
301
 
 
302
class InstrumentedConfigObj(object):
 
303
    """A config obj look-enough-alike to record calls made to it."""
 
304
 
 
305
    def __contains__(self, thing):
 
306
        self._calls.append(('__contains__', thing))
 
307
        return False
 
308
 
 
309
    def __getitem__(self, key):
 
310
        self._calls.append(('__getitem__', key))
 
311
        return self
 
312
 
 
313
    def __init__(self, input, encoding=None):
 
314
        self._calls = [('__init__', input, encoding)]
 
315
 
 
316
    def __setitem__(self, key, value):
 
317
        self._calls.append(('__setitem__', key, value))
 
318
 
 
319
    def __delitem__(self, key):
 
320
        self._calls.append(('__delitem__', key))
 
321
 
 
322
    def keys(self):
 
323
        self._calls.append(('keys',))
 
324
        return []
 
325
 
 
326
    def reload(self):
 
327
        self._calls.append(('reload',))
 
328
 
 
329
    def write(self, arg):
 
330
        self._calls.append(('write',))
 
331
 
 
332
    def as_bool(self, value):
 
333
        self._calls.append(('as_bool', value))
 
334
        return False
 
335
 
 
336
    def get_value(self, section, name):
 
337
        self._calls.append(('get_value', section, name))
 
338
        return None
 
339
 
 
340
 
 
341
class FakeBranch(object):
 
342
 
 
343
    def __init__(self, base=None):
 
344
        if base is None:
 
345
            self.base = "http://example.com/branches/demo"
 
346
        else:
 
347
            self.base = base
 
348
        self._transport = self.control_files = \
 
349
            FakeControlFilesAndTransport()
 
350
 
 
351
    def _get_config(self):
 
352
        return config.TransportConfig(self._transport, 'branch.conf')
 
353
 
 
354
    def lock_write(self):
 
355
        return lock.LogicalLockResult(self.unlock)
 
356
 
 
357
    def unlock(self):
 
358
        pass
 
359
 
 
360
 
 
361
class FakeControlFilesAndTransport(object):
 
362
 
 
363
    def __init__(self):
 
364
        self.files = {}
 
365
        self._transport = self
 
366
 
 
367
    def get(self, filename):
 
368
        # from Transport
 
369
        try:
 
370
            return BytesIO(self.files[filename])
 
371
        except KeyError:
 
372
            raise errors.NoSuchFile(filename)
 
373
 
 
374
    def get_bytes(self, filename):
 
375
        # from Transport
 
376
        try:
 
377
            return self.files[filename]
 
378
        except KeyError:
 
379
            raise errors.NoSuchFile(filename)
 
380
 
 
381
    def put(self, filename, fileobj):
 
382
        self.files[filename] = fileobj.read()
 
383
 
 
384
    def put_file(self, filename, fileobj):
 
385
        return self.put(filename, fileobj)
 
386
 
 
387
 
 
388
class InstrumentedConfig(config.Config):
 
389
    """An instrumented config that supplies stubs for template methods."""
 
390
 
 
391
    def __init__(self):
 
392
        super(InstrumentedConfig, self).__init__()
 
393
        self._calls = []
 
394
        self._signatures = config.CHECK_NEVER
 
395
        self._change_editor = 'vimdiff -fo {new_path} {old_path}'
 
396
 
 
397
    def _get_user_id(self):
 
398
        self._calls.append('_get_user_id')
 
399
        return "Robert Collins <robert.collins@example.org>"
 
400
 
 
401
    def _get_signature_checking(self):
 
402
        self._calls.append('_get_signature_checking')
 
403
        return self._signatures
 
404
 
 
405
    def _get_change_editor(self):
 
406
        self._calls.append('_get_change_editor')
 
407
        return self._change_editor
 
408
 
 
409
 
 
410
bool_config = b"""[DEFAULT]
 
411
active = true
 
412
inactive = false
 
413
[UPPERCASE]
 
414
active = True
 
415
nonactive = False
 
416
"""
 
417
 
 
418
 
 
419
class TestConfigObj(tests.TestCase):
 
420
 
 
421
    def test_get_bool(self):
 
422
        co = config.ConfigObj(BytesIO(bool_config))
 
423
        self.assertIs(co.get_bool('DEFAULT', 'active'), True)
 
424
        self.assertIs(co.get_bool('DEFAULT', 'inactive'), False)
 
425
        self.assertIs(co.get_bool('UPPERCASE', 'active'), True)
 
426
        self.assertIs(co.get_bool('UPPERCASE', 'nonactive'), False)
 
427
 
 
428
    def test_hash_sign_in_value(self):
 
429
        """
 
430
        Before 4.5.0, ConfigObj did not quote # signs in values, so they'd be
 
431
        treated as comments when read in again. (#86838)
 
432
        """
 
433
        co = config.ConfigObj()
 
434
        co['test'] = 'foo#bar'
 
435
        outfile = BytesIO()
 
436
        co.write(outfile=outfile)
 
437
        lines = outfile.getvalue().splitlines()
 
438
        self.assertEqual(lines, [b'test = "foo#bar"'])
 
439
        co2 = config.ConfigObj(lines)
 
440
        self.assertEqual(co2['test'], 'foo#bar')
 
441
 
 
442
    def test_triple_quotes(self):
 
443
        # Bug #710410: if the value string has triple quotes
 
444
        # then ConfigObj versions up to 4.7.2 will quote them wrong
 
445
        # and won't able to read them back
 
446
        triple_quotes_value = '''spam
 
447
""" that's my spam """
 
448
eggs'''
 
449
        co = config.ConfigObj()
 
450
        co['test'] = triple_quotes_value
 
451
        # While writing this test another bug in ConfigObj has been found:
 
452
        # method co.write() without arguments produces list of lines
 
453
        # one option per line, and multiline values are not split
 
454
        # across multiple lines,
 
455
        # and that breaks the parsing these lines back by ConfigObj.
 
456
        # This issue only affects test, but it's better to avoid
 
457
        # `co.write()` construct at all.
 
458
        # [bialix 20110222] bug report sent to ConfigObj's author
 
459
        outfile = BytesIO()
 
460
        co.write(outfile=outfile)
 
461
        output = outfile.getvalue()
 
462
        # now we're trying to read it back
 
463
        co2 = config.ConfigObj(BytesIO(output))
 
464
        self.assertEqual(triple_quotes_value, co2['test'])
 
465
 
 
466
 
 
467
erroneous_config = b"""[section] # line 1
 
468
good=good # line 2
 
469
[section] # line 3
 
470
whocares=notme # line 4
 
471
"""
 
472
 
 
473
 
 
474
class TestConfigObjErrors(tests.TestCase):
 
475
 
 
476
    def test_duplicate_section_name_error_line(self):
 
477
        try:
 
478
            co = configobj.ConfigObj(BytesIO(erroneous_config),
 
479
                                     raise_errors=True)
 
480
        except config.configobj.DuplicateError as e:
 
481
            self.assertEqual(3, e.line_number)
 
482
        else:
 
483
            self.fail('Error in config file not detected')
 
484
 
 
485
 
 
486
class TestConfig(tests.TestCase):
 
487
 
 
488
    def test_constructs(self):
 
489
        config.Config()
 
490
 
 
491
    def test_user_email(self):
 
492
        my_config = InstrumentedConfig()
 
493
        self.assertEqual('robert.collins@example.org', my_config.user_email())
 
494
        self.assertEqual(['_get_user_id'], my_config._calls)
 
495
 
 
496
    def test_username(self):
 
497
        my_config = InstrumentedConfig()
 
498
        self.assertEqual('Robert Collins <robert.collins@example.org>',
 
499
                         my_config.username())
 
500
        self.assertEqual(['_get_user_id'], my_config._calls)
 
501
 
 
502
    def test_get_user_option_default(self):
 
503
        my_config = config.Config()
 
504
        self.assertEqual(None, my_config.get_user_option('no_option'))
 
505
 
 
506
    def test_validate_signatures_in_log_default(self):
 
507
        my_config = config.Config()
 
508
        self.assertEqual(False, my_config.validate_signatures_in_log())
 
509
 
 
510
    def test_get_change_editor(self):
 
511
        my_config = InstrumentedConfig()
 
512
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
 
513
        self.assertEqual(['_get_change_editor'], my_config._calls)
 
514
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
515
        self.assertEqual(['vimdiff', '-fo', '{new_path}', '{old_path}'],
 
516
                         change_editor.command_template)
 
517
 
 
518
    def test_get_change_editor_implicit_args(self):
 
519
        # If there are no substitution variables, then assume the
 
520
        # old and new path are the last arguments.
 
521
        my_config = InstrumentedConfig()
 
522
        my_config._change_editor = 'vimdiff -o'
 
523
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
 
524
        self.assertEqual(['_get_change_editor'], my_config._calls)
 
525
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
526
        self.assertEqual(['vimdiff', '-o', '{old_path}', '{new_path}'],
 
527
                         change_editor.command_template)
 
528
 
 
529
    def test_get_change_editor_old_style(self):
 
530
        # Test the old style format for the change_editor setting.
 
531
        my_config = InstrumentedConfig()
 
532
        my_config._change_editor = 'vimdiff -o @old_path @new_path'
 
533
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
 
534
        self.assertEqual(['_get_change_editor'], my_config._calls)
 
535
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
536
        self.assertEqual(['vimdiff', '-o', '{old_path}', '{new_path}'],
 
537
                         change_editor.command_template)
 
538
 
 
539
 
 
540
class TestIniConfig(tests.TestCaseInTempDir):
 
541
 
 
542
    def make_config_parser(self, s):
 
543
        conf = config.IniBasedConfig.from_string(s)
 
544
        return conf, conf._get_parser()
 
545
 
 
546
 
 
547
class TestIniConfigBuilding(TestIniConfig):
 
548
 
 
549
    def test_contructs(self):
 
550
        config.IniBasedConfig()
 
551
 
 
552
    def test_from_fp(self):
 
553
        my_config = config.IniBasedConfig.from_string(sample_config_text)
 
554
        self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
 
555
 
 
556
    def test_cached(self):
 
557
        my_config = config.IniBasedConfig.from_string(sample_config_text)
 
558
        parser = my_config._get_parser()
 
559
        self.assertTrue(my_config._get_parser() is parser)
 
560
 
 
561
    def _dummy_chown(self, path, uid, gid):
 
562
        self.path, self.uid, self.gid = path, uid, gid
 
563
 
 
564
    def test_ini_config_ownership(self):
 
565
        """Ensure that chown is happening during _write_config_file"""
 
566
        self.requireFeature(features.chown_feature)
 
567
        self.overrideAttr(os, 'chown', self._dummy_chown)
 
568
        self.path = self.uid = self.gid = None
 
569
        conf = config.IniBasedConfig(file_name='./foo.conf')
 
570
        conf._write_config_file()
 
571
        self.assertEqual(self.path, './foo.conf')
 
572
        self.assertTrue(isinstance(self.uid, int))
 
573
        self.assertTrue(isinstance(self.gid, int))
 
574
 
 
575
 
 
576
class TestIniConfigSaving(tests.TestCaseInTempDir):
 
577
 
 
578
    def test_cant_save_without_a_file_name(self):
 
579
        conf = config.IniBasedConfig()
 
580
        self.assertRaises(AssertionError, conf._write_config_file)
 
581
 
 
582
    def test_saved_with_content(self):
 
583
        content = b'foo = bar\n'
 
584
        config.IniBasedConfig.from_string(content, file_name='./test.conf',
 
585
                                          save=True)
 
586
        self.assertFileEqual(content, 'test.conf')
 
587
 
 
588
 
 
589
class TestIniConfigOptionExpansion(tests.TestCase):
 
590
    """Test option expansion from the IniConfig level.
 
591
 
 
592
    What we really want here is to test the Config level, but the class being
 
593
    abstract as far as storing values is concerned, this can't be done
 
594
    properly (yet).
 
595
    """
 
596
    # FIXME: This should be rewritten when all configs share a storage
 
597
    # implementation -- vila 2011-02-18
 
598
 
 
599
    def get_config(self, string=None):
 
600
        if string is None:
 
601
            string = b''
 
602
        c = config.IniBasedConfig.from_string(string)
 
603
        return c
 
604
 
 
605
    def assertExpansion(self, expected, conf, string, env=None):
 
606
        self.assertEqual(expected, conf.expand_options(string, env))
 
607
 
 
608
    def test_no_expansion(self):
 
609
        c = self.get_config('')
 
610
        self.assertExpansion('foo', c, 'foo')
 
611
 
 
612
    def test_env_adding_options(self):
 
613
        c = self.get_config('')
 
614
        self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
 
615
 
 
616
    def test_env_overriding_options(self):
 
617
        c = self.get_config('foo=baz')
 
618
        self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
 
619
 
 
620
    def test_simple_ref(self):
 
621
        c = self.get_config('foo=xxx')
 
622
        self.assertExpansion('xxx', c, '{foo}')
 
623
 
 
624
    def test_unknown_ref(self):
 
625
        c = self.get_config('')
 
626
        self.assertRaises(config.ExpandingUnknownOption,
 
627
                          c.expand_options, '{foo}')
 
628
 
 
629
    def test_indirect_ref(self):
 
630
        c = self.get_config('''
 
631
foo=xxx
 
632
bar={foo}
 
633
''')
 
634
        self.assertExpansion('xxx', c, '{bar}')
 
635
 
 
636
    def test_embedded_ref(self):
 
637
        c = self.get_config('''
 
638
foo=xxx
 
639
bar=foo
 
640
''')
 
641
        self.assertExpansion('xxx', c, '{{bar}}')
 
642
 
 
643
    def test_simple_loop(self):
 
644
        c = self.get_config('foo={foo}')
 
645
        self.assertRaises(config.OptionExpansionLoop, c.expand_options,
 
646
                          '{foo}')
 
647
 
 
648
    def test_indirect_loop(self):
 
649
        c = self.get_config('''
 
650
foo={bar}
 
651
bar={baz}
 
652
baz={foo}''')
 
653
        e = self.assertRaises(config.OptionExpansionLoop,
 
654
                              c.expand_options, '{foo}')
 
655
        self.assertEqual('foo->bar->baz', e.refs)
 
656
        self.assertEqual('{foo}', e.string)
 
657
 
 
658
    def test_list(self):
 
659
        conf = self.get_config('''
 
660
foo=start
 
661
bar=middle
 
662
baz=end
 
663
list={foo},{bar},{baz}
 
664
''')
 
665
        self.assertEqual(['start', 'middle', 'end'],
 
666
                         conf.get_user_option('list', expand=True))
 
667
 
 
668
    def test_cascading_list(self):
 
669
        conf = self.get_config('''
 
670
foo=start,{bar}
 
671
bar=middle,{baz}
 
672
baz=end
 
673
list={foo}
 
674
''')
 
675
        self.assertEqual(['start', 'middle', 'end'],
 
676
                         conf.get_user_option('list', expand=True))
 
677
 
 
678
    def test_pathological_hidden_list(self):
 
679
        conf = self.get_config('''
 
680
foo=bin
 
681
bar=go
 
682
start={foo
 
683
middle=},{
 
684
end=bar}
 
685
hidden={start}{middle}{end}
 
686
''')
 
687
        # Nope, it's either a string or a list, and the list wins as soon as a
 
688
        # ',' appears, so the string concatenation never occur.
 
689
        self.assertEqual(['{foo', '}', '{', 'bar}'],
 
690
                         conf.get_user_option('hidden', expand=True))
 
691
 
 
692
 
 
693
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
 
694
 
 
695
    def get_config(self, location, string=None):
 
696
        if string is None:
 
697
            string = ''
 
698
        # Since we don't save the config we won't strictly require to inherit
 
699
        # from TestCaseInTempDir, but an error occurs so quickly...
 
700
        c = config.LocationConfig.from_string(string, location)
 
701
        return c
 
702
 
 
703
    def test_dont_cross_unrelated_section(self):
 
704
        c = self.get_config('/another/branch/path', '''
 
705
[/one/branch/path]
 
706
foo = hello
 
707
bar = {foo}/2
 
708
 
 
709
[/another/branch/path]
 
710
bar = {foo}/2
 
711
''')
 
712
        self.assertRaises(config.ExpandingUnknownOption,
 
713
                          c.get_user_option, 'bar', expand=True)
 
714
 
 
715
    def test_cross_related_sections(self):
 
716
        c = self.get_config('/project/branch/path', '''
 
717
[/project]
 
718
foo = qu
 
719
 
 
720
[/project/branch/path]
 
721
bar = {foo}ux
 
722
''')
 
723
        self.assertEqual('quux', c.get_user_option('bar', expand=True))
 
724
 
 
725
 
 
726
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
 
727
 
 
728
    def test_cannot_reload_without_name(self):
 
729
        conf = config.IniBasedConfig.from_string(sample_config_text)
 
730
        self.assertRaises(AssertionError, conf.reload)
 
731
 
 
732
    def test_reload_see_new_value(self):
 
733
        c1 = config.IniBasedConfig.from_string('editor=vim\n',
 
734
                                               file_name='./test/conf')
 
735
        c1._write_config_file()
 
736
        c2 = config.IniBasedConfig.from_string('editor=emacs\n',
 
737
                                               file_name='./test/conf')
 
738
        c2._write_config_file()
 
739
        self.assertEqual('vim', c1.get_user_option('editor'))
 
740
        self.assertEqual('emacs', c2.get_user_option('editor'))
 
741
        # Make sure we get the Right value
 
742
        c1.reload()
 
743
        self.assertEqual('emacs', c1.get_user_option('editor'))
 
744
 
 
745
 
 
746
class TestLockableConfig(tests.TestCaseInTempDir):
 
747
 
 
748
    scenarios = lockable_config_scenarios()
 
749
 
 
750
    # Set by load_tests
 
751
    config_class = None
 
752
    config_args = None
 
753
    config_section = None
 
754
 
 
755
    def setUp(self):
 
756
        super(TestLockableConfig, self).setUp()
 
757
        self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
 
758
        self.config = self.create_config(self._content)
 
759
 
 
760
    def get_existing_config(self):
 
761
        return self.config_class(*self.config_args)
 
762
 
 
763
    def create_config(self, content):
 
764
        kwargs = dict(save=True)
 
765
        c = self.config_class.from_string(content, *self.config_args, **kwargs)
 
766
        return c
 
767
 
 
768
    def test_simple_read_access(self):
 
769
        self.assertEqual('1', self.config.get_user_option('one'))
 
770
 
 
771
    def test_simple_write_access(self):
 
772
        self.config.set_user_option('one', 'one')
 
773
        self.assertEqual('one', self.config.get_user_option('one'))
 
774
 
 
775
    def test_listen_to_the_last_speaker(self):
 
776
        c1 = self.config
 
777
        c2 = self.get_existing_config()
 
778
        c1.set_user_option('one', 'ONE')
 
779
        c2.set_user_option('two', 'TWO')
 
780
        self.assertEqual('ONE', c1.get_user_option('one'))
 
781
        self.assertEqual('TWO', c2.get_user_option('two'))
 
782
        # The second update respect the first one
 
783
        self.assertEqual('ONE', c2.get_user_option('one'))
 
784
 
 
785
    def test_last_speaker_wins(self):
 
786
        # If the same config is not shared, the same variable modified twice
 
787
        # can only see a single result.
 
788
        c1 = self.config
 
789
        c2 = self.get_existing_config()
 
790
        c1.set_user_option('one', 'c1')
 
791
        c2.set_user_option('one', 'c2')
 
792
        self.assertEqual('c2', c2._get_user_option('one'))
 
793
        # The first modification is still available until another refresh
 
794
        # occur
 
795
        self.assertEqual('c1', c1._get_user_option('one'))
 
796
        c1.set_user_option('two', 'done')
 
797
        self.assertEqual('c2', c1._get_user_option('one'))
 
798
 
 
799
    def test_writes_are_serialized(self):
 
800
        c1 = self.config
 
801
        c2 = self.get_existing_config()
 
802
 
 
803
        # We spawn a thread that will pause *during* the write
 
804
        before_writing = threading.Event()
 
805
        after_writing = threading.Event()
 
806
        writing_done = threading.Event()
 
807
        c1_orig = c1._write_config_file
 
808
 
 
809
        def c1_write_config_file():
 
810
            before_writing.set()
 
811
            c1_orig()
 
812
            # The lock is held. We wait for the main thread to decide when to
 
813
            # continue
 
814
            after_writing.wait()
 
815
        c1._write_config_file = c1_write_config_file
 
816
 
 
817
        def c1_set_option():
 
818
            c1.set_user_option('one', 'c1')
 
819
            writing_done.set()
 
820
        t1 = threading.Thread(target=c1_set_option)
 
821
        # Collect the thread after the test
 
822
        self.addCleanup(t1.join)
 
823
        # Be ready to unblock the thread if the test goes wrong
 
824
        self.addCleanup(after_writing.set)
 
825
        t1.start()
 
826
        before_writing.wait()
 
827
        self.assertTrue(c1._lock.is_held)
 
828
        self.assertRaises(errors.LockContention,
 
829
                          c2.set_user_option, 'one', 'c2')
 
830
        self.assertEqual('c1', c1.get_user_option('one'))
 
831
        # Let the lock be released
 
832
        after_writing.set()
 
833
        writing_done.wait()
 
834
        c2.set_user_option('one', 'c2')
 
835
        self.assertEqual('c2', c2.get_user_option('one'))
 
836
 
 
837
    def test_read_while_writing(self):
 
838
        c1 = self.config
 
839
        # We spawn a thread that will pause *during* the write
 
840
        ready_to_write = threading.Event()
 
841
        do_writing = threading.Event()
 
842
        writing_done = threading.Event()
 
843
        c1_orig = c1._write_config_file
 
844
 
 
845
        def c1_write_config_file():
 
846
            ready_to_write.set()
 
847
            # The lock is held. We wait for the main thread to decide when to
 
848
            # continue
 
849
            do_writing.wait()
 
850
            c1_orig()
 
851
            writing_done.set()
 
852
        c1._write_config_file = c1_write_config_file
 
853
 
 
854
        def c1_set_option():
 
855
            c1.set_user_option('one', 'c1')
 
856
        t1 = threading.Thread(target=c1_set_option)
 
857
        # Collect the thread after the test
 
858
        self.addCleanup(t1.join)
 
859
        # Be ready to unblock the thread if the test goes wrong
 
860
        self.addCleanup(do_writing.set)
 
861
        t1.start()
 
862
        # Ensure the thread is ready to write
 
863
        ready_to_write.wait()
 
864
        self.assertTrue(c1._lock.is_held)
 
865
        self.assertEqual('c1', c1.get_user_option('one'))
 
866
        # If we read during the write, we get the old value
 
867
        c2 = self.get_existing_config()
 
868
        self.assertEqual('1', c2.get_user_option('one'))
 
869
        # Let the writing occur and ensure it occurred
 
870
        do_writing.set()
 
871
        writing_done.wait()
 
872
        # Now we get the updated value
 
873
        c3 = self.get_existing_config()
 
874
        self.assertEqual('c1', c3.get_user_option('one'))
 
875
 
 
876
 
 
877
class TestGetUserOptionAs(TestIniConfig):
 
878
 
 
879
    def test_get_user_option_as_bool(self):
 
880
        conf, parser = self.make_config_parser("""
 
881
a_true_bool = true
 
882
a_false_bool = 0
 
883
an_invalid_bool = maybe
 
884
a_list = hmm, who knows ? # This is interpreted as a list !
 
885
""")
 
886
        get_bool = conf.get_user_option_as_bool
 
887
        self.assertEqual(True, get_bool('a_true_bool'))
 
888
        self.assertEqual(False, get_bool('a_false_bool'))
 
889
        warnings = []
 
890
 
 
891
        def warning(*args):
 
892
            warnings.append(args[0] % args[1:])
 
893
        self.overrideAttr(trace, 'warning', warning)
 
894
        msg = 'Value "%s" is not a boolean for "%s"'
 
895
        self.assertIs(None, get_bool('an_invalid_bool'))
 
896
        self.assertEqual(msg % ('maybe', 'an_invalid_bool'), warnings[0])
 
897
        warnings = []
 
898
        self.assertIs(None, get_bool('not_defined_in_this_config'))
 
899
        self.assertEqual([], warnings)
 
900
 
 
901
    def test_get_user_option_as_list(self):
 
902
        conf, parser = self.make_config_parser("""
 
903
a_list = a,b,c
 
904
length_1 = 1,
 
905
one_item = x
 
906
""")
 
907
        get_list = conf.get_user_option_as_list
 
908
        self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
 
909
        self.assertEqual(['1'], get_list('length_1'))
 
910
        self.assertEqual('x', conf.get_user_option('one_item'))
 
911
        # automatically cast to list
 
912
        self.assertEqual(['x'], get_list('one_item'))
 
913
 
 
914
 
 
915
class TestSupressWarning(TestIniConfig):
 
916
 
 
917
    def make_warnings_config(self, s):
 
918
        conf, parser = self.make_config_parser(s)
 
919
        return conf.suppress_warning
 
920
 
 
921
    def test_suppress_warning_unknown(self):
 
922
        suppress_warning = self.make_warnings_config('')
 
923
        self.assertEqual(False, suppress_warning('unknown_warning'))
 
924
 
 
925
    def test_suppress_warning_known(self):
 
926
        suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
 
927
        self.assertEqual(False, suppress_warning('c'))
 
928
        self.assertEqual(True, suppress_warning('a'))
 
929
        self.assertEqual(True, suppress_warning('b'))
 
930
 
 
931
 
 
932
class TestGetConfig(tests.TestCaseInTempDir):
 
933
 
 
934
    def test_constructs(self):
 
935
        config.GlobalConfig()
 
936
 
 
937
    def test_calls_read_filenames(self):
 
938
        # replace the class that is constructed, to check its parameters
 
939
        oldparserclass = config.ConfigObj
 
940
        config.ConfigObj = InstrumentedConfigObj
 
941
        my_config = config.GlobalConfig()
 
942
        try:
 
943
            parser = my_config._get_parser()
 
944
        finally:
 
945
            config.ConfigObj = oldparserclass
 
946
        self.assertIsInstance(parser, InstrumentedConfigObj)
 
947
        self.assertEqual(parser._calls, [('__init__', bedding.config_path(),
 
948
                                          'utf-8')])
 
949
 
 
950
 
 
951
class TestBranchConfig(tests.TestCaseWithTransport):
 
952
 
 
953
    def test_constructs_valid(self):
 
954
        branch = FakeBranch()
 
955
        my_config = config.BranchConfig(branch)
 
956
        self.assertIsNot(None, my_config)
 
957
 
 
958
    def test_constructs_error(self):
 
959
        self.assertRaises(TypeError, config.BranchConfig)
 
960
 
 
961
    def test_get_location_config(self):
 
962
        branch = FakeBranch()
 
963
        my_config = config.BranchConfig(branch)
 
964
        location_config = my_config._get_location_config()
 
965
        self.assertEqual(branch.base, location_config.location)
 
966
        self.assertIs(location_config, my_config._get_location_config())
 
967
 
 
968
    def test_get_config(self):
 
969
        """The Branch.get_config method works properly"""
 
970
        b = controldir.ControlDir.create_standalone_workingtree('.').branch
 
971
        my_config = b.get_config()
 
972
        self.assertIs(my_config.get_user_option('wacky'), None)
 
973
        my_config.set_user_option('wacky', 'unlikely')
 
974
        self.assertEqual(my_config.get_user_option('wacky'), 'unlikely')
 
975
 
 
976
        # Ensure we get the same thing if we start again
 
977
        b2 = branch.Branch.open('.')
 
978
        my_config2 = b2.get_config()
 
979
        self.assertEqual(my_config2.get_user_option('wacky'), 'unlikely')
 
980
 
 
981
    def test_has_explicit_nickname(self):
 
982
        b = self.make_branch('.')
 
983
        self.assertFalse(b.get_config().has_explicit_nickname())
 
984
        b.nick = 'foo'
 
985
        self.assertTrue(b.get_config().has_explicit_nickname())
 
986
 
 
987
    def test_config_url(self):
 
988
        """The Branch.get_config will use section that uses a local url"""
 
989
        branch = self.make_branch('branch')
 
990
        self.assertEqual('branch', branch.nick)
 
991
 
 
992
        local_url = urlutils.local_path_to_url('branch')
 
993
        conf = config.LocationConfig.from_string(
 
994
            '[%s]\nnickname = foobar' % (local_url,),
 
995
            local_url, save=True)
 
996
        self.assertIsNot(None, conf)
 
997
        self.assertEqual('foobar', branch.nick)
 
998
 
 
999
    def test_config_local_path(self):
 
1000
        """The Branch.get_config will use a local system path"""
 
1001
        branch = self.make_branch('branch')
 
1002
        self.assertEqual('branch', branch.nick)
 
1003
 
 
1004
        local_path = osutils.getcwd().encode('utf8')
 
1005
        config.LocationConfig.from_string(
 
1006
            b'[%s/branch]\nnickname = barry' % (local_path,),
 
1007
            'branch', save=True)
 
1008
        # Now the branch will find its nick via the location config
 
1009
        self.assertEqual('barry', branch.nick)
 
1010
 
 
1011
    def test_config_creates_local(self):
 
1012
        """Creating a new entry in config uses a local path."""
 
1013
        branch = self.make_branch('branch', format='knit')
 
1014
        branch.set_push_location('http://foobar')
 
1015
        local_path = osutils.getcwd().encode('utf8')
 
1016
        # Surprisingly ConfigObj doesn't create a trailing newline
 
1017
        self.check_file_contents(bedding.locations_config_path(),
 
1018
                                 b'[%s/branch]\n'
 
1019
                                 b'push_location = http://foobar\n'
 
1020
                                 b'push_location:policy = norecurse\n'
 
1021
                                 % (local_path,))
 
1022
 
 
1023
    def test_autonick_urlencoded(self):
 
1024
        b = self.make_branch('!repo')
 
1025
        self.assertEqual('!repo', b.get_config().get_nickname())
 
1026
 
 
1027
    def test_autonick_uses_branch_name(self):
 
1028
        b = self.make_branch('foo', name='bar')
 
1029
        self.assertEqual('bar', b.get_config().get_nickname())
 
1030
 
 
1031
    def test_warn_if_masked(self):
 
1032
        warnings = []
 
1033
 
 
1034
        def warning(*args):
 
1035
            warnings.append(args[0] % args[1:])
 
1036
        self.overrideAttr(trace, 'warning', warning)
 
1037
 
 
1038
        def set_option(store, warn_masked=True):
 
1039
            warnings[:] = []
 
1040
            conf.set_user_option('example_option', repr(store), store=store,
 
1041
                                 warn_masked=warn_masked)
 
1042
 
 
1043
        def assertWarning(warning):
 
1044
            if warning is None:
 
1045
                self.assertEqual(0, len(warnings))
 
1046
            else:
 
1047
                self.assertEqual(1, len(warnings))
 
1048
                self.assertEqual(warning, warnings[0])
 
1049
        branch = self.make_branch('.')
 
1050
        conf = branch.get_config()
 
1051
        set_option(config.STORE_GLOBAL)
 
1052
        assertWarning(None)
 
1053
        set_option(config.STORE_BRANCH)
 
1054
        assertWarning(None)
 
1055
        set_option(config.STORE_GLOBAL)
 
1056
        assertWarning('Value "4" is masked by "3" from branch.conf')
 
1057
        set_option(config.STORE_GLOBAL, warn_masked=False)
 
1058
        assertWarning(None)
 
1059
        set_option(config.STORE_LOCATION)
 
1060
        assertWarning(None)
 
1061
        set_option(config.STORE_BRANCH)
 
1062
        assertWarning('Value "3" is masked by "0" from locations.conf')
 
1063
        set_option(config.STORE_BRANCH, warn_masked=False)
 
1064
        assertWarning(None)
 
1065
 
 
1066
 
 
1067
class TestGlobalConfigItems(tests.TestCaseInTempDir):
 
1068
 
 
1069
    def _get_empty_config(self):
 
1070
        my_config = config.GlobalConfig()
 
1071
        return my_config
 
1072
 
 
1073
    def _get_sample_config(self):
 
1074
        my_config = config.GlobalConfig.from_string(sample_config_text)
 
1075
        return my_config
 
1076
 
 
1077
    def test_user_id(self):
 
1078
        my_config = config.GlobalConfig.from_string(sample_config_text)
 
1079
        self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
 
1080
                         my_config._get_user_id())
 
1081
 
 
1082
    def test_absent_user_id(self):
 
1083
        my_config = config.GlobalConfig()
 
1084
        self.assertEqual(None, my_config._get_user_id())
 
1085
 
 
1086
    def test_get_user_option_default(self):
 
1087
        my_config = self._get_empty_config()
 
1088
        self.assertEqual(None, my_config.get_user_option('no_option'))
 
1089
 
 
1090
    def test_get_user_option_global(self):
 
1091
        my_config = self._get_sample_config()
 
1092
        self.assertEqual("something",
 
1093
                         my_config.get_user_option('user_global_option'))
 
1094
 
 
1095
    def test_configured_validate_signatures_in_log(self):
 
1096
        my_config = self._get_sample_config()
 
1097
        self.assertEqual(True, my_config.validate_signatures_in_log())
 
1098
 
 
1099
    def test_get_alias(self):
 
1100
        my_config = self._get_sample_config()
 
1101
        self.assertEqual('help', my_config.get_alias('h'))
 
1102
 
 
1103
    def test_get_aliases(self):
 
1104
        my_config = self._get_sample_config()
 
1105
        aliases = my_config.get_aliases()
 
1106
        self.assertEqual(2, len(aliases))
 
1107
        sorted_keys = sorted(aliases)
 
1108
        self.assertEqual('help', aliases[sorted_keys[0]])
 
1109
        self.assertEqual(sample_long_alias, aliases[sorted_keys[1]])
 
1110
 
 
1111
    def test_get_no_alias(self):
 
1112
        my_config = self._get_sample_config()
 
1113
        self.assertEqual(None, my_config.get_alias('foo'))
 
1114
 
 
1115
    def test_get_long_alias(self):
 
1116
        my_config = self._get_sample_config()
 
1117
        self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
 
1118
 
 
1119
    def test_get_change_editor(self):
 
1120
        my_config = self._get_sample_config()
 
1121
        change_editor = my_config.get_change_editor('old', 'new')
 
1122
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
1123
        self.assertEqual('vimdiff -of {new_path} {old_path}',
 
1124
                         ' '.join(change_editor.command_template))
 
1125
 
 
1126
    def test_get_no_change_editor(self):
 
1127
        my_config = self._get_empty_config()
 
1128
        change_editor = my_config.get_change_editor('old', 'new')
 
1129
        self.assertIs(None, change_editor)
 
1130
 
 
1131
    def test_get_merge_tools(self):
 
1132
        conf = self._get_sample_config()
 
1133
        tools = conf.get_merge_tools()
 
1134
        self.log(repr(tools))
 
1135
        self.assertEqual(
 
1136
            {u'funkytool': u'funkytool "arg with spaces" {this_temp}',
 
1137
             u'sometool': u'sometool {base} {this} {other} -o {result}',
 
1138
             u'newtool': u'"newtool with spaces" {this_temp}'},
 
1139
            tools)
 
1140
 
 
1141
    def test_get_merge_tools_empty(self):
 
1142
        conf = self._get_empty_config()
 
1143
        tools = conf.get_merge_tools()
 
1144
        self.assertEqual({}, tools)
 
1145
 
 
1146
    def test_find_merge_tool(self):
 
1147
        conf = self._get_sample_config()
 
1148
        cmdline = conf.find_merge_tool('sometool')
 
1149
        self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
 
1150
 
 
1151
    def test_find_merge_tool_not_found(self):
 
1152
        conf = self._get_sample_config()
 
1153
        cmdline = conf.find_merge_tool('DOES NOT EXIST')
 
1154
        self.assertIs(cmdline, None)
 
1155
 
 
1156
    def test_find_merge_tool_known(self):
 
1157
        conf = self._get_empty_config()
 
1158
        cmdline = conf.find_merge_tool('kdiff3')
 
1159
        self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
 
1160
 
 
1161
    def test_find_merge_tool_override_known(self):
 
1162
        conf = self._get_empty_config()
 
1163
        conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
 
1164
        cmdline = conf.find_merge_tool('kdiff3')
 
1165
        self.assertEqual('kdiff3 blah', cmdline)
 
1166
 
 
1167
 
 
1168
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
 
1169
 
 
1170
    def test_empty(self):
 
1171
        my_config = config.GlobalConfig()
 
1172
        self.assertEqual(0, len(my_config.get_aliases()))
 
1173
 
 
1174
    def test_set_alias(self):
 
1175
        my_config = config.GlobalConfig()
 
1176
        alias_value = 'commit --strict'
 
1177
        my_config.set_alias('commit', alias_value)
 
1178
        new_config = config.GlobalConfig()
 
1179
        self.assertEqual(alias_value, new_config.get_alias('commit'))
 
1180
 
 
1181
    def test_remove_alias(self):
 
1182
        my_config = config.GlobalConfig()
 
1183
        my_config.set_alias('commit', 'commit --strict')
 
1184
        # Now remove the alias again.
 
1185
        my_config.unset_alias('commit')
 
1186
        new_config = config.GlobalConfig()
 
1187
        self.assertIs(None, new_config.get_alias('commit'))
 
1188
 
 
1189
 
 
1190
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
 
1191
 
 
1192
    def test_constructs_valid(self):
 
1193
        config.LocationConfig('http://example.com')
 
1194
 
 
1195
    def test_constructs_error(self):
 
1196
        self.assertRaises(TypeError, config.LocationConfig)
 
1197
 
 
1198
    def test_branch_calls_read_filenames(self):
 
1199
        # This is testing the correct file names are provided.
 
1200
        # TODO: consolidate with the test for GlobalConfigs filename checks.
 
1201
        #
 
1202
        # replace the class that is constructed, to check its parameters
 
1203
        oldparserclass = config.ConfigObj
 
1204
        config.ConfigObj = InstrumentedConfigObj
 
1205
        try:
 
1206
            my_config = config.LocationConfig('http://www.example.com')
 
1207
            parser = my_config._get_parser()
 
1208
        finally:
 
1209
            config.ConfigObj = oldparserclass
 
1210
        self.assertIsInstance(parser, InstrumentedConfigObj)
 
1211
        self.assertEqual(parser._calls,
 
1212
                         [('__init__', bedding.locations_config_path(),
 
1213
                           'utf-8')])
 
1214
 
 
1215
    def test_get_global_config(self):
 
1216
        my_config = config.BranchConfig(FakeBranch('http://example.com'))
 
1217
        global_config = my_config._get_global_config()
 
1218
        self.assertIsInstance(global_config, config.GlobalConfig)
 
1219
        self.assertIs(global_config, my_config._get_global_config())
 
1220
 
 
1221
    def assertLocationMatching(self, expected):
 
1222
        self.assertEqual(expected,
 
1223
                         list(self.my_location_config._get_matching_sections()))
 
1224
 
 
1225
    def test__get_matching_sections_no_match(self):
 
1226
        self.get_branch_config('/')
 
1227
        self.assertLocationMatching([])
 
1228
 
 
1229
    def test__get_matching_sections_exact(self):
 
1230
        self.get_branch_config('http://www.example.com')
 
1231
        self.assertLocationMatching([('http://www.example.com', '')])
 
1232
 
 
1233
    def test__get_matching_sections_suffix_does_not(self):
 
1234
        self.get_branch_config('http://www.example.com-com')
 
1235
        self.assertLocationMatching([])
 
1236
 
 
1237
    def test__get_matching_sections_subdir_recursive(self):
 
1238
        self.get_branch_config('http://www.example.com/com')
 
1239
        self.assertLocationMatching([('http://www.example.com', 'com')])
 
1240
 
 
1241
    def test__get_matching_sections_ignoreparent(self):
 
1242
        self.get_branch_config('http://www.example.com/ignoreparent')
 
1243
        self.assertLocationMatching([('http://www.example.com/ignoreparent',
 
1244
                                      '')])
 
1245
 
 
1246
    def test__get_matching_sections_ignoreparent_subdir(self):
 
1247
        self.get_branch_config(
 
1248
            'http://www.example.com/ignoreparent/childbranch')
 
1249
        self.assertLocationMatching([('http://www.example.com/ignoreparent',
 
1250
                                      'childbranch')])
 
1251
 
 
1252
    def test__get_matching_sections_subdir_trailing_slash(self):
 
1253
        self.get_branch_config('/b')
 
1254
        self.assertLocationMatching([('/b/', '')])
 
1255
 
 
1256
    def test__get_matching_sections_subdir_child(self):
 
1257
        self.get_branch_config('/a/foo')
 
1258
        self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
 
1259
 
 
1260
    def test__get_matching_sections_subdir_child_child(self):
 
1261
        self.get_branch_config('/a/foo/bar')
 
1262
        self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
 
1263
 
 
1264
    def test__get_matching_sections_trailing_slash_with_children(self):
 
1265
        self.get_branch_config('/a/')
 
1266
        self.assertLocationMatching([('/a/', '')])
 
1267
 
 
1268
    def test__get_matching_sections_explicit_over_glob(self):
 
1269
        # XXX: 2006-09-08 jamesh
 
1270
        # This test only passes because ord('c') > ord('*').  If there
 
1271
        # was a config section for '/a/?', it would get precedence
 
1272
        # over '/a/c'.
 
1273
        self.get_branch_config('/a/c')
 
1274
        self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
 
1275
 
 
1276
    def test__get_option_policy_normal(self):
 
1277
        self.get_branch_config('http://www.example.com')
 
1278
        self.assertEqual(
 
1279
            self.my_location_config._get_config_policy(
 
1280
                'http://www.example.com', 'normal_option'),
 
1281
            config.POLICY_NONE)
 
1282
 
 
1283
    def test__get_option_policy_norecurse(self):
 
1284
        self.get_branch_config('http://www.example.com')
 
1285
        self.assertEqual(
 
1286
            self.my_location_config._get_option_policy(
 
1287
                'http://www.example.com', 'norecurse_option'),
 
1288
            config.POLICY_NORECURSE)
 
1289
        # Test old recurse=False setting:
 
1290
        self.assertEqual(
 
1291
            self.my_location_config._get_option_policy(
 
1292
                'http://www.example.com/norecurse', 'normal_option'),
 
1293
            config.POLICY_NORECURSE)
 
1294
 
 
1295
    def test__get_option_policy_normal(self):
 
1296
        self.get_branch_config('http://www.example.com')
 
1297
        self.assertEqual(
 
1298
            self.my_location_config._get_option_policy(
 
1299
                'http://www.example.com', 'appendpath_option'),
 
1300
            config.POLICY_APPENDPATH)
 
1301
 
 
1302
    def test__get_options_with_policy(self):
 
1303
        self.get_branch_config('/dir/subdir',
 
1304
                               location_config="""\
 
1305
[/dir]
 
1306
other_url = /other-dir
 
1307
other_url:policy = appendpath
 
1308
[/dir/subdir]
 
1309
other_url = /other-subdir
 
1310
""")
 
1311
        self.assertOptions(
 
1312
            [(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
 
1313
             (u'other_url', u'/other-dir', u'/dir', 'locations'),
 
1314
             (u'other_url:policy', u'appendpath', u'/dir', 'locations')],
 
1315
            self.my_location_config)
 
1316
 
 
1317
    def test_location_without_username(self):
 
1318
        self.get_branch_config('http://www.example.com/ignoreparent')
 
1319
        self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
 
1320
                         self.my_config.username())
 
1321
 
 
1322
    def test_location_not_listed(self):
 
1323
        """Test that the global username is used when no location matches"""
 
1324
        self.get_branch_config('/home/robertc/sources')
 
1325
        self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
 
1326
                         self.my_config.username())
 
1327
 
 
1328
    def test_overriding_location(self):
 
1329
        self.get_branch_config('http://www.example.com/foo')
 
1330
        self.assertEqual('Robert Collins <robertc@example.org>',
 
1331
                         self.my_config.username())
 
1332
 
 
1333
    def test_get_user_option_global(self):
 
1334
        self.get_branch_config('/a')
 
1335
        self.assertEqual('something',
 
1336
                         self.my_config.get_user_option('user_global_option'))
 
1337
 
 
1338
    def test_get_user_option_local(self):
 
1339
        self.get_branch_config('/a')
 
1340
        self.assertEqual('local',
 
1341
                         self.my_config.get_user_option('user_local_option'))
 
1342
 
 
1343
    def test_get_user_option_appendpath(self):
 
1344
        # returned as is for the base path:
 
1345
        self.get_branch_config('http://www.example.com')
 
1346
        self.assertEqual('append',
 
1347
                         self.my_config.get_user_option('appendpath_option'))
 
1348
        # Extra path components get appended:
 
1349
        self.get_branch_config('http://www.example.com/a/b/c')
 
1350
        self.assertEqual('append/a/b/c',
 
1351
                         self.my_config.get_user_option('appendpath_option'))
 
1352
        # Overriden for http://www.example.com/dir, where it is a
 
1353
        # normal option:
 
1354
        self.get_branch_config('http://www.example.com/dir/a/b/c')
 
1355
        self.assertEqual('normal',
 
1356
                         self.my_config.get_user_option('appendpath_option'))
 
1357
 
 
1358
    def test_get_user_option_norecurse(self):
 
1359
        self.get_branch_config('http://www.example.com')
 
1360
        self.assertEqual('norecurse',
 
1361
                         self.my_config.get_user_option('norecurse_option'))
 
1362
        self.get_branch_config('http://www.example.com/dir')
 
1363
        self.assertEqual(None,
 
1364
                         self.my_config.get_user_option('norecurse_option'))
 
1365
        # http://www.example.com/norecurse is a recurse=False section
 
1366
        # that redefines normal_option.  Subdirectories do not pick up
 
1367
        # this redefinition.
 
1368
        self.get_branch_config('http://www.example.com/norecurse')
 
1369
        self.assertEqual('norecurse',
 
1370
                         self.my_config.get_user_option('normal_option'))
 
1371
        self.get_branch_config('http://www.example.com/norecurse/subdir')
 
1372
        self.assertEqual('normal',
 
1373
                         self.my_config.get_user_option('normal_option'))
 
1374
 
 
1375
    def test_set_user_option_norecurse(self):
 
1376
        self.get_branch_config('http://www.example.com')
 
1377
        self.my_config.set_user_option('foo', 'bar',
 
1378
                                       store=config.STORE_LOCATION_NORECURSE)
 
1379
        self.assertEqual(
 
1380
            self.my_location_config._get_option_policy(
 
1381
                'http://www.example.com', 'foo'),
 
1382
            config.POLICY_NORECURSE)
 
1383
 
 
1384
    def test_set_user_option_appendpath(self):
 
1385
        self.get_branch_config('http://www.example.com')
 
1386
        self.my_config.set_user_option('foo', 'bar',
 
1387
                                       store=config.STORE_LOCATION_APPENDPATH)
 
1388
        self.assertEqual(
 
1389
            self.my_location_config._get_option_policy(
 
1390
                'http://www.example.com', 'foo'),
 
1391
            config.POLICY_APPENDPATH)
 
1392
 
 
1393
    def test_set_user_option_change_policy(self):
 
1394
        self.get_branch_config('http://www.example.com')
 
1395
        self.my_config.set_user_option('norecurse_option', 'normal',
 
1396
                                       store=config.STORE_LOCATION)
 
1397
        self.assertEqual(
 
1398
            self.my_location_config._get_option_policy(
 
1399
                'http://www.example.com', 'norecurse_option'),
 
1400
            config.POLICY_NONE)
 
1401
 
 
1402
    def get_branch_config(self, location, global_config=None,
 
1403
                          location_config=None):
 
1404
        my_branch = FakeBranch(location)
 
1405
        if global_config is None:
 
1406
            global_config = sample_config_text
 
1407
        if location_config is None:
 
1408
            location_config = sample_branches_text
 
1409
 
 
1410
        config.GlobalConfig.from_string(global_config, save=True)
 
1411
        config.LocationConfig.from_string(location_config, my_branch.base,
 
1412
                                          save=True)
 
1413
        my_config = config.BranchConfig(my_branch)
 
1414
        self.my_config = my_config
 
1415
        self.my_location_config = my_config._get_location_config()
 
1416
 
 
1417
    def test_set_user_setting_sets_and_saves2(self):
 
1418
        self.get_branch_config('/a/c')
 
1419
        self.assertIs(self.my_config.get_user_option('foo'), None)
 
1420
        self.my_config.set_user_option('foo', 'bar')
 
1421
        self.assertEqual(
 
1422
            self.my_config.branch.control_files.files['branch.conf'].strip(),
 
1423
            b'foo = bar')
 
1424
        self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
 
1425
        self.my_config.set_user_option('foo', 'baz',
 
1426
                                       store=config.STORE_LOCATION)
 
1427
        self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
 
1428
        self.my_config.set_user_option('foo', 'qux')
 
1429
        self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
 
1430
 
 
1431
    def test_get_bzr_remote_path(self):
 
1432
        my_config = config.LocationConfig('/a/c')
 
1433
        self.assertEqual('bzr', my_config.get_bzr_remote_path())
 
1434
        my_config.set_user_option('bzr_remote_path', '/path-bzr')
 
1435
        self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
 
1436
        self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
 
1437
        self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
 
1438
 
 
1439
 
 
1440
precedence_global = b'option = global'
 
1441
precedence_branch = b'option = branch'
 
1442
precedence_location = b"""
 
1443
[http://]
 
1444
recurse = true
 
1445
option = recurse
 
1446
[http://example.com/specific]
 
1447
option = exact
 
1448
"""
 
1449
 
 
1450
 
 
1451
class TestBranchConfigItems(tests.TestCaseInTempDir):
 
1452
 
 
1453
    def get_branch_config(self, global_config=None, location=None,
 
1454
                          location_config=None, branch_data_config=None):
 
1455
        my_branch = FakeBranch(location)
 
1456
        if global_config is not None:
 
1457
            config.GlobalConfig.from_string(global_config, save=True)
 
1458
        if location_config is not None:
 
1459
            config.LocationConfig.from_string(location_config, my_branch.base,
 
1460
                                              save=True)
 
1461
        my_config = config.BranchConfig(my_branch)
 
1462
        if branch_data_config is not None:
 
1463
            my_config.branch.control_files.files['branch.conf'] = \
 
1464
                branch_data_config
 
1465
        return my_config
 
1466
 
 
1467
    def test_user_id(self):
 
1468
        branch = FakeBranch()
 
1469
        my_config = config.BranchConfig(branch)
 
1470
        self.assertIsNot(None, my_config.username())
 
1471
        my_config.branch.control_files.files['email'] = "John"
 
1472
        my_config.set_user_option('email',
 
1473
                                  "Robert Collins <robertc@example.org>")
 
1474
        self.assertEqual("Robert Collins <robertc@example.org>",
 
1475
                         my_config.username())
 
1476
 
 
1477
    def test_BRZ_EMAIL_OVERRIDES(self):
 
1478
        self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
 
1479
        branch = FakeBranch()
 
1480
        my_config = config.BranchConfig(branch)
 
1481
        self.assertEqual("Robert Collins <robertc@example.org>",
 
1482
                         my_config.username())
 
1483
 
 
1484
    def test_BRZ_EMAIL_OVERRIDES(self):
 
1485
        self.overrideEnv('BZR_EMAIL', "Robert Collins <robertc@example.org>")
 
1486
        branch = FakeBranch()
 
1487
        my_config = config.BranchConfig(branch)
 
1488
        self.assertEqual("Robert Collins <robertc@example.org>",
 
1489
                         my_config.username())
 
1490
 
 
1491
    def test_get_user_option_global(self):
 
1492
        my_config = self.get_branch_config(global_config=sample_config_text)
 
1493
        self.assertEqual('something',
 
1494
                         my_config.get_user_option('user_global_option'))
 
1495
 
 
1496
    def test_config_precedence(self):
 
1497
        # FIXME: eager test, luckily no persitent config file makes it fail
 
1498
        # -- vila 20100716
 
1499
        my_config = self.get_branch_config(global_config=precedence_global)
 
1500
        self.assertEqual(my_config.get_user_option('option'), 'global')
 
1501
        my_config = self.get_branch_config(global_config=precedence_global,
 
1502
                                           branch_data_config=precedence_branch)
 
1503
        self.assertEqual(my_config.get_user_option('option'), 'branch')
 
1504
        my_config = self.get_branch_config(
 
1505
            global_config=precedence_global,
 
1506
            branch_data_config=precedence_branch,
 
1507
            location_config=precedence_location)
 
1508
        self.assertEqual(my_config.get_user_option('option'), 'recurse')
 
1509
        my_config = self.get_branch_config(
 
1510
            global_config=precedence_global,
 
1511
            branch_data_config=precedence_branch,
 
1512
            location_config=precedence_location,
 
1513
            location='http://example.com/specific')
 
1514
        self.assertEqual(my_config.get_user_option('option'), 'exact')
 
1515
 
 
1516
 
 
1517
class TestMailAddressExtraction(tests.TestCase):
 
1518
 
 
1519
    def test_extract_email_address(self):
 
1520
        self.assertEqual('jane@test.com',
 
1521
                         config.extract_email_address('Jane <jane@test.com>'))
 
1522
        self.assertRaises(config.NoEmailInUsername,
 
1523
                          config.extract_email_address, 'Jane Tester')
 
1524
 
 
1525
    def test_parse_username(self):
 
1526
        self.assertEqual(('', 'jdoe@example.com'),
 
1527
                         config.parse_username('jdoe@example.com'))
 
1528
        self.assertEqual(('', 'jdoe@example.com'),
 
1529
                         config.parse_username('<jdoe@example.com>'))
 
1530
        self.assertEqual(('John Doe', 'jdoe@example.com'),
 
1531
                         config.parse_username('John Doe <jdoe@example.com>'))
 
1532
        self.assertEqual(('John Doe', ''),
 
1533
                         config.parse_username('John Doe'))
 
1534
        self.assertEqual(('John Doe', 'jdoe@example.com'),
 
1535
                         config.parse_username('John Doe jdoe@example.com'))
 
1536
 
 
1537
 
 
1538
class TestTreeConfig(tests.TestCaseWithTransport):
 
1539
 
 
1540
    def test_get_value(self):
 
1541
        """Test that retreiving a value from a section is possible"""
 
1542
        branch = self.make_branch('.')
 
1543
        tree_config = config.TreeConfig(branch)
 
1544
        tree_config.set_option('value', 'key', 'SECTION')
 
1545
        tree_config.set_option('value2', 'key2')
 
1546
        tree_config.set_option('value3-top', 'key3')
 
1547
        tree_config.set_option('value3-section', 'key3', 'SECTION')
 
1548
        value = tree_config.get_option('key', 'SECTION')
 
1549
        self.assertEqual(value, 'value')
 
1550
        value = tree_config.get_option('key2')
 
1551
        self.assertEqual(value, 'value2')
 
1552
        self.assertEqual(tree_config.get_option('non-existant'), None)
 
1553
        value = tree_config.get_option('non-existant', 'SECTION')
 
1554
        self.assertEqual(value, None)
 
1555
        value = tree_config.get_option('non-existant', default='default')
 
1556
        self.assertEqual(value, 'default')
 
1557
        self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
 
1558
        value = tree_config.get_option('key2', 'NOSECTION', default='default')
 
1559
        self.assertEqual(value, 'default')
 
1560
        value = tree_config.get_option('key3')
 
1561
        self.assertEqual(value, 'value3-top')
 
1562
        value = tree_config.get_option('key3', 'SECTION')
 
1563
        self.assertEqual(value, 'value3-section')
 
1564
 
 
1565
 
 
1566
class TestTransportConfig(tests.TestCaseWithTransport):
 
1567
 
 
1568
    def test_load_utf8(self):
 
1569
        """Ensure we can load an utf8-encoded file."""
 
1570
        t = self.get_transport()
 
1571
        unicode_user = u'b\N{Euro Sign}ar'
 
1572
        unicode_content = u'user=%s' % (unicode_user,)
 
1573
        utf8_content = unicode_content.encode('utf8')
 
1574
        # Store the raw content in the config file
 
1575
        t.put_bytes('foo.conf', utf8_content)
 
1576
        conf = config.TransportConfig(t, 'foo.conf')
 
1577
        self.assertEqual(unicode_user, conf.get_option('user'))
 
1578
 
 
1579
    def test_load_non_ascii(self):
 
1580
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
1581
        t = self.get_transport()
 
1582
        t.put_bytes('foo.conf', b'user=foo\n#\xff\n')
 
1583
        conf = config.TransportConfig(t, 'foo.conf')
 
1584
        self.assertRaises(config.ConfigContentError, conf._get_configobj)
 
1585
 
 
1586
    def test_load_erroneous_content(self):
 
1587
        """Ensure we display a proper error on content that can't be parsed."""
 
1588
        t = self.get_transport()
 
1589
        t.put_bytes('foo.conf', b'[open_section\n')
 
1590
        conf = config.TransportConfig(t, 'foo.conf')
 
1591
        self.assertRaises(config.ParseConfigError, conf._get_configobj)
 
1592
 
 
1593
    def test_load_permission_denied(self):
 
1594
        """Ensure we get an empty config file if the file is inaccessible."""
 
1595
        warnings = []
 
1596
 
 
1597
        def warning(*args):
 
1598
            warnings.append(args[0] % args[1:])
 
1599
        self.overrideAttr(trace, 'warning', warning)
 
1600
 
 
1601
        class DenyingTransport(object):
 
1602
 
 
1603
            def __init__(self, base):
 
1604
                self.base = base
 
1605
 
 
1606
            def get_bytes(self, relpath):
 
1607
                raise errors.PermissionDenied(relpath, "")
 
1608
 
 
1609
        cfg = config.TransportConfig(
 
1610
            DenyingTransport("nonexisting://"), 'control.conf')
 
1611
        self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
 
1612
        self.assertEqual(
 
1613
            warnings,
 
1614
            [u'Permission denied while trying to open configuration file '
 
1615
             u'nonexisting:///control.conf.'])
 
1616
 
 
1617
    def test_get_value(self):
 
1618
        """Test that retreiving a value from a section is possible"""
 
1619
        bzrdir_config = config.TransportConfig(self.get_transport('.'),
 
1620
                                               'control.conf')
 
1621
        bzrdir_config.set_option('value', 'key', 'SECTION')
 
1622
        bzrdir_config.set_option('value2', 'key2')
 
1623
        bzrdir_config.set_option('value3-top', 'key3')
 
1624
        bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
 
1625
        value = bzrdir_config.get_option('key', 'SECTION')
 
1626
        self.assertEqual(value, 'value')
 
1627
        value = bzrdir_config.get_option('key2')
 
1628
        self.assertEqual(value, 'value2')
 
1629
        self.assertEqual(bzrdir_config.get_option('non-existant'), None)
 
1630
        value = bzrdir_config.get_option('non-existant', 'SECTION')
 
1631
        self.assertEqual(value, None)
 
1632
        value = bzrdir_config.get_option('non-existant', default='default')
 
1633
        self.assertEqual(value, 'default')
 
1634
        self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
 
1635
        value = bzrdir_config.get_option('key2', 'NOSECTION',
 
1636
                                         default='default')
 
1637
        self.assertEqual(value, 'default')
 
1638
        value = bzrdir_config.get_option('key3')
 
1639
        self.assertEqual(value, 'value3-top')
 
1640
        value = bzrdir_config.get_option('key3', 'SECTION')
 
1641
        self.assertEqual(value, 'value3-section')
 
1642
 
 
1643
    def test_set_unset_default_stack_on(self):
 
1644
        my_dir = self.make_controldir('.')
 
1645
        bzrdir_config = config.BzrDirConfig(my_dir)
 
1646
        self.assertIs(None, bzrdir_config.get_default_stack_on())
 
1647
        bzrdir_config.set_default_stack_on('Foo')
 
1648
        self.assertEqual('Foo', bzrdir_config._config.get_option(
 
1649
                         'default_stack_on'))
 
1650
        self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
 
1651
        bzrdir_config.set_default_stack_on(None)
 
1652
        self.assertIs(None, bzrdir_config.get_default_stack_on())
 
1653
 
 
1654
 
 
1655
class TestOldConfigHooks(tests.TestCaseWithTransport):
 
1656
 
 
1657
    def setUp(self):
 
1658
        super(TestOldConfigHooks, self).setUp()
 
1659
        create_configs_with_file_option(self)
 
1660
 
 
1661
    def assertGetHook(self, conf, name, value):
 
1662
        calls = []
 
1663
 
 
1664
        def hook(*args):
 
1665
            calls.append(args)
 
1666
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
1667
        self.addCleanup(
 
1668
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
1669
        self.assertLength(0, calls)
 
1670
        actual_value = conf.get_user_option(name)
 
1671
        self.assertEqual(value, actual_value)
 
1672
        self.assertLength(1, calls)
 
1673
        self.assertEqual((conf, name, value), calls[0])
 
1674
 
 
1675
    def test_get_hook_breezy(self):
 
1676
        self.assertGetHook(self.breezy_config, 'file', 'breezy')
 
1677
 
 
1678
    def test_get_hook_locations(self):
 
1679
        self.assertGetHook(self.locations_config, 'file', 'locations')
 
1680
 
 
1681
    def test_get_hook_branch(self):
 
1682
        # Since locations masks branch, we define a different option
 
1683
        self.branch_config.set_user_option('file2', 'branch')
 
1684
        self.assertGetHook(self.branch_config, 'file2', 'branch')
 
1685
 
 
1686
    def assertSetHook(self, conf, name, value):
 
1687
        calls = []
 
1688
 
 
1689
        def hook(*args):
 
1690
            calls.append(args)
 
1691
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
1692
        self.addCleanup(
 
1693
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
1694
        self.assertLength(0, calls)
 
1695
        conf.set_user_option(name, value)
 
1696
        self.assertLength(1, calls)
 
1697
        # We can't assert the conf object below as different configs use
 
1698
        # different means to implement set_user_option and we care only about
 
1699
        # coverage here.
 
1700
        self.assertEqual((name, value), calls[0][1:])
 
1701
 
 
1702
    def test_set_hook_breezy(self):
 
1703
        self.assertSetHook(self.breezy_config, 'foo', 'breezy')
 
1704
 
 
1705
    def test_set_hook_locations(self):
 
1706
        self.assertSetHook(self.locations_config, 'foo', 'locations')
 
1707
 
 
1708
    def test_set_hook_branch(self):
 
1709
        self.assertSetHook(self.branch_config, 'foo', 'branch')
 
1710
 
 
1711
    def assertRemoveHook(self, conf, name, section_name=None):
 
1712
        calls = []
 
1713
 
 
1714
        def hook(*args):
 
1715
            calls.append(args)
 
1716
        config.OldConfigHooks.install_named_hook('remove', hook, None)
 
1717
        self.addCleanup(
 
1718
            config.OldConfigHooks.uninstall_named_hook, 'remove', None)
 
1719
        self.assertLength(0, calls)
 
1720
        conf.remove_user_option(name, section_name)
 
1721
        self.assertLength(1, calls)
 
1722
        # We can't assert the conf object below as different configs use
 
1723
        # different means to implement remove_user_option and we care only about
 
1724
        # coverage here.
 
1725
        self.assertEqual((name,), calls[0][1:])
 
1726
 
 
1727
    def test_remove_hook_breezy(self):
 
1728
        self.assertRemoveHook(self.breezy_config, 'file')
 
1729
 
 
1730
    def test_remove_hook_locations(self):
 
1731
        self.assertRemoveHook(self.locations_config, 'file',
 
1732
                              self.locations_config.location)
 
1733
 
 
1734
    def test_remove_hook_branch(self):
 
1735
        self.assertRemoveHook(self.branch_config, 'file')
 
1736
 
 
1737
    def assertLoadHook(self, name, conf_class, *conf_args):
 
1738
        calls = []
 
1739
 
 
1740
        def hook(*args):
 
1741
            calls.append(args)
 
1742
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
1743
        self.addCleanup(
 
1744
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
1745
        self.assertLength(0, calls)
 
1746
        # Build a config
 
1747
        conf = conf_class(*conf_args)
 
1748
        # Access an option to trigger a load
 
1749
        conf.get_user_option(name)
 
1750
        self.assertLength(1, calls)
 
1751
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1752
 
 
1753
    def test_load_hook_breezy(self):
 
1754
        self.assertLoadHook('file', config.GlobalConfig)
 
1755
 
 
1756
    def test_load_hook_locations(self):
 
1757
        self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
 
1758
 
 
1759
    def test_load_hook_branch(self):
 
1760
        self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
 
1761
 
 
1762
    def assertSaveHook(self, conf):
 
1763
        calls = []
 
1764
 
 
1765
        def hook(*args):
 
1766
            calls.append(args)
 
1767
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
1768
        self.addCleanup(
 
1769
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
1770
        self.assertLength(0, calls)
 
1771
        # Setting an option triggers a save
 
1772
        conf.set_user_option('foo', 'bar')
 
1773
        self.assertLength(1, calls)
 
1774
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1775
 
 
1776
    def test_save_hook_breezy(self):
 
1777
        self.assertSaveHook(self.breezy_config)
 
1778
 
 
1779
    def test_save_hook_locations(self):
 
1780
        self.assertSaveHook(self.locations_config)
 
1781
 
 
1782
    def test_save_hook_branch(self):
 
1783
        self.assertSaveHook(self.branch_config)
 
1784
 
 
1785
 
 
1786
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
 
1787
    """Tests config hooks for remote configs.
 
1788
 
 
1789
    No tests for the remove hook as this is not implemented there.
 
1790
    """
 
1791
 
 
1792
    def setUp(self):
 
1793
        super(TestOldConfigHooksForRemote, self).setUp()
 
1794
        self.transport_server = test_server.SmartTCPServer_for_testing
 
1795
        create_configs_with_file_option(self)
 
1796
 
 
1797
    def assertGetHook(self, conf, name, value):
 
1798
        calls = []
 
1799
 
 
1800
        def hook(*args):
 
1801
            calls.append(args)
 
1802
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
1803
        self.addCleanup(
 
1804
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
1805
        self.assertLength(0, calls)
 
1806
        actual_value = conf.get_option(name)
 
1807
        self.assertEqual(value, actual_value)
 
1808
        self.assertLength(1, calls)
 
1809
        self.assertEqual((conf, name, value), calls[0])
 
1810
 
 
1811
    def test_get_hook_remote_branch(self):
 
1812
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1813
        self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
 
1814
 
 
1815
    def test_get_hook_remote_bzrdir(self):
 
1816
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1817
        conf = remote_bzrdir._get_config()
 
1818
        conf.set_option('remotedir', 'file')
 
1819
        self.assertGetHook(conf, 'file', 'remotedir')
 
1820
 
 
1821
    def assertSetHook(self, conf, name, value):
 
1822
        calls = []
 
1823
 
 
1824
        def hook(*args):
 
1825
            calls.append(args)
 
1826
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
1827
        self.addCleanup(
 
1828
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
1829
        self.assertLength(0, calls)
 
1830
        conf.set_option(value, name)
 
1831
        self.assertLength(1, calls)
 
1832
        # We can't assert the conf object below as different configs use
 
1833
        # different means to implement set_user_option and we care only about
 
1834
        # coverage here.
 
1835
        self.assertEqual((name, value), calls[0][1:])
 
1836
 
 
1837
    def test_set_hook_remote_branch(self):
 
1838
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1839
        self.addCleanup(remote_branch.lock_write().unlock)
 
1840
        self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
 
1841
 
 
1842
    def test_set_hook_remote_bzrdir(self):
 
1843
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1844
        self.addCleanup(remote_branch.lock_write().unlock)
 
1845
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1846
        self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
 
1847
 
 
1848
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
 
1849
        calls = []
 
1850
 
 
1851
        def hook(*args):
 
1852
            calls.append(args)
 
1853
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
1854
        self.addCleanup(
 
1855
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
1856
        self.assertLength(0, calls)
 
1857
        # Build a config
 
1858
        conf = conf_class(*conf_args)
 
1859
        # Access an option to trigger a load
 
1860
        conf.get_option(name)
 
1861
        self.assertLength(expected_nb_calls, calls)
 
1862
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1863
 
 
1864
    def test_load_hook_remote_branch(self):
 
1865
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1866
        self.assertLoadHook(
 
1867
            1, 'file', remote.RemoteBranchConfig, remote_branch)
 
1868
 
 
1869
    def test_load_hook_remote_bzrdir(self):
 
1870
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1871
        # The config file doesn't exist, set an option to force its creation
 
1872
        conf = remote_bzrdir._get_config()
 
1873
        conf.set_option('remotedir', 'file')
 
1874
        # We get one call for the server and one call for the client, this is
 
1875
        # caused by the differences in implementations betwen
 
1876
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
 
1877
        # SmartServerBranchGetConfigFile (in smart/branch.py)
 
1878
        self.assertLoadHook(
 
1879
            2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
 
1880
 
 
1881
    def assertSaveHook(self, conf):
 
1882
        calls = []
 
1883
 
 
1884
        def hook(*args):
 
1885
            calls.append(args)
 
1886
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
1887
        self.addCleanup(
 
1888
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
1889
        self.assertLength(0, calls)
 
1890
        # Setting an option triggers a save
 
1891
        conf.set_option('foo', 'bar')
 
1892
        self.assertLength(1, calls)
 
1893
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1894
 
 
1895
    def test_save_hook_remote_branch(self):
 
1896
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1897
        self.addCleanup(remote_branch.lock_write().unlock)
 
1898
        self.assertSaveHook(remote_branch._get_config())
 
1899
 
 
1900
    def test_save_hook_remote_bzrdir(self):
 
1901
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1902
        self.addCleanup(remote_branch.lock_write().unlock)
 
1903
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1904
        self.assertSaveHook(remote_bzrdir._get_config())
 
1905
 
 
1906
 
 
1907
class TestOptionNames(tests.TestCase):
 
1908
 
 
1909
    def is_valid(self, name):
 
1910
        return config._option_ref_re.match('{%s}' % name) is not None
 
1911
 
 
1912
    def test_valid_names(self):
 
1913
        self.assertTrue(self.is_valid('foo'))
 
1914
        self.assertTrue(self.is_valid('foo.bar'))
 
1915
        self.assertTrue(self.is_valid('f1'))
 
1916
        self.assertTrue(self.is_valid('_'))
 
1917
        self.assertTrue(self.is_valid('__bar__'))
 
1918
        self.assertTrue(self.is_valid('a_'))
 
1919
        self.assertTrue(self.is_valid('a1'))
 
1920
        # Don't break bzr-svn for no good reason
 
1921
        self.assertTrue(self.is_valid('guessed-layout'))
 
1922
 
 
1923
    def test_invalid_names(self):
 
1924
        self.assertFalse(self.is_valid(' foo'))
 
1925
        self.assertFalse(self.is_valid('foo '))
 
1926
        self.assertFalse(self.is_valid('1'))
 
1927
        self.assertFalse(self.is_valid('1,2'))
 
1928
        self.assertFalse(self.is_valid('foo$'))
 
1929
        self.assertFalse(self.is_valid('!foo'))
 
1930
        self.assertFalse(self.is_valid('foo.'))
 
1931
        self.assertFalse(self.is_valid('foo..bar'))
 
1932
        self.assertFalse(self.is_valid('{}'))
 
1933
        self.assertFalse(self.is_valid('{a}'))
 
1934
        self.assertFalse(self.is_valid('a\n'))
 
1935
        self.assertFalse(self.is_valid('-'))
 
1936
        self.assertFalse(self.is_valid('-a'))
 
1937
        self.assertFalse(self.is_valid('a-'))
 
1938
        self.assertFalse(self.is_valid('a--a'))
 
1939
 
 
1940
    def assertSingleGroup(self, reference):
 
1941
        # the regexp is used with split and as such should match the reference
 
1942
        # *only*, if more groups needs to be defined, (?:...) should be used.
 
1943
        m = config._option_ref_re.match('{a}')
 
1944
        self.assertLength(1, m.groups())
 
1945
 
 
1946
    def test_valid_references(self):
 
1947
        self.assertSingleGroup('{a}')
 
1948
        self.assertSingleGroup('{{a}}')
 
1949
 
 
1950
 
 
1951
class TestOption(tests.TestCase):
 
1952
 
 
1953
    def test_default_value(self):
 
1954
        opt = config.Option('foo', default='bar')
 
1955
        self.assertEqual('bar', opt.get_default())
 
1956
 
 
1957
    def test_callable_default_value(self):
 
1958
        def bar_as_unicode():
 
1959
            return u'bar'
 
1960
        opt = config.Option('foo', default=bar_as_unicode)
 
1961
        self.assertEqual('bar', opt.get_default())
 
1962
 
 
1963
    def test_default_value_from_env(self):
 
1964
        opt = config.Option('foo', default='bar', default_from_env=['FOO'])
 
1965
        self.overrideEnv('FOO', 'quux')
 
1966
        # Env variable provides a default taking over the option one
 
1967
        self.assertEqual('quux', opt.get_default())
 
1968
 
 
1969
    def test_first_default_value_from_env_wins(self):
 
1970
        opt = config.Option('foo', default='bar',
 
1971
                            default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
 
1972
        self.overrideEnv('FOO', 'foo')
 
1973
        self.overrideEnv('BAZ', 'baz')
 
1974
        # The first env var set wins
 
1975
        self.assertEqual('foo', opt.get_default())
 
1976
 
 
1977
    def test_not_supported_list_default_value(self):
 
1978
        self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
 
1979
 
 
1980
    def test_not_supported_object_default_value(self):
 
1981
        self.assertRaises(AssertionError, config.Option, 'foo',
 
1982
                          default=object())
 
1983
 
 
1984
    def test_not_supported_callable_default_value_not_unicode(self):
 
1985
        def bar_not_unicode():
 
1986
            return b'bar'
 
1987
        opt = config.Option('foo', default=bar_not_unicode)
 
1988
        self.assertRaises(AssertionError, opt.get_default)
 
1989
 
 
1990
    def test_get_help_topic(self):
 
1991
        opt = config.Option('foo')
 
1992
        self.assertEqual('foo', opt.get_help_topic())
 
1993
 
 
1994
 
 
1995
class TestOptionConverter(tests.TestCase):
 
1996
 
 
1997
    def assertConverted(self, expected, opt, value):
 
1998
        self.assertEqual(expected, opt.convert_from_unicode(None, value))
 
1999
 
 
2000
    def assertCallsWarning(self, opt, value):
 
2001
        warnings = []
 
2002
 
 
2003
        def warning(*args):
 
2004
            warnings.append(args[0] % args[1:])
 
2005
        self.overrideAttr(trace, 'warning', warning)
 
2006
        self.assertEqual(None, opt.convert_from_unicode(None, value))
 
2007
        self.assertLength(1, warnings)
 
2008
        self.assertEqual(
 
2009
            'Value "%s" is not valid for "%s"' % (value, opt.name),
 
2010
            warnings[0])
 
2011
 
 
2012
    def assertCallsError(self, opt, value):
 
2013
        self.assertRaises(config.ConfigOptionValueError,
 
2014
                          opt.convert_from_unicode, None, value)
 
2015
 
 
2016
    def assertConvertInvalid(self, opt, invalid_value):
 
2017
        opt.invalid = None
 
2018
        self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
 
2019
        opt.invalid = 'warning'
 
2020
        self.assertCallsWarning(opt, invalid_value)
 
2021
        opt.invalid = 'error'
 
2022
        self.assertCallsError(opt, invalid_value)
 
2023
 
 
2024
 
 
2025
class TestOptionWithBooleanConverter(TestOptionConverter):
 
2026
 
 
2027
    def get_option(self):
 
2028
        return config.Option('foo', help='A boolean.',
 
2029
                             from_unicode=config.bool_from_store)
 
2030
 
 
2031
    def test_convert_invalid(self):
 
2032
        opt = self.get_option()
 
2033
        # A string that is not recognized as a boolean
 
2034
        self.assertConvertInvalid(opt, u'invalid-boolean')
 
2035
        # A list of strings is never recognized as a boolean
 
2036
        self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
 
2037
 
 
2038
    def test_convert_valid(self):
 
2039
        opt = self.get_option()
 
2040
        self.assertConverted(True, opt, u'True')
 
2041
        self.assertConverted(True, opt, u'1')
 
2042
        self.assertConverted(False, opt, u'False')
 
2043
 
 
2044
 
 
2045
class TestOptionWithIntegerConverter(TestOptionConverter):
 
2046
 
 
2047
    def get_option(self):
 
2048
        return config.Option('foo', help='An integer.',
 
2049
                             from_unicode=config.int_from_store)
 
2050
 
 
2051
    def test_convert_invalid(self):
 
2052
        opt = self.get_option()
 
2053
        # A string that is not recognized as an integer
 
2054
        self.assertConvertInvalid(opt, u'forty-two')
 
2055
        # A list of strings is never recognized as an integer
 
2056
        self.assertConvertInvalid(opt, [u'a', u'list'])
 
2057
 
 
2058
    def test_convert_valid(self):
 
2059
        opt = self.get_option()
 
2060
        self.assertConverted(16, opt, u'16')
 
2061
 
 
2062
 
 
2063
class TestOptionWithSIUnitConverter(TestOptionConverter):
 
2064
 
 
2065
    def get_option(self):
 
2066
        return config.Option('foo', help='An integer in SI units.',
 
2067
                             from_unicode=config.int_SI_from_store)
 
2068
 
 
2069
    def test_convert_invalid(self):
 
2070
        opt = self.get_option()
 
2071
        self.assertConvertInvalid(opt, u'not-a-unit')
 
2072
        self.assertConvertInvalid(opt, u'Gb')  # Forgot the value
 
2073
        self.assertConvertInvalid(opt, u'1b')  # Forgot the unit
 
2074
        self.assertConvertInvalid(opt, u'1GG')
 
2075
        self.assertConvertInvalid(opt, u'1Mbb')
 
2076
        self.assertConvertInvalid(opt, u'1MM')
 
2077
 
 
2078
    def test_convert_valid(self):
 
2079
        opt = self.get_option()
 
2080
        self.assertConverted(int(5e3), opt, u'5kb')
 
2081
        self.assertConverted(int(5e6), opt, u'5M')
 
2082
        self.assertConverted(int(5e6), opt, u'5MB')
 
2083
        self.assertConverted(int(5e9), opt, u'5g')
 
2084
        self.assertConverted(int(5e9), opt, u'5gB')
 
2085
        self.assertConverted(100, opt, u'100')
 
2086
 
 
2087
 
 
2088
class TestListOption(TestOptionConverter):
 
2089
 
 
2090
    def get_option(self):
 
2091
        return config.ListOption('foo', help='A list.')
 
2092
 
 
2093
    def test_convert_invalid(self):
 
2094
        opt = self.get_option()
 
2095
        # We don't even try to convert a list into a list, we only expect
 
2096
        # strings
 
2097
        self.assertConvertInvalid(opt, [1])
 
2098
        # No string is invalid as all forms can be converted to a list
 
2099
 
 
2100
    def test_convert_valid(self):
 
2101
        opt = self.get_option()
 
2102
        # An empty string is an empty list
 
2103
        self.assertConverted([], opt, '')  # Using a bare str() just in case
 
2104
        self.assertConverted([], opt, u'')
 
2105
        # A boolean
 
2106
        self.assertConverted([u'True'], opt, u'True')
 
2107
        # An integer
 
2108
        self.assertConverted([u'42'], opt, u'42')
 
2109
        # A single string
 
2110
        self.assertConverted([u'bar'], opt, u'bar')
 
2111
 
 
2112
 
 
2113
class TestRegistryOption(TestOptionConverter):
 
2114
 
 
2115
    def get_option(self, registry):
 
2116
        return config.RegistryOption('foo', registry,
 
2117
                                     help='A registry option.')
 
2118
 
 
2119
    def test_convert_invalid(self):
 
2120
        registry = _mod_registry.Registry()
 
2121
        opt = self.get_option(registry)
 
2122
        self.assertConvertInvalid(opt, [1])
 
2123
        self.assertConvertInvalid(opt, u"notregistered")
 
2124
 
 
2125
    def test_convert_valid(self):
 
2126
        registry = _mod_registry.Registry()
 
2127
        registry.register("someval", 1234)
 
2128
        opt = self.get_option(registry)
 
2129
        # Using a bare str() just in case
 
2130
        self.assertConverted(1234, opt, "someval")
 
2131
        self.assertConverted(1234, opt, u'someval')
 
2132
        self.assertConverted(None, opt, None)
 
2133
 
 
2134
    def test_help(self):
 
2135
        registry = _mod_registry.Registry()
 
2136
        registry.register("someval", 1234, help="some option")
 
2137
        registry.register("dunno", 1234, help="some other option")
 
2138
        opt = self.get_option(registry)
 
2139
        self.assertEqual(
 
2140
            'A registry option.\n'
 
2141
            '\n'
 
2142
            'The following values are supported:\n'
 
2143
            ' dunno - some other option\n'
 
2144
            ' someval - some option\n',
 
2145
            opt.help)
 
2146
 
 
2147
    def test_get_help_text(self):
 
2148
        registry = _mod_registry.Registry()
 
2149
        registry.register("someval", 1234, help="some option")
 
2150
        registry.register("dunno", 1234, help="some other option")
 
2151
        opt = self.get_option(registry)
 
2152
        self.assertEqual(
 
2153
            'A registry option.\n'
 
2154
            '\n'
 
2155
            'The following values are supported:\n'
 
2156
            ' dunno - some other option\n'
 
2157
            ' someval - some option\n',
 
2158
            opt.get_help_text())
 
2159
 
 
2160
 
 
2161
class TestOptionRegistry(tests.TestCase):
 
2162
 
 
2163
    def setUp(self):
 
2164
        super(TestOptionRegistry, self).setUp()
 
2165
        # Always start with an empty registry
 
2166
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
2167
        self.registry = config.option_registry
 
2168
 
 
2169
    def test_register(self):
 
2170
        opt = config.Option('foo')
 
2171
        self.registry.register(opt)
 
2172
        self.assertIs(opt, self.registry.get('foo'))
 
2173
 
 
2174
    def test_registered_help(self):
 
2175
        opt = config.Option('foo', help='A simple option')
 
2176
        self.registry.register(opt)
 
2177
        self.assertEqual('A simple option', self.registry.get_help('foo'))
 
2178
 
 
2179
    def test_dont_register_illegal_name(self):
 
2180
        self.assertRaises(config.IllegalOptionName,
 
2181
                          self.registry.register, config.Option(' foo'))
 
2182
        self.assertRaises(config.IllegalOptionName,
 
2183
                          self.registry.register, config.Option('bar,'))
 
2184
 
 
2185
    lazy_option = config.Option('lazy_foo', help='Lazy help')
 
2186
 
 
2187
    def test_register_lazy(self):
 
2188
        self.registry.register_lazy('lazy_foo', self.__module__,
 
2189
                                    'TestOptionRegistry.lazy_option')
 
2190
        self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
 
2191
 
 
2192
    def test_registered_lazy_help(self):
 
2193
        self.registry.register_lazy('lazy_foo', self.__module__,
 
2194
                                    'TestOptionRegistry.lazy_option')
 
2195
        self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
 
2196
 
 
2197
    def test_dont_lazy_register_illegal_name(self):
 
2198
        # This is where the root cause of http://pad.lv/1235099 is better
 
2199
        # understood: 'register_lazy' doc string mentions that key should match
 
2200
        # the option name which indirectly requires that the option name is a
 
2201
        # valid python identifier. We violate that rule here (using a key that
 
2202
        # doesn't match the option name) to test the option name checking.
 
2203
        self.assertRaises(config.IllegalOptionName,
 
2204
                          self.registry.register_lazy, ' foo', self.__module__,
 
2205
                          'TestOptionRegistry.lazy_option')
 
2206
        self.assertRaises(config.IllegalOptionName,
 
2207
                          self.registry.register_lazy, '1,2', self.__module__,
 
2208
                          'TestOptionRegistry.lazy_option')
 
2209
 
 
2210
 
 
2211
class TestRegisteredOptions(tests.TestCase):
 
2212
    """All registered options should verify some constraints."""
 
2213
 
 
2214
    scenarios = [(key, {'option_name': key, 'option': option}) for key, option
 
2215
                 in config.option_registry.iteritems()]
 
2216
 
 
2217
    def setUp(self):
 
2218
        super(TestRegisteredOptions, self).setUp()
 
2219
        self.registry = config.option_registry
 
2220
 
 
2221
    def test_proper_name(self):
 
2222
        # An option should be registered under its own name, this can't be
 
2223
        # checked at registration time for the lazy ones.
 
2224
        self.assertEqual(self.option_name, self.option.name)
 
2225
 
 
2226
    def test_help_is_set(self):
 
2227
        option_help = self.registry.get_help(self.option_name)
 
2228
        # Come on, think about the user, he really wants to know what the
 
2229
        # option is about
 
2230
        self.assertIsNot(None, option_help)
 
2231
        self.assertNotEqual('', option_help)
 
2232
 
 
2233
 
 
2234
class TestSection(tests.TestCase):
 
2235
 
 
2236
    # FIXME: Parametrize so that all sections produced by Stores run these
 
2237
    # tests -- vila 2011-04-01
 
2238
 
 
2239
    def test_get_a_value(self):
 
2240
        a_dict = dict(foo='bar')
 
2241
        section = config.Section('myID', a_dict)
 
2242
        self.assertEqual('bar', section.get('foo'))
 
2243
 
 
2244
    def test_get_unknown_option(self):
 
2245
        a_dict = dict()
 
2246
        section = config.Section(None, a_dict)
 
2247
        self.assertEqual('out of thin air',
 
2248
                         section.get('foo', 'out of thin air'))
 
2249
 
 
2250
    def test_options_is_shared(self):
 
2251
        a_dict = dict()
 
2252
        section = config.Section(None, a_dict)
 
2253
        self.assertIs(a_dict, section.options)
 
2254
 
 
2255
 
 
2256
class TestMutableSection(tests.TestCase):
 
2257
 
 
2258
    scenarios = [('mutable',
 
2259
                  {'get_section':
 
2260
                   lambda opts: config.MutableSection('myID', opts)},),
 
2261
                 ]
 
2262
 
 
2263
    def test_set(self):
 
2264
        a_dict = dict(foo='bar')
 
2265
        section = self.get_section(a_dict)
 
2266
        section.set('foo', 'new_value')
 
2267
        self.assertEqual('new_value', section.get('foo'))
 
2268
        # The change appears in the shared section
 
2269
        self.assertEqual('new_value', a_dict.get('foo'))
 
2270
        # We keep track of the change
 
2271
        self.assertTrue('foo' in section.orig)
 
2272
        self.assertEqual('bar', section.orig.get('foo'))
 
2273
 
 
2274
    def test_set_preserve_original_once(self):
 
2275
        a_dict = dict(foo='bar')
 
2276
        section = self.get_section(a_dict)
 
2277
        section.set('foo', 'first_value')
 
2278
        section.set('foo', 'second_value')
 
2279
        # We keep track of the original value
 
2280
        self.assertTrue('foo' in section.orig)
 
2281
        self.assertEqual('bar', section.orig.get('foo'))
 
2282
 
 
2283
    def test_remove(self):
 
2284
        a_dict = dict(foo='bar')
 
2285
        section = self.get_section(a_dict)
 
2286
        section.remove('foo')
 
2287
        # We get None for unknown options via the default value
 
2288
        self.assertEqual(None, section.get('foo'))
 
2289
        # Or we just get the default value
 
2290
        self.assertEqual('unknown', section.get('foo', 'unknown'))
 
2291
        self.assertFalse('foo' in section.options)
 
2292
        # We keep track of the deletion
 
2293
        self.assertTrue('foo' in section.orig)
 
2294
        self.assertEqual('bar', section.orig.get('foo'))
 
2295
 
 
2296
    def test_remove_new_option(self):
 
2297
        a_dict = dict()
 
2298
        section = self.get_section(a_dict)
 
2299
        section.set('foo', 'bar')
 
2300
        section.remove('foo')
 
2301
        self.assertFalse('foo' in section.options)
 
2302
        # The option didn't exist initially so it we need to keep track of it
 
2303
        # with a special value
 
2304
        self.assertTrue('foo' in section.orig)
 
2305
        self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
 
2306
 
 
2307
 
 
2308
class TestCommandLineStore(tests.TestCase):
 
2309
 
 
2310
    def setUp(self):
 
2311
        super(TestCommandLineStore, self).setUp()
 
2312
        self.store = config.CommandLineStore()
 
2313
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
2314
 
 
2315
    def get_section(self):
 
2316
        """Get the unique section for the command line overrides."""
 
2317
        sections = list(self.store.get_sections())
 
2318
        self.assertLength(1, sections)
 
2319
        store, section = sections[0]
 
2320
        self.assertEqual(self.store, store)
 
2321
        return section
 
2322
 
 
2323
    def test_no_override(self):
 
2324
        self.store._from_cmdline([])
 
2325
        section = self.get_section()
 
2326
        self.assertLength(0, list(section.iter_option_names()))
 
2327
 
 
2328
    def test_simple_override(self):
 
2329
        self.store._from_cmdline(['a=b'])
 
2330
        section = self.get_section()
 
2331
        self.assertEqual('b', section.get('a'))
 
2332
 
 
2333
    def test_list_override(self):
 
2334
        opt = config.ListOption('l')
 
2335
        config.option_registry.register(opt)
 
2336
        self.store._from_cmdline(['l=1,2,3'])
 
2337
        val = self.get_section().get('l')
 
2338
        self.assertEqual('1,2,3', val)
 
2339
        # Reminder: lists should be registered as such explicitely, otherwise
 
2340
        # the conversion needs to be done afterwards.
 
2341
        self.assertEqual(['1', '2', '3'],
 
2342
                         opt.convert_from_unicode(self.store, val))
 
2343
 
 
2344
    def test_multiple_overrides(self):
 
2345
        self.store._from_cmdline(['a=b', 'x=y'])
 
2346
        section = self.get_section()
 
2347
        self.assertEqual('b', section.get('a'))
 
2348
        self.assertEqual('y', section.get('x'))
 
2349
 
 
2350
    def test_wrong_syntax(self):
 
2351
        self.assertRaises(errors.BzrCommandError,
 
2352
                          self.store._from_cmdline, ['a=b', 'c'])
 
2353
 
 
2354
 
 
2355
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
 
2356
 
 
2357
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2358
                 in config.test_store_builder_registry.iteritems()] + [
 
2359
        ('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
 
2360
 
 
2361
    def test_id(self):
 
2362
        store = self.get_store(self)
 
2363
        if isinstance(store, config.TransportIniFileStore):
 
2364
            raise tests.TestNotApplicable(
 
2365
                "%s is not a concrete Store implementation"
 
2366
                " so it doesn't need an id" % (store.__class__.__name__,))
 
2367
        self.assertIsNot(None, store.id)
 
2368
 
 
2369
 
 
2370
class TestStore(tests.TestCaseWithTransport):
 
2371
 
 
2372
    def assertSectionContent(self, expected, store_and_section):
 
2373
        """Assert that some options have the proper values in a section."""
 
2374
        _, section = store_and_section
 
2375
        expected_name, expected_options = expected
 
2376
        self.assertEqual(expected_name, section.id)
 
2377
        self.assertEqual(
 
2378
            expected_options,
 
2379
            dict([(k, section.get(k)) for k in expected_options.keys()]))
 
2380
 
 
2381
 
 
2382
class TestReadonlyStore(TestStore):
 
2383
 
 
2384
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2385
                 in config.test_store_builder_registry.iteritems()]
 
2386
 
 
2387
    def test_building_delays_load(self):
 
2388
        store = self.get_store(self)
 
2389
        self.assertEqual(False, store.is_loaded())
 
2390
        store._load_from_string(b'')
 
2391
        self.assertEqual(True, store.is_loaded())
 
2392
 
 
2393
    def test_get_no_sections_for_empty(self):
 
2394
        store = self.get_store(self)
 
2395
        store._load_from_string(b'')
 
2396
        self.assertEqual([], list(store.get_sections()))
 
2397
 
 
2398
    def test_get_default_section(self):
 
2399
        store = self.get_store(self)
 
2400
        store._load_from_string(b'foo=bar')
 
2401
        sections = list(store.get_sections())
 
2402
        self.assertLength(1, sections)
 
2403
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2404
 
 
2405
    def test_get_named_section(self):
 
2406
        store = self.get_store(self)
 
2407
        store._load_from_string(b'[baz]\nfoo=bar')
 
2408
        sections = list(store.get_sections())
 
2409
        self.assertLength(1, sections)
 
2410
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
 
2411
 
 
2412
    def test_load_from_string_fails_for_non_empty_store(self):
 
2413
        store = self.get_store(self)
 
2414
        store._load_from_string(b'foo=bar')
 
2415
        self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
 
2416
 
 
2417
 
 
2418
class TestStoreQuoting(TestStore):
 
2419
 
 
2420
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2421
                 in config.test_store_builder_registry.iteritems()]
 
2422
 
 
2423
    def setUp(self):
 
2424
        super(TestStoreQuoting, self).setUp()
 
2425
        self.store = self.get_store(self)
 
2426
        # We need a loaded store but any content will do
 
2427
        self.store._load_from_string(b'')
 
2428
 
 
2429
    def assertIdempotent(self, s):
 
2430
        """Assert that quoting an unquoted string is a no-op and vice-versa.
 
2431
 
 
2432
        What matters here is that option values, as they appear in a store, can
 
2433
        be safely round-tripped out of the store and back.
 
2434
 
 
2435
        :param s: A string, quoted if required.
 
2436
        """
 
2437
        self.assertEqual(s, self.store.quote(self.store.unquote(s)))
 
2438
        self.assertEqual(s, self.store.unquote(self.store.quote(s)))
 
2439
 
 
2440
    def test_empty_string(self):
 
2441
        if isinstance(self.store, config.IniFileStore):
 
2442
            # configobj._quote doesn't handle empty values
 
2443
            self.assertRaises(AssertionError,
 
2444
                              self.assertIdempotent, '')
 
2445
        else:
 
2446
            self.assertIdempotent('')
 
2447
        # But quoted empty strings are ok
 
2448
        self.assertIdempotent('""')
 
2449
 
 
2450
    def test_embedded_spaces(self):
 
2451
        self.assertIdempotent('" a b c "')
 
2452
 
 
2453
    def test_embedded_commas(self):
 
2454
        self.assertIdempotent('" a , b c "')
 
2455
 
 
2456
    def test_simple_comma(self):
 
2457
        if isinstance(self.store, config.IniFileStore):
 
2458
            # configobj requires that lists are special-cased
 
2459
            self.assertRaises(AssertionError,
 
2460
                              self.assertIdempotent, ',')
 
2461
        else:
 
2462
            self.assertIdempotent(',')
 
2463
        # When a single comma is required, quoting is also required
 
2464
        self.assertIdempotent('","')
 
2465
 
 
2466
    def test_list(self):
 
2467
        if isinstance(self.store, config.IniFileStore):
 
2468
            # configobj requires that lists are special-cased
 
2469
            self.assertRaises(AssertionError,
 
2470
                              self.assertIdempotent, 'a,b')
 
2471
        else:
 
2472
            self.assertIdempotent('a,b')
 
2473
 
 
2474
 
 
2475
class TestDictFromStore(tests.TestCase):
 
2476
 
 
2477
    def test_unquote_not_string(self):
 
2478
        conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
 
2479
        value = conf.get('a_section')
 
2480
        # Urgh, despite 'conf' asking for the no-name section, we get the
 
2481
        # content of another section as a dict o_O
 
2482
        self.assertEqual({'a': '1'}, value)
 
2483
        unquoted = conf.store.unquote(value)
 
2484
        # Which cannot be unquoted but shouldn't crash either (the use cases
 
2485
        # are getting the value or displaying it. In the later case, '%s' will
 
2486
        # do).
 
2487
        self.assertEqual({'a': '1'}, unquoted)
 
2488
        self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
 
2489
 
 
2490
 
 
2491
class TestIniFileStoreContent(tests.TestCaseWithTransport):
 
2492
    """Simulate loading a config store with content of various encodings.
 
2493
 
 
2494
    All files produced by bzr are in utf8 content.
 
2495
 
 
2496
    Users may modify them manually and end up with a file that can't be
 
2497
    loaded. We need to issue proper error messages in this case.
 
2498
    """
 
2499
 
 
2500
    invalid_utf8_char = b'\xff'
 
2501
 
 
2502
    def test_load_utf8(self):
 
2503
        """Ensure we can load an utf8-encoded file."""
 
2504
        t = self.get_transport()
 
2505
        # From http://pad.lv/799212
 
2506
        unicode_user = u'b\N{Euro Sign}ar'
 
2507
        unicode_content = u'user=%s' % (unicode_user,)
 
2508
        utf8_content = unicode_content.encode('utf8')
 
2509
        # Store the raw content in the config file
 
2510
        t.put_bytes('foo.conf', utf8_content)
 
2511
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2512
        store.load()
 
2513
        stack = config.Stack([store.get_sections], store)
 
2514
        self.assertEqual(unicode_user, stack.get('user'))
 
2515
 
 
2516
    def test_load_non_ascii(self):
 
2517
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2518
        t = self.get_transport()
 
2519
        t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2520
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2521
        self.assertRaises(config.ConfigContentError, store.load)
 
2522
 
 
2523
    def test_load_erroneous_content(self):
 
2524
        """Ensure we display a proper error on content that can't be parsed."""
 
2525
        t = self.get_transport()
 
2526
        t.put_bytes('foo.conf', b'[open_section\n')
 
2527
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2528
        self.assertRaises(config.ParseConfigError, store.load)
 
2529
 
 
2530
    def test_load_permission_denied(self):
 
2531
        """Ensure we get warned when trying to load an inaccessible file."""
 
2532
        warnings = []
 
2533
 
 
2534
        def warning(*args):
 
2535
            warnings.append(args[0] % args[1:])
 
2536
        self.overrideAttr(trace, 'warning', warning)
 
2537
 
 
2538
        t = self.get_transport()
 
2539
 
 
2540
        def get_bytes(relpath):
 
2541
            raise errors.PermissionDenied(relpath, "")
 
2542
        t.get_bytes = get_bytes
 
2543
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2544
        self.assertRaises(errors.PermissionDenied, store.load)
 
2545
        self.assertEqual(
 
2546
            warnings,
 
2547
            [u'Permission denied while trying to load configuration store %s.'
 
2548
             % store.external_url()])
 
2549
 
 
2550
 
 
2551
class TestIniConfigContent(tests.TestCaseWithTransport):
 
2552
    """Simulate loading a IniBasedConfig with content of various encodings.
 
2553
 
 
2554
    All files produced by bzr are in utf8 content.
 
2555
 
 
2556
    Users may modify them manually and end up with a file that can't be
 
2557
    loaded. We need to issue proper error messages in this case.
 
2558
    """
 
2559
 
 
2560
    invalid_utf8_char = b'\xff'
 
2561
 
 
2562
    def test_load_utf8(self):
 
2563
        """Ensure we can load an utf8-encoded file."""
 
2564
        # From http://pad.lv/799212
 
2565
        unicode_user = u'b\N{Euro Sign}ar'
 
2566
        unicode_content = u'user=%s' % (unicode_user,)
 
2567
        utf8_content = unicode_content.encode('utf8')
 
2568
        # Store the raw content in the config file
 
2569
        with open('foo.conf', 'wb') as f:
 
2570
            f.write(utf8_content)
 
2571
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2572
        self.assertEqual(unicode_user, conf.get_user_option('user'))
 
2573
 
 
2574
    def test_load_badly_encoded_content(self):
 
2575
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2576
        with open('foo.conf', 'wb') as f:
 
2577
            f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2578
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2579
        self.assertRaises(config.ConfigContentError, conf._get_parser)
 
2580
 
 
2581
    def test_load_erroneous_content(self):
 
2582
        """Ensure we display a proper error on content that can't be parsed."""
 
2583
        with open('foo.conf', 'wb') as f:
 
2584
            f.write(b'[open_section\n')
 
2585
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2586
        self.assertRaises(config.ParseConfigError, conf._get_parser)
 
2587
 
 
2588
 
 
2589
class TestMutableStore(TestStore):
 
2590
 
 
2591
    scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
 
2592
                 in config.test_store_builder_registry.iteritems()]
 
2593
 
 
2594
    def setUp(self):
 
2595
        super(TestMutableStore, self).setUp()
 
2596
        self.transport = self.get_transport()
 
2597
 
 
2598
    def has_store(self, store):
 
2599
        store_basename = urlutils.relative_url(self.transport.external_url(),
 
2600
                                               store.external_url())
 
2601
        return self.transport.has(store_basename)
 
2602
 
 
2603
    def test_save_empty_creates_no_file(self):
 
2604
        # FIXME: There should be a better way than relying on the test
 
2605
        # parametrization to identify branch.conf -- vila 2011-0526
 
2606
        if self.store_id in ('branch', 'remote_branch'):
 
2607
            raise tests.TestNotApplicable(
 
2608
                'branch.conf is *always* created when a branch is initialized')
 
2609
        store = self.get_store(self)
 
2610
        store.save()
 
2611
        self.assertEqual(False, self.has_store(store))
 
2612
 
 
2613
    def test_mutable_section_shared(self):
 
2614
        store = self.get_store(self)
 
2615
        store._load_from_string(b'foo=bar\n')
 
2616
        # FIXME: There should be a better way than relying on the test
 
2617
        # parametrization to identify branch.conf -- vila 2011-0526
 
2618
        if self.store_id in ('branch', 'remote_branch'):
 
2619
            # branch stores requires write locked branches
 
2620
            self.addCleanup(store.branch.lock_write().unlock)
 
2621
        section1 = store.get_mutable_section(None)
 
2622
        section2 = store.get_mutable_section(None)
 
2623
        # If we get different sections, different callers won't share the
 
2624
        # modification
 
2625
        self.assertIs(section1, section2)
 
2626
 
 
2627
    def test_save_emptied_succeeds(self):
 
2628
        store = self.get_store(self)
 
2629
        store._load_from_string(b'foo=bar\n')
 
2630
        # FIXME: There should be a better way than relying on the test
 
2631
        # parametrization to identify branch.conf -- vila 2011-0526
 
2632
        if self.store_id in ('branch', 'remote_branch'):
 
2633
            # branch stores requires write locked branches
 
2634
            self.addCleanup(store.branch.lock_write().unlock)
 
2635
        section = store.get_mutable_section(None)
 
2636
        section.remove('foo')
 
2637
        store.save()
 
2638
        self.assertEqual(True, self.has_store(store))
 
2639
        modified_store = self.get_store(self)
 
2640
        sections = list(modified_store.get_sections())
 
2641
        self.assertLength(0, sections)
 
2642
 
 
2643
    def test_save_with_content_succeeds(self):
 
2644
        # FIXME: There should be a better way than relying on the test
 
2645
        # parametrization to identify branch.conf -- vila 2011-0526
 
2646
        if self.store_id in ('branch', 'remote_branch'):
 
2647
            raise tests.TestNotApplicable(
 
2648
                'branch.conf is *always* created when a branch is initialized')
 
2649
        store = self.get_store(self)
 
2650
        store._load_from_string(b'foo=bar\n')
 
2651
        self.assertEqual(False, self.has_store(store))
 
2652
        store.save()
 
2653
        self.assertEqual(True, self.has_store(store))
 
2654
        modified_store = self.get_store(self)
 
2655
        sections = list(modified_store.get_sections())
 
2656
        self.assertLength(1, sections)
 
2657
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2658
 
 
2659
    def test_set_option_in_empty_store(self):
 
2660
        store = self.get_store(self)
 
2661
        # FIXME: There should be a better way than relying on the test
 
2662
        # parametrization to identify branch.conf -- vila 2011-0526
 
2663
        if self.store_id in ('branch', 'remote_branch'):
 
2664
            # branch stores requires write locked branches
 
2665
            self.addCleanup(store.branch.lock_write().unlock)
 
2666
        section = store.get_mutable_section(None)
 
2667
        section.set('foo', 'bar')
 
2668
        store.save()
 
2669
        modified_store = self.get_store(self)
 
2670
        sections = list(modified_store.get_sections())
 
2671
        self.assertLength(1, sections)
 
2672
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2673
 
 
2674
    def test_set_option_in_default_section(self):
 
2675
        store = self.get_store(self)
 
2676
        store._load_from_string(b'')
 
2677
        # FIXME: There should be a better way than relying on the test
 
2678
        # parametrization to identify branch.conf -- vila 2011-0526
 
2679
        if self.store_id in ('branch', 'remote_branch'):
 
2680
            # branch stores requires write locked branches
 
2681
            self.addCleanup(store.branch.lock_write().unlock)
 
2682
        section = store.get_mutable_section(None)
 
2683
        section.set('foo', 'bar')
 
2684
        store.save()
 
2685
        modified_store = self.get_store(self)
 
2686
        sections = list(modified_store.get_sections())
 
2687
        self.assertLength(1, sections)
 
2688
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2689
 
 
2690
    def test_set_option_in_named_section(self):
 
2691
        store = self.get_store(self)
 
2692
        store._load_from_string(b'')
 
2693
        # FIXME: There should be a better way than relying on the test
 
2694
        # parametrization to identify branch.conf -- vila 2011-0526
 
2695
        if self.store_id in ('branch', 'remote_branch'):
 
2696
            # branch stores requires write locked branches
 
2697
            self.addCleanup(store.branch.lock_write().unlock)
 
2698
        section = store.get_mutable_section('baz')
 
2699
        section.set('foo', 'bar')
 
2700
        store.save()
 
2701
        modified_store = self.get_store(self)
 
2702
        sections = list(modified_store.get_sections())
 
2703
        self.assertLength(1, sections)
 
2704
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
 
2705
 
 
2706
    def test_load_hook(self):
 
2707
        # First, we need to ensure that the store exists
 
2708
        store = self.get_store(self)
 
2709
        # FIXME: There should be a better way than relying on the test
 
2710
        # parametrization to identify branch.conf -- vila 2011-0526
 
2711
        if self.store_id in ('branch', 'remote_branch'):
 
2712
            # branch stores requires write locked branches
 
2713
            self.addCleanup(store.branch.lock_write().unlock)
 
2714
        section = store.get_mutable_section('baz')
 
2715
        section.set('foo', 'bar')
 
2716
        store.save()
 
2717
        # Now we can try to load it
 
2718
        store = self.get_store(self)
 
2719
        calls = []
 
2720
 
 
2721
        def hook(*args):
 
2722
            calls.append(args)
 
2723
        config.ConfigHooks.install_named_hook('load', hook, None)
 
2724
        self.assertLength(0, calls)
 
2725
        store.load()
 
2726
        self.assertLength(1, calls)
 
2727
        self.assertEqual((store,), calls[0])
 
2728
 
 
2729
    def test_save_hook(self):
 
2730
        calls = []
 
2731
 
 
2732
        def hook(*args):
 
2733
            calls.append(args)
 
2734
        config.ConfigHooks.install_named_hook('save', hook, None)
 
2735
        self.assertLength(0, calls)
 
2736
        store = self.get_store(self)
 
2737
        # FIXME: There should be a better way than relying on the test
 
2738
        # parametrization to identify branch.conf -- vila 2011-0526
 
2739
        if self.store_id in ('branch', 'remote_branch'):
 
2740
            # branch stores requires write locked branches
 
2741
            self.addCleanup(store.branch.lock_write().unlock)
 
2742
        section = store.get_mutable_section('baz')
 
2743
        section.set('foo', 'bar')
 
2744
        store.save()
 
2745
        self.assertLength(1, calls)
 
2746
        self.assertEqual((store,), calls[0])
 
2747
 
 
2748
    def test_set_mark_dirty(self):
 
2749
        stack = config.MemoryStack(b'')
 
2750
        self.assertLength(0, stack.store.dirty_sections)
 
2751
        stack.set('foo', 'baz')
 
2752
        self.assertLength(1, stack.store.dirty_sections)
 
2753
        self.assertTrue(stack.store._need_saving())
 
2754
 
 
2755
    def test_remove_mark_dirty(self):
 
2756
        stack = config.MemoryStack(b'foo=bar')
 
2757
        self.assertLength(0, stack.store.dirty_sections)
 
2758
        stack.remove('foo')
 
2759
        self.assertLength(1, stack.store.dirty_sections)
 
2760
        self.assertTrue(stack.store._need_saving())
 
2761
 
 
2762
 
 
2763
class TestStoreSaveChanges(tests.TestCaseWithTransport):
 
2764
    """Tests that config changes are kept in memory and saved on-demand."""
 
2765
 
 
2766
    def setUp(self):
 
2767
        super(TestStoreSaveChanges, self).setUp()
 
2768
        self.transport = self.get_transport()
 
2769
        # Most of the tests involve two stores pointing to the same persistent
 
2770
        # storage to observe the effects of concurrent changes
 
2771
        self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
 
2772
        self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
 
2773
        self.warnings = []
 
2774
 
 
2775
        def warning(*args):
 
2776
            self.warnings.append(args[0] % args[1:])
 
2777
        self.overrideAttr(trace, 'warning', warning)
 
2778
 
 
2779
    def has_store(self, store):
 
2780
        store_basename = urlutils.relative_url(self.transport.external_url(),
 
2781
                                               store.external_url())
 
2782
        return self.transport.has(store_basename)
 
2783
 
 
2784
    def get_stack(self, store):
 
2785
        # Any stack will do as long as it uses the right store, just a single
 
2786
        # no-name section is enough
 
2787
        return config.Stack([store.get_sections], store)
 
2788
 
 
2789
    def test_no_changes_no_save(self):
 
2790
        s = self.get_stack(self.st1)
 
2791
        s.store.save_changes()
 
2792
        self.assertEqual(False, self.has_store(self.st1))
 
2793
 
 
2794
    def test_unrelated_concurrent_update(self):
 
2795
        s1 = self.get_stack(self.st1)
 
2796
        s2 = self.get_stack(self.st2)
 
2797
        s1.set('foo', 'bar')
 
2798
        s2.set('baz', 'quux')
 
2799
        s1.store.save()
 
2800
        # Changes don't propagate magically
 
2801
        self.assertEqual(None, s1.get('baz'))
 
2802
        s2.store.save_changes()
 
2803
        self.assertEqual('quux', s2.get('baz'))
 
2804
        # Changes are acquired when saving
 
2805
        self.assertEqual('bar', s2.get('foo'))
 
2806
        # Since there is no overlap, no warnings are emitted
 
2807
        self.assertLength(0, self.warnings)
 
2808
 
 
2809
    def test_concurrent_update_modified(self):
 
2810
        s1 = self.get_stack(self.st1)
 
2811
        s2 = self.get_stack(self.st2)
 
2812
        s1.set('foo', 'bar')
 
2813
        s2.set('foo', 'baz')
 
2814
        s1.store.save()
 
2815
        # Last speaker wins
 
2816
        s2.store.save_changes()
 
2817
        self.assertEqual('baz', s2.get('foo'))
 
2818
        # But the user get a warning
 
2819
        self.assertLength(1, self.warnings)
 
2820
        warning = self.warnings[0]
 
2821
        self.assertStartsWith(warning, 'Option foo in section None')
 
2822
        self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
 
2823
                            ' The baz value will be saved.')
 
2824
 
 
2825
    def test_concurrent_deletion(self):
 
2826
        self.st1._load_from_string(b'foo=bar')
 
2827
        self.st1.save()
 
2828
        s1 = self.get_stack(self.st1)
 
2829
        s2 = self.get_stack(self.st2)
 
2830
        s1.remove('foo')
 
2831
        s2.remove('foo')
 
2832
        s1.store.save_changes()
 
2833
        # No warning yet
 
2834
        self.assertLength(0, self.warnings)
 
2835
        s2.store.save_changes()
 
2836
        # Now we get one
 
2837
        self.assertLength(1, self.warnings)
 
2838
        warning = self.warnings[0]
 
2839
        self.assertStartsWith(warning, 'Option foo in section None')
 
2840
        self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
 
2841
                            ' The <DELETED> value will be saved.')
 
2842
 
 
2843
 
 
2844
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
 
2845
 
 
2846
    def get_store(self):
 
2847
        return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
2848
 
 
2849
    def test_get_quoted_string(self):
 
2850
        store = self.get_store()
 
2851
        store._load_from_string(b'foo= " abc "')
 
2852
        stack = config.Stack([store.get_sections])
 
2853
        self.assertEqual(' abc ', stack.get('foo'))
 
2854
 
 
2855
    def test_set_quoted_string(self):
 
2856
        store = self.get_store()
 
2857
        stack = config.Stack([store.get_sections], store)
 
2858
        stack.set('foo', ' a b c ')
 
2859
        store.save()
 
2860
        self.assertFileEqual(b'foo = " a b c "' +
 
2861
                             os.linesep.encode('ascii'), 'foo.conf')
 
2862
 
 
2863
 
 
2864
class TestTransportIniFileStore(TestStore):
 
2865
 
 
2866
    def test_loading_unknown_file_fails(self):
 
2867
        store = config.TransportIniFileStore(self.get_transport(),
 
2868
                                             'I-do-not-exist')
 
2869
        self.assertRaises(errors.NoSuchFile, store.load)
 
2870
 
 
2871
    def test_invalid_content(self):
 
2872
        store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
2873
        self.assertEqual(False, store.is_loaded())
 
2874
        exc = self.assertRaises(
 
2875
            config.ParseConfigError, store._load_from_string,
 
2876
            b'this is invalid !')
 
2877
        self.assertEndsWith(exc.filename, 'foo.conf')
 
2878
        # And the load failed
 
2879
        self.assertEqual(False, store.is_loaded())
 
2880
 
 
2881
    def test_get_embedded_sections(self):
 
2882
        # A more complicated example (which also shows that section names and
 
2883
        # option names share the same name space...)
 
2884
        # FIXME: This should be fixed by forbidding dicts as values ?
 
2885
        # -- vila 2011-04-05
 
2886
        store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
2887
        store._load_from_string(b'''
 
2888
foo=bar
 
2889
l=1,2
 
2890
[DEFAULT]
 
2891
foo_in_DEFAULT=foo_DEFAULT
 
2892
[bar]
 
2893
foo_in_bar=barbar
 
2894
[baz]
 
2895
foo_in_baz=barbaz
 
2896
[[qux]]
 
2897
foo_in_qux=quux
 
2898
''')
 
2899
        sections = list(store.get_sections())
 
2900
        self.assertLength(4, sections)
 
2901
        # The default section has no name.
 
2902
        # List values are provided as strings and need to be explicitly
 
2903
        # converted by specifying from_unicode=list_from_store at option
 
2904
        # registration
 
2905
        self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
 
2906
                                  sections[0])
 
2907
        self.assertSectionContent(
 
2908
            ('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
 
2909
        self.assertSectionContent(
 
2910
            ('bar', {'foo_in_bar': 'barbar'}), sections[2])
 
2911
        # sub sections are provided as embedded dicts.
 
2912
        self.assertSectionContent(
 
2913
            ('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
 
2914
            sections[3])
 
2915
 
 
2916
 
 
2917
class TestLockableIniFileStore(TestStore):
 
2918
 
 
2919
    def test_create_store_in_created_dir(self):
 
2920
        self.assertPathDoesNotExist('dir')
 
2921
        t = self.get_transport('dir/subdir')
 
2922
        store = config.LockableIniFileStore(t, 'foo.conf')
 
2923
        store.get_mutable_section(None).set('foo', 'bar')
 
2924
        store.save()
 
2925
        self.assertPathExists('dir/subdir')
 
2926
 
 
2927
 
 
2928
class TestConcurrentStoreUpdates(TestStore):
 
2929
    """Test that Stores properly handle conccurent updates.
 
2930
 
 
2931
    New Store implementation may fail some of these tests but until such
 
2932
    implementations exist it's hard to properly filter them from the scenarios
 
2933
    applied here. If you encounter such a case, contact the bzr devs.
 
2934
    """
 
2935
 
 
2936
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
2937
                 in config.test_stack_builder_registry.iteritems()]
 
2938
 
 
2939
    def setUp(self):
 
2940
        super(TestConcurrentStoreUpdates, self).setUp()
 
2941
        self.stack = self.get_stack(self)
 
2942
        if not isinstance(self.stack, config._CompatibleStack):
 
2943
            raise tests.TestNotApplicable(
 
2944
                '%s is not meant to be compatible with the old config design'
 
2945
                % (self.stack,))
 
2946
        self.stack.set('one', '1')
 
2947
        self.stack.set('two', '2')
 
2948
        # Flush the store
 
2949
        self.stack.store.save()
 
2950
 
 
2951
    def test_simple_read_access(self):
 
2952
        self.assertEqual('1', self.stack.get('one'))
 
2953
 
 
2954
    def test_simple_write_access(self):
 
2955
        self.stack.set('one', 'one')
 
2956
        self.assertEqual('one', self.stack.get('one'))
 
2957
 
 
2958
    def test_listen_to_the_last_speaker(self):
 
2959
        c1 = self.stack
 
2960
        c2 = self.get_stack(self)
 
2961
        c1.set('one', 'ONE')
 
2962
        c2.set('two', 'TWO')
 
2963
        self.assertEqual('ONE', c1.get('one'))
 
2964
        self.assertEqual('TWO', c2.get('two'))
 
2965
        # The second update respect the first one
 
2966
        self.assertEqual('ONE', c2.get('one'))
 
2967
 
 
2968
    def test_last_speaker_wins(self):
 
2969
        # If the same config is not shared, the same variable modified twice
 
2970
        # can only see a single result.
 
2971
        c1 = self.stack
 
2972
        c2 = self.get_stack(self)
 
2973
        c1.set('one', 'c1')
 
2974
        c2.set('one', 'c2')
 
2975
        self.assertEqual('c2', c2.get('one'))
 
2976
        # The first modification is still available until another refresh
 
2977
        # occur
 
2978
        self.assertEqual('c1', c1.get('one'))
 
2979
        c1.set('two', 'done')
 
2980
        self.assertEqual('c2', c1.get('one'))
 
2981
 
 
2982
    def test_writes_are_serialized(self):
 
2983
        c1 = self.stack
 
2984
        c2 = self.get_stack(self)
 
2985
 
 
2986
        # We spawn a thread that will pause *during* the config saving.
 
2987
        before_writing = threading.Event()
 
2988
        after_writing = threading.Event()
 
2989
        writing_done = threading.Event()
 
2990
        c1_save_without_locking_orig = c1.store.save_without_locking
 
2991
 
 
2992
        def c1_save_without_locking():
 
2993
            before_writing.set()
 
2994
            c1_save_without_locking_orig()
 
2995
            # The lock is held. We wait for the main thread to decide when to
 
2996
            # continue
 
2997
            after_writing.wait()
 
2998
        c1.store.save_without_locking = c1_save_without_locking
 
2999
 
 
3000
        def c1_set():
 
3001
            c1.set('one', 'c1')
 
3002
            writing_done.set()
 
3003
        t1 = threading.Thread(target=c1_set)
 
3004
        # Collect the thread after the test
 
3005
        self.addCleanup(t1.join)
 
3006
        # Be ready to unblock the thread if the test goes wrong
 
3007
        self.addCleanup(after_writing.set)
 
3008
        t1.start()
 
3009
        before_writing.wait()
 
3010
        self.assertRaises(errors.LockContention,
 
3011
                          c2.set, 'one', 'c2')
 
3012
        self.assertEqual('c1', c1.get('one'))
 
3013
        # Let the lock be released
 
3014
        after_writing.set()
 
3015
        writing_done.wait()
 
3016
        c2.set('one', 'c2')
 
3017
        self.assertEqual('c2', c2.get('one'))
 
3018
 
 
3019
    def test_read_while_writing(self):
 
3020
        c1 = self.stack
 
3021
        # We spawn a thread that will pause *during* the write
 
3022
        ready_to_write = threading.Event()
 
3023
        do_writing = threading.Event()
 
3024
        writing_done = threading.Event()
 
3025
        # We override the _save implementation so we know the store is locked
 
3026
        c1_save_without_locking_orig = c1.store.save_without_locking
 
3027
 
 
3028
        def c1_save_without_locking():
 
3029
            ready_to_write.set()
 
3030
            # The lock is held. We wait for the main thread to decide when to
 
3031
            # continue
 
3032
            do_writing.wait()
 
3033
            c1_save_without_locking_orig()
 
3034
            writing_done.set()
 
3035
        c1.store.save_without_locking = c1_save_without_locking
 
3036
 
 
3037
        def c1_set():
 
3038
            c1.set('one', 'c1')
 
3039
        t1 = threading.Thread(target=c1_set)
 
3040
        # Collect the thread after the test
 
3041
        self.addCleanup(t1.join)
 
3042
        # Be ready to unblock the thread if the test goes wrong
 
3043
        self.addCleanup(do_writing.set)
 
3044
        t1.start()
 
3045
        # Ensure the thread is ready to write
 
3046
        ready_to_write.wait()
 
3047
        self.assertEqual('c1', c1.get('one'))
 
3048
        # If we read during the write, we get the old value
 
3049
        c2 = self.get_stack(self)
 
3050
        self.assertEqual('1', c2.get('one'))
 
3051
        # Let the writing occur and ensure it occurred
 
3052
        do_writing.set()
 
3053
        writing_done.wait()
 
3054
        # Now we get the updated value
 
3055
        c3 = self.get_stack(self)
 
3056
        self.assertEqual('c1', c3.get('one'))
 
3057
 
 
3058
    # FIXME: It may be worth looking into removing the lock dir when it's not
 
3059
    # needed anymore and look at possible fallouts for concurrent lockers. This
 
3060
    # will matter if/when we use config files outside of breezy directories
 
3061
    # (.config/breezy or .bzr) -- vila 20110-04-111
 
3062
 
 
3063
 
 
3064
class TestSectionMatcher(TestStore):
 
3065
 
 
3066
    scenarios = [('location', {'matcher': config.LocationMatcher}),
 
3067
                 ('id', {'matcher': config.NameMatcher}), ]
 
3068
 
 
3069
    def setUp(self):
 
3070
        super(TestSectionMatcher, self).setUp()
 
3071
        # Any simple store is good enough
 
3072
        self.get_store = config.test_store_builder_registry.get('configobj')
 
3073
 
 
3074
    def test_no_matches_for_empty_stores(self):
 
3075
        store = self.get_store(self)
 
3076
        store._load_from_string(b'')
 
3077
        matcher = self.matcher(store, '/bar')
 
3078
        self.assertEqual([], list(matcher.get_sections()))
 
3079
 
 
3080
    def test_build_doesnt_load_store(self):
 
3081
        store = self.get_store(self)
 
3082
        self.matcher(store, '/bar')
 
3083
        self.assertFalse(store.is_loaded())
 
3084
 
 
3085
 
 
3086
class TestLocationSection(tests.TestCase):
 
3087
 
 
3088
    def get_section(self, options, extra_path):
 
3089
        section = config.Section('foo', options)
 
3090
        return config.LocationSection(section, extra_path)
 
3091
 
 
3092
    def test_simple_option(self):
 
3093
        section = self.get_section({'foo': 'bar'}, '')
 
3094
        self.assertEqual('bar', section.get('foo'))
 
3095
 
 
3096
    def test_option_with_extra_path(self):
 
3097
        section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
 
3098
                                   'baz')
 
3099
        self.assertEqual('bar/baz', section.get('foo'))
 
3100
 
 
3101
    def test_invalid_policy(self):
 
3102
        section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
 
3103
                                   'baz')
 
3104
        # invalid policies are ignored
 
3105
        self.assertEqual('bar', section.get('foo'))
 
3106
 
 
3107
 
 
3108
class TestLocationMatcher(TestStore):
 
3109
 
 
3110
    def setUp(self):
 
3111
        super(TestLocationMatcher, self).setUp()
 
3112
        # Any simple store is good enough
 
3113
        self.get_store = config.test_store_builder_registry.get('configobj')
 
3114
 
 
3115
    def test_unrelated_section_excluded(self):
 
3116
        store = self.get_store(self)
 
3117
        store._load_from_string(b'''
 
3118
[/foo]
 
3119
section=/foo
 
3120
[/foo/baz]
 
3121
section=/foo/baz
 
3122
[/foo/bar]
 
3123
section=/foo/bar
 
3124
[/foo/bar/baz]
 
3125
section=/foo/bar/baz
 
3126
[/quux/quux]
 
3127
section=/quux/quux
 
3128
''')
 
3129
        self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
 
3130
                          '/quux/quux'],
 
3131
                         [section.id for _, section in store.get_sections()])
 
3132
        matcher = config.LocationMatcher(store, '/foo/bar/quux')
 
3133
        sections = [section for _, section in matcher.get_sections()]
 
3134
        self.assertEqual(['/foo/bar', '/foo'],
 
3135
                         [section.id for section in sections])
 
3136
        self.assertEqual(['quux', 'bar/quux'],
 
3137
                         [section.extra_path for section in sections])
 
3138
 
 
3139
    def test_more_specific_sections_first(self):
 
3140
        store = self.get_store(self)
 
3141
        store._load_from_string(b'''
 
3142
[/foo]
 
3143
section=/foo
 
3144
[/foo/bar]
 
3145
section=/foo/bar
 
3146
''')
 
3147
        self.assertEqual(['/foo', '/foo/bar'],
 
3148
                         [section.id for _, section in store.get_sections()])
 
3149
        matcher = config.LocationMatcher(store, '/foo/bar/baz')
 
3150
        sections = [section for _, section in matcher.get_sections()]
 
3151
        self.assertEqual(['/foo/bar', '/foo'],
 
3152
                         [section.id for section in sections])
 
3153
        self.assertEqual(['baz', 'bar/baz'],
 
3154
                         [section.extra_path for section in sections])
 
3155
 
 
3156
    def test_appendpath_in_no_name_section(self):
 
3157
        # It's a bit weird to allow appendpath in a no-name section, but
 
3158
        # someone may found a use for it
 
3159
        store = self.get_store(self)
 
3160
        store._load_from_string(b'''
 
3161
foo=bar
 
3162
foo:policy = appendpath
 
3163
''')
 
3164
        matcher = config.LocationMatcher(store, 'dir/subdir')
 
3165
        sections = list(matcher.get_sections())
 
3166
        self.assertLength(1, sections)
 
3167
        self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
 
3168
 
 
3169
    def test_file_urls_are_normalized(self):
 
3170
        store = self.get_store(self)
 
3171
        if sys.platform == 'win32':
 
3172
            expected_url = 'file:///C:/dir/subdir'
 
3173
            expected_location = 'C:/dir/subdir'
 
3174
        else:
 
3175
            expected_url = 'file:///dir/subdir'
 
3176
            expected_location = '/dir/subdir'
 
3177
        matcher = config.LocationMatcher(store, expected_url)
 
3178
        self.assertEqual(expected_location, matcher.location)
 
3179
 
 
3180
    def test_branch_name_colo(self):
 
3181
        store = self.get_store(self)
 
3182
        store._load_from_string(dedent("""\
 
3183
            [/]
 
3184
            push_location=my{branchname}
 
3185
        """).encode('ascii'))
 
3186
        matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
 
3187
        self.assertEqual('example<', matcher.branch_name)
 
3188
        ((_, section),) = matcher.get_sections()
 
3189
        self.assertEqual('example<', section.locals['branchname'])
 
3190
 
 
3191
    def test_branch_name_basename(self):
 
3192
        store = self.get_store(self)
 
3193
        store._load_from_string(dedent("""\
 
3194
            [/]
 
3195
            push_location=my{branchname}
 
3196
        """).encode('ascii'))
 
3197
        matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
 
3198
        self.assertEqual('example<', matcher.branch_name)
 
3199
        ((_, section),) = matcher.get_sections()
 
3200
        self.assertEqual('example<', section.locals['branchname'])
 
3201
 
 
3202
 
 
3203
class TestStartingPathMatcher(TestStore):
 
3204
 
 
3205
    def setUp(self):
 
3206
        super(TestStartingPathMatcher, self).setUp()
 
3207
        # Any simple store is good enough
 
3208
        self.store = config.IniFileStore()
 
3209
 
 
3210
    def assertSectionIDs(self, expected, location, content):
 
3211
        self.store._load_from_string(content)
 
3212
        matcher = config.StartingPathMatcher(self.store, location)
 
3213
        sections = list(matcher.get_sections())
 
3214
        self.assertLength(len(expected), sections)
 
3215
        self.assertEqual(expected, [section.id for _, section in sections])
 
3216
        return sections
 
3217
 
 
3218
    def test_empty(self):
 
3219
        self.assertSectionIDs([], self.get_url(), b'')
 
3220
 
 
3221
    def test_url_vs_local_paths(self):
 
3222
        # The matcher location is an url and the section names are local paths
 
3223
        self.assertSectionIDs(['/foo/bar', '/foo'],
 
3224
                              'file:///foo/bar/baz', b'''\
 
3225
[/foo]
 
3226
[/foo/bar]
 
3227
''')
 
3228
 
 
3229
    def test_local_path_vs_url(self):
 
3230
        # The matcher location is a local path and the section names are urls
 
3231
        self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
 
3232
                              '/foo/bar/baz', b'''\
 
3233
[file:///foo]
 
3234
[file:///foo/bar]
 
3235
''')
 
3236
 
 
3237
    def test_no_name_section_included_when_present(self):
 
3238
        # Note that other tests will cover the case where the no-name section
 
3239
        # is empty and as such, not included.
 
3240
        sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
 
3241
                                         '/foo/bar/baz', b'''\
 
3242
option = defined so the no-name section exists
 
3243
[/foo]
 
3244
[/foo/bar]
 
3245
''')
 
3246
        self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
 
3247
                         [s.locals['relpath'] for _, s in sections])
 
3248
 
 
3249
    def test_order_reversed(self):
 
3250
        self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
 
3251
[/foo]
 
3252
[/foo/bar]
 
3253
''')
 
3254
 
 
3255
    def test_unrelated_section_excluded(self):
 
3256
        self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
 
3257
[/foo]
 
3258
[/foo/qux]
 
3259
[/foo/bar]
 
3260
''')
 
3261
 
 
3262
    def test_glob_included(self):
 
3263
        sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
 
3264
                                         '/foo/bar/baz', b'''\
 
3265
[/foo]
 
3266
[/foo/qux]
 
3267
[/foo/b*]
 
3268
[/foo/*/baz]
 
3269
''')
 
3270
        # Note that 'baz' as a relpath for /foo/b* is not fully correct, but
 
3271
        # nothing really is... as far using {relpath} to append it to something
 
3272
        # else, this seems good enough though.
 
3273
        self.assertEqual(['', 'baz', 'bar/baz'],
 
3274
                         [s.locals['relpath'] for _, s in sections])
 
3275
 
 
3276
    def test_respect_order(self):
 
3277
        self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
 
3278
                              '/foo/bar/baz', b'''\
 
3279
[/foo/*/baz]
 
3280
[/foo/qux]
 
3281
[/foo/b*]
 
3282
[/foo]
 
3283
''')
 
3284
 
 
3285
 
 
3286
class TestNameMatcher(TestStore):
 
3287
 
 
3288
    def setUp(self):
 
3289
        super(TestNameMatcher, self).setUp()
 
3290
        self.matcher = config.NameMatcher
 
3291
        # Any simple store is good enough
 
3292
        self.get_store = config.test_store_builder_registry.get('configobj')
 
3293
 
 
3294
    def get_matching_sections(self, name):
 
3295
        store = self.get_store(self)
 
3296
        store._load_from_string(b'''
 
3297
[foo]
 
3298
option=foo
 
3299
[foo/baz]
 
3300
option=foo/baz
 
3301
[bar]
 
3302
option=bar
 
3303
''')
 
3304
        matcher = self.matcher(store, name)
 
3305
        return list(matcher.get_sections())
 
3306
 
 
3307
    def test_matching(self):
 
3308
        sections = self.get_matching_sections('foo')
 
3309
        self.assertLength(1, sections)
 
3310
        self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
 
3311
 
 
3312
    def test_not_matching(self):
 
3313
        sections = self.get_matching_sections('baz')
 
3314
        self.assertLength(0, sections)
 
3315
 
 
3316
 
 
3317
class TestBaseStackGet(tests.TestCase):
 
3318
 
 
3319
    def setUp(self):
 
3320
        super(TestBaseStackGet, self).setUp()
 
3321
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3322
 
 
3323
    def test_get_first_definition(self):
 
3324
        store1 = config.IniFileStore()
 
3325
        store1._load_from_string(b'foo=bar')
 
3326
        store2 = config.IniFileStore()
 
3327
        store2._load_from_string(b'foo=baz')
 
3328
        conf = config.Stack([store1.get_sections, store2.get_sections])
 
3329
        self.assertEqual('bar', conf.get('foo'))
 
3330
 
 
3331
    def test_get_with_registered_default_value(self):
 
3332
        config.option_registry.register(config.Option('foo', default='bar'))
 
3333
        conf_stack = config.Stack([])
 
3334
        self.assertEqual('bar', conf_stack.get('foo'))
 
3335
 
 
3336
    def test_get_without_registered_default_value(self):
 
3337
        config.option_registry.register(config.Option('foo'))
 
3338
        conf_stack = config.Stack([])
 
3339
        self.assertEqual(None, conf_stack.get('foo'))
 
3340
 
 
3341
    def test_get_without_default_value_for_not_registered(self):
 
3342
        conf_stack = config.Stack([])
 
3343
        self.assertEqual(None, conf_stack.get('foo'))
 
3344
 
 
3345
    def test_get_for_empty_section_callable(self):
 
3346
        conf_stack = config.Stack([lambda: []])
 
3347
        self.assertEqual(None, conf_stack.get('foo'))
 
3348
 
 
3349
    def test_get_for_broken_callable(self):
 
3350
        # Trying to use and invalid callable raises an exception on first use
 
3351
        conf_stack = config.Stack([object])
 
3352
        self.assertRaises(TypeError, conf_stack.get, 'foo')
 
3353
 
 
3354
 
 
3355
class TestStackWithSimpleStore(tests.TestCase):
 
3356
 
 
3357
    def setUp(self):
 
3358
        super(TestStackWithSimpleStore, self).setUp()
 
3359
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3360
        self.registry = config.option_registry
 
3361
 
 
3362
    def get_conf(self, content=None):
 
3363
        return config.MemoryStack(content)
 
3364
 
 
3365
    def test_override_value_from_env(self):
 
3366
        self.overrideEnv('FOO', None)
 
3367
        self.registry.register(
 
3368
            config.Option('foo', default='bar', override_from_env=['FOO']))
 
3369
        self.overrideEnv('FOO', 'quux')
 
3370
        # Env variable provides a default taking over the option one
 
3371
        conf = self.get_conf(b'foo=store')
 
3372
        self.assertEqual('quux', conf.get('foo'))
 
3373
 
 
3374
    def test_first_override_value_from_env_wins(self):
 
3375
        self.overrideEnv('NO_VALUE', None)
 
3376
        self.overrideEnv('FOO', None)
 
3377
        self.overrideEnv('BAZ', None)
 
3378
        self.registry.register(
 
3379
            config.Option('foo', default='bar',
 
3380
                          override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
 
3381
        self.overrideEnv('FOO', 'foo')
 
3382
        self.overrideEnv('BAZ', 'baz')
 
3383
        # The first env var set wins
 
3384
        conf = self.get_conf(b'foo=store')
 
3385
        self.assertEqual('foo', conf.get('foo'))
 
3386
 
 
3387
 
 
3388
class TestMemoryStack(tests.TestCase):
 
3389
 
 
3390
    def test_get(self):
 
3391
        conf = config.MemoryStack(b'foo=bar')
 
3392
        self.assertEqual('bar', conf.get('foo'))
 
3393
 
 
3394
    def test_set(self):
 
3395
        conf = config.MemoryStack(b'foo=bar')
 
3396
        conf.set('foo', 'baz')
 
3397
        self.assertEqual('baz', conf.get('foo'))
 
3398
 
 
3399
    def test_no_content(self):
 
3400
        conf = config.MemoryStack()
 
3401
        # No content means no loading
 
3402
        self.assertFalse(conf.store.is_loaded())
 
3403
        self.assertRaises(NotImplementedError, conf.get, 'foo')
 
3404
        # But a content can still be provided
 
3405
        conf.store._load_from_string(b'foo=bar')
 
3406
        self.assertEqual('bar', conf.get('foo'))
 
3407
 
 
3408
 
 
3409
class TestStackIterSections(tests.TestCase):
 
3410
 
 
3411
    def test_empty_stack(self):
 
3412
        conf = config.Stack([])
 
3413
        sections = list(conf.iter_sections())
 
3414
        self.assertLength(0, sections)
 
3415
 
 
3416
    def test_empty_store(self):
 
3417
        store = config.IniFileStore()
 
3418
        store._load_from_string(b'')
 
3419
        conf = config.Stack([store.get_sections])
 
3420
        sections = list(conf.iter_sections())
 
3421
        self.assertLength(0, sections)
 
3422
 
 
3423
    def test_simple_store(self):
 
3424
        store = config.IniFileStore()
 
3425
        store._load_from_string(b'foo=bar')
 
3426
        conf = config.Stack([store.get_sections])
 
3427
        tuples = list(conf.iter_sections())
 
3428
        self.assertLength(1, tuples)
 
3429
        (found_store, found_section) = tuples[0]
 
3430
        self.assertIs(store, found_store)
 
3431
 
 
3432
    def test_two_stores(self):
 
3433
        store1 = config.IniFileStore()
 
3434
        store1._load_from_string(b'foo=bar')
 
3435
        store2 = config.IniFileStore()
 
3436
        store2._load_from_string(b'bar=qux')
 
3437
        conf = config.Stack([store1.get_sections, store2.get_sections])
 
3438
        tuples = list(conf.iter_sections())
 
3439
        self.assertLength(2, tuples)
 
3440
        self.assertIs(store1, tuples[0][0])
 
3441
        self.assertIs(store2, tuples[1][0])
 
3442
 
 
3443
 
 
3444
class TestStackWithTransport(tests.TestCaseWithTransport):
 
3445
 
 
3446
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
3447
                 in config.test_stack_builder_registry.iteritems()]
 
3448
 
 
3449
 
 
3450
class TestConcreteStacks(TestStackWithTransport):
 
3451
 
 
3452
    def test_build_stack(self):
 
3453
        # Just a smoke test to help debug builders
 
3454
        self.get_stack(self)
 
3455
 
 
3456
 
 
3457
class TestStackGet(TestStackWithTransport):
 
3458
 
 
3459
    def setUp(self):
 
3460
        super(TestStackGet, self).setUp()
 
3461
        self.conf = self.get_stack(self)
 
3462
 
 
3463
    def test_get_for_empty_stack(self):
 
3464
        self.assertEqual(None, self.conf.get('foo'))
 
3465
 
 
3466
    def test_get_hook(self):
 
3467
        self.conf.set('foo', 'bar')
 
3468
        calls = []
 
3469
 
 
3470
        def hook(*args):
 
3471
            calls.append(args)
 
3472
        config.ConfigHooks.install_named_hook('get', hook, None)
 
3473
        self.assertLength(0, calls)
 
3474
        value = self.conf.get('foo')
 
3475
        self.assertEqual('bar', value)
 
3476
        self.assertLength(1, calls)
 
3477
        self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
 
3478
 
 
3479
 
 
3480
class TestStackGetWithConverter(tests.TestCase):
 
3481
 
 
3482
    def setUp(self):
 
3483
        super(TestStackGetWithConverter, self).setUp()
 
3484
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3485
        self.registry = config.option_registry
 
3486
 
 
3487
    def get_conf(self, content=None):
 
3488
        return config.MemoryStack(content)
 
3489
 
 
3490
    def register_bool_option(self, name, default=None, default_from_env=None):
 
3491
        b = config.Option(name, help='A boolean.',
 
3492
                          default=default, default_from_env=default_from_env,
 
3493
                          from_unicode=config.bool_from_store)
 
3494
        self.registry.register(b)
 
3495
 
 
3496
    def test_get_default_bool_None(self):
 
3497
        self.register_bool_option('foo')
 
3498
        conf = self.get_conf(b'')
 
3499
        self.assertEqual(None, conf.get('foo'))
 
3500
 
 
3501
    def test_get_default_bool_True(self):
 
3502
        self.register_bool_option('foo', u'True')
 
3503
        conf = self.get_conf(b'')
 
3504
        self.assertEqual(True, conf.get('foo'))
 
3505
 
 
3506
    def test_get_default_bool_False(self):
 
3507
        self.register_bool_option('foo', False)
 
3508
        conf = self.get_conf(b'')
 
3509
        self.assertEqual(False, conf.get('foo'))
 
3510
 
 
3511
    def test_get_default_bool_False_as_string(self):
 
3512
        self.register_bool_option('foo', u'False')
 
3513
        conf = self.get_conf(b'')
 
3514
        self.assertEqual(False, conf.get('foo'))
 
3515
 
 
3516
    def test_get_default_bool_from_env_converted(self):
 
3517
        self.register_bool_option('foo', u'True', default_from_env=['FOO'])
 
3518
        self.overrideEnv('FOO', 'False')
 
3519
        conf = self.get_conf(b'')
 
3520
        self.assertEqual(False, conf.get('foo'))
 
3521
 
 
3522
    def test_get_default_bool_when_conversion_fails(self):
 
3523
        self.register_bool_option('foo', default='True')
 
3524
        conf = self.get_conf(b'foo=invalid boolean')
 
3525
        self.assertEqual(True, conf.get('foo'))
 
3526
 
 
3527
    def register_integer_option(self, name,
 
3528
                                default=None, default_from_env=None):
 
3529
        i = config.Option(name, help='An integer.',
 
3530
                          default=default, default_from_env=default_from_env,
 
3531
                          from_unicode=config.int_from_store)
 
3532
        self.registry.register(i)
 
3533
 
 
3534
    def test_get_default_integer_None(self):
 
3535
        self.register_integer_option('foo')
 
3536
        conf = self.get_conf(b'')
 
3537
        self.assertEqual(None, conf.get('foo'))
 
3538
 
 
3539
    def test_get_default_integer(self):
 
3540
        self.register_integer_option('foo', 42)
 
3541
        conf = self.get_conf(b'')
 
3542
        self.assertEqual(42, conf.get('foo'))
 
3543
 
 
3544
    def test_get_default_integer_as_string(self):
 
3545
        self.register_integer_option('foo', u'42')
 
3546
        conf = self.get_conf(b'')
 
3547
        self.assertEqual(42, conf.get('foo'))
 
3548
 
 
3549
    def test_get_default_integer_from_env(self):
 
3550
        self.register_integer_option('foo', default_from_env=['FOO'])
 
3551
        self.overrideEnv('FOO', '18')
 
3552
        conf = self.get_conf(b'')
 
3553
        self.assertEqual(18, conf.get('foo'))
 
3554
 
 
3555
    def test_get_default_integer_when_conversion_fails(self):
 
3556
        self.register_integer_option('foo', default='12')
 
3557
        conf = self.get_conf(b'foo=invalid integer')
 
3558
        self.assertEqual(12, conf.get('foo'))
 
3559
 
 
3560
    def register_list_option(self, name, default=None, default_from_env=None):
 
3561
        l = config.ListOption(name, help='A list.', default=default,
 
3562
                              default_from_env=default_from_env)
 
3563
        self.registry.register(l)
 
3564
 
 
3565
    def test_get_default_list_None(self):
 
3566
        self.register_list_option('foo')
 
3567
        conf = self.get_conf(b'')
 
3568
        self.assertEqual(None, conf.get('foo'))
 
3569
 
 
3570
    def test_get_default_list_empty(self):
 
3571
        self.register_list_option('foo', '')
 
3572
        conf = self.get_conf(b'')
 
3573
        self.assertEqual([], conf.get('foo'))
 
3574
 
 
3575
    def test_get_default_list_from_env(self):
 
3576
        self.register_list_option('foo', default_from_env=['FOO'])
 
3577
        self.overrideEnv('FOO', '')
 
3578
        conf = self.get_conf(b'')
 
3579
        self.assertEqual([], conf.get('foo'))
 
3580
 
 
3581
    def test_get_with_list_converter_no_item(self):
 
3582
        self.register_list_option('foo', None)
 
3583
        conf = self.get_conf(b'foo=,')
 
3584
        self.assertEqual([], conf.get('foo'))
 
3585
 
 
3586
    def test_get_with_list_converter_many_items(self):
 
3587
        self.register_list_option('foo', None)
 
3588
        conf = self.get_conf(b'foo=m,o,r,e')
 
3589
        self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
 
3590
 
 
3591
    def test_get_with_list_converter_embedded_spaces_many_items(self):
 
3592
        self.register_list_option('foo', None)
 
3593
        conf = self.get_conf(b'foo=" bar", "baz "')
 
3594
        self.assertEqual([' bar', 'baz '], conf.get('foo'))
 
3595
 
 
3596
    def test_get_with_list_converter_stripped_spaces_many_items(self):
 
3597
        self.register_list_option('foo', None)
 
3598
        conf = self.get_conf(b'foo= bar ,  baz ')
 
3599
        self.assertEqual(['bar', 'baz'], conf.get('foo'))
 
3600
 
 
3601
 
 
3602
class TestIterOptionRefs(tests.TestCase):
 
3603
    """iter_option_refs is a bit unusual, document some cases."""
 
3604
 
 
3605
    def assertRefs(self, expected, string):
 
3606
        self.assertEqual(expected, list(config.iter_option_refs(string)))
 
3607
 
 
3608
    def test_empty(self):
 
3609
        self.assertRefs([(False, '')], '')
 
3610
 
 
3611
    def test_no_refs(self):
 
3612
        self.assertRefs([(False, 'foo bar')], 'foo bar')
 
3613
 
 
3614
    def test_single_ref(self):
 
3615
        self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
 
3616
 
 
3617
    def test_broken_ref(self):
 
3618
        self.assertRefs([(False, '{foo')], '{foo')
 
3619
 
 
3620
    def test_embedded_ref(self):
 
3621
        self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
 
3622
                        '{{foo}}')
 
3623
 
 
3624
    def test_two_refs(self):
 
3625
        self.assertRefs([(False, ''), (True, '{foo}'),
 
3626
                         (False, ''), (True, '{bar}'),
 
3627
                         (False, ''), ],
 
3628
                        '{foo}{bar}')
 
3629
 
 
3630
    def test_newline_in_refs_are_not_matched(self):
 
3631
        self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
 
3632
 
 
3633
 
 
3634
class TestStackExpandOptions(tests.TestCaseWithTransport):
 
3635
 
 
3636
    def setUp(self):
 
3637
        super(TestStackExpandOptions, self).setUp()
 
3638
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3639
        self.registry = config.option_registry
 
3640
        store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
3641
        self.conf = config.Stack([store.get_sections], store)
 
3642
 
 
3643
    def assertExpansion(self, expected, string, env=None):
 
3644
        self.assertEqual(expected, self.conf.expand_options(string, env))
 
3645
 
 
3646
    def test_no_expansion(self):
 
3647
        self.assertExpansion('foo', 'foo')
 
3648
 
 
3649
    def test_expand_default_value(self):
 
3650
        self.conf.store._load_from_string(b'bar=baz')
 
3651
        self.registry.register(config.Option('foo', default=u'{bar}'))
 
3652
        self.assertEqual('baz', self.conf.get('foo', expand=True))
 
3653
 
 
3654
    def test_expand_default_from_env(self):
 
3655
        self.conf.store._load_from_string(b'bar=baz')
 
3656
        self.registry.register(config.Option('foo', default_from_env=['FOO']))
 
3657
        self.overrideEnv('FOO', '{bar}')
 
3658
        self.assertEqual('baz', self.conf.get('foo', expand=True))
 
3659
 
 
3660
    def test_expand_default_on_failed_conversion(self):
 
3661
        self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
 
3662
        self.registry.register(
 
3663
            config.Option('foo', default=u'{bar}',
 
3664
                          from_unicode=config.int_from_store))
 
3665
        self.assertEqual(42, self.conf.get('foo', expand=True))
 
3666
 
 
3667
    def test_env_adding_options(self):
 
3668
        self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
 
3669
 
 
3670
    def test_env_overriding_options(self):
 
3671
        self.conf.store._load_from_string(b'foo=baz')
 
3672
        self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
 
3673
 
 
3674
    def test_simple_ref(self):
 
3675
        self.conf.store._load_from_string(b'foo=xxx')
 
3676
        self.assertExpansion('xxx', '{foo}')
 
3677
 
 
3678
    def test_unknown_ref(self):
 
3679
        self.assertRaises(config.ExpandingUnknownOption,
 
3680
                          self.conf.expand_options, '{foo}')
 
3681
 
 
3682
    def test_illegal_def_is_ignored(self):
 
3683
        self.assertExpansion('{1,2}', '{1,2}')
 
3684
        self.assertExpansion('{ }', '{ }')
 
3685
        self.assertExpansion('${Foo,f}', '${Foo,f}')
 
3686
 
 
3687
    def test_indirect_ref(self):
 
3688
        self.conf.store._load_from_string(b'''
 
3689
foo=xxx
 
3690
bar={foo}
 
3691
''')
 
3692
        self.assertExpansion('xxx', '{bar}')
 
3693
 
 
3694
    def test_embedded_ref(self):
 
3695
        self.conf.store._load_from_string(b'''
 
3696
foo=xxx
 
3697
bar=foo
 
3698
''')
 
3699
        self.assertExpansion('xxx', '{{bar}}')
 
3700
 
 
3701
    def test_simple_loop(self):
 
3702
        self.conf.store._load_from_string(b'foo={foo}')
 
3703
        self.assertRaises(config.OptionExpansionLoop,
 
3704
                          self.conf.expand_options, '{foo}')
 
3705
 
 
3706
    def test_indirect_loop(self):
 
3707
        self.conf.store._load_from_string(b'''
 
3708
foo={bar}
 
3709
bar={baz}
 
3710
baz={foo}''')
 
3711
        e = self.assertRaises(config.OptionExpansionLoop,
 
3712
                              self.conf.expand_options, '{foo}')
 
3713
        self.assertEqual('foo->bar->baz', e.refs)
 
3714
        self.assertEqual('{foo}', e.string)
 
3715
 
 
3716
    def test_list(self):
 
3717
        self.conf.store._load_from_string(b'''
 
3718
foo=start
 
3719
bar=middle
 
3720
baz=end
 
3721
list={foo},{bar},{baz}
 
3722
''')
 
3723
        self.registry.register(
 
3724
            config.ListOption('list'))
 
3725
        self.assertEqual(['start', 'middle', 'end'],
 
3726
                         self.conf.get('list', expand=True))
 
3727
 
 
3728
    def test_cascading_list(self):
 
3729
        self.conf.store._load_from_string(b'''
 
3730
foo=start,{bar}
 
3731
bar=middle,{baz}
 
3732
baz=end
 
3733
list={foo}
 
3734
''')
 
3735
        self.registry.register(config.ListOption('list'))
 
3736
        # Register an intermediate option as a list to ensure no conversion
 
3737
        # happen while expanding. Conversion should only occur for the original
 
3738
        # option ('list' here).
 
3739
        self.registry.register(config.ListOption('baz'))
 
3740
        self.assertEqual(['start', 'middle', 'end'],
 
3741
                         self.conf.get('list', expand=True))
 
3742
 
 
3743
    def test_pathologically_hidden_list(self):
 
3744
        self.conf.store._load_from_string(b'''
 
3745
foo=bin
 
3746
bar=go
 
3747
start={foo
 
3748
middle=},{
 
3749
end=bar}
 
3750
hidden={start}{middle}{end}
 
3751
''')
 
3752
        # What matters is what the registration says, the conversion happens
 
3753
        # only after all expansions have been performed
 
3754
        self.registry.register(config.ListOption('hidden'))
 
3755
        self.assertEqual(['bin', 'go'],
 
3756
                         self.conf.get('hidden', expand=True))
 
3757
 
 
3758
 
 
3759
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
 
3760
 
 
3761
    def setUp(self):
 
3762
        super(TestStackCrossSectionsExpand, self).setUp()
 
3763
 
 
3764
    def get_config(self, location, string):
 
3765
        if string is None:
 
3766
            string = b''
 
3767
        # Since we don't save the config we won't strictly require to inherit
 
3768
        # from TestCaseInTempDir, but an error occurs so quickly...
 
3769
        c = config.LocationStack(location)
 
3770
        c.store._load_from_string(string)
 
3771
        return c
 
3772
 
 
3773
    def test_dont_cross_unrelated_section(self):
 
3774
        c = self.get_config('/another/branch/path', b'''
 
3775
[/one/branch/path]
 
3776
foo = hello
 
3777
bar = {foo}/2
 
3778
 
 
3779
[/another/branch/path]
 
3780
bar = {foo}/2
 
3781
''')
 
3782
        self.assertRaises(config.ExpandingUnknownOption,
 
3783
                          c.get, 'bar', expand=True)
 
3784
 
 
3785
    def test_cross_related_sections(self):
 
3786
        c = self.get_config('/project/branch/path', b'''
 
3787
[/project]
 
3788
foo = qu
 
3789
 
 
3790
[/project/branch/path]
 
3791
bar = {foo}ux
 
3792
''')
 
3793
        self.assertEqual('quux', c.get('bar', expand=True))
 
3794
 
 
3795
 
 
3796
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
 
3797
 
 
3798
    def test_cross_global_locations(self):
 
3799
        l_store = config.LocationStore()
 
3800
        l_store._load_from_string(b'''
 
3801
[/branch]
 
3802
lfoo = loc-foo
 
3803
lbar = {gbar}
 
3804
''')
 
3805
        l_store.save()
 
3806
        g_store = config.GlobalStore()
 
3807
        g_store._load_from_string(b'''
 
3808
[DEFAULT]
 
3809
gfoo = {lfoo}
 
3810
gbar = glob-bar
 
3811
''')
 
3812
        g_store.save()
 
3813
        stack = config.LocationStack('/branch')
 
3814
        self.assertEqual('glob-bar', stack.get('lbar', expand=True))
 
3815
        self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
 
3816
 
 
3817
 
 
3818
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
 
3819
 
 
3820
    def test_expand_locals_empty(self):
 
3821
        l_store = config.LocationStore()
 
3822
        l_store._load_from_string(b'''
 
3823
[/home/user/project]
 
3824
base = {basename}
 
3825
rel = {relpath}
 
3826
''')
 
3827
        l_store.save()
 
3828
        stack = config.LocationStack('/home/user/project/')
 
3829
        self.assertEqual('', stack.get('base', expand=True))
 
3830
        self.assertEqual('', stack.get('rel', expand=True))
 
3831
 
 
3832
    def test_expand_basename_locally(self):
 
3833
        l_store = config.LocationStore()
 
3834
        l_store._load_from_string(b'''
 
3835
[/home/user/project]
 
3836
bfoo = {basename}
 
3837
''')
 
3838
        l_store.save()
 
3839
        stack = config.LocationStack('/home/user/project/branch')
 
3840
        self.assertEqual('branch', stack.get('bfoo', expand=True))
 
3841
 
 
3842
    def test_expand_basename_locally_longer_path(self):
 
3843
        l_store = config.LocationStore()
 
3844
        l_store._load_from_string(b'''
 
3845
[/home/user]
 
3846
bfoo = {basename}
 
3847
''')
 
3848
        l_store.save()
 
3849
        stack = config.LocationStack('/home/user/project/dir/branch')
 
3850
        self.assertEqual('branch', stack.get('bfoo', expand=True))
 
3851
 
 
3852
    def test_expand_relpath_locally(self):
 
3853
        l_store = config.LocationStore()
 
3854
        l_store._load_from_string(b'''
 
3855
[/home/user/project]
 
3856
lfoo = loc-foo/{relpath}
 
3857
''')
 
3858
        l_store.save()
 
3859
        stack = config.LocationStack('/home/user/project/branch')
 
3860
        self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
 
3861
 
 
3862
    def test_expand_relpath_unknonw_in_global(self):
 
3863
        g_store = config.GlobalStore()
 
3864
        g_store._load_from_string(b'''
 
3865
[DEFAULT]
 
3866
gfoo = {relpath}
 
3867
''')
 
3868
        g_store.save()
 
3869
        stack = config.LocationStack('/home/user/project/branch')
 
3870
        self.assertRaises(config.ExpandingUnknownOption,
 
3871
                          stack.get, 'gfoo', expand=True)
 
3872
 
 
3873
    def test_expand_local_option_locally(self):
 
3874
        l_store = config.LocationStore()
 
3875
        l_store._load_from_string(b'''
 
3876
[/home/user/project]
 
3877
lfoo = loc-foo/{relpath}
 
3878
lbar = {gbar}
 
3879
''')
 
3880
        l_store.save()
 
3881
        g_store = config.GlobalStore()
 
3882
        g_store._load_from_string(b'''
 
3883
[DEFAULT]
 
3884
gfoo = {lfoo}
 
3885
gbar = glob-bar
 
3886
''')
 
3887
        g_store.save()
 
3888
        stack = config.LocationStack('/home/user/project/branch')
 
3889
        self.assertEqual('glob-bar', stack.get('lbar', expand=True))
 
3890
        self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
 
3891
 
 
3892
    def test_locals_dont_leak(self):
 
3893
        """Make sure we chose the right local in presence of several sections.
 
3894
        """
 
3895
        l_store = config.LocationStore()
 
3896
        l_store._load_from_string(b'''
 
3897
[/home/user]
 
3898
lfoo = loc-foo/{relpath}
 
3899
[/home/user/project]
 
3900
lfoo = loc-foo/{relpath}
 
3901
''')
 
3902
        l_store.save()
 
3903
        stack = config.LocationStack('/home/user/project/branch')
 
3904
        self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
 
3905
        stack = config.LocationStack('/home/user/bar/baz')
 
3906
        self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
 
3907
 
 
3908
 
 
3909
class TestStackSet(TestStackWithTransport):
 
3910
 
 
3911
    def test_simple_set(self):
 
3912
        conf = self.get_stack(self)
 
3913
        self.assertEqual(None, conf.get('foo'))
 
3914
        conf.set('foo', 'baz')
 
3915
        # Did we get it back ?
 
3916
        self.assertEqual('baz', conf.get('foo'))
 
3917
 
 
3918
    def test_set_creates_a_new_section(self):
 
3919
        conf = self.get_stack(self)
 
3920
        conf.set('foo', 'baz')
 
3921
        self.assertEqual, 'baz', conf.get('foo')
 
3922
 
 
3923
    def test_set_hook(self):
 
3924
        calls = []
 
3925
 
 
3926
        def hook(*args):
 
3927
            calls.append(args)
 
3928
        config.ConfigHooks.install_named_hook('set', hook, None)
 
3929
        self.assertLength(0, calls)
 
3930
        conf = self.get_stack(self)
 
3931
        conf.set('foo', 'bar')
 
3932
        self.assertLength(1, calls)
 
3933
        self.assertEqual((conf, 'foo', 'bar'), calls[0])
 
3934
 
 
3935
 
 
3936
class TestStackRemove(TestStackWithTransport):
 
3937
 
 
3938
    def test_remove_existing(self):
 
3939
        conf = self.get_stack(self)
 
3940
        conf.set('foo', 'bar')
 
3941
        self.assertEqual('bar', conf.get('foo'))
 
3942
        conf.remove('foo')
 
3943
        # Did we get it back ?
 
3944
        self.assertEqual(None, conf.get('foo'))
 
3945
 
 
3946
    def test_remove_unknown(self):
 
3947
        conf = self.get_stack(self)
 
3948
        self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
 
3949
 
 
3950
    def test_remove_hook(self):
 
3951
        calls = []
 
3952
 
 
3953
        def hook(*args):
 
3954
            calls.append(args)
 
3955
        config.ConfigHooks.install_named_hook('remove', hook, None)
 
3956
        self.assertLength(0, calls)
 
3957
        conf = self.get_stack(self)
 
3958
        conf.set('foo', 'bar')
 
3959
        conf.remove('foo')
 
3960
        self.assertLength(1, calls)
 
3961
        self.assertEqual((conf, 'foo'), calls[0])
 
3962
 
 
3963
 
 
3964
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
 
3965
 
 
3966
    def setUp(self):
 
3967
        super(TestConfigGetOptions, self).setUp()
 
3968
        create_configs(self)
 
3969
 
 
3970
    def test_no_variable(self):
 
3971
        # Using branch should query branch, locations and breezy
 
3972
        self.assertOptions([], self.branch_config)
 
3973
 
 
3974
    def test_option_in_breezy(self):
 
3975
        self.breezy_config.set_user_option('file', 'breezy')
 
3976
        self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
 
3977
                           self.breezy_config)
 
3978
 
 
3979
    def test_option_in_locations(self):
 
3980
        self.locations_config.set_user_option('file', 'locations')
 
3981
        self.assertOptions(
 
3982
            [('file', 'locations', self.tree.basedir, 'locations')],
 
3983
            self.locations_config)
 
3984
 
 
3985
    def test_option_in_branch(self):
 
3986
        self.branch_config.set_user_option('file', 'branch')
 
3987
        self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
 
3988
                           self.branch_config)
 
3989
 
 
3990
    def test_option_in_breezy_and_branch(self):
 
3991
        self.breezy_config.set_user_option('file', 'breezy')
 
3992
        self.branch_config.set_user_option('file', 'branch')
 
3993
        self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
 
3994
                            ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
3995
                           self.branch_config)
 
3996
 
 
3997
    def test_option_in_branch_and_locations(self):
 
3998
        # Hmm, locations override branch :-/
 
3999
        self.locations_config.set_user_option('file', 'locations')
 
4000
        self.branch_config.set_user_option('file', 'branch')
 
4001
        self.assertOptions(
 
4002
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4003
             ('file', 'branch', 'DEFAULT', 'branch'), ],
 
4004
            self.branch_config)
 
4005
 
 
4006
    def test_option_in_breezy_locations_and_branch(self):
 
4007
        self.breezy_config.set_user_option('file', 'breezy')
 
4008
        self.locations_config.set_user_option('file', 'locations')
 
4009
        self.branch_config.set_user_option('file', 'branch')
 
4010
        self.assertOptions(
 
4011
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4012
             ('file', 'branch', 'DEFAULT', 'branch'),
 
4013
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
4014
            self.branch_config)
 
4015
 
 
4016
 
 
4017
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
 
4018
 
 
4019
    def setUp(self):
 
4020
        super(TestConfigRemoveOption, self).setUp()
 
4021
        create_configs_with_file_option(self)
 
4022
 
 
4023
    def test_remove_in_locations(self):
 
4024
        self.locations_config.remove_user_option('file', self.tree.basedir)
 
4025
        self.assertOptions(
 
4026
            [('file', 'branch', 'DEFAULT', 'branch'),
 
4027
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
4028
            self.branch_config)
 
4029
 
 
4030
    def test_remove_in_branch(self):
 
4031
        self.branch_config.remove_user_option('file')
 
4032
        self.assertOptions(
 
4033
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4034
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
4035
            self.branch_config)
 
4036
 
 
4037
    def test_remove_in_breezy(self):
 
4038
        self.breezy_config.remove_user_option('file')
 
4039
        self.assertOptions(
 
4040
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4041
             ('file', 'branch', 'DEFAULT', 'branch'), ],
 
4042
            self.branch_config)
 
4043
 
 
4044
 
 
4045
class TestConfigGetSections(tests.TestCaseWithTransport):
 
4046
 
 
4047
    def setUp(self):
 
4048
        super(TestConfigGetSections, self).setUp()
 
4049
        create_configs(self)
 
4050
 
 
4051
    def assertSectionNames(self, expected, conf, name=None):
 
4052
        """Check which sections are returned for a given config.
 
4053
 
 
4054
        If fallback configurations exist their sections can be included.
 
4055
 
 
4056
        :param expected: A list of section names.
 
4057
 
 
4058
        :param conf: The configuration that will be queried.
 
4059
 
 
4060
        :param name: An optional section name that will be passed to
 
4061
            get_sections().
 
4062
        """
 
4063
        sections = list(conf._get_sections(name))
 
4064
        self.assertLength(len(expected), sections)
 
4065
        self.assertEqual(expected, [n for n, _, _ in sections])
 
4066
 
 
4067
    def test_breezy_default_section(self):
 
4068
        self.assertSectionNames(['DEFAULT'], self.breezy_config)
 
4069
 
 
4070
    def test_locations_default_section(self):
 
4071
        # No sections are defined in an empty file
 
4072
        self.assertSectionNames([], self.locations_config)
 
4073
 
 
4074
    def test_locations_named_section(self):
 
4075
        self.locations_config.set_user_option('file', 'locations')
 
4076
        self.assertSectionNames([self.tree.basedir], self.locations_config)
 
4077
 
 
4078
    def test_locations_matching_sections(self):
 
4079
        loc_config = self.locations_config
 
4080
        loc_config.set_user_option('file', 'locations')
 
4081
        # We need to cheat a bit here to create an option in sections above and
 
4082
        # below the 'location' one.
 
4083
        parser = loc_config._get_parser()
 
4084
        # locations.cong deals with '/' ignoring native os.sep
 
4085
        location_names = self.tree.basedir.split('/')
 
4086
        parent = '/'.join(location_names[:-1])
 
4087
        child = '/'.join(location_names + ['child'])
 
4088
        parser[parent] = {}
 
4089
        parser[parent]['file'] = 'parent'
 
4090
        parser[child] = {}
 
4091
        parser[child]['file'] = 'child'
 
4092
        self.assertSectionNames([self.tree.basedir, parent], loc_config)
 
4093
 
 
4094
    def test_branch_data_default_section(self):
 
4095
        self.assertSectionNames([None],
 
4096
                                self.branch_config._get_branch_data_config())
 
4097
 
 
4098
    def test_branch_default_sections(self):
 
4099
        # No sections are defined in an empty locations file
 
4100
        self.assertSectionNames([None, 'DEFAULT'],
 
4101
                                self.branch_config)
 
4102
        # Unless we define an option
 
4103
        self.branch_config._get_location_config().set_user_option(
 
4104
            'file', 'locations')
 
4105
        self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
 
4106
                                self.branch_config)
 
4107
 
 
4108
    def test_breezy_named_section(self):
 
4109
        # We need to cheat as the API doesn't give direct access to sections
 
4110
        # other than DEFAULT.
 
4111
        self.breezy_config.set_alias('breezy', 'bzr')
 
4112
        self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
 
4113
 
 
4114
 
 
4115
class TestSharedStores(tests.TestCaseInTempDir):
 
4116
 
 
4117
    def test_breezy_conf_shared(self):
 
4118
        g1 = config.GlobalStack()
 
4119
        g2 = config.GlobalStack()
 
4120
        # The two stacks share the same store
 
4121
        self.assertIs(g1.store, g2.store)
 
4122
 
 
4123
 
 
4124
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
 
4125
    """Test warning for permissions of authentication.conf."""
 
4126
 
 
4127
    def setUp(self):
 
4128
        super(TestAuthenticationConfigFilePermissions, self).setUp()
 
4129
        self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
 
4130
        with open(self.path, 'wb') as f:
 
4131
            f.write(b"""[broken]
 
4132
scheme=ftp
 
4133
user=joe
 
4134
port=port # Error: Not an int
 
4135
""")
 
4136
        self.overrideAttr(bedding, 'authentication_config_path',
 
4137
                          lambda: self.path)
 
4138
        osutils.chmod_if_possible(self.path, 0o755)
 
4139
 
 
4140
    def test_check_warning(self):
 
4141
        conf = config.AuthenticationConfig()
 
4142
        self.assertEqual(conf._filename, self.path)
 
4143
        self.assertContainsRe(self.get_log(),
 
4144
                              'Saved passwords may be accessible by other users.')
 
4145
 
 
4146
    def test_check_suppressed_warning(self):
 
4147
        global_config = config.GlobalConfig()
 
4148
        global_config.set_user_option('suppress_warnings',
 
4149
                                      'insecure_permissions')
 
4150
        conf = config.AuthenticationConfig()
 
4151
        self.assertEqual(conf._filename, self.path)
 
4152
        self.assertNotContainsRe(self.get_log(),
 
4153
                                 'Saved passwords may be accessible by other users.')
 
4154
 
 
4155
 
 
4156
class TestAuthenticationConfigFile(tests.TestCase):
 
4157
    """Test the authentication.conf file matching"""
 
4158
 
 
4159
    def _got_user_passwd(self, expected_user, expected_password,
 
4160
                         config, *args, **kwargs):
 
4161
        credentials = config.get_credentials(*args, **kwargs)
 
4162
        if credentials is None:
 
4163
            user = None
 
4164
            password = None
 
4165
        else:
 
4166
            user = credentials['user']
 
4167
            password = credentials['password']
 
4168
        self.assertEqual(expected_user, user)
 
4169
        self.assertEqual(expected_password, password)
 
4170
 
 
4171
    def test_empty_config(self):
 
4172
        conf = config.AuthenticationConfig(_file=BytesIO())
 
4173
        self.assertEqual({}, conf._get_config())
 
4174
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
 
4175
 
 
4176
    def test_non_utf8_config(self):
 
4177
        conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
 
4178
        self.assertRaises(config.ConfigContentError, conf._get_config)
 
4179
 
 
4180
    def test_missing_auth_section_header(self):
 
4181
        conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
 
4182
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
 
4183
 
 
4184
    def test_auth_section_header_not_closed(self):
 
4185
        conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
 
4186
        self.assertRaises(config.ParseConfigError, conf._get_config)
 
4187
 
 
4188
    def test_auth_value_not_boolean(self):
 
4189
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4190
[broken]
 
4191
scheme=ftp
 
4192
user=joe
 
4193
verify_certificates=askme # Error: Not a boolean
 
4194
"""))
 
4195
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
 
4196
 
 
4197
    def test_auth_value_not_int(self):
 
4198
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4199
[broken]
 
4200
scheme=ftp
 
4201
user=joe
 
4202
port=port # Error: Not an int
 
4203
"""))
 
4204
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
 
4205
 
 
4206
    def test_unknown_password_encoding(self):
 
4207
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4208
[broken]
 
4209
scheme=ftp
 
4210
user=joe
 
4211
password_encoding=unknown
 
4212
"""))
 
4213
        self.assertRaises(ValueError, conf.get_password,
 
4214
                          'ftp', 'foo.net', 'joe')
 
4215
 
 
4216
    def test_credentials_for_scheme_host(self):
 
4217
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4218
# Identity on foo.net
 
4219
[ftp definition]
 
4220
scheme=ftp
 
4221
host=foo.net
 
4222
user=joe
 
4223
password=secret-pass
 
4224
"""))
 
4225
        # Basic matching
 
4226
        self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
 
4227
        # different scheme
 
4228
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
 
4229
        # different host
 
4230
        self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
 
4231
 
 
4232
    def test_credentials_for_host_port(self):
 
4233
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4234
# Identity on foo.net
 
4235
[ftp definition]
 
4236
scheme=ftp
 
4237
port=10021
 
4238
host=foo.net
 
4239
user=joe
 
4240
password=secret-pass
 
4241
"""))
 
4242
        # No port
 
4243
        self._got_user_passwd('joe', 'secret-pass',
 
4244
                              conf, 'ftp', 'foo.net', port=10021)
 
4245
        # different port
 
4246
        self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
 
4247
 
 
4248
    def test_for_matching_host(self):
 
4249
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4250
# Identity on foo.net
 
4251
[sourceforge]
 
4252
scheme=bzr
 
4253
host=bzr.sf.net
 
4254
user=joe
 
4255
password=joepass
 
4256
[sourceforge domain]
 
4257
scheme=bzr
 
4258
host=.bzr.sf.net
 
4259
user=georges
 
4260
password=bendover
 
4261
"""))
 
4262
        # matching domain
 
4263
        self._got_user_passwd('georges', 'bendover',
 
4264
                              conf, 'bzr', 'foo.bzr.sf.net')
 
4265
        # phishing attempt
 
4266
        self._got_user_passwd(None, None,
 
4267
                              conf, 'bzr', 'bbzr.sf.net')
 
4268
 
 
4269
    def test_for_matching_host_None(self):
 
4270
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4271
# Identity on foo.net
 
4272
[catchup bzr]
 
4273
scheme=bzr
 
4274
user=joe
 
4275
password=joepass
 
4276
[DEFAULT]
 
4277
user=georges
 
4278
password=bendover
 
4279
"""))
 
4280
        # match no host
 
4281
        self._got_user_passwd('joe', 'joepass',
 
4282
                              conf, 'bzr', 'quux.net')
 
4283
        # no host but different scheme
 
4284
        self._got_user_passwd('georges', 'bendover',
 
4285
                              conf, 'ftp', 'quux.net')
 
4286
 
 
4287
    def test_credentials_for_path(self):
 
4288
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4289
[http dir1]
 
4290
scheme=http
 
4291
host=bar.org
 
4292
path=/dir1
 
4293
user=jim
 
4294
password=jimpass
 
4295
[http dir2]
 
4296
scheme=http
 
4297
host=bar.org
 
4298
path=/dir2
 
4299
user=georges
 
4300
password=bendover
 
4301
"""))
 
4302
        # no path no dice
 
4303
        self._got_user_passwd(None, None,
 
4304
                              conf, 'http', host='bar.org', path='/dir3')
 
4305
        # matching path
 
4306
        self._got_user_passwd('georges', 'bendover',
 
4307
                              conf, 'http', host='bar.org', path='/dir2')
 
4308
        # matching subdir
 
4309
        self._got_user_passwd('jim', 'jimpass',
 
4310
                              conf, 'http', host='bar.org', path='/dir1/subdir')
 
4311
 
 
4312
    def test_credentials_for_user(self):
 
4313
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4314
[with user]
 
4315
scheme=http
 
4316
host=bar.org
 
4317
user=jim
 
4318
password=jimpass
 
4319
"""))
 
4320
        # Get user
 
4321
        self._got_user_passwd('jim', 'jimpass',
 
4322
                              conf, 'http', 'bar.org')
 
4323
        # Get same user
 
4324
        self._got_user_passwd('jim', 'jimpass',
 
4325
                              conf, 'http', 'bar.org', user='jim')
 
4326
        # Don't get a different user if one is specified
 
4327
        self._got_user_passwd(None, None,
 
4328
                              conf, 'http', 'bar.org', user='georges')
 
4329
 
 
4330
    def test_credentials_for_user_without_password(self):
 
4331
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4332
[without password]
 
4333
scheme=http
 
4334
host=bar.org
 
4335
user=jim
 
4336
"""))
 
4337
        # Get user but no password
 
4338
        self._got_user_passwd('jim', None,
 
4339
                              conf, 'http', 'bar.org')
 
4340
 
 
4341
    def test_verify_certificates(self):
 
4342
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4343
[self-signed]
 
4344
scheme=https
 
4345
host=bar.org
 
4346
user=jim
 
4347
password=jimpass
 
4348
verify_certificates=False
 
4349
[normal]
 
4350
scheme=https
 
4351
host=foo.net
 
4352
user=georges
 
4353
password=bendover
 
4354
"""))
 
4355
        credentials = conf.get_credentials('https', 'bar.org')
 
4356
        self.assertEqual(False, credentials.get('verify_certificates'))
 
4357
        credentials = conf.get_credentials('https', 'foo.net')
 
4358
        self.assertEqual(True, credentials.get('verify_certificates'))
 
4359
 
 
4360
 
 
4361
class TestAuthenticationStorage(tests.TestCaseInTempDir):
 
4362
 
 
4363
    def test_set_credentials(self):
 
4364
        conf = config.AuthenticationConfig()
 
4365
        conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
 
4366
                             99, path='/foo', verify_certificates=False, realm='realm')
 
4367
        credentials = conf.get_credentials(host='host', scheme='scheme',
 
4368
                                           port=99, path='/foo',
 
4369
                                           realm='realm')
 
4370
        CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
 
4371
                       'verify_certificates': False, 'scheme': 'scheme',
 
4372
                       'host': 'host', 'port': 99, 'path': '/foo',
 
4373
                       'realm': 'realm'}
 
4374
        self.assertEqual(CREDENTIALS, credentials)
 
4375
        credentials_from_disk = config.AuthenticationConfig().get_credentials(
 
4376
            host='host', scheme='scheme', port=99, path='/foo', realm='realm')
 
4377
        self.assertEqual(CREDENTIALS, credentials_from_disk)
 
4378
 
 
4379
    def test_reset_credentials_different_name(self):
 
4380
        conf = config.AuthenticationConfig()
 
4381
        conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
 
4382
        conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
 
4383
        self.assertIs(None, conf._get_config().get('name'))
 
4384
        credentials = conf.get_credentials(host='host', scheme='scheme')
 
4385
        CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
 
4386
                       'password', 'verify_certificates': True,
 
4387
                       'scheme': 'scheme', 'host': 'host', 'port': None,
 
4388
                       'path': None, 'realm': None}
 
4389
        self.assertEqual(CREDENTIALS, credentials)
 
4390
 
 
4391
 
 
4392
class TestAuthenticationConfig(tests.TestCaseInTempDir):
 
4393
    """Test AuthenticationConfig behaviour"""
 
4394
 
 
4395
    def _check_default_password_prompt(self, expected_prompt_format, scheme,
 
4396
                                       host=None, port=None, realm=None,
 
4397
                                       path=None):
 
4398
        if host is None:
 
4399
            host = 'bar.org'
 
4400
        user, password = 'jim', 'precious'
 
4401
        expected_prompt = expected_prompt_format % {
 
4402
            'scheme': scheme, 'host': host, 'port': port,
 
4403
            'user': user, 'realm': realm}
 
4404
 
 
4405
        ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
 
4406
        # We use an empty conf so that the user is always prompted
 
4407
        conf = config.AuthenticationConfig()
 
4408
        self.assertEqual(password,
 
4409
                         conf.get_password(scheme, host, user, port=port,
 
4410
                                           realm=realm, path=path))
 
4411
        self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
 
4412
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
 
4413
 
 
4414
    def _check_default_username_prompt(self, expected_prompt_format, scheme,
 
4415
                                       host=None, port=None, realm=None,
 
4416
                                       path=None):
 
4417
        if host is None:
 
4418
            host = 'bar.org'
 
4419
        username = 'jim'
 
4420
        expected_prompt = expected_prompt_format % {
 
4421
            'scheme': scheme, 'host': host, 'port': port,
 
4422
            'realm': realm}
 
4423
        ui.ui_factory = tests.TestUIFactory(stdin=username + '\n')
 
4424
        # We use an empty conf so that the user is always prompted
 
4425
        conf = config.AuthenticationConfig()
 
4426
        self.assertEqual(username, conf.get_user(scheme, host, port=port,
 
4427
                                                 realm=realm, path=path, ask=True))
 
4428
        self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
 
4429
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
 
4430
 
 
4431
    def test_username_defaults_prompts(self):
 
4432
        # HTTP prompts can't be tested here, see test_http.py
 
4433
        self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
 
4434
        self._check_default_username_prompt(
 
4435
            u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
 
4436
        self._check_default_username_prompt(
 
4437
            u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
 
4438
 
 
4439
    def test_username_default_no_prompt(self):
 
4440
        conf = config.AuthenticationConfig()
 
4441
        self.assertEqual(None,
 
4442
                         conf.get_user('ftp', 'example.com'))
 
4443
        self.assertEqual("explicitdefault",
 
4444
                         conf.get_user('ftp', 'example.com', default="explicitdefault"))
 
4445
 
 
4446
    def test_password_default_prompts(self):
 
4447
        # HTTP prompts can't be tested here, see test_http.py
 
4448
        self._check_default_password_prompt(
 
4449
            u'FTP %(user)s@%(host)s password: ', 'ftp')
 
4450
        self._check_default_password_prompt(
 
4451
            u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
 
4452
        self._check_default_password_prompt(
 
4453
            u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
 
4454
        # SMTP port handling is a bit special (it's handled if embedded in the
 
4455
        # host too)
 
4456
        # FIXME: should we: forbid that, extend it to other schemes, leave
 
4457
        # things as they are that's fine thank you ?
 
4458
        self._check_default_password_prompt(
 
4459
            u'SMTP %(user)s@%(host)s password: ', 'smtp')
 
4460
        self._check_default_password_prompt(
 
4461
            u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
 
4462
        self._check_default_password_prompt(
 
4463
            u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
 
4464
 
 
4465
    def test_ssh_password_emits_warning(self):
 
4466
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4467
[ssh with password]
 
4468
scheme=ssh
 
4469
host=bar.org
 
4470
user=jim
 
4471
password=jimpass
 
4472
"""))
 
4473
        entered_password = 'typed-by-hand'
 
4474
        ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
 
4475
 
 
4476
        # Since the password defined in the authentication config is ignored,
 
4477
        # the user is prompted
 
4478
        self.assertEqual(entered_password,
 
4479
                         conf.get_password('ssh', 'bar.org', user='jim'))
 
4480
        self.assertContainsRe(
 
4481
            self.get_log(),
 
4482
            'password ignored in section \\[ssh with password\\]')
 
4483
 
 
4484
    def test_ssh_without_password_doesnt_emit_warning(self):
 
4485
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4486
[ssh with password]
 
4487
scheme=ssh
 
4488
host=bar.org
 
4489
user=jim
 
4490
"""))
 
4491
        entered_password = 'typed-by-hand'
 
4492
        ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
 
4493
 
 
4494
        # Since the password defined in the authentication config is ignored,
 
4495
        # the user is prompted
 
4496
        self.assertEqual(entered_password,
 
4497
                         conf.get_password('ssh', 'bar.org', user='jim'))
 
4498
        # No warning shoud be emitted since there is no password. We are only
 
4499
        # providing "user".
 
4500
        self.assertNotContainsRe(
 
4501
            self.get_log(),
 
4502
            'password ignored in section \\[ssh with password\\]')
 
4503
 
 
4504
    def test_uses_fallback_stores(self):
 
4505
        self.overrideAttr(config, 'credential_store_registry',
 
4506
                          config.CredentialStoreRegistry())
 
4507
        store = StubCredentialStore()
 
4508
        store.add_credentials("http", "example.com", "joe", "secret")
 
4509
        config.credential_store_registry.register("stub", store, fallback=True)
 
4510
        conf = config.AuthenticationConfig(_file=BytesIO())
 
4511
        creds = conf.get_credentials("http", "example.com")
 
4512
        self.assertEqual("joe", creds["user"])
 
4513
        self.assertEqual("secret", creds["password"])
 
4514
 
 
4515
 
 
4516
class StubCredentialStore(config.CredentialStore):
 
4517
 
 
4518
    def __init__(self):
 
4519
        self._username = {}
 
4520
        self._password = {}
 
4521
 
 
4522
    def add_credentials(self, scheme, host, user, password=None):
 
4523
        self._username[(scheme, host)] = user
 
4524
        self._password[(scheme, host)] = password
 
4525
 
 
4526
    def get_credentials(self, scheme, host, port=None, user=None,
 
4527
                        path=None, realm=None):
 
4528
        key = (scheme, host)
 
4529
        if key not in self._username:
 
4530
            return None
 
4531
        return {"scheme": scheme, "host": host, "port": port,
 
4532
                "user": self._username[key], "password": self._password[key]}
 
4533
 
 
4534
 
 
4535
class CountingCredentialStore(config.CredentialStore):
 
4536
 
 
4537
    def __init__(self):
 
4538
        self._calls = 0
 
4539
 
 
4540
    def get_credentials(self, scheme, host, port=None, user=None,
 
4541
                        path=None, realm=None):
 
4542
        self._calls += 1
 
4543
        return None
 
4544
 
 
4545
 
 
4546
class TestCredentialStoreRegistry(tests.TestCase):
 
4547
 
 
4548
    def _get_cs_registry(self):
 
4549
        return config.credential_store_registry
 
4550
 
 
4551
    def test_default_credential_store(self):
 
4552
        r = self._get_cs_registry()
 
4553
        default = r.get_credential_store(None)
 
4554
        self.assertIsInstance(default, config.PlainTextCredentialStore)
 
4555
 
 
4556
    def test_unknown_credential_store(self):
 
4557
        r = self._get_cs_registry()
 
4558
        # It's hard to imagine someone creating a credential store named
 
4559
        # 'unknown' so we use that as an never registered key.
 
4560
        self.assertRaises(KeyError, r.get_credential_store, 'unknown')
 
4561
 
 
4562
    def test_fallback_none_registered(self):
 
4563
        r = config.CredentialStoreRegistry()
 
4564
        self.assertEqual(None,
 
4565
                         r.get_fallback_credentials("http", "example.com"))
 
4566
 
 
4567
    def test_register(self):
 
4568
        r = config.CredentialStoreRegistry()
 
4569
        r.register("stub", StubCredentialStore(), fallback=False)
 
4570
        r.register("another", StubCredentialStore(), fallback=True)
 
4571
        self.assertEqual(["another", "stub"], r.keys())
 
4572
 
 
4573
    def test_register_lazy(self):
 
4574
        r = config.CredentialStoreRegistry()
 
4575
        r.register_lazy("stub", "breezy.tests.test_config",
 
4576
                        "StubCredentialStore", fallback=False)
 
4577
        self.assertEqual(["stub"], r.keys())
 
4578
        self.assertIsInstance(r.get_credential_store("stub"),
 
4579
                              StubCredentialStore)
 
4580
 
 
4581
    def test_is_fallback(self):
 
4582
        r = config.CredentialStoreRegistry()
 
4583
        r.register("stub1", None, fallback=False)
 
4584
        r.register("stub2", None, fallback=True)
 
4585
        self.assertEqual(False, r.is_fallback("stub1"))
 
4586
        self.assertEqual(True, r.is_fallback("stub2"))
 
4587
 
 
4588
    def test_no_fallback(self):
 
4589
        r = config.CredentialStoreRegistry()
 
4590
        store = CountingCredentialStore()
 
4591
        r.register("count", store, fallback=False)
 
4592
        self.assertEqual(None,
 
4593
                         r.get_fallback_credentials("http", "example.com"))
 
4594
        self.assertEqual(0, store._calls)
 
4595
 
 
4596
    def test_fallback_credentials(self):
 
4597
        r = config.CredentialStoreRegistry()
 
4598
        store = StubCredentialStore()
 
4599
        store.add_credentials("http", "example.com",
 
4600
                              "somebody", "geheim")
 
4601
        r.register("stub", store, fallback=True)
 
4602
        creds = r.get_fallback_credentials("http", "example.com")
 
4603
        self.assertEqual("somebody", creds["user"])
 
4604
        self.assertEqual("geheim", creds["password"])
 
4605
 
 
4606
    def test_fallback_first_wins(self):
 
4607
        r = config.CredentialStoreRegistry()
 
4608
        stub1 = StubCredentialStore()
 
4609
        stub1.add_credentials("http", "example.com",
 
4610
                              "somebody", "stub1")
 
4611
        r.register("stub1", stub1, fallback=True)
 
4612
        stub2 = StubCredentialStore()
 
4613
        stub2.add_credentials("http", "example.com",
 
4614
                              "somebody", "stub2")
 
4615
        r.register("stub2", stub1, fallback=True)
 
4616
        creds = r.get_fallback_credentials("http", "example.com")
 
4617
        self.assertEqual("somebody", creds["user"])
 
4618
        self.assertEqual("stub1", creds["password"])
 
4619
 
 
4620
 
 
4621
class TestPlainTextCredentialStore(tests.TestCase):
 
4622
 
 
4623
    def test_decode_password(self):
 
4624
        r = config.credential_store_registry
 
4625
        plain_text = r.get_credential_store()
 
4626
        decoded = plain_text.decode_password(dict(password='secret'))
 
4627
        self.assertEqual('secret', decoded)
 
4628
 
 
4629
 
 
4630
class TestBase64CredentialStore(tests.TestCase):
 
4631
 
 
4632
    def test_decode_password(self):
 
4633
        r = config.credential_store_registry
 
4634
        plain_text = r.get_credential_store('base64')
 
4635
        decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
 
4636
        self.assertEqual(b'secret', decoded)
 
4637
 
 
4638
 
 
4639
# FIXME: Once we have a way to declare authentication to all test servers, we
 
4640
# can implement generic tests.
 
4641
# test_user_password_in_url
 
4642
# test_user_in_url_password_from_config
 
4643
# test_user_in_url_password_prompted
 
4644
# test_user_in_config
 
4645
# test_user_getpass.getuser
 
4646
# test_user_prompted ?
 
4647
class TestAuthenticationRing(tests.TestCaseWithTransport):
 
4648
    pass
 
4649
 
 
4650
 
 
4651
class EmailOptionTests(tests.TestCase):
 
4652
 
 
4653
    def test_default_email_uses_BRZ_EMAIL(self):
 
4654
        conf = config.MemoryStack(b'email=jelmer@debian.org')
 
4655
        # BRZ_EMAIL takes precedence over BZR_EMAIL and EMAIL
 
4656
        self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
 
4657
        self.overrideEnv('BZR_EMAIL', 'jelmer@jelmer.uk')
 
4658
        self.overrideEnv('EMAIL', 'jelmer@apache.org')
 
4659
        self.assertEqual('jelmer@samba.org', conf.get('email'))
 
4660
 
 
4661
    def test_default_email_uses_BZR_EMAIL(self):
 
4662
        conf = config.MemoryStack(b'email=jelmer@debian.org')
 
4663
        # BZR_EMAIL takes precedence over EMAIL
 
4664
        self.overrideEnv('BZR_EMAIL', 'jelmer@samba.org')
 
4665
        self.overrideEnv('EMAIL', 'jelmer@apache.org')
 
4666
        self.assertEqual('jelmer@samba.org', conf.get('email'))
 
4667
 
 
4668
    def test_default_email_uses_EMAIL(self):
 
4669
        conf = config.MemoryStack(b'')
 
4670
        self.overrideEnv('BRZ_EMAIL', None)
 
4671
        self.overrideEnv('EMAIL', 'jelmer@apache.org')
 
4672
        self.assertEqual('jelmer@apache.org', conf.get('email'))
 
4673
 
 
4674
    def test_BRZ_EMAIL_overrides(self):
 
4675
        conf = config.MemoryStack(b'email=jelmer@debian.org')
 
4676
        self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
 
4677
        self.assertEqual('jelmer@apache.org', conf.get('email'))
 
4678
        self.overrideEnv('BRZ_EMAIL', None)
 
4679
        self.overrideEnv('EMAIL', 'jelmer@samba.org')
 
4680
        self.assertEqual('jelmer@debian.org', conf.get('email'))
 
4681
 
 
4682
 
 
4683
class MailClientOptionTests(tests.TestCase):
 
4684
 
 
4685
    def test_default(self):
 
4686
        conf = config.MemoryStack(b'')
 
4687
        client = conf.get('mail_client')
 
4688
        self.assertIs(client, mail_client.DefaultMail)
 
4689
 
 
4690
    def test_evolution(self):
 
4691
        conf = config.MemoryStack(b'mail_client=evolution')
 
4692
        client = conf.get('mail_client')
 
4693
        self.assertIs(client, mail_client.Evolution)
 
4694
 
 
4695
    def test_kmail(self):
 
4696
        conf = config.MemoryStack(b'mail_client=kmail')
 
4697
        client = conf.get('mail_client')
 
4698
        self.assertIs(client, mail_client.KMail)
 
4699
 
 
4700
    def test_mutt(self):
 
4701
        conf = config.MemoryStack(b'mail_client=mutt')
 
4702
        client = conf.get('mail_client')
 
4703
        self.assertIs(client, mail_client.Mutt)
 
4704
 
 
4705
    def test_thunderbird(self):
 
4706
        conf = config.MemoryStack(b'mail_client=thunderbird')
 
4707
        client = conf.get('mail_client')
 
4708
        self.assertIs(client, mail_client.Thunderbird)
 
4709
 
 
4710
    def test_explicit_default(self):
 
4711
        conf = config.MemoryStack(b'mail_client=default')
 
4712
        client = conf.get('mail_client')
 
4713
        self.assertIs(client, mail_client.DefaultMail)
 
4714
 
 
4715
    def test_editor(self):
 
4716
        conf = config.MemoryStack(b'mail_client=editor')
 
4717
        client = conf.get('mail_client')
 
4718
        self.assertIs(client, mail_client.Editor)
 
4719
 
 
4720
    def test_mapi(self):
 
4721
        conf = config.MemoryStack(b'mail_client=mapi')
 
4722
        client = conf.get('mail_client')
 
4723
        self.assertIs(client, mail_client.MAPIClient)
 
4724
 
 
4725
    def test_xdg_email(self):
 
4726
        conf = config.MemoryStack(b'mail_client=xdg-email')
 
4727
        client = conf.get('mail_client')
 
4728
        self.assertIs(client, mail_client.XDGEmail)
 
4729
 
 
4730
    def test_unknown(self):
 
4731
        conf = config.MemoryStack(b'mail_client=firebird')
 
4732
        self.assertRaises(config.ConfigOptionValueError, conf.get,
 
4733
                          'mail_client')