/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_config.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-07 02:14:30 UTC
  • mto: This revision was merged to the branch mainline in revision 7492.
  • Revision ID: jelmer@jelmer.uk-20200207021430-m49iq3x4x8xlib6x
Drop python2 support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2014, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for finding and reading the bzr config file[s]."""
 
18
 
 
19
from textwrap import dedent
 
20
from io import BytesIO
 
21
import os
 
22
import sys
 
23
import threading
 
24
 
 
25
import configobj
 
26
from testtools import matchers
 
27
 
 
28
from .. import (
 
29
    branch,
 
30
    config,
 
31
    bedding,
 
32
    controldir,
 
33
    diff,
 
34
    errors,
 
35
    lock,
 
36
    osutils,
 
37
    mail_client,
 
38
    ui,
 
39
    urlutils,
 
40
    registry as _mod_registry,
 
41
    tests,
 
42
    trace,
 
43
    )
 
44
from ..bzr import (
 
45
    remote,
 
46
    )
 
47
from ..transport import remote as transport_remote
 
48
from . import (
 
49
    features,
 
50
    scenarios,
 
51
    test_server,
 
52
    )
 
53
 
 
54
 
 
55
def lockable_config_scenarios():
 
56
    return [
 
57
        ('global',
 
58
         {'config_class': config.GlobalConfig,
 
59
          'config_args': [],
 
60
          'config_section': 'DEFAULT'}),
 
61
        ('locations',
 
62
         {'config_class': config.LocationConfig,
 
63
          'config_args': ['.'],
 
64
          'config_section': '.'}), ]
 
65
 
 
66
 
 
67
load_tests = scenarios.load_tests_apply_scenarios
 
68
 
 
69
# Register helpers to build stores
 
70
config.test_store_builder_registry.register(
 
71
    'configobj', lambda test: config.TransportIniFileStore(
 
72
        test.get_transport(), 'configobj.conf'))
 
73
config.test_store_builder_registry.register(
 
74
    'breezy', lambda test: config.GlobalStore())
 
75
config.test_store_builder_registry.register(
 
76
    'location', lambda test: config.LocationStore())
 
77
 
 
78
 
 
79
def build_backing_branch(test, relpath,
 
80
                         transport_class=None, server_class=None):
 
81
    """Test helper to create a backing branch only once.
 
82
 
 
83
    Some tests needs multiple stores/stacks to check concurrent update
 
84
    behaviours. As such, they need to build different branch *objects* even if
 
85
    they share the branch on disk.
 
86
 
 
87
    :param relpath: The relative path to the branch. (Note that the helper
 
88
        should always specify the same relpath).
 
89
 
 
90
    :param transport_class: The Transport class the test needs to use.
 
91
 
 
92
    :param server_class: The server associated with the ``transport_class``
 
93
        above.
 
94
 
 
95
    Either both or neither of ``transport_class`` and ``server_class`` should
 
96
    be specified.
 
97
    """
 
98
    if transport_class is not None and server_class is not None:
 
99
        test.transport_class = transport_class
 
100
        test.transport_server = server_class
 
101
    elif not (transport_class is None and server_class is None):
 
102
        raise AssertionError('Specify both ``transport_class`` and '
 
103
                             '``server_class`` or neither of them')
 
104
    if getattr(test, 'backing_branch', None) is None:
 
105
        # First call, let's build the branch on disk
 
106
        test.backing_branch = test.make_branch(relpath)
 
107
 
 
108
 
 
109
def build_branch_store(test):
 
110
    build_backing_branch(test, 'branch')
 
111
    b = branch.Branch.open('branch')
 
112
    return config.BranchStore(b)
 
113
 
 
114
 
 
115
config.test_store_builder_registry.register('branch', build_branch_store)
 
116
 
 
117
 
 
118
def build_control_store(test):
 
119
    build_backing_branch(test, 'branch')
 
120
    b = controldir.ControlDir.open('branch')
 
121
    return config.ControlStore(b)
 
122
 
 
123
 
 
124
config.test_store_builder_registry.register('control', build_control_store)
 
125
 
 
126
 
 
127
def build_remote_branch_store(test):
 
128
    # There is only one permutation (but we won't be able to handle more with
 
129
    # this design anyway)
 
130
    (transport_class,
 
131
     server_class) = transport_remote.get_test_permutations()[0]
 
132
    build_backing_branch(test, 'branch', transport_class, server_class)
 
133
    b = branch.Branch.open(test.get_url('branch'))
 
134
    return config.BranchStore(b)
 
135
 
 
136
 
 
137
config.test_store_builder_registry.register('remote_branch',
 
138
                                            build_remote_branch_store)
 
139
 
 
140
 
 
141
config.test_stack_builder_registry.register(
 
142
    'breezy', lambda test: config.GlobalStack())
 
143
config.test_stack_builder_registry.register(
 
144
    'location', lambda test: config.LocationStack('.'))
 
145
 
 
146
 
 
147
def build_branch_stack(test):
 
148
    build_backing_branch(test, 'branch')
 
149
    b = branch.Branch.open('branch')
 
150
    return config.BranchStack(b)
 
151
 
 
152
 
 
153
config.test_stack_builder_registry.register('branch', build_branch_stack)
 
154
 
 
155
 
 
156
def build_branch_only_stack(test):
 
157
    # There is only one permutation (but we won't be able to handle more with
 
158
    # this design anyway)
 
159
    (transport_class,
 
160
     server_class) = transport_remote.get_test_permutations()[0]
 
161
    build_backing_branch(test, 'branch', transport_class, server_class)
 
162
    b = branch.Branch.open(test.get_url('branch'))
 
163
    return config.BranchOnlyStack(b)
 
164
 
 
165
 
 
166
config.test_stack_builder_registry.register('branch_only',
 
167
                                            build_branch_only_stack)
 
168
 
 
169
 
 
170
def build_remote_control_stack(test):
 
171
    # There is only one permutation (but we won't be able to handle more with
 
172
    # this design anyway)
 
173
    (transport_class,
 
174
     server_class) = transport_remote.get_test_permutations()[0]
 
175
    # We need only a bzrdir for this, not a full branch, but it's not worth
 
176
    # creating a dedicated helper to create only the bzrdir
 
177
    build_backing_branch(test, 'branch', transport_class, server_class)
 
178
    b = branch.Branch.open(test.get_url('branch'))
 
179
    return config.RemoteControlStack(b.controldir)
 
180
 
 
181
 
 
182
config.test_stack_builder_registry.register('remote_control',
 
183
                                            build_remote_control_stack)
 
184
 
 
185
 
 
186
sample_long_alias = "log -r-15..-1 --line"
 
187
sample_config_text = u"""
 
188
[DEFAULT]
 
189
email=Erik B\u00e5gfors <erik@bagfors.nu>
 
190
editor=vim
 
191
change_editor=vimdiff -of {new_path} {old_path}
 
192
gpg_signing_key=DD4D5088
 
193
log_format=short
 
194
validate_signatures_in_log=true
 
195
acceptable_keys=amy
 
196
user_global_option=something
 
197
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
 
198
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
 
199
bzr.mergetool.newtool='"newtool with spaces" {this_temp}'
 
200
bzr.default_mergetool=sometool
 
201
[ALIASES]
 
202
h=help
 
203
ll=""".encode('utf-8') + sample_long_alias.encode('utf-8') + b"\n"
 
204
 
 
205
 
 
206
sample_always_signatures = b"""
 
207
[DEFAULT]
 
208
check_signatures=ignore
 
209
create_signatures=always
 
210
"""
 
211
 
 
212
sample_ignore_signatures = b"""
 
213
[DEFAULT]
 
214
check_signatures=require
 
215
create_signatures=never
 
216
"""
 
217
 
 
218
sample_maybe_signatures = b"""
 
219
[DEFAULT]
 
220
check_signatures=ignore
 
221
create_signatures=when-required
 
222
"""
 
223
 
 
224
sample_branches_text = b"""
 
225
[http://www.example.com]
 
226
# Top level policy
 
227
email=Robert Collins <robertc@example.org>
 
228
normal_option = normal
 
229
appendpath_option = append
 
230
appendpath_option:policy = appendpath
 
231
norecurse_option = norecurse
 
232
norecurse_option:policy = norecurse
 
233
[http://www.example.com/ignoreparent]
 
234
# different project: ignore parent dir config
 
235
ignore_parents=true
 
236
[http://www.example.com/norecurse]
 
237
# configuration items that only apply to this dir
 
238
recurse=false
 
239
normal_option = norecurse
 
240
[http://www.example.com/dir]
 
241
appendpath_option = normal
 
242
[/b/]
 
243
check_signatures=require
 
244
# test trailing / matching with no children
 
245
[/a/]
 
246
check_signatures=check-available
 
247
gpg_signing_key=default
 
248
user_local_option=local
 
249
# test trailing / matching
 
250
[/a/*]
 
251
#subdirs will match but not the parent
 
252
[/a/c]
 
253
check_signatures=ignore
 
254
post_commit=breezy.tests.test_config.post_commit
 
255
#testing explicit beats globs
 
256
"""
 
257
 
 
258
 
 
259
def create_configs(test):
 
260
    """Create configuration files for a given test.
 
261
 
 
262
    This requires creating a tree (and populate the ``test.tree`` attribute)
 
263
    and its associated branch and will populate the following attributes:
 
264
 
 
265
    - branch_config: A BranchConfig for the associated branch.
 
266
 
 
267
    - locations_config : A LocationConfig for the associated branch
 
268
 
 
269
    - breezy_config: A GlobalConfig.
 
270
 
 
271
    The tree and branch are created in a 'tree' subdirectory so the tests can
 
272
    still use the test directory to stay outside of the branch.
 
273
    """
 
274
    tree = test.make_branch_and_tree('tree')
 
275
    test.tree = tree
 
276
    test.branch_config = config.BranchConfig(tree.branch)
 
277
    test.locations_config = config.LocationConfig(tree.basedir)
 
278
    test.breezy_config = config.GlobalConfig()
 
279
 
 
280
 
 
281
def create_configs_with_file_option(test):
 
282
    """Create configuration files with a ``file`` option set in each.
 
283
 
 
284
    This builds on ``create_configs`` and add one ``file`` option in each
 
285
    configuration with a value which allows identifying the configuration file.
 
286
    """
 
287
    create_configs(test)
 
288
    test.breezy_config.set_user_option('file', 'breezy')
 
289
    test.locations_config.set_user_option('file', 'locations')
 
290
    test.branch_config.set_user_option('file', 'branch')
 
291
 
 
292
 
 
293
class TestOptionsMixin:
 
294
 
 
295
    def assertOptions(self, expected, conf):
 
296
        # We don't care about the parser (as it will make tests hard to write
 
297
        # and error-prone anyway)
 
298
        self.assertThat([opt[:4] for opt in conf._get_options()],
 
299
                        matchers.Equals(expected))
 
300
 
 
301
 
 
302
class InstrumentedConfigObj(object):
 
303
    """A config obj look-enough-alike to record calls made to it."""
 
304
 
 
305
    def __contains__(self, thing):
 
306
        self._calls.append(('__contains__', thing))
 
307
        return False
 
308
 
 
309
    def __getitem__(self, key):
 
310
        self._calls.append(('__getitem__', key))
 
311
        return self
 
312
 
 
313
    def __init__(self, input, encoding=None):
 
314
        self._calls = [('__init__', input, encoding)]
 
315
 
 
316
    def __setitem__(self, key, value):
 
317
        self._calls.append(('__setitem__', key, value))
 
318
 
 
319
    def __delitem__(self, key):
 
320
        self._calls.append(('__delitem__', key))
 
321
 
 
322
    def keys(self):
 
323
        self._calls.append(('keys',))
 
324
        return []
 
325
 
 
326
    def reload(self):
 
327
        self._calls.append(('reload',))
 
328
 
 
329
    def write(self, arg):
 
330
        self._calls.append(('write',))
 
331
 
 
332
    def as_bool(self, value):
 
333
        self._calls.append(('as_bool', value))
 
334
        return False
 
335
 
 
336
    def get_value(self, section, name):
 
337
        self._calls.append(('get_value', section, name))
 
338
        return None
 
339
 
 
340
 
 
341
class FakeBranch(object):
 
342
 
 
343
    def __init__(self, base=None):
 
344
        if base is None:
 
345
            self.base = "http://example.com/branches/demo"
 
346
        else:
 
347
            self.base = base
 
348
        self._transport = self.control_files = \
 
349
            FakeControlFilesAndTransport()
 
350
 
 
351
    def _get_config(self):
 
352
        return config.TransportConfig(self._transport, 'branch.conf')
 
353
 
 
354
    def lock_write(self):
 
355
        return lock.LogicalLockResult(self.unlock)
 
356
 
 
357
    def unlock(self):
 
358
        pass
 
359
 
 
360
 
 
361
class FakeControlFilesAndTransport(object):
 
362
 
 
363
    def __init__(self):
 
364
        self.files = {}
 
365
        self._transport = self
 
366
 
 
367
    def get(self, filename):
 
368
        # from Transport
 
369
        try:
 
370
            return BytesIO(self.files[filename])
 
371
        except KeyError:
 
372
            raise errors.NoSuchFile(filename)
 
373
 
 
374
    def get_bytes(self, filename):
 
375
        # from Transport
 
376
        try:
 
377
            return self.files[filename]
 
378
        except KeyError:
 
379
            raise errors.NoSuchFile(filename)
 
380
 
 
381
    def put(self, filename, fileobj):
 
382
        self.files[filename] = fileobj.read()
 
383
 
 
384
    def put_file(self, filename, fileobj):
 
385
        return self.put(filename, fileobj)
 
386
 
 
387
 
 
388
class InstrumentedConfig(config.Config):
 
389
    """An instrumented config that supplies stubs for template methods."""
 
390
 
 
391
    def __init__(self):
 
392
        super(InstrumentedConfig, self).__init__()
 
393
        self._calls = []
 
394
        self._signatures = config.CHECK_NEVER
 
395
        self._change_editor = 'vimdiff -fo {new_path} {old_path}'
 
396
 
 
397
    def _get_user_id(self):
 
398
        self._calls.append('_get_user_id')
 
399
        return "Robert Collins <robert.collins@example.org>"
 
400
 
 
401
    def _get_signature_checking(self):
 
402
        self._calls.append('_get_signature_checking')
 
403
        return self._signatures
 
404
 
 
405
    def _get_change_editor(self):
 
406
        self._calls.append('_get_change_editor')
 
407
        return self._change_editor
 
408
 
 
409
 
 
410
bool_config = b"""[DEFAULT]
 
411
active = true
 
412
inactive = false
 
413
[UPPERCASE]
 
414
active = True
 
415
nonactive = False
 
416
"""
 
417
 
 
418
 
 
419
class TestConfigObj(tests.TestCase):
 
420
 
 
421
    def test_get_bool(self):
 
422
        co = config.ConfigObj(BytesIO(bool_config))
 
423
        self.assertIs(co.get_bool('DEFAULT', 'active'), True)
 
424
        self.assertIs(co.get_bool('DEFAULT', 'inactive'), False)
 
425
        self.assertIs(co.get_bool('UPPERCASE', 'active'), True)
 
426
        self.assertIs(co.get_bool('UPPERCASE', 'nonactive'), False)
 
427
 
 
428
    def test_hash_sign_in_value(self):
 
429
        """
 
430
        Before 4.5.0, ConfigObj did not quote # signs in values, so they'd be
 
431
        treated as comments when read in again. (#86838)
 
432
        """
 
433
        co = config.ConfigObj()
 
434
        co['test'] = 'foo#bar'
 
435
        outfile = BytesIO()
 
436
        co.write(outfile=outfile)
 
437
        lines = outfile.getvalue().splitlines()
 
438
        self.assertEqual(lines, [b'test = "foo#bar"'])
 
439
        co2 = config.ConfigObj(lines)
 
440
        self.assertEqual(co2['test'], 'foo#bar')
 
441
 
 
442
    def test_triple_quotes(self):
 
443
        # Bug #710410: if the value string has triple quotes
 
444
        # then ConfigObj versions up to 4.7.2 will quote them wrong
 
445
        # and won't able to read them back
 
446
        triple_quotes_value = '''spam
 
447
""" that's my spam """
 
448
eggs'''
 
449
        co = config.ConfigObj()
 
450
        co['test'] = triple_quotes_value
 
451
        # While writing this test another bug in ConfigObj has been found:
 
452
        # method co.write() without arguments produces list of lines
 
453
        # one option per line, and multiline values are not split
 
454
        # across multiple lines,
 
455
        # and that breaks the parsing these lines back by ConfigObj.
 
456
        # This issue only affects test, but it's better to avoid
 
457
        # `co.write()` construct at all.
 
458
        # [bialix 20110222] bug report sent to ConfigObj's author
 
459
        outfile = BytesIO()
 
460
        co.write(outfile=outfile)
 
461
        output = outfile.getvalue()
 
462
        # now we're trying to read it back
 
463
        co2 = config.ConfigObj(BytesIO(output))
 
464
        self.assertEqual(triple_quotes_value, co2['test'])
 
465
 
 
466
 
 
467
erroneous_config = b"""[section] # line 1
 
468
good=good # line 2
 
469
[section] # line 3
 
470
whocares=notme # line 4
 
471
"""
 
472
 
 
473
 
 
474
class TestConfigObjErrors(tests.TestCase):
 
475
 
 
476
    def test_duplicate_section_name_error_line(self):
 
477
        try:
 
478
            co = configobj.ConfigObj(BytesIO(erroneous_config),
 
479
                                     raise_errors=True)
 
480
        except config.configobj.DuplicateError as e:
 
481
            self.assertEqual(3, e.line_number)
 
482
        else:
 
483
            self.fail('Error in config file not detected')
 
484
 
 
485
 
 
486
class TestConfig(tests.TestCase):
 
487
 
 
488
    def test_constructs(self):
 
489
        config.Config()
 
490
 
 
491
    def test_user_email(self):
 
492
        my_config = InstrumentedConfig()
 
493
        self.assertEqual('robert.collins@example.org', my_config.user_email())
 
494
        self.assertEqual(['_get_user_id'], my_config._calls)
 
495
 
 
496
    def test_username(self):
 
497
        my_config = InstrumentedConfig()
 
498
        self.assertEqual('Robert Collins <robert.collins@example.org>',
 
499
                         my_config.username())
 
500
        self.assertEqual(['_get_user_id'], my_config._calls)
 
501
 
 
502
    def test_get_user_option_default(self):
 
503
        my_config = config.Config()
 
504
        self.assertEqual(None, my_config.get_user_option('no_option'))
 
505
 
 
506
    def test_validate_signatures_in_log_default(self):
 
507
        my_config = config.Config()
 
508
        self.assertEqual(False, my_config.validate_signatures_in_log())
 
509
 
 
510
    def test_get_change_editor(self):
 
511
        my_config = InstrumentedConfig()
 
512
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
 
513
        self.assertEqual(['_get_change_editor'], my_config._calls)
 
514
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
515
        self.assertEqual(['vimdiff', '-fo', '{new_path}', '{old_path}'],
 
516
                         change_editor.command_template)
 
517
 
 
518
    def test_get_change_editor_implicit_args(self):
 
519
        # If there are no substitution variables, then assume the
 
520
        # old and new path are the last arguments.
 
521
        my_config = InstrumentedConfig()
 
522
        my_config._change_editor = 'vimdiff -o'
 
523
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
 
524
        self.assertEqual(['_get_change_editor'], my_config._calls)
 
525
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
526
        self.assertEqual(['vimdiff', '-o', '{old_path}', '{new_path}'],
 
527
                         change_editor.command_template)
 
528
 
 
529
    def test_get_change_editor_old_style(self):
 
530
        # Test the old style format for the change_editor setting.
 
531
        my_config = InstrumentedConfig()
 
532
        my_config._change_editor = 'vimdiff -o @old_path @new_path'
 
533
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
 
534
        self.assertEqual(['_get_change_editor'], my_config._calls)
 
535
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
536
        self.assertEqual(['vimdiff', '-o', '{old_path}', '{new_path}'],
 
537
                         change_editor.command_template)
 
538
 
 
539
 
 
540
class TestIniConfig(tests.TestCaseInTempDir):
 
541
 
 
542
    def make_config_parser(self, s):
 
543
        conf = config.IniBasedConfig.from_string(s)
 
544
        return conf, conf._get_parser()
 
545
 
 
546
 
 
547
class TestIniConfigBuilding(TestIniConfig):
 
548
 
 
549
    def test_contructs(self):
 
550
        config.IniBasedConfig()
 
551
 
 
552
    def test_from_fp(self):
 
553
        my_config = config.IniBasedConfig.from_string(sample_config_text)
 
554
        self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
 
555
 
 
556
    def test_cached(self):
 
557
        my_config = config.IniBasedConfig.from_string(sample_config_text)
 
558
        parser = my_config._get_parser()
 
559
        self.assertTrue(my_config._get_parser() is parser)
 
560
 
 
561
    def _dummy_chown(self, path, uid, gid):
 
562
        self.path, self.uid, self.gid = path, uid, gid
 
563
 
 
564
    def test_ini_config_ownership(self):
 
565
        """Ensure that chown is happening during _write_config_file"""
 
566
        self.requireFeature(features.chown_feature)
 
567
        self.overrideAttr(os, 'chown', self._dummy_chown)
 
568
        self.path = self.uid = self.gid = None
 
569
        conf = config.IniBasedConfig(file_name='./foo.conf')
 
570
        conf._write_config_file()
 
571
        self.assertEqual(self.path, './foo.conf')
 
572
        self.assertTrue(isinstance(self.uid, int))
 
573
        self.assertTrue(isinstance(self.gid, int))
 
574
 
 
575
 
 
576
class TestIniConfigSaving(tests.TestCaseInTempDir):
 
577
 
 
578
    def test_cant_save_without_a_file_name(self):
 
579
        conf = config.IniBasedConfig()
 
580
        self.assertRaises(AssertionError, conf._write_config_file)
 
581
 
 
582
    def test_saved_with_content(self):
 
583
        content = b'foo = bar\n'
 
584
        config.IniBasedConfig.from_string(content, file_name='./test.conf',
 
585
                                          save=True)
 
586
        self.assertFileEqual(content, 'test.conf')
 
587
 
 
588
 
 
589
class TestIniConfigOptionExpansion(tests.TestCase):
 
590
    """Test option expansion from the IniConfig level.
 
591
 
 
592
    What we really want here is to test the Config level, but the class being
 
593
    abstract as far as storing values is concerned, this can't be done
 
594
    properly (yet).
 
595
    """
 
596
    # FIXME: This should be rewritten when all configs share a storage
 
597
    # implementation -- vila 2011-02-18
 
598
 
 
599
    def get_config(self, string=None):
 
600
        if string is None:
 
601
            string = b''
 
602
        c = config.IniBasedConfig.from_string(string)
 
603
        return c
 
604
 
 
605
    def assertExpansion(self, expected, conf, string, env=None):
 
606
        self.assertEqual(expected, conf.expand_options(string, env))
 
607
 
 
608
    def test_no_expansion(self):
 
609
        c = self.get_config('')
 
610
        self.assertExpansion('foo', c, 'foo')
 
611
 
 
612
    def test_env_adding_options(self):
 
613
        c = self.get_config('')
 
614
        self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
 
615
 
 
616
    def test_env_overriding_options(self):
 
617
        c = self.get_config('foo=baz')
 
618
        self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
 
619
 
 
620
    def test_simple_ref(self):
 
621
        c = self.get_config('foo=xxx')
 
622
        self.assertExpansion('xxx', c, '{foo}')
 
623
 
 
624
    def test_unknown_ref(self):
 
625
        c = self.get_config('')
 
626
        self.assertRaises(config.ExpandingUnknownOption,
 
627
                          c.expand_options, '{foo}')
 
628
 
 
629
    def test_indirect_ref(self):
 
630
        c = self.get_config('''
 
631
foo=xxx
 
632
bar={foo}
 
633
''')
 
634
        self.assertExpansion('xxx', c, '{bar}')
 
635
 
 
636
    def test_embedded_ref(self):
 
637
        c = self.get_config('''
 
638
foo=xxx
 
639
bar=foo
 
640
''')
 
641
        self.assertExpansion('xxx', c, '{{bar}}')
 
642
 
 
643
    def test_simple_loop(self):
 
644
        c = self.get_config('foo={foo}')
 
645
        self.assertRaises(config.OptionExpansionLoop, c.expand_options,
 
646
                          '{foo}')
 
647
 
 
648
    def test_indirect_loop(self):
 
649
        c = self.get_config('''
 
650
foo={bar}
 
651
bar={baz}
 
652
baz={foo}''')
 
653
        e = self.assertRaises(config.OptionExpansionLoop,
 
654
                              c.expand_options, '{foo}')
 
655
        self.assertEqual('foo->bar->baz', e.refs)
 
656
        self.assertEqual('{foo}', e.string)
 
657
 
 
658
    def test_list(self):
 
659
        conf = self.get_config('''
 
660
foo=start
 
661
bar=middle
 
662
baz=end
 
663
list={foo},{bar},{baz}
 
664
''')
 
665
        self.assertEqual(['start', 'middle', 'end'],
 
666
                         conf.get_user_option('list', expand=True))
 
667
 
 
668
    def test_cascading_list(self):
 
669
        conf = self.get_config('''
 
670
foo=start,{bar}
 
671
bar=middle,{baz}
 
672
baz=end
 
673
list={foo}
 
674
''')
 
675
        self.assertEqual(['start', 'middle', 'end'],
 
676
                         conf.get_user_option('list', expand=True))
 
677
 
 
678
    def test_pathological_hidden_list(self):
 
679
        conf = self.get_config('''
 
680
foo=bin
 
681
bar=go
 
682
start={foo
 
683
middle=},{
 
684
end=bar}
 
685
hidden={start}{middle}{end}
 
686
''')
 
687
        # Nope, it's either a string or a list, and the list wins as soon as a
 
688
        # ',' appears, so the string concatenation never occur.
 
689
        self.assertEqual(['{foo', '}', '{', 'bar}'],
 
690
                         conf.get_user_option('hidden', expand=True))
 
691
 
 
692
 
 
693
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
 
694
 
 
695
    def get_config(self, location, string=None):
 
696
        if string is None:
 
697
            string = ''
 
698
        # Since we don't save the config we won't strictly require to inherit
 
699
        # from TestCaseInTempDir, but an error occurs so quickly...
 
700
        c = config.LocationConfig.from_string(string, location)
 
701
        return c
 
702
 
 
703
    def test_dont_cross_unrelated_section(self):
 
704
        c = self.get_config('/another/branch/path', '''
 
705
[/one/branch/path]
 
706
foo = hello
 
707
bar = {foo}/2
 
708
 
 
709
[/another/branch/path]
 
710
bar = {foo}/2
 
711
''')
 
712
        self.assertRaises(config.ExpandingUnknownOption,
 
713
                          c.get_user_option, 'bar', expand=True)
 
714
 
 
715
    def test_cross_related_sections(self):
 
716
        c = self.get_config('/project/branch/path', '''
 
717
[/project]
 
718
foo = qu
 
719
 
 
720
[/project/branch/path]
 
721
bar = {foo}ux
 
722
''')
 
723
        self.assertEqual('quux', c.get_user_option('bar', expand=True))
 
724
 
 
725
 
 
726
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
 
727
 
 
728
    def test_cannot_reload_without_name(self):
 
729
        conf = config.IniBasedConfig.from_string(sample_config_text)
 
730
        self.assertRaises(AssertionError, conf.reload)
 
731
 
 
732
    def test_reload_see_new_value(self):
 
733
        c1 = config.IniBasedConfig.from_string('editor=vim\n',
 
734
                                               file_name='./test/conf')
 
735
        c1._write_config_file()
 
736
        c2 = config.IniBasedConfig.from_string('editor=emacs\n',
 
737
                                               file_name='./test/conf')
 
738
        c2._write_config_file()
 
739
        self.assertEqual('vim', c1.get_user_option('editor'))
 
740
        self.assertEqual('emacs', c2.get_user_option('editor'))
 
741
        # Make sure we get the Right value
 
742
        c1.reload()
 
743
        self.assertEqual('emacs', c1.get_user_option('editor'))
 
744
 
 
745
 
 
746
class TestLockableConfig(tests.TestCaseInTempDir):
 
747
 
 
748
    scenarios = lockable_config_scenarios()
 
749
 
 
750
    # Set by load_tests
 
751
    config_class = None
 
752
    config_args = None
 
753
    config_section = None
 
754
 
 
755
    def setUp(self):
 
756
        super(TestLockableConfig, self).setUp()
 
757
        self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
 
758
        self.config = self.create_config(self._content)
 
759
 
 
760
    def get_existing_config(self):
 
761
        return self.config_class(*self.config_args)
 
762
 
 
763
    def create_config(self, content):
 
764
        kwargs = dict(save=True)
 
765
        c = self.config_class.from_string(content, *self.config_args, **kwargs)
 
766
        return c
 
767
 
 
768
    def test_simple_read_access(self):
 
769
        self.assertEqual('1', self.config.get_user_option('one'))
 
770
 
 
771
    def test_simple_write_access(self):
 
772
        self.config.set_user_option('one', 'one')
 
773
        self.assertEqual('one', self.config.get_user_option('one'))
 
774
 
 
775
    def test_listen_to_the_last_speaker(self):
 
776
        c1 = self.config
 
777
        c2 = self.get_existing_config()
 
778
        c1.set_user_option('one', 'ONE')
 
779
        c2.set_user_option('two', 'TWO')
 
780
        self.assertEqual('ONE', c1.get_user_option('one'))
 
781
        self.assertEqual('TWO', c2.get_user_option('two'))
 
782
        # The second update respect the first one
 
783
        self.assertEqual('ONE', c2.get_user_option('one'))
 
784
 
 
785
    def test_last_speaker_wins(self):
 
786
        # If the same config is not shared, the same variable modified twice
 
787
        # can only see a single result.
 
788
        c1 = self.config
 
789
        c2 = self.get_existing_config()
 
790
        c1.set_user_option('one', 'c1')
 
791
        c2.set_user_option('one', 'c2')
 
792
        self.assertEqual('c2', c2._get_user_option('one'))
 
793
        # The first modification is still available until another refresh
 
794
        # occur
 
795
        self.assertEqual('c1', c1._get_user_option('one'))
 
796
        c1.set_user_option('two', 'done')
 
797
        self.assertEqual('c2', c1._get_user_option('one'))
 
798
 
 
799
    def test_writes_are_serialized(self):
 
800
        c1 = self.config
 
801
        c2 = self.get_existing_config()
 
802
 
 
803
        # We spawn a thread that will pause *during* the write
 
804
        before_writing = threading.Event()
 
805
        after_writing = threading.Event()
 
806
        writing_done = threading.Event()
 
807
        c1_orig = c1._write_config_file
 
808
 
 
809
        def c1_write_config_file():
 
810
            before_writing.set()
 
811
            c1_orig()
 
812
            # The lock is held. We wait for the main thread to decide when to
 
813
            # continue
 
814
            after_writing.wait()
 
815
        c1._write_config_file = c1_write_config_file
 
816
 
 
817
        def c1_set_option():
 
818
            c1.set_user_option('one', 'c1')
 
819
            writing_done.set()
 
820
        t1 = threading.Thread(target=c1_set_option)
 
821
        # Collect the thread after the test
 
822
        self.addCleanup(t1.join)
 
823
        # Be ready to unblock the thread if the test goes wrong
 
824
        self.addCleanup(after_writing.set)
 
825
        t1.start()
 
826
        before_writing.wait()
 
827
        self.assertTrue(c1._lock.is_held)
 
828
        self.assertRaises(errors.LockContention,
 
829
                          c2.set_user_option, 'one', 'c2')
 
830
        self.assertEqual('c1', c1.get_user_option('one'))
 
831
        # Let the lock be released
 
832
        after_writing.set()
 
833
        writing_done.wait()
 
834
        c2.set_user_option('one', 'c2')
 
835
        self.assertEqual('c2', c2.get_user_option('one'))
 
836
 
 
837
    def test_read_while_writing(self):
 
838
        c1 = self.config
 
839
        # We spawn a thread that will pause *during* the write
 
840
        ready_to_write = threading.Event()
 
841
        do_writing = threading.Event()
 
842
        writing_done = threading.Event()
 
843
        c1_orig = c1._write_config_file
 
844
 
 
845
        def c1_write_config_file():
 
846
            ready_to_write.set()
 
847
            # The lock is held. We wait for the main thread to decide when to
 
848
            # continue
 
849
            do_writing.wait()
 
850
            c1_orig()
 
851
            writing_done.set()
 
852
        c1._write_config_file = c1_write_config_file
 
853
 
 
854
        def c1_set_option():
 
855
            c1.set_user_option('one', 'c1')
 
856
        t1 = threading.Thread(target=c1_set_option)
 
857
        # Collect the thread after the test
 
858
        self.addCleanup(t1.join)
 
859
        # Be ready to unblock the thread if the test goes wrong
 
860
        self.addCleanup(do_writing.set)
 
861
        t1.start()
 
862
        # Ensure the thread is ready to write
 
863
        ready_to_write.wait()
 
864
        self.assertTrue(c1._lock.is_held)
 
865
        self.assertEqual('c1', c1.get_user_option('one'))
 
866
        # If we read during the write, we get the old value
 
867
        c2 = self.get_existing_config()
 
868
        self.assertEqual('1', c2.get_user_option('one'))
 
869
        # Let the writing occur and ensure it occurred
 
870
        do_writing.set()
 
871
        writing_done.wait()
 
872
        # Now we get the updated value
 
873
        c3 = self.get_existing_config()
 
874
        self.assertEqual('c1', c3.get_user_option('one'))
 
875
 
 
876
 
 
877
class TestGetUserOptionAs(TestIniConfig):
 
878
 
 
879
    def test_get_user_option_as_bool(self):
 
880
        conf, parser = self.make_config_parser("""
 
881
a_true_bool = true
 
882
a_false_bool = 0
 
883
an_invalid_bool = maybe
 
884
a_list = hmm, who knows ? # This is interpreted as a list !
 
885
""")
 
886
        get_bool = conf.get_user_option_as_bool
 
887
        self.assertEqual(True, get_bool('a_true_bool'))
 
888
        self.assertEqual(False, get_bool('a_false_bool'))
 
889
        warnings = []
 
890
 
 
891
        def warning(*args):
 
892
            warnings.append(args[0] % args[1:])
 
893
        self.overrideAttr(trace, 'warning', warning)
 
894
        msg = 'Value "%s" is not a boolean for "%s"'
 
895
        self.assertIs(None, get_bool('an_invalid_bool'))
 
896
        self.assertEqual(msg % ('maybe', 'an_invalid_bool'), warnings[0])
 
897
        warnings = []
 
898
        self.assertIs(None, get_bool('not_defined_in_this_config'))
 
899
        self.assertEqual([], warnings)
 
900
 
 
901
    def test_get_user_option_as_list(self):
 
902
        conf, parser = self.make_config_parser("""
 
903
a_list = a,b,c
 
904
length_1 = 1,
 
905
one_item = x
 
906
""")
 
907
        get_list = conf.get_user_option_as_list
 
908
        self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
 
909
        self.assertEqual(['1'], get_list('length_1'))
 
910
        self.assertEqual('x', conf.get_user_option('one_item'))
 
911
        # automatically cast to list
 
912
        self.assertEqual(['x'], get_list('one_item'))
 
913
 
 
914
 
 
915
class TestSupressWarning(TestIniConfig):
 
916
 
 
917
    def make_warnings_config(self, s):
 
918
        conf, parser = self.make_config_parser(s)
 
919
        return conf.suppress_warning
 
920
 
 
921
    def test_suppress_warning_unknown(self):
 
922
        suppress_warning = self.make_warnings_config('')
 
923
        self.assertEqual(False, suppress_warning('unknown_warning'))
 
924
 
 
925
    def test_suppress_warning_known(self):
 
926
        suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
 
927
        self.assertEqual(False, suppress_warning('c'))
 
928
        self.assertEqual(True, suppress_warning('a'))
 
929
        self.assertEqual(True, suppress_warning('b'))
 
930
 
 
931
 
 
932
class TestGetConfig(tests.TestCaseInTempDir):
 
933
 
 
934
    def test_constructs(self):
 
935
        config.GlobalConfig()
 
936
 
 
937
    def test_calls_read_filenames(self):
 
938
        # replace the class that is constructed, to check its parameters
 
939
        oldparserclass = config.ConfigObj
 
940
        config.ConfigObj = InstrumentedConfigObj
 
941
        my_config = config.GlobalConfig()
 
942
        try:
 
943
            parser = my_config._get_parser()
 
944
        finally:
 
945
            config.ConfigObj = oldparserclass
 
946
        self.assertIsInstance(parser, InstrumentedConfigObj)
 
947
        self.assertEqual(parser._calls, [('__init__', bedding.config_path(),
 
948
                                          'utf-8')])
 
949
 
 
950
 
 
951
class TestBranchConfig(tests.TestCaseWithTransport):
 
952
 
 
953
    def test_constructs_valid(self):
 
954
        branch = FakeBranch()
 
955
        my_config = config.BranchConfig(branch)
 
956
        self.assertIsNot(None, my_config)
 
957
 
 
958
    def test_constructs_error(self):
 
959
        self.assertRaises(TypeError, config.BranchConfig)
 
960
 
 
961
    def test_get_location_config(self):
 
962
        branch = FakeBranch()
 
963
        my_config = config.BranchConfig(branch)
 
964
        location_config = my_config._get_location_config()
 
965
        self.assertEqual(branch.base, location_config.location)
 
966
        self.assertIs(location_config, my_config._get_location_config())
 
967
 
 
968
    def test_get_config(self):
 
969
        """The Branch.get_config method works properly"""
 
970
        b = controldir.ControlDir.create_standalone_workingtree('.').branch
 
971
        my_config = b.get_config()
 
972
        self.assertIs(my_config.get_user_option('wacky'), None)
 
973
        my_config.set_user_option('wacky', 'unlikely')
 
974
        self.assertEqual(my_config.get_user_option('wacky'), 'unlikely')
 
975
 
 
976
        # Ensure we get the same thing if we start again
 
977
        b2 = branch.Branch.open('.')
 
978
        my_config2 = b2.get_config()
 
979
        self.assertEqual(my_config2.get_user_option('wacky'), 'unlikely')
 
980
 
 
981
    def test_has_explicit_nickname(self):
 
982
        b = self.make_branch('.')
 
983
        self.assertFalse(b.get_config().has_explicit_nickname())
 
984
        b.nick = 'foo'
 
985
        self.assertTrue(b.get_config().has_explicit_nickname())
 
986
 
 
987
    def test_config_url(self):
 
988
        """The Branch.get_config will use section that uses a local url"""
 
989
        branch = self.make_branch('branch')
 
990
        self.assertEqual('branch', branch.nick)
 
991
 
 
992
        local_url = urlutils.local_path_to_url('branch')
 
993
        conf = config.LocationConfig.from_string(
 
994
            '[%s]\nnickname = foobar' % (local_url,),
 
995
            local_url, save=True)
 
996
        self.assertIsNot(None, conf)
 
997
        self.assertEqual('foobar', branch.nick)
 
998
 
 
999
    def test_config_local_path(self):
 
1000
        """The Branch.get_config will use a local system path"""
 
1001
        branch = self.make_branch('branch')
 
1002
        self.assertEqual('branch', branch.nick)
 
1003
 
 
1004
        local_path = osutils.getcwd().encode('utf8')
 
1005
        config.LocationConfig.from_string(
 
1006
            b'[%s/branch]\nnickname = barry' % (local_path,),
 
1007
            'branch', save=True)
 
1008
        # Now the branch will find its nick via the location config
 
1009
        self.assertEqual('barry', branch.nick)
 
1010
 
 
1011
    def test_config_creates_local(self):
 
1012
        """Creating a new entry in config uses a local path."""
 
1013
        branch = self.make_branch('branch', format='knit')
 
1014
        branch.set_push_location('http://foobar')
 
1015
        local_path = osutils.getcwd().encode('utf8')
 
1016
        # Surprisingly ConfigObj doesn't create a trailing newline
 
1017
        self.check_file_contents(bedding.locations_config_path(),
 
1018
                                 b'[%s/branch]\n'
 
1019
                                 b'push_location = http://foobar\n'
 
1020
                                 b'push_location:policy = norecurse\n'
 
1021
                                 % (local_path,))
 
1022
 
 
1023
    def test_autonick_urlencoded(self):
 
1024
        b = self.make_branch('!repo')
 
1025
        self.assertEqual('!repo', b.get_config().get_nickname())
 
1026
 
 
1027
    def test_autonick_uses_branch_name(self):
 
1028
        b = self.make_branch('foo', name='bar')
 
1029
        self.assertEqual('bar', b.get_config().get_nickname())
 
1030
 
 
1031
    def test_warn_if_masked(self):
 
1032
        warnings = []
 
1033
 
 
1034
        def warning(*args):
 
1035
            warnings.append(args[0] % args[1:])
 
1036
        self.overrideAttr(trace, 'warning', warning)
 
1037
 
 
1038
        def set_option(store, warn_masked=True):
 
1039
            warnings[:] = []
 
1040
            conf.set_user_option('example_option', repr(store), store=store,
 
1041
                                 warn_masked=warn_masked)
 
1042
 
 
1043
        def assertWarning(warning):
 
1044
            if warning is None:
 
1045
                self.assertEqual(0, len(warnings))
 
1046
            else:
 
1047
                self.assertEqual(1, len(warnings))
 
1048
                self.assertEqual(warning, warnings[0])
 
1049
        branch = self.make_branch('.')
 
1050
        conf = branch.get_config()
 
1051
        set_option(config.STORE_GLOBAL)
 
1052
        assertWarning(None)
 
1053
        set_option(config.STORE_BRANCH)
 
1054
        assertWarning(None)
 
1055
        set_option(config.STORE_GLOBAL)
 
1056
        assertWarning('Value "4" is masked by "3" from branch.conf')
 
1057
        set_option(config.STORE_GLOBAL, warn_masked=False)
 
1058
        assertWarning(None)
 
1059
        set_option(config.STORE_LOCATION)
 
1060
        assertWarning(None)
 
1061
        set_option(config.STORE_BRANCH)
 
1062
        assertWarning('Value "3" is masked by "0" from locations.conf')
 
1063
        set_option(config.STORE_BRANCH, warn_masked=False)
 
1064
        assertWarning(None)
 
1065
 
 
1066
 
 
1067
class TestGlobalConfigItems(tests.TestCaseInTempDir):
 
1068
 
 
1069
    def _get_empty_config(self):
 
1070
        my_config = config.GlobalConfig()
 
1071
        return my_config
 
1072
 
 
1073
    def _get_sample_config(self):
 
1074
        my_config = config.GlobalConfig.from_string(sample_config_text)
 
1075
        return my_config
 
1076
 
 
1077
    def test_user_id(self):
 
1078
        my_config = config.GlobalConfig.from_string(sample_config_text)
 
1079
        self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
 
1080
                         my_config._get_user_id())
 
1081
 
 
1082
    def test_absent_user_id(self):
 
1083
        my_config = config.GlobalConfig()
 
1084
        self.assertEqual(None, my_config._get_user_id())
 
1085
 
 
1086
    def test_get_user_option_default(self):
 
1087
        my_config = self._get_empty_config()
 
1088
        self.assertEqual(None, my_config.get_user_option('no_option'))
 
1089
 
 
1090
    def test_get_user_option_global(self):
 
1091
        my_config = self._get_sample_config()
 
1092
        self.assertEqual("something",
 
1093
                         my_config.get_user_option('user_global_option'))
 
1094
 
 
1095
    def test_configured_validate_signatures_in_log(self):
 
1096
        my_config = self._get_sample_config()
 
1097
        self.assertEqual(True, my_config.validate_signatures_in_log())
 
1098
 
 
1099
    def test_get_alias(self):
 
1100
        my_config = self._get_sample_config()
 
1101
        self.assertEqual('help', my_config.get_alias('h'))
 
1102
 
 
1103
    def test_get_aliases(self):
 
1104
        my_config = self._get_sample_config()
 
1105
        aliases = my_config.get_aliases()
 
1106
        self.assertEqual(2, len(aliases))
 
1107
        sorted_keys = sorted(aliases)
 
1108
        self.assertEqual('help', aliases[sorted_keys[0]])
 
1109
        self.assertEqual(sample_long_alias, aliases[sorted_keys[1]])
 
1110
 
 
1111
    def test_get_no_alias(self):
 
1112
        my_config = self._get_sample_config()
 
1113
        self.assertEqual(None, my_config.get_alias('foo'))
 
1114
 
 
1115
    def test_get_long_alias(self):
 
1116
        my_config = self._get_sample_config()
 
1117
        self.assertEqual(sample_long_alias, my_config.get_alias('ll'))
 
1118
 
 
1119
    def test_get_change_editor(self):
 
1120
        my_config = self._get_sample_config()
 
1121
        change_editor = my_config.get_change_editor('old', 'new')
 
1122
        self.assertIs(diff.DiffFromTool, change_editor.__class__)
 
1123
        self.assertEqual('vimdiff -of {new_path} {old_path}',
 
1124
                         ' '.join(change_editor.command_template))
 
1125
 
 
1126
    def test_get_no_change_editor(self):
 
1127
        my_config = self._get_empty_config()
 
1128
        change_editor = my_config.get_change_editor('old', 'new')
 
1129
        self.assertIs(None, change_editor)
 
1130
 
 
1131
    def test_get_merge_tools(self):
 
1132
        conf = self._get_sample_config()
 
1133
        tools = conf.get_merge_tools()
 
1134
        self.log(repr(tools))
 
1135
        self.assertEqual(
 
1136
            {u'funkytool': u'funkytool "arg with spaces" {this_temp}',
 
1137
             u'sometool': u'sometool {base} {this} {other} -o {result}',
 
1138
             u'newtool': u'"newtool with spaces" {this_temp}'},
 
1139
            tools)
 
1140
 
 
1141
    def test_get_merge_tools_empty(self):
 
1142
        conf = self._get_empty_config()
 
1143
        tools = conf.get_merge_tools()
 
1144
        self.assertEqual({}, tools)
 
1145
 
 
1146
    def test_find_merge_tool(self):
 
1147
        conf = self._get_sample_config()
 
1148
        cmdline = conf.find_merge_tool('sometool')
 
1149
        self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
 
1150
 
 
1151
    def test_find_merge_tool_not_found(self):
 
1152
        conf = self._get_sample_config()
 
1153
        cmdline = conf.find_merge_tool('DOES NOT EXIST')
 
1154
        self.assertIs(cmdline, None)
 
1155
 
 
1156
    def test_find_merge_tool_known(self):
 
1157
        conf = self._get_empty_config()
 
1158
        cmdline = conf.find_merge_tool('kdiff3')
 
1159
        self.assertEqual('kdiff3 {base} {this} {other} -o {result}', cmdline)
 
1160
 
 
1161
    def test_find_merge_tool_override_known(self):
 
1162
        conf = self._get_empty_config()
 
1163
        conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
 
1164
        cmdline = conf.find_merge_tool('kdiff3')
 
1165
        self.assertEqual('kdiff3 blah', cmdline)
 
1166
 
 
1167
 
 
1168
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
 
1169
 
 
1170
    def test_empty(self):
 
1171
        my_config = config.GlobalConfig()
 
1172
        self.assertEqual(0, len(my_config.get_aliases()))
 
1173
 
 
1174
    def test_set_alias(self):
 
1175
        my_config = config.GlobalConfig()
 
1176
        alias_value = 'commit --strict'
 
1177
        my_config.set_alias('commit', alias_value)
 
1178
        new_config = config.GlobalConfig()
 
1179
        self.assertEqual(alias_value, new_config.get_alias('commit'))
 
1180
 
 
1181
    def test_remove_alias(self):
 
1182
        my_config = config.GlobalConfig()
 
1183
        my_config.set_alias('commit', 'commit --strict')
 
1184
        # Now remove the alias again.
 
1185
        my_config.unset_alias('commit')
 
1186
        new_config = config.GlobalConfig()
 
1187
        self.assertIs(None, new_config.get_alias('commit'))
 
1188
 
 
1189
 
 
1190
class TestLocationConfig(tests.TestCaseInTempDir, TestOptionsMixin):
 
1191
 
 
1192
    def test_constructs_valid(self):
 
1193
        config.LocationConfig('http://example.com')
 
1194
 
 
1195
    def test_constructs_error(self):
 
1196
        self.assertRaises(TypeError, config.LocationConfig)
 
1197
 
 
1198
    def test_branch_calls_read_filenames(self):
 
1199
        # This is testing the correct file names are provided.
 
1200
        # TODO: consolidate with the test for GlobalConfigs filename checks.
 
1201
        #
 
1202
        # replace the class that is constructed, to check its parameters
 
1203
        oldparserclass = config.ConfigObj
 
1204
        config.ConfigObj = InstrumentedConfigObj
 
1205
        try:
 
1206
            my_config = config.LocationConfig('http://www.example.com')
 
1207
            parser = my_config._get_parser()
 
1208
        finally:
 
1209
            config.ConfigObj = oldparserclass
 
1210
        self.assertIsInstance(parser, InstrumentedConfigObj)
 
1211
        self.assertEqual(parser._calls,
 
1212
                         [('__init__', bedding.locations_config_path(),
 
1213
                           'utf-8')])
 
1214
 
 
1215
    def test_get_global_config(self):
 
1216
        my_config = config.BranchConfig(FakeBranch('http://example.com'))
 
1217
        global_config = my_config._get_global_config()
 
1218
        self.assertIsInstance(global_config, config.GlobalConfig)
 
1219
        self.assertIs(global_config, my_config._get_global_config())
 
1220
 
 
1221
    def assertLocationMatching(self, expected):
 
1222
        self.assertEqual(expected,
 
1223
                         list(self.my_location_config._get_matching_sections()))
 
1224
 
 
1225
    def test__get_matching_sections_no_match(self):
 
1226
        self.get_branch_config('/')
 
1227
        self.assertLocationMatching([])
 
1228
 
 
1229
    def test__get_matching_sections_exact(self):
 
1230
        self.get_branch_config('http://www.example.com')
 
1231
        self.assertLocationMatching([('http://www.example.com', '')])
 
1232
 
 
1233
    def test__get_matching_sections_suffix_does_not(self):
 
1234
        self.get_branch_config('http://www.example.com-com')
 
1235
        self.assertLocationMatching([])
 
1236
 
 
1237
    def test__get_matching_sections_subdir_recursive(self):
 
1238
        self.get_branch_config('http://www.example.com/com')
 
1239
        self.assertLocationMatching([('http://www.example.com', 'com')])
 
1240
 
 
1241
    def test__get_matching_sections_ignoreparent(self):
 
1242
        self.get_branch_config('http://www.example.com/ignoreparent')
 
1243
        self.assertLocationMatching([('http://www.example.com/ignoreparent',
 
1244
                                      '')])
 
1245
 
 
1246
    def test__get_matching_sections_ignoreparent_subdir(self):
 
1247
        self.get_branch_config(
 
1248
            'http://www.example.com/ignoreparent/childbranch')
 
1249
        self.assertLocationMatching([('http://www.example.com/ignoreparent',
 
1250
                                      'childbranch')])
 
1251
 
 
1252
    def test__get_matching_sections_subdir_trailing_slash(self):
 
1253
        self.get_branch_config('/b')
 
1254
        self.assertLocationMatching([('/b/', '')])
 
1255
 
 
1256
    def test__get_matching_sections_subdir_child(self):
 
1257
        self.get_branch_config('/a/foo')
 
1258
        self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
 
1259
 
 
1260
    def test__get_matching_sections_subdir_child_child(self):
 
1261
        self.get_branch_config('/a/foo/bar')
 
1262
        self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
 
1263
 
 
1264
    def test__get_matching_sections_trailing_slash_with_children(self):
 
1265
        self.get_branch_config('/a/')
 
1266
        self.assertLocationMatching([('/a/', '')])
 
1267
 
 
1268
    def test__get_matching_sections_explicit_over_glob(self):
 
1269
        # XXX: 2006-09-08 jamesh
 
1270
        # This test only passes because ord('c') > ord('*').  If there
 
1271
        # was a config section for '/a/?', it would get precedence
 
1272
        # over '/a/c'.
 
1273
        self.get_branch_config('/a/c')
 
1274
        self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
 
1275
 
 
1276
    def test__get_option_policy_normal(self):
 
1277
        self.get_branch_config('http://www.example.com')
 
1278
        self.assertEqual(
 
1279
            self.my_location_config._get_config_policy(
 
1280
                'http://www.example.com', 'normal_option'),
 
1281
            config.POLICY_NONE)
 
1282
 
 
1283
    def test__get_option_policy_norecurse(self):
 
1284
        self.get_branch_config('http://www.example.com')
 
1285
        self.assertEqual(
 
1286
            self.my_location_config._get_option_policy(
 
1287
                'http://www.example.com', 'norecurse_option'),
 
1288
            config.POLICY_NORECURSE)
 
1289
        # Test old recurse=False setting:
 
1290
        self.assertEqual(
 
1291
            self.my_location_config._get_option_policy(
 
1292
                'http://www.example.com/norecurse', 'normal_option'),
 
1293
            config.POLICY_NORECURSE)
 
1294
 
 
1295
    def test__get_option_policy_normal(self):
 
1296
        self.get_branch_config('http://www.example.com')
 
1297
        self.assertEqual(
 
1298
            self.my_location_config._get_option_policy(
 
1299
                'http://www.example.com', 'appendpath_option'),
 
1300
            config.POLICY_APPENDPATH)
 
1301
 
 
1302
    def test__get_options_with_policy(self):
 
1303
        self.get_branch_config('/dir/subdir',
 
1304
                               location_config="""\
 
1305
[/dir]
 
1306
other_url = /other-dir
 
1307
other_url:policy = appendpath
 
1308
[/dir/subdir]
 
1309
other_url = /other-subdir
 
1310
""")
 
1311
        self.assertOptions(
 
1312
            [(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
 
1313
             (u'other_url', u'/other-dir', u'/dir', 'locations'),
 
1314
             (u'other_url:policy', u'appendpath', u'/dir', 'locations')],
 
1315
            self.my_location_config)
 
1316
 
 
1317
    def test_location_without_username(self):
 
1318
        self.get_branch_config('http://www.example.com/ignoreparent')
 
1319
        self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
 
1320
                         self.my_config.username())
 
1321
 
 
1322
    def test_location_not_listed(self):
 
1323
        """Test that the global username is used when no location matches"""
 
1324
        self.get_branch_config('/home/robertc/sources')
 
1325
        self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
 
1326
                         self.my_config.username())
 
1327
 
 
1328
    def test_overriding_location(self):
 
1329
        self.get_branch_config('http://www.example.com/foo')
 
1330
        self.assertEqual('Robert Collins <robertc@example.org>',
 
1331
                         self.my_config.username())
 
1332
 
 
1333
    def test_get_user_option_global(self):
 
1334
        self.get_branch_config('/a')
 
1335
        self.assertEqual('something',
 
1336
                         self.my_config.get_user_option('user_global_option'))
 
1337
 
 
1338
    def test_get_user_option_local(self):
 
1339
        self.get_branch_config('/a')
 
1340
        self.assertEqual('local',
 
1341
                         self.my_config.get_user_option('user_local_option'))
 
1342
 
 
1343
    def test_get_user_option_appendpath(self):
 
1344
        # returned as is for the base path:
 
1345
        self.get_branch_config('http://www.example.com')
 
1346
        self.assertEqual('append',
 
1347
                         self.my_config.get_user_option('appendpath_option'))
 
1348
        # Extra path components get appended:
 
1349
        self.get_branch_config('http://www.example.com/a/b/c')
 
1350
        self.assertEqual('append/a/b/c',
 
1351
                         self.my_config.get_user_option('appendpath_option'))
 
1352
        # Overriden for http://www.example.com/dir, where it is a
 
1353
        # normal option:
 
1354
        self.get_branch_config('http://www.example.com/dir/a/b/c')
 
1355
        self.assertEqual('normal',
 
1356
                         self.my_config.get_user_option('appendpath_option'))
 
1357
 
 
1358
    def test_get_user_option_norecurse(self):
 
1359
        self.get_branch_config('http://www.example.com')
 
1360
        self.assertEqual('norecurse',
 
1361
                         self.my_config.get_user_option('norecurse_option'))
 
1362
        self.get_branch_config('http://www.example.com/dir')
 
1363
        self.assertEqual(None,
 
1364
                         self.my_config.get_user_option('norecurse_option'))
 
1365
        # http://www.example.com/norecurse is a recurse=False section
 
1366
        # that redefines normal_option.  Subdirectories do not pick up
 
1367
        # this redefinition.
 
1368
        self.get_branch_config('http://www.example.com/norecurse')
 
1369
        self.assertEqual('norecurse',
 
1370
                         self.my_config.get_user_option('normal_option'))
 
1371
        self.get_branch_config('http://www.example.com/norecurse/subdir')
 
1372
        self.assertEqual('normal',
 
1373
                         self.my_config.get_user_option('normal_option'))
 
1374
 
 
1375
    def test_set_user_option_norecurse(self):
 
1376
        self.get_branch_config('http://www.example.com')
 
1377
        self.my_config.set_user_option('foo', 'bar',
 
1378
                                       store=config.STORE_LOCATION_NORECURSE)
 
1379
        self.assertEqual(
 
1380
            self.my_location_config._get_option_policy(
 
1381
                'http://www.example.com', 'foo'),
 
1382
            config.POLICY_NORECURSE)
 
1383
 
 
1384
    def test_set_user_option_appendpath(self):
 
1385
        self.get_branch_config('http://www.example.com')
 
1386
        self.my_config.set_user_option('foo', 'bar',
 
1387
                                       store=config.STORE_LOCATION_APPENDPATH)
 
1388
        self.assertEqual(
 
1389
            self.my_location_config._get_option_policy(
 
1390
                'http://www.example.com', 'foo'),
 
1391
            config.POLICY_APPENDPATH)
 
1392
 
 
1393
    def test_set_user_option_change_policy(self):
 
1394
        self.get_branch_config('http://www.example.com')
 
1395
        self.my_config.set_user_option('norecurse_option', 'normal',
 
1396
                                       store=config.STORE_LOCATION)
 
1397
        self.assertEqual(
 
1398
            self.my_location_config._get_option_policy(
 
1399
                'http://www.example.com', 'norecurse_option'),
 
1400
            config.POLICY_NONE)
 
1401
 
 
1402
    def get_branch_config(self, location, global_config=None,
 
1403
                          location_config=None):
 
1404
        my_branch = FakeBranch(location)
 
1405
        if global_config is None:
 
1406
            global_config = sample_config_text
 
1407
        if location_config is None:
 
1408
            location_config = sample_branches_text
 
1409
 
 
1410
        config.GlobalConfig.from_string(global_config, save=True)
 
1411
        config.LocationConfig.from_string(location_config, my_branch.base,
 
1412
                                          save=True)
 
1413
        my_config = config.BranchConfig(my_branch)
 
1414
        self.my_config = my_config
 
1415
        self.my_location_config = my_config._get_location_config()
 
1416
 
 
1417
    def test_set_user_setting_sets_and_saves2(self):
 
1418
        self.get_branch_config('/a/c')
 
1419
        self.assertIs(self.my_config.get_user_option('foo'), None)
 
1420
        self.my_config.set_user_option('foo', 'bar')
 
1421
        self.assertEqual(
 
1422
            self.my_config.branch.control_files.files['branch.conf'].strip(),
 
1423
            b'foo = bar')
 
1424
        self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
 
1425
        self.my_config.set_user_option('foo', 'baz',
 
1426
                                       store=config.STORE_LOCATION)
 
1427
        self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
 
1428
        self.my_config.set_user_option('foo', 'qux')
 
1429
        self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
 
1430
 
 
1431
    def test_get_bzr_remote_path(self):
 
1432
        my_config = config.LocationConfig('/a/c')
 
1433
        self.assertEqual('bzr', my_config.get_bzr_remote_path())
 
1434
        my_config.set_user_option('bzr_remote_path', '/path-bzr')
 
1435
        self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
 
1436
        self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
 
1437
        self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
 
1438
 
 
1439
 
 
1440
precedence_global = b'option = global'
 
1441
precedence_branch = b'option = branch'
 
1442
precedence_location = b"""
 
1443
[http://]
 
1444
recurse = true
 
1445
option = recurse
 
1446
[http://example.com/specific]
 
1447
option = exact
 
1448
"""
 
1449
 
 
1450
 
 
1451
class TestBranchConfigItems(tests.TestCaseInTempDir):
 
1452
 
 
1453
    def get_branch_config(self, global_config=None, location=None,
 
1454
                          location_config=None, branch_data_config=None):
 
1455
        my_branch = FakeBranch(location)
 
1456
        if global_config is not None:
 
1457
            config.GlobalConfig.from_string(global_config, save=True)
 
1458
        if location_config is not None:
 
1459
            config.LocationConfig.from_string(location_config, my_branch.base,
 
1460
                                              save=True)
 
1461
        my_config = config.BranchConfig(my_branch)
 
1462
        if branch_data_config is not None:
 
1463
            my_config.branch.control_files.files['branch.conf'] = \
 
1464
                branch_data_config
 
1465
        return my_config
 
1466
 
 
1467
    def test_user_id(self):
 
1468
        branch = FakeBranch()
 
1469
        my_config = config.BranchConfig(branch)
 
1470
        self.assertIsNot(None, my_config.username())
 
1471
        my_config.branch.control_files.files['email'] = "John"
 
1472
        my_config.set_user_option('email',
 
1473
                                  "Robert Collins <robertc@example.org>")
 
1474
        self.assertEqual("Robert Collins <robertc@example.org>",
 
1475
                         my_config.username())
 
1476
 
 
1477
    def test_BRZ_EMAIL_OVERRIDES(self):
 
1478
        self.overrideEnv('BRZ_EMAIL', "Robert Collins <robertc@example.org>")
 
1479
        branch = FakeBranch()
 
1480
        my_config = config.BranchConfig(branch)
 
1481
        self.assertEqual("Robert Collins <robertc@example.org>",
 
1482
                         my_config.username())
 
1483
 
 
1484
    def test_get_user_option_global(self):
 
1485
        my_config = self.get_branch_config(global_config=sample_config_text)
 
1486
        self.assertEqual('something',
 
1487
                         my_config.get_user_option('user_global_option'))
 
1488
 
 
1489
    def test_config_precedence(self):
 
1490
        # FIXME: eager test, luckily no persitent config file makes it fail
 
1491
        # -- vila 20100716
 
1492
        my_config = self.get_branch_config(global_config=precedence_global)
 
1493
        self.assertEqual(my_config.get_user_option('option'), 'global')
 
1494
        my_config = self.get_branch_config(global_config=precedence_global,
 
1495
                                           branch_data_config=precedence_branch)
 
1496
        self.assertEqual(my_config.get_user_option('option'), 'branch')
 
1497
        my_config = self.get_branch_config(
 
1498
            global_config=precedence_global,
 
1499
            branch_data_config=precedence_branch,
 
1500
            location_config=precedence_location)
 
1501
        self.assertEqual(my_config.get_user_option('option'), 'recurse')
 
1502
        my_config = self.get_branch_config(
 
1503
            global_config=precedence_global,
 
1504
            branch_data_config=precedence_branch,
 
1505
            location_config=precedence_location,
 
1506
            location='http://example.com/specific')
 
1507
        self.assertEqual(my_config.get_user_option('option'), 'exact')
 
1508
 
 
1509
 
 
1510
class TestMailAddressExtraction(tests.TestCase):
 
1511
 
 
1512
    def test_extract_email_address(self):
 
1513
        self.assertEqual('jane@test.com',
 
1514
                         config.extract_email_address('Jane <jane@test.com>'))
 
1515
        self.assertRaises(config.NoEmailInUsername,
 
1516
                          config.extract_email_address, 'Jane Tester')
 
1517
 
 
1518
    def test_parse_username(self):
 
1519
        self.assertEqual(('', 'jdoe@example.com'),
 
1520
                         config.parse_username('jdoe@example.com'))
 
1521
        self.assertEqual(('', 'jdoe@example.com'),
 
1522
                         config.parse_username('<jdoe@example.com>'))
 
1523
        self.assertEqual(('John Doe', 'jdoe@example.com'),
 
1524
                         config.parse_username('John Doe <jdoe@example.com>'))
 
1525
        self.assertEqual(('John Doe', ''),
 
1526
                         config.parse_username('John Doe'))
 
1527
        self.assertEqual(('John Doe', 'jdoe@example.com'),
 
1528
                         config.parse_username('John Doe jdoe@example.com'))
 
1529
 
 
1530
 
 
1531
class TestTreeConfig(tests.TestCaseWithTransport):
 
1532
 
 
1533
    def test_get_value(self):
 
1534
        """Test that retreiving a value from a section is possible"""
 
1535
        branch = self.make_branch('.')
 
1536
        tree_config = config.TreeConfig(branch)
 
1537
        tree_config.set_option('value', 'key', 'SECTION')
 
1538
        tree_config.set_option('value2', 'key2')
 
1539
        tree_config.set_option('value3-top', 'key3')
 
1540
        tree_config.set_option('value3-section', 'key3', 'SECTION')
 
1541
        value = tree_config.get_option('key', 'SECTION')
 
1542
        self.assertEqual(value, 'value')
 
1543
        value = tree_config.get_option('key2')
 
1544
        self.assertEqual(value, 'value2')
 
1545
        self.assertEqual(tree_config.get_option('non-existant'), None)
 
1546
        value = tree_config.get_option('non-existant', 'SECTION')
 
1547
        self.assertEqual(value, None)
 
1548
        value = tree_config.get_option('non-existant', default='default')
 
1549
        self.assertEqual(value, 'default')
 
1550
        self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
 
1551
        value = tree_config.get_option('key2', 'NOSECTION', default='default')
 
1552
        self.assertEqual(value, 'default')
 
1553
        value = tree_config.get_option('key3')
 
1554
        self.assertEqual(value, 'value3-top')
 
1555
        value = tree_config.get_option('key3', 'SECTION')
 
1556
        self.assertEqual(value, 'value3-section')
 
1557
 
 
1558
 
 
1559
class TestTransportConfig(tests.TestCaseWithTransport):
 
1560
 
 
1561
    def test_load_utf8(self):
 
1562
        """Ensure we can load an utf8-encoded file."""
 
1563
        t = self.get_transport()
 
1564
        unicode_user = u'b\N{Euro Sign}ar'
 
1565
        unicode_content = u'user=%s' % (unicode_user,)
 
1566
        utf8_content = unicode_content.encode('utf8')
 
1567
        # Store the raw content in the config file
 
1568
        t.put_bytes('foo.conf', utf8_content)
 
1569
        conf = config.TransportConfig(t, 'foo.conf')
 
1570
        self.assertEqual(unicode_user, conf.get_option('user'))
 
1571
 
 
1572
    def test_load_non_ascii(self):
 
1573
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
1574
        t = self.get_transport()
 
1575
        t.put_bytes('foo.conf', b'user=foo\n#\xff\n')
 
1576
        conf = config.TransportConfig(t, 'foo.conf')
 
1577
        self.assertRaises(config.ConfigContentError, conf._get_configobj)
 
1578
 
 
1579
    def test_load_erroneous_content(self):
 
1580
        """Ensure we display a proper error on content that can't be parsed."""
 
1581
        t = self.get_transport()
 
1582
        t.put_bytes('foo.conf', b'[open_section\n')
 
1583
        conf = config.TransportConfig(t, 'foo.conf')
 
1584
        self.assertRaises(config.ParseConfigError, conf._get_configobj)
 
1585
 
 
1586
    def test_load_permission_denied(self):
 
1587
        """Ensure we get an empty config file if the file is inaccessible."""
 
1588
        warnings = []
 
1589
 
 
1590
        def warning(*args):
 
1591
            warnings.append(args[0] % args[1:])
 
1592
        self.overrideAttr(trace, 'warning', warning)
 
1593
 
 
1594
        class DenyingTransport(object):
 
1595
 
 
1596
            def __init__(self, base):
 
1597
                self.base = base
 
1598
 
 
1599
            def get_bytes(self, relpath):
 
1600
                raise errors.PermissionDenied(relpath, "")
 
1601
 
 
1602
        cfg = config.TransportConfig(
 
1603
            DenyingTransport("nonexisting://"), 'control.conf')
 
1604
        self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
 
1605
        self.assertEqual(
 
1606
            warnings,
 
1607
            [u'Permission denied while trying to open configuration file '
 
1608
             u'nonexisting:///control.conf.'])
 
1609
 
 
1610
    def test_get_value(self):
 
1611
        """Test that retreiving a value from a section is possible"""
 
1612
        bzrdir_config = config.TransportConfig(self.get_transport('.'),
 
1613
                                               'control.conf')
 
1614
        bzrdir_config.set_option('value', 'key', 'SECTION')
 
1615
        bzrdir_config.set_option('value2', 'key2')
 
1616
        bzrdir_config.set_option('value3-top', 'key3')
 
1617
        bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
 
1618
        value = bzrdir_config.get_option('key', 'SECTION')
 
1619
        self.assertEqual(value, 'value')
 
1620
        value = bzrdir_config.get_option('key2')
 
1621
        self.assertEqual(value, 'value2')
 
1622
        self.assertEqual(bzrdir_config.get_option('non-existant'), None)
 
1623
        value = bzrdir_config.get_option('non-existant', 'SECTION')
 
1624
        self.assertEqual(value, None)
 
1625
        value = bzrdir_config.get_option('non-existant', default='default')
 
1626
        self.assertEqual(value, 'default')
 
1627
        self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
 
1628
        value = bzrdir_config.get_option('key2', 'NOSECTION',
 
1629
                                         default='default')
 
1630
        self.assertEqual(value, 'default')
 
1631
        value = bzrdir_config.get_option('key3')
 
1632
        self.assertEqual(value, 'value3-top')
 
1633
        value = bzrdir_config.get_option('key3', 'SECTION')
 
1634
        self.assertEqual(value, 'value3-section')
 
1635
 
 
1636
    def test_set_unset_default_stack_on(self):
 
1637
        my_dir = self.make_controldir('.')
 
1638
        bzrdir_config = config.BzrDirConfig(my_dir)
 
1639
        self.assertIs(None, bzrdir_config.get_default_stack_on())
 
1640
        bzrdir_config.set_default_stack_on('Foo')
 
1641
        self.assertEqual('Foo', bzrdir_config._config.get_option(
 
1642
                         'default_stack_on'))
 
1643
        self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
 
1644
        bzrdir_config.set_default_stack_on(None)
 
1645
        self.assertIs(None, bzrdir_config.get_default_stack_on())
 
1646
 
 
1647
 
 
1648
class TestOldConfigHooks(tests.TestCaseWithTransport):
 
1649
 
 
1650
    def setUp(self):
 
1651
        super(TestOldConfigHooks, self).setUp()
 
1652
        create_configs_with_file_option(self)
 
1653
 
 
1654
    def assertGetHook(self, conf, name, value):
 
1655
        calls = []
 
1656
 
 
1657
        def hook(*args):
 
1658
            calls.append(args)
 
1659
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
1660
        self.addCleanup(
 
1661
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
1662
        self.assertLength(0, calls)
 
1663
        actual_value = conf.get_user_option(name)
 
1664
        self.assertEqual(value, actual_value)
 
1665
        self.assertLength(1, calls)
 
1666
        self.assertEqual((conf, name, value), calls[0])
 
1667
 
 
1668
    def test_get_hook_breezy(self):
 
1669
        self.assertGetHook(self.breezy_config, 'file', 'breezy')
 
1670
 
 
1671
    def test_get_hook_locations(self):
 
1672
        self.assertGetHook(self.locations_config, 'file', 'locations')
 
1673
 
 
1674
    def test_get_hook_branch(self):
 
1675
        # Since locations masks branch, we define a different option
 
1676
        self.branch_config.set_user_option('file2', 'branch')
 
1677
        self.assertGetHook(self.branch_config, 'file2', 'branch')
 
1678
 
 
1679
    def assertSetHook(self, conf, name, value):
 
1680
        calls = []
 
1681
 
 
1682
        def hook(*args):
 
1683
            calls.append(args)
 
1684
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
1685
        self.addCleanup(
 
1686
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
1687
        self.assertLength(0, calls)
 
1688
        conf.set_user_option(name, value)
 
1689
        self.assertLength(1, calls)
 
1690
        # We can't assert the conf object below as different configs use
 
1691
        # different means to implement set_user_option and we care only about
 
1692
        # coverage here.
 
1693
        self.assertEqual((name, value), calls[0][1:])
 
1694
 
 
1695
    def test_set_hook_breezy(self):
 
1696
        self.assertSetHook(self.breezy_config, 'foo', 'breezy')
 
1697
 
 
1698
    def test_set_hook_locations(self):
 
1699
        self.assertSetHook(self.locations_config, 'foo', 'locations')
 
1700
 
 
1701
    def test_set_hook_branch(self):
 
1702
        self.assertSetHook(self.branch_config, 'foo', 'branch')
 
1703
 
 
1704
    def assertRemoveHook(self, conf, name, section_name=None):
 
1705
        calls = []
 
1706
 
 
1707
        def hook(*args):
 
1708
            calls.append(args)
 
1709
        config.OldConfigHooks.install_named_hook('remove', hook, None)
 
1710
        self.addCleanup(
 
1711
            config.OldConfigHooks.uninstall_named_hook, 'remove', None)
 
1712
        self.assertLength(0, calls)
 
1713
        conf.remove_user_option(name, section_name)
 
1714
        self.assertLength(1, calls)
 
1715
        # We can't assert the conf object below as different configs use
 
1716
        # different means to implement remove_user_option and we care only about
 
1717
        # coverage here.
 
1718
        self.assertEqual((name,), calls[0][1:])
 
1719
 
 
1720
    def test_remove_hook_breezy(self):
 
1721
        self.assertRemoveHook(self.breezy_config, 'file')
 
1722
 
 
1723
    def test_remove_hook_locations(self):
 
1724
        self.assertRemoveHook(self.locations_config, 'file',
 
1725
                              self.locations_config.location)
 
1726
 
 
1727
    def test_remove_hook_branch(self):
 
1728
        self.assertRemoveHook(self.branch_config, 'file')
 
1729
 
 
1730
    def assertLoadHook(self, name, conf_class, *conf_args):
 
1731
        calls = []
 
1732
 
 
1733
        def hook(*args):
 
1734
            calls.append(args)
 
1735
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
1736
        self.addCleanup(
 
1737
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
1738
        self.assertLength(0, calls)
 
1739
        # Build a config
 
1740
        conf = conf_class(*conf_args)
 
1741
        # Access an option to trigger a load
 
1742
        conf.get_user_option(name)
 
1743
        self.assertLength(1, calls)
 
1744
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1745
 
 
1746
    def test_load_hook_breezy(self):
 
1747
        self.assertLoadHook('file', config.GlobalConfig)
 
1748
 
 
1749
    def test_load_hook_locations(self):
 
1750
        self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
 
1751
 
 
1752
    def test_load_hook_branch(self):
 
1753
        self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
 
1754
 
 
1755
    def assertSaveHook(self, conf):
 
1756
        calls = []
 
1757
 
 
1758
        def hook(*args):
 
1759
            calls.append(args)
 
1760
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
1761
        self.addCleanup(
 
1762
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
1763
        self.assertLength(0, calls)
 
1764
        # Setting an option triggers a save
 
1765
        conf.set_user_option('foo', 'bar')
 
1766
        self.assertLength(1, calls)
 
1767
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1768
 
 
1769
    def test_save_hook_breezy(self):
 
1770
        self.assertSaveHook(self.breezy_config)
 
1771
 
 
1772
    def test_save_hook_locations(self):
 
1773
        self.assertSaveHook(self.locations_config)
 
1774
 
 
1775
    def test_save_hook_branch(self):
 
1776
        self.assertSaveHook(self.branch_config)
 
1777
 
 
1778
 
 
1779
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
 
1780
    """Tests config hooks for remote configs.
 
1781
 
 
1782
    No tests for the remove hook as this is not implemented there.
 
1783
    """
 
1784
 
 
1785
    def setUp(self):
 
1786
        super(TestOldConfigHooksForRemote, self).setUp()
 
1787
        self.transport_server = test_server.SmartTCPServer_for_testing
 
1788
        create_configs_with_file_option(self)
 
1789
 
 
1790
    def assertGetHook(self, conf, name, value):
 
1791
        calls = []
 
1792
 
 
1793
        def hook(*args):
 
1794
            calls.append(args)
 
1795
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
1796
        self.addCleanup(
 
1797
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
1798
        self.assertLength(0, calls)
 
1799
        actual_value = conf.get_option(name)
 
1800
        self.assertEqual(value, actual_value)
 
1801
        self.assertLength(1, calls)
 
1802
        self.assertEqual((conf, name, value), calls[0])
 
1803
 
 
1804
    def test_get_hook_remote_branch(self):
 
1805
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1806
        self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
 
1807
 
 
1808
    def test_get_hook_remote_bzrdir(self):
 
1809
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1810
        conf = remote_bzrdir._get_config()
 
1811
        conf.set_option('remotedir', 'file')
 
1812
        self.assertGetHook(conf, 'file', 'remotedir')
 
1813
 
 
1814
    def assertSetHook(self, conf, name, value):
 
1815
        calls = []
 
1816
 
 
1817
        def hook(*args):
 
1818
            calls.append(args)
 
1819
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
1820
        self.addCleanup(
 
1821
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
1822
        self.assertLength(0, calls)
 
1823
        conf.set_option(value, name)
 
1824
        self.assertLength(1, calls)
 
1825
        # We can't assert the conf object below as different configs use
 
1826
        # different means to implement set_user_option and we care only about
 
1827
        # coverage here.
 
1828
        self.assertEqual((name, value), calls[0][1:])
 
1829
 
 
1830
    def test_set_hook_remote_branch(self):
 
1831
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1832
        self.addCleanup(remote_branch.lock_write().unlock)
 
1833
        self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
 
1834
 
 
1835
    def test_set_hook_remote_bzrdir(self):
 
1836
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1837
        self.addCleanup(remote_branch.lock_write().unlock)
 
1838
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1839
        self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
 
1840
 
 
1841
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
 
1842
        calls = []
 
1843
 
 
1844
        def hook(*args):
 
1845
            calls.append(args)
 
1846
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
1847
        self.addCleanup(
 
1848
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
1849
        self.assertLength(0, calls)
 
1850
        # Build a config
 
1851
        conf = conf_class(*conf_args)
 
1852
        # Access an option to trigger a load
 
1853
        conf.get_option(name)
 
1854
        self.assertLength(expected_nb_calls, calls)
 
1855
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1856
 
 
1857
    def test_load_hook_remote_branch(self):
 
1858
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1859
        self.assertLoadHook(
 
1860
            1, 'file', remote.RemoteBranchConfig, remote_branch)
 
1861
 
 
1862
    def test_load_hook_remote_bzrdir(self):
 
1863
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1864
        # The config file doesn't exist, set an option to force its creation
 
1865
        conf = remote_bzrdir._get_config()
 
1866
        conf.set_option('remotedir', 'file')
 
1867
        # We get one call for the server and one call for the client, this is
 
1868
        # caused by the differences in implementations betwen
 
1869
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
 
1870
        # SmartServerBranchGetConfigFile (in smart/branch.py)
 
1871
        self.assertLoadHook(
 
1872
            2, 'file', remote.RemoteBzrDirConfig, remote_bzrdir)
 
1873
 
 
1874
    def assertSaveHook(self, conf):
 
1875
        calls = []
 
1876
 
 
1877
        def hook(*args):
 
1878
            calls.append(args)
 
1879
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
1880
        self.addCleanup(
 
1881
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
1882
        self.assertLength(0, calls)
 
1883
        # Setting an option triggers a save
 
1884
        conf.set_option('foo', 'bar')
 
1885
        self.assertLength(1, calls)
 
1886
        # Since we can't assert about conf, we just use the number of calls ;-/
 
1887
 
 
1888
    def test_save_hook_remote_branch(self):
 
1889
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1890
        self.addCleanup(remote_branch.lock_write().unlock)
 
1891
        self.assertSaveHook(remote_branch._get_config())
 
1892
 
 
1893
    def test_save_hook_remote_bzrdir(self):
 
1894
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
1895
        self.addCleanup(remote_branch.lock_write().unlock)
 
1896
        remote_bzrdir = controldir.ControlDir.open(self.get_url('tree'))
 
1897
        self.assertSaveHook(remote_bzrdir._get_config())
 
1898
 
 
1899
 
 
1900
class TestOptionNames(tests.TestCase):
 
1901
 
 
1902
    def is_valid(self, name):
 
1903
        return config._option_ref_re.match('{%s}' % name) is not None
 
1904
 
 
1905
    def test_valid_names(self):
 
1906
        self.assertTrue(self.is_valid('foo'))
 
1907
        self.assertTrue(self.is_valid('foo.bar'))
 
1908
        self.assertTrue(self.is_valid('f1'))
 
1909
        self.assertTrue(self.is_valid('_'))
 
1910
        self.assertTrue(self.is_valid('__bar__'))
 
1911
        self.assertTrue(self.is_valid('a_'))
 
1912
        self.assertTrue(self.is_valid('a1'))
 
1913
        # Don't break bzr-svn for no good reason
 
1914
        self.assertTrue(self.is_valid('guessed-layout'))
 
1915
 
 
1916
    def test_invalid_names(self):
 
1917
        self.assertFalse(self.is_valid(' foo'))
 
1918
        self.assertFalse(self.is_valid('foo '))
 
1919
        self.assertFalse(self.is_valid('1'))
 
1920
        self.assertFalse(self.is_valid('1,2'))
 
1921
        self.assertFalse(self.is_valid('foo$'))
 
1922
        self.assertFalse(self.is_valid('!foo'))
 
1923
        self.assertFalse(self.is_valid('foo.'))
 
1924
        self.assertFalse(self.is_valid('foo..bar'))
 
1925
        self.assertFalse(self.is_valid('{}'))
 
1926
        self.assertFalse(self.is_valid('{a}'))
 
1927
        self.assertFalse(self.is_valid('a\n'))
 
1928
        self.assertFalse(self.is_valid('-'))
 
1929
        self.assertFalse(self.is_valid('-a'))
 
1930
        self.assertFalse(self.is_valid('a-'))
 
1931
        self.assertFalse(self.is_valid('a--a'))
 
1932
 
 
1933
    def assertSingleGroup(self, reference):
 
1934
        # the regexp is used with split and as such should match the reference
 
1935
        # *only*, if more groups needs to be defined, (?:...) should be used.
 
1936
        m = config._option_ref_re.match('{a}')
 
1937
        self.assertLength(1, m.groups())
 
1938
 
 
1939
    def test_valid_references(self):
 
1940
        self.assertSingleGroup('{a}')
 
1941
        self.assertSingleGroup('{{a}}')
 
1942
 
 
1943
 
 
1944
class TestOption(tests.TestCase):
 
1945
 
 
1946
    def test_default_value(self):
 
1947
        opt = config.Option('foo', default='bar')
 
1948
        self.assertEqual('bar', opt.get_default())
 
1949
 
 
1950
    def test_callable_default_value(self):
 
1951
        def bar_as_unicode():
 
1952
            return u'bar'
 
1953
        opt = config.Option('foo', default=bar_as_unicode)
 
1954
        self.assertEqual('bar', opt.get_default())
 
1955
 
 
1956
    def test_default_value_from_env(self):
 
1957
        opt = config.Option('foo', default='bar', default_from_env=['FOO'])
 
1958
        self.overrideEnv('FOO', 'quux')
 
1959
        # Env variable provides a default taking over the option one
 
1960
        self.assertEqual('quux', opt.get_default())
 
1961
 
 
1962
    def test_first_default_value_from_env_wins(self):
 
1963
        opt = config.Option('foo', default='bar',
 
1964
                            default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
 
1965
        self.overrideEnv('FOO', 'foo')
 
1966
        self.overrideEnv('BAZ', 'baz')
 
1967
        # The first env var set wins
 
1968
        self.assertEqual('foo', opt.get_default())
 
1969
 
 
1970
    def test_not_supported_list_default_value(self):
 
1971
        self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
 
1972
 
 
1973
    def test_not_supported_object_default_value(self):
 
1974
        self.assertRaises(AssertionError, config.Option, 'foo',
 
1975
                          default=object())
 
1976
 
 
1977
    def test_not_supported_callable_default_value_not_unicode(self):
 
1978
        def bar_not_unicode():
 
1979
            return b'bar'
 
1980
        opt = config.Option('foo', default=bar_not_unicode)
 
1981
        self.assertRaises(AssertionError, opt.get_default)
 
1982
 
 
1983
    def test_get_help_topic(self):
 
1984
        opt = config.Option('foo')
 
1985
        self.assertEqual('foo', opt.get_help_topic())
 
1986
 
 
1987
 
 
1988
class TestOptionConverter(tests.TestCase):
 
1989
 
 
1990
    def assertConverted(self, expected, opt, value):
 
1991
        self.assertEqual(expected, opt.convert_from_unicode(None, value))
 
1992
 
 
1993
    def assertCallsWarning(self, opt, value):
 
1994
        warnings = []
 
1995
 
 
1996
        def warning(*args):
 
1997
            warnings.append(args[0] % args[1:])
 
1998
        self.overrideAttr(trace, 'warning', warning)
 
1999
        self.assertEqual(None, opt.convert_from_unicode(None, value))
 
2000
        self.assertLength(1, warnings)
 
2001
        self.assertEqual(
 
2002
            'Value "%s" is not valid for "%s"' % (value, opt.name),
 
2003
            warnings[0])
 
2004
 
 
2005
    def assertCallsError(self, opt, value):
 
2006
        self.assertRaises(config.ConfigOptionValueError,
 
2007
                          opt.convert_from_unicode, None, value)
 
2008
 
 
2009
    def assertConvertInvalid(self, opt, invalid_value):
 
2010
        opt.invalid = None
 
2011
        self.assertEqual(None, opt.convert_from_unicode(None, invalid_value))
 
2012
        opt.invalid = 'warning'
 
2013
        self.assertCallsWarning(opt, invalid_value)
 
2014
        opt.invalid = 'error'
 
2015
        self.assertCallsError(opt, invalid_value)
 
2016
 
 
2017
 
 
2018
class TestOptionWithBooleanConverter(TestOptionConverter):
 
2019
 
 
2020
    def get_option(self):
 
2021
        return config.Option('foo', help='A boolean.',
 
2022
                             from_unicode=config.bool_from_store)
 
2023
 
 
2024
    def test_convert_invalid(self):
 
2025
        opt = self.get_option()
 
2026
        # A string that is not recognized as a boolean
 
2027
        self.assertConvertInvalid(opt, u'invalid-boolean')
 
2028
        # A list of strings is never recognized as a boolean
 
2029
        self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
 
2030
 
 
2031
    def test_convert_valid(self):
 
2032
        opt = self.get_option()
 
2033
        self.assertConverted(True, opt, u'True')
 
2034
        self.assertConverted(True, opt, u'1')
 
2035
        self.assertConverted(False, opt, u'False')
 
2036
 
 
2037
 
 
2038
class TestOptionWithIntegerConverter(TestOptionConverter):
 
2039
 
 
2040
    def get_option(self):
 
2041
        return config.Option('foo', help='An integer.',
 
2042
                             from_unicode=config.int_from_store)
 
2043
 
 
2044
    def test_convert_invalid(self):
 
2045
        opt = self.get_option()
 
2046
        # A string that is not recognized as an integer
 
2047
        self.assertConvertInvalid(opt, u'forty-two')
 
2048
        # A list of strings is never recognized as an integer
 
2049
        self.assertConvertInvalid(opt, [u'a', u'list'])
 
2050
 
 
2051
    def test_convert_valid(self):
 
2052
        opt = self.get_option()
 
2053
        self.assertConverted(16, opt, u'16')
 
2054
 
 
2055
 
 
2056
class TestOptionWithSIUnitConverter(TestOptionConverter):
 
2057
 
 
2058
    def get_option(self):
 
2059
        return config.Option('foo', help='An integer in SI units.',
 
2060
                             from_unicode=config.int_SI_from_store)
 
2061
 
 
2062
    def test_convert_invalid(self):
 
2063
        opt = self.get_option()
 
2064
        self.assertConvertInvalid(opt, u'not-a-unit')
 
2065
        self.assertConvertInvalid(opt, u'Gb')  # Forgot the value
 
2066
        self.assertConvertInvalid(opt, u'1b')  # Forgot the unit
 
2067
        self.assertConvertInvalid(opt, u'1GG')
 
2068
        self.assertConvertInvalid(opt, u'1Mbb')
 
2069
        self.assertConvertInvalid(opt, u'1MM')
 
2070
 
 
2071
    def test_convert_valid(self):
 
2072
        opt = self.get_option()
 
2073
        self.assertConverted(int(5e3), opt, u'5kb')
 
2074
        self.assertConverted(int(5e6), opt, u'5M')
 
2075
        self.assertConverted(int(5e6), opt, u'5MB')
 
2076
        self.assertConverted(int(5e9), opt, u'5g')
 
2077
        self.assertConverted(int(5e9), opt, u'5gB')
 
2078
        self.assertConverted(100, opt, u'100')
 
2079
 
 
2080
 
 
2081
class TestListOption(TestOptionConverter):
 
2082
 
 
2083
    def get_option(self):
 
2084
        return config.ListOption('foo', help='A list.')
 
2085
 
 
2086
    def test_convert_invalid(self):
 
2087
        opt = self.get_option()
 
2088
        # We don't even try to convert a list into a list, we only expect
 
2089
        # strings
 
2090
        self.assertConvertInvalid(opt, [1])
 
2091
        # No string is invalid as all forms can be converted to a list
 
2092
 
 
2093
    def test_convert_valid(self):
 
2094
        opt = self.get_option()
 
2095
        # An empty string is an empty list
 
2096
        self.assertConverted([], opt, '')  # Using a bare str() just in case
 
2097
        self.assertConverted([], opt, u'')
 
2098
        # A boolean
 
2099
        self.assertConverted([u'True'], opt, u'True')
 
2100
        # An integer
 
2101
        self.assertConverted([u'42'], opt, u'42')
 
2102
        # A single string
 
2103
        self.assertConverted([u'bar'], opt, u'bar')
 
2104
 
 
2105
 
 
2106
class TestRegistryOption(TestOptionConverter):
 
2107
 
 
2108
    def get_option(self, registry):
 
2109
        return config.RegistryOption('foo', registry,
 
2110
                                     help='A registry option.')
 
2111
 
 
2112
    def test_convert_invalid(self):
 
2113
        registry = _mod_registry.Registry()
 
2114
        opt = self.get_option(registry)
 
2115
        self.assertConvertInvalid(opt, [1])
 
2116
        self.assertConvertInvalid(opt, u"notregistered")
 
2117
 
 
2118
    def test_convert_valid(self):
 
2119
        registry = _mod_registry.Registry()
 
2120
        registry.register("someval", 1234)
 
2121
        opt = self.get_option(registry)
 
2122
        # Using a bare str() just in case
 
2123
        self.assertConverted(1234, opt, "someval")
 
2124
        self.assertConverted(1234, opt, u'someval')
 
2125
        self.assertConverted(None, opt, None)
 
2126
 
 
2127
    def test_help(self):
 
2128
        registry = _mod_registry.Registry()
 
2129
        registry.register("someval", 1234, help="some option")
 
2130
        registry.register("dunno", 1234, help="some other option")
 
2131
        opt = self.get_option(registry)
 
2132
        self.assertEqual(
 
2133
            'A registry option.\n'
 
2134
            '\n'
 
2135
            'The following values are supported:\n'
 
2136
            ' dunno - some other option\n'
 
2137
            ' someval - some option\n',
 
2138
            opt.help)
 
2139
 
 
2140
    def test_get_help_text(self):
 
2141
        registry = _mod_registry.Registry()
 
2142
        registry.register("someval", 1234, help="some option")
 
2143
        registry.register("dunno", 1234, help="some other option")
 
2144
        opt = self.get_option(registry)
 
2145
        self.assertEqual(
 
2146
            'A registry option.\n'
 
2147
            '\n'
 
2148
            'The following values are supported:\n'
 
2149
            ' dunno - some other option\n'
 
2150
            ' someval - some option\n',
 
2151
            opt.get_help_text())
 
2152
 
 
2153
 
 
2154
class TestOptionRegistry(tests.TestCase):
 
2155
 
 
2156
    def setUp(self):
 
2157
        super(TestOptionRegistry, self).setUp()
 
2158
        # Always start with an empty registry
 
2159
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
2160
        self.registry = config.option_registry
 
2161
 
 
2162
    def test_register(self):
 
2163
        opt = config.Option('foo')
 
2164
        self.registry.register(opt)
 
2165
        self.assertIs(opt, self.registry.get('foo'))
 
2166
 
 
2167
    def test_registered_help(self):
 
2168
        opt = config.Option('foo', help='A simple option')
 
2169
        self.registry.register(opt)
 
2170
        self.assertEqual('A simple option', self.registry.get_help('foo'))
 
2171
 
 
2172
    def test_dont_register_illegal_name(self):
 
2173
        self.assertRaises(config.IllegalOptionName,
 
2174
                          self.registry.register, config.Option(' foo'))
 
2175
        self.assertRaises(config.IllegalOptionName,
 
2176
                          self.registry.register, config.Option('bar,'))
 
2177
 
 
2178
    lazy_option = config.Option('lazy_foo', help='Lazy help')
 
2179
 
 
2180
    def test_register_lazy(self):
 
2181
        self.registry.register_lazy('lazy_foo', self.__module__,
 
2182
                                    'TestOptionRegistry.lazy_option')
 
2183
        self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
 
2184
 
 
2185
    def test_registered_lazy_help(self):
 
2186
        self.registry.register_lazy('lazy_foo', self.__module__,
 
2187
                                    'TestOptionRegistry.lazy_option')
 
2188
        self.assertEqual('Lazy help', self.registry.get_help('lazy_foo'))
 
2189
 
 
2190
    def test_dont_lazy_register_illegal_name(self):
 
2191
        # This is where the root cause of http://pad.lv/1235099 is better
 
2192
        # understood: 'register_lazy' doc string mentions that key should match
 
2193
        # the option name which indirectly requires that the option name is a
 
2194
        # valid python identifier. We violate that rule here (using a key that
 
2195
        # doesn't match the option name) to test the option name checking.
 
2196
        self.assertRaises(config.IllegalOptionName,
 
2197
                          self.registry.register_lazy, ' foo', self.__module__,
 
2198
                          'TestOptionRegistry.lazy_option')
 
2199
        self.assertRaises(config.IllegalOptionName,
 
2200
                          self.registry.register_lazy, '1,2', self.__module__,
 
2201
                          'TestOptionRegistry.lazy_option')
 
2202
 
 
2203
 
 
2204
class TestRegisteredOptions(tests.TestCase):
 
2205
    """All registered options should verify some constraints."""
 
2206
 
 
2207
    scenarios = [(key, {'option_name': key, 'option': option}) for key, option
 
2208
                 in config.option_registry.iteritems()]
 
2209
 
 
2210
    def setUp(self):
 
2211
        super(TestRegisteredOptions, self).setUp()
 
2212
        self.registry = config.option_registry
 
2213
 
 
2214
    def test_proper_name(self):
 
2215
        # An option should be registered under its own name, this can't be
 
2216
        # checked at registration time for the lazy ones.
 
2217
        self.assertEqual(self.option_name, self.option.name)
 
2218
 
 
2219
    def test_help_is_set(self):
 
2220
        option_help = self.registry.get_help(self.option_name)
 
2221
        # Come on, think about the user, he really wants to know what the
 
2222
        # option is about
 
2223
        self.assertIsNot(None, option_help)
 
2224
        self.assertNotEqual('', option_help)
 
2225
 
 
2226
 
 
2227
class TestSection(tests.TestCase):
 
2228
 
 
2229
    # FIXME: Parametrize so that all sections produced by Stores run these
 
2230
    # tests -- vila 2011-04-01
 
2231
 
 
2232
    def test_get_a_value(self):
 
2233
        a_dict = dict(foo='bar')
 
2234
        section = config.Section('myID', a_dict)
 
2235
        self.assertEqual('bar', section.get('foo'))
 
2236
 
 
2237
    def test_get_unknown_option(self):
 
2238
        a_dict = dict()
 
2239
        section = config.Section(None, a_dict)
 
2240
        self.assertEqual('out of thin air',
 
2241
                         section.get('foo', 'out of thin air'))
 
2242
 
 
2243
    def test_options_is_shared(self):
 
2244
        a_dict = dict()
 
2245
        section = config.Section(None, a_dict)
 
2246
        self.assertIs(a_dict, section.options)
 
2247
 
 
2248
 
 
2249
class TestMutableSection(tests.TestCase):
 
2250
 
 
2251
    scenarios = [('mutable',
 
2252
                  {'get_section':
 
2253
                   lambda opts: config.MutableSection('myID', opts)},),
 
2254
                 ]
 
2255
 
 
2256
    def test_set(self):
 
2257
        a_dict = dict(foo='bar')
 
2258
        section = self.get_section(a_dict)
 
2259
        section.set('foo', 'new_value')
 
2260
        self.assertEqual('new_value', section.get('foo'))
 
2261
        # The change appears in the shared section
 
2262
        self.assertEqual('new_value', a_dict.get('foo'))
 
2263
        # We keep track of the change
 
2264
        self.assertTrue('foo' in section.orig)
 
2265
        self.assertEqual('bar', section.orig.get('foo'))
 
2266
 
 
2267
    def test_set_preserve_original_once(self):
 
2268
        a_dict = dict(foo='bar')
 
2269
        section = self.get_section(a_dict)
 
2270
        section.set('foo', 'first_value')
 
2271
        section.set('foo', 'second_value')
 
2272
        # We keep track of the original value
 
2273
        self.assertTrue('foo' in section.orig)
 
2274
        self.assertEqual('bar', section.orig.get('foo'))
 
2275
 
 
2276
    def test_remove(self):
 
2277
        a_dict = dict(foo='bar')
 
2278
        section = self.get_section(a_dict)
 
2279
        section.remove('foo')
 
2280
        # We get None for unknown options via the default value
 
2281
        self.assertEqual(None, section.get('foo'))
 
2282
        # Or we just get the default value
 
2283
        self.assertEqual('unknown', section.get('foo', 'unknown'))
 
2284
        self.assertFalse('foo' in section.options)
 
2285
        # We keep track of the deletion
 
2286
        self.assertTrue('foo' in section.orig)
 
2287
        self.assertEqual('bar', section.orig.get('foo'))
 
2288
 
 
2289
    def test_remove_new_option(self):
 
2290
        a_dict = dict()
 
2291
        section = self.get_section(a_dict)
 
2292
        section.set('foo', 'bar')
 
2293
        section.remove('foo')
 
2294
        self.assertFalse('foo' in section.options)
 
2295
        # The option didn't exist initially so it we need to keep track of it
 
2296
        # with a special value
 
2297
        self.assertTrue('foo' in section.orig)
 
2298
        self.assertEqual(config._NewlyCreatedOption, section.orig['foo'])
 
2299
 
 
2300
 
 
2301
class TestCommandLineStore(tests.TestCase):
 
2302
 
 
2303
    def setUp(self):
 
2304
        super(TestCommandLineStore, self).setUp()
 
2305
        self.store = config.CommandLineStore()
 
2306
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
2307
 
 
2308
    def get_section(self):
 
2309
        """Get the unique section for the command line overrides."""
 
2310
        sections = list(self.store.get_sections())
 
2311
        self.assertLength(1, sections)
 
2312
        store, section = sections[0]
 
2313
        self.assertEqual(self.store, store)
 
2314
        return section
 
2315
 
 
2316
    def test_no_override(self):
 
2317
        self.store._from_cmdline([])
 
2318
        section = self.get_section()
 
2319
        self.assertLength(0, list(section.iter_option_names()))
 
2320
 
 
2321
    def test_simple_override(self):
 
2322
        self.store._from_cmdline(['a=b'])
 
2323
        section = self.get_section()
 
2324
        self.assertEqual('b', section.get('a'))
 
2325
 
 
2326
    def test_list_override(self):
 
2327
        opt = config.ListOption('l')
 
2328
        config.option_registry.register(opt)
 
2329
        self.store._from_cmdline(['l=1,2,3'])
 
2330
        val = self.get_section().get('l')
 
2331
        self.assertEqual('1,2,3', val)
 
2332
        # Reminder: lists should be registered as such explicitely, otherwise
 
2333
        # the conversion needs to be done afterwards.
 
2334
        self.assertEqual(['1', '2', '3'],
 
2335
                         opt.convert_from_unicode(self.store, val))
 
2336
 
 
2337
    def test_multiple_overrides(self):
 
2338
        self.store._from_cmdline(['a=b', 'x=y'])
 
2339
        section = self.get_section()
 
2340
        self.assertEqual('b', section.get('a'))
 
2341
        self.assertEqual('y', section.get('x'))
 
2342
 
 
2343
    def test_wrong_syntax(self):
 
2344
        self.assertRaises(errors.BzrCommandError,
 
2345
                          self.store._from_cmdline, ['a=b', 'c'])
 
2346
 
 
2347
 
 
2348
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
 
2349
 
 
2350
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2351
                 in config.test_store_builder_registry.iteritems()] + [
 
2352
        ('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
 
2353
 
 
2354
    def test_id(self):
 
2355
        store = self.get_store(self)
 
2356
        if isinstance(store, config.TransportIniFileStore):
 
2357
            raise tests.TestNotApplicable(
 
2358
                "%s is not a concrete Store implementation"
 
2359
                " so it doesn't need an id" % (store.__class__.__name__,))
 
2360
        self.assertIsNot(None, store.id)
 
2361
 
 
2362
 
 
2363
class TestStore(tests.TestCaseWithTransport):
 
2364
 
 
2365
    def assertSectionContent(self, expected, store_and_section):
 
2366
        """Assert that some options have the proper values in a section."""
 
2367
        _, section = store_and_section
 
2368
        expected_name, expected_options = expected
 
2369
        self.assertEqual(expected_name, section.id)
 
2370
        self.assertEqual(
 
2371
            expected_options,
 
2372
            dict([(k, section.get(k)) for k in expected_options.keys()]))
 
2373
 
 
2374
 
 
2375
class TestReadonlyStore(TestStore):
 
2376
 
 
2377
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2378
                 in config.test_store_builder_registry.iteritems()]
 
2379
 
 
2380
    def test_building_delays_load(self):
 
2381
        store = self.get_store(self)
 
2382
        self.assertEqual(False, store.is_loaded())
 
2383
        store._load_from_string(b'')
 
2384
        self.assertEqual(True, store.is_loaded())
 
2385
 
 
2386
    def test_get_no_sections_for_empty(self):
 
2387
        store = self.get_store(self)
 
2388
        store._load_from_string(b'')
 
2389
        self.assertEqual([], list(store.get_sections()))
 
2390
 
 
2391
    def test_get_default_section(self):
 
2392
        store = self.get_store(self)
 
2393
        store._load_from_string(b'foo=bar')
 
2394
        sections = list(store.get_sections())
 
2395
        self.assertLength(1, sections)
 
2396
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2397
 
 
2398
    def test_get_named_section(self):
 
2399
        store = self.get_store(self)
 
2400
        store._load_from_string(b'[baz]\nfoo=bar')
 
2401
        sections = list(store.get_sections())
 
2402
        self.assertLength(1, sections)
 
2403
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
 
2404
 
 
2405
    def test_load_from_string_fails_for_non_empty_store(self):
 
2406
        store = self.get_store(self)
 
2407
        store._load_from_string(b'foo=bar')
 
2408
        self.assertRaises(AssertionError, store._load_from_string, b'bar=baz')
 
2409
 
 
2410
 
 
2411
class TestStoreQuoting(TestStore):
 
2412
 
 
2413
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2414
                 in config.test_store_builder_registry.iteritems()]
 
2415
 
 
2416
    def setUp(self):
 
2417
        super(TestStoreQuoting, self).setUp()
 
2418
        self.store = self.get_store(self)
 
2419
        # We need a loaded store but any content will do
 
2420
        self.store._load_from_string(b'')
 
2421
 
 
2422
    def assertIdempotent(self, s):
 
2423
        """Assert that quoting an unquoted string is a no-op and vice-versa.
 
2424
 
 
2425
        What matters here is that option values, as they appear in a store, can
 
2426
        be safely round-tripped out of the store and back.
 
2427
 
 
2428
        :param s: A string, quoted if required.
 
2429
        """
 
2430
        self.assertEqual(s, self.store.quote(self.store.unquote(s)))
 
2431
        self.assertEqual(s, self.store.unquote(self.store.quote(s)))
 
2432
 
 
2433
    def test_empty_string(self):
 
2434
        if isinstance(self.store, config.IniFileStore):
 
2435
            # configobj._quote doesn't handle empty values
 
2436
            self.assertRaises(AssertionError,
 
2437
                              self.assertIdempotent, '')
 
2438
        else:
 
2439
            self.assertIdempotent('')
 
2440
        # But quoted empty strings are ok
 
2441
        self.assertIdempotent('""')
 
2442
 
 
2443
    def test_embedded_spaces(self):
 
2444
        self.assertIdempotent('" a b c "')
 
2445
 
 
2446
    def test_embedded_commas(self):
 
2447
        self.assertIdempotent('" a , b c "')
 
2448
 
 
2449
    def test_simple_comma(self):
 
2450
        if isinstance(self.store, config.IniFileStore):
 
2451
            # configobj requires that lists are special-cased
 
2452
            self.assertRaises(AssertionError,
 
2453
                              self.assertIdempotent, ',')
 
2454
        else:
 
2455
            self.assertIdempotent(',')
 
2456
        # When a single comma is required, quoting is also required
 
2457
        self.assertIdempotent('","')
 
2458
 
 
2459
    def test_list(self):
 
2460
        if isinstance(self.store, config.IniFileStore):
 
2461
            # configobj requires that lists are special-cased
 
2462
            self.assertRaises(AssertionError,
 
2463
                              self.assertIdempotent, 'a,b')
 
2464
        else:
 
2465
            self.assertIdempotent('a,b')
 
2466
 
 
2467
 
 
2468
class TestDictFromStore(tests.TestCase):
 
2469
 
 
2470
    def test_unquote_not_string(self):
 
2471
        conf = config.MemoryStack(b'x=2\n[a_section]\na=1\n')
 
2472
        value = conf.get('a_section')
 
2473
        # Urgh, despite 'conf' asking for the no-name section, we get the
 
2474
        # content of another section as a dict o_O
 
2475
        self.assertEqual({'a': '1'}, value)
 
2476
        unquoted = conf.store.unquote(value)
 
2477
        # Which cannot be unquoted but shouldn't crash either (the use cases
 
2478
        # are getting the value or displaying it. In the later case, '%s' will
 
2479
        # do).
 
2480
        self.assertEqual({'a': '1'}, unquoted)
 
2481
        self.assertIn('%s' % (unquoted,), ("{u'a': u'1'}", "{'a': '1'}"))
 
2482
 
 
2483
 
 
2484
class TestIniFileStoreContent(tests.TestCaseWithTransport):
 
2485
    """Simulate loading a config store with content of various encodings.
 
2486
 
 
2487
    All files produced by bzr are in utf8 content.
 
2488
 
 
2489
    Users may modify them manually and end up with a file that can't be
 
2490
    loaded. We need to issue proper error messages in this case.
 
2491
    """
 
2492
 
 
2493
    invalid_utf8_char = b'\xff'
 
2494
 
 
2495
    def test_load_utf8(self):
 
2496
        """Ensure we can load an utf8-encoded file."""
 
2497
        t = self.get_transport()
 
2498
        # From http://pad.lv/799212
 
2499
        unicode_user = u'b\N{Euro Sign}ar'
 
2500
        unicode_content = u'user=%s' % (unicode_user,)
 
2501
        utf8_content = unicode_content.encode('utf8')
 
2502
        # Store the raw content in the config file
 
2503
        t.put_bytes('foo.conf', utf8_content)
 
2504
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2505
        store.load()
 
2506
        stack = config.Stack([store.get_sections], store)
 
2507
        self.assertEqual(unicode_user, stack.get('user'))
 
2508
 
 
2509
    def test_load_non_ascii(self):
 
2510
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2511
        t = self.get_transport()
 
2512
        t.put_bytes('foo.conf', b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2513
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2514
        self.assertRaises(config.ConfigContentError, store.load)
 
2515
 
 
2516
    def test_load_erroneous_content(self):
 
2517
        """Ensure we display a proper error on content that can't be parsed."""
 
2518
        t = self.get_transport()
 
2519
        t.put_bytes('foo.conf', b'[open_section\n')
 
2520
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2521
        self.assertRaises(config.ParseConfigError, store.load)
 
2522
 
 
2523
    def test_load_permission_denied(self):
 
2524
        """Ensure we get warned when trying to load an inaccessible file."""
 
2525
        warnings = []
 
2526
 
 
2527
        def warning(*args):
 
2528
            warnings.append(args[0] % args[1:])
 
2529
        self.overrideAttr(trace, 'warning', warning)
 
2530
 
 
2531
        t = self.get_transport()
 
2532
 
 
2533
        def get_bytes(relpath):
 
2534
            raise errors.PermissionDenied(relpath, "")
 
2535
        t.get_bytes = get_bytes
 
2536
        store = config.TransportIniFileStore(t, 'foo.conf')
 
2537
        self.assertRaises(errors.PermissionDenied, store.load)
 
2538
        self.assertEqual(
 
2539
            warnings,
 
2540
            [u'Permission denied while trying to load configuration store %s.'
 
2541
             % store.external_url()])
 
2542
 
 
2543
 
 
2544
class TestIniConfigContent(tests.TestCaseWithTransport):
 
2545
    """Simulate loading a IniBasedConfig with content of various encodings.
 
2546
 
 
2547
    All files produced by bzr are in utf8 content.
 
2548
 
 
2549
    Users may modify them manually and end up with a file that can't be
 
2550
    loaded. We need to issue proper error messages in this case.
 
2551
    """
 
2552
 
 
2553
    invalid_utf8_char = b'\xff'
 
2554
 
 
2555
    def test_load_utf8(self):
 
2556
        """Ensure we can load an utf8-encoded file."""
 
2557
        # From http://pad.lv/799212
 
2558
        unicode_user = u'b\N{Euro Sign}ar'
 
2559
        unicode_content = u'user=%s' % (unicode_user,)
 
2560
        utf8_content = unicode_content.encode('utf8')
 
2561
        # Store the raw content in the config file
 
2562
        with open('foo.conf', 'wb') as f:
 
2563
            f.write(utf8_content)
 
2564
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2565
        self.assertEqual(unicode_user, conf.get_user_option('user'))
 
2566
 
 
2567
    def test_load_badly_encoded_content(self):
 
2568
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2569
        with open('foo.conf', 'wb') as f:
 
2570
            f.write(b'user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2571
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2572
        self.assertRaises(config.ConfigContentError, conf._get_parser)
 
2573
 
 
2574
    def test_load_erroneous_content(self):
 
2575
        """Ensure we display a proper error on content that can't be parsed."""
 
2576
        with open('foo.conf', 'wb') as f:
 
2577
            f.write(b'[open_section\n')
 
2578
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2579
        self.assertRaises(config.ParseConfigError, conf._get_parser)
 
2580
 
 
2581
 
 
2582
class TestMutableStore(TestStore):
 
2583
 
 
2584
    scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
 
2585
                 in config.test_store_builder_registry.iteritems()]
 
2586
 
 
2587
    def setUp(self):
 
2588
        super(TestMutableStore, self).setUp()
 
2589
        self.transport = self.get_transport()
 
2590
 
 
2591
    def has_store(self, store):
 
2592
        store_basename = urlutils.relative_url(self.transport.external_url(),
 
2593
                                               store.external_url())
 
2594
        return self.transport.has(store_basename)
 
2595
 
 
2596
    def test_save_empty_creates_no_file(self):
 
2597
        # FIXME: There should be a better way than relying on the test
 
2598
        # parametrization to identify branch.conf -- vila 2011-0526
 
2599
        if self.store_id in ('branch', 'remote_branch'):
 
2600
            raise tests.TestNotApplicable(
 
2601
                'branch.conf is *always* created when a branch is initialized')
 
2602
        store = self.get_store(self)
 
2603
        store.save()
 
2604
        self.assertEqual(False, self.has_store(store))
 
2605
 
 
2606
    def test_mutable_section_shared(self):
 
2607
        store = self.get_store(self)
 
2608
        store._load_from_string(b'foo=bar\n')
 
2609
        # FIXME: There should be a better way than relying on the test
 
2610
        # parametrization to identify branch.conf -- vila 2011-0526
 
2611
        if self.store_id in ('branch', 'remote_branch'):
 
2612
            # branch stores requires write locked branches
 
2613
            self.addCleanup(store.branch.lock_write().unlock)
 
2614
        section1 = store.get_mutable_section(None)
 
2615
        section2 = store.get_mutable_section(None)
 
2616
        # If we get different sections, different callers won't share the
 
2617
        # modification
 
2618
        self.assertIs(section1, section2)
 
2619
 
 
2620
    def test_save_emptied_succeeds(self):
 
2621
        store = self.get_store(self)
 
2622
        store._load_from_string(b'foo=bar\n')
 
2623
        # FIXME: There should be a better way than relying on the test
 
2624
        # parametrization to identify branch.conf -- vila 2011-0526
 
2625
        if self.store_id in ('branch', 'remote_branch'):
 
2626
            # branch stores requires write locked branches
 
2627
            self.addCleanup(store.branch.lock_write().unlock)
 
2628
        section = store.get_mutable_section(None)
 
2629
        section.remove('foo')
 
2630
        store.save()
 
2631
        self.assertEqual(True, self.has_store(store))
 
2632
        modified_store = self.get_store(self)
 
2633
        sections = list(modified_store.get_sections())
 
2634
        self.assertLength(0, sections)
 
2635
 
 
2636
    def test_save_with_content_succeeds(self):
 
2637
        # FIXME: There should be a better way than relying on the test
 
2638
        # parametrization to identify branch.conf -- vila 2011-0526
 
2639
        if self.store_id in ('branch', 'remote_branch'):
 
2640
            raise tests.TestNotApplicable(
 
2641
                'branch.conf is *always* created when a branch is initialized')
 
2642
        store = self.get_store(self)
 
2643
        store._load_from_string(b'foo=bar\n')
 
2644
        self.assertEqual(False, self.has_store(store))
 
2645
        store.save()
 
2646
        self.assertEqual(True, self.has_store(store))
 
2647
        modified_store = self.get_store(self)
 
2648
        sections = list(modified_store.get_sections())
 
2649
        self.assertLength(1, sections)
 
2650
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2651
 
 
2652
    def test_set_option_in_empty_store(self):
 
2653
        store = self.get_store(self)
 
2654
        # FIXME: There should be a better way than relying on the test
 
2655
        # parametrization to identify branch.conf -- vila 2011-0526
 
2656
        if self.store_id in ('branch', 'remote_branch'):
 
2657
            # branch stores requires write locked branches
 
2658
            self.addCleanup(store.branch.lock_write().unlock)
 
2659
        section = store.get_mutable_section(None)
 
2660
        section.set('foo', 'bar')
 
2661
        store.save()
 
2662
        modified_store = self.get_store(self)
 
2663
        sections = list(modified_store.get_sections())
 
2664
        self.assertLength(1, sections)
 
2665
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2666
 
 
2667
    def test_set_option_in_default_section(self):
 
2668
        store = self.get_store(self)
 
2669
        store._load_from_string(b'')
 
2670
        # FIXME: There should be a better way than relying on the test
 
2671
        # parametrization to identify branch.conf -- vila 2011-0526
 
2672
        if self.store_id in ('branch', 'remote_branch'):
 
2673
            # branch stores requires write locked branches
 
2674
            self.addCleanup(store.branch.lock_write().unlock)
 
2675
        section = store.get_mutable_section(None)
 
2676
        section.set('foo', 'bar')
 
2677
        store.save()
 
2678
        modified_store = self.get_store(self)
 
2679
        sections = list(modified_store.get_sections())
 
2680
        self.assertLength(1, sections)
 
2681
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
 
2682
 
 
2683
    def test_set_option_in_named_section(self):
 
2684
        store = self.get_store(self)
 
2685
        store._load_from_string(b'')
 
2686
        # FIXME: There should be a better way than relying on the test
 
2687
        # parametrization to identify branch.conf -- vila 2011-0526
 
2688
        if self.store_id in ('branch', 'remote_branch'):
 
2689
            # branch stores requires write locked branches
 
2690
            self.addCleanup(store.branch.lock_write().unlock)
 
2691
        section = store.get_mutable_section('baz')
 
2692
        section.set('foo', 'bar')
 
2693
        store.save()
 
2694
        modified_store = self.get_store(self)
 
2695
        sections = list(modified_store.get_sections())
 
2696
        self.assertLength(1, sections)
 
2697
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
 
2698
 
 
2699
    def test_load_hook(self):
 
2700
        # First, we need to ensure that the store exists
 
2701
        store = self.get_store(self)
 
2702
        # FIXME: There should be a better way than relying on the test
 
2703
        # parametrization to identify branch.conf -- vila 2011-0526
 
2704
        if self.store_id in ('branch', 'remote_branch'):
 
2705
            # branch stores requires write locked branches
 
2706
            self.addCleanup(store.branch.lock_write().unlock)
 
2707
        section = store.get_mutable_section('baz')
 
2708
        section.set('foo', 'bar')
 
2709
        store.save()
 
2710
        # Now we can try to load it
 
2711
        store = self.get_store(self)
 
2712
        calls = []
 
2713
 
 
2714
        def hook(*args):
 
2715
            calls.append(args)
 
2716
        config.ConfigHooks.install_named_hook('load', hook, None)
 
2717
        self.assertLength(0, calls)
 
2718
        store.load()
 
2719
        self.assertLength(1, calls)
 
2720
        self.assertEqual((store,), calls[0])
 
2721
 
 
2722
    def test_save_hook(self):
 
2723
        calls = []
 
2724
 
 
2725
        def hook(*args):
 
2726
            calls.append(args)
 
2727
        config.ConfigHooks.install_named_hook('save', hook, None)
 
2728
        self.assertLength(0, calls)
 
2729
        store = self.get_store(self)
 
2730
        # FIXME: There should be a better way than relying on the test
 
2731
        # parametrization to identify branch.conf -- vila 2011-0526
 
2732
        if self.store_id in ('branch', 'remote_branch'):
 
2733
            # branch stores requires write locked branches
 
2734
            self.addCleanup(store.branch.lock_write().unlock)
 
2735
        section = store.get_mutable_section('baz')
 
2736
        section.set('foo', 'bar')
 
2737
        store.save()
 
2738
        self.assertLength(1, calls)
 
2739
        self.assertEqual((store,), calls[0])
 
2740
 
 
2741
    def test_set_mark_dirty(self):
 
2742
        stack = config.MemoryStack(b'')
 
2743
        self.assertLength(0, stack.store.dirty_sections)
 
2744
        stack.set('foo', 'baz')
 
2745
        self.assertLength(1, stack.store.dirty_sections)
 
2746
        self.assertTrue(stack.store._need_saving())
 
2747
 
 
2748
    def test_remove_mark_dirty(self):
 
2749
        stack = config.MemoryStack(b'foo=bar')
 
2750
        self.assertLength(0, stack.store.dirty_sections)
 
2751
        stack.remove('foo')
 
2752
        self.assertLength(1, stack.store.dirty_sections)
 
2753
        self.assertTrue(stack.store._need_saving())
 
2754
 
 
2755
 
 
2756
class TestStoreSaveChanges(tests.TestCaseWithTransport):
 
2757
    """Tests that config changes are kept in memory and saved on-demand."""
 
2758
 
 
2759
    def setUp(self):
 
2760
        super(TestStoreSaveChanges, self).setUp()
 
2761
        self.transport = self.get_transport()
 
2762
        # Most of the tests involve two stores pointing to the same persistent
 
2763
        # storage to observe the effects of concurrent changes
 
2764
        self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
 
2765
        self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
 
2766
        self.warnings = []
 
2767
 
 
2768
        def warning(*args):
 
2769
            self.warnings.append(args[0] % args[1:])
 
2770
        self.overrideAttr(trace, 'warning', warning)
 
2771
 
 
2772
    def has_store(self, store):
 
2773
        store_basename = urlutils.relative_url(self.transport.external_url(),
 
2774
                                               store.external_url())
 
2775
        return self.transport.has(store_basename)
 
2776
 
 
2777
    def get_stack(self, store):
 
2778
        # Any stack will do as long as it uses the right store, just a single
 
2779
        # no-name section is enough
 
2780
        return config.Stack([store.get_sections], store)
 
2781
 
 
2782
    def test_no_changes_no_save(self):
 
2783
        s = self.get_stack(self.st1)
 
2784
        s.store.save_changes()
 
2785
        self.assertEqual(False, self.has_store(self.st1))
 
2786
 
 
2787
    def test_unrelated_concurrent_update(self):
 
2788
        s1 = self.get_stack(self.st1)
 
2789
        s2 = self.get_stack(self.st2)
 
2790
        s1.set('foo', 'bar')
 
2791
        s2.set('baz', 'quux')
 
2792
        s1.store.save()
 
2793
        # Changes don't propagate magically
 
2794
        self.assertEqual(None, s1.get('baz'))
 
2795
        s2.store.save_changes()
 
2796
        self.assertEqual('quux', s2.get('baz'))
 
2797
        # Changes are acquired when saving
 
2798
        self.assertEqual('bar', s2.get('foo'))
 
2799
        # Since there is no overlap, no warnings are emitted
 
2800
        self.assertLength(0, self.warnings)
 
2801
 
 
2802
    def test_concurrent_update_modified(self):
 
2803
        s1 = self.get_stack(self.st1)
 
2804
        s2 = self.get_stack(self.st2)
 
2805
        s1.set('foo', 'bar')
 
2806
        s2.set('foo', 'baz')
 
2807
        s1.store.save()
 
2808
        # Last speaker wins
 
2809
        s2.store.save_changes()
 
2810
        self.assertEqual('baz', s2.get('foo'))
 
2811
        # But the user get a warning
 
2812
        self.assertLength(1, self.warnings)
 
2813
        warning = self.warnings[0]
 
2814
        self.assertStartsWith(warning, 'Option foo in section None')
 
2815
        self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
 
2816
                            ' The baz value will be saved.')
 
2817
 
 
2818
    def test_concurrent_deletion(self):
 
2819
        self.st1._load_from_string(b'foo=bar')
 
2820
        self.st1.save()
 
2821
        s1 = self.get_stack(self.st1)
 
2822
        s2 = self.get_stack(self.st2)
 
2823
        s1.remove('foo')
 
2824
        s2.remove('foo')
 
2825
        s1.store.save_changes()
 
2826
        # No warning yet
 
2827
        self.assertLength(0, self.warnings)
 
2828
        s2.store.save_changes()
 
2829
        # Now we get one
 
2830
        self.assertLength(1, self.warnings)
 
2831
        warning = self.warnings[0]
 
2832
        self.assertStartsWith(warning, 'Option foo in section None')
 
2833
        self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
 
2834
                            ' The <DELETED> value will be saved.')
 
2835
 
 
2836
 
 
2837
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
 
2838
 
 
2839
    def get_store(self):
 
2840
        return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
2841
 
 
2842
    def test_get_quoted_string(self):
 
2843
        store = self.get_store()
 
2844
        store._load_from_string(b'foo= " abc "')
 
2845
        stack = config.Stack([store.get_sections])
 
2846
        self.assertEqual(' abc ', stack.get('foo'))
 
2847
 
 
2848
    def test_set_quoted_string(self):
 
2849
        store = self.get_store()
 
2850
        stack = config.Stack([store.get_sections], store)
 
2851
        stack.set('foo', ' a b c ')
 
2852
        store.save()
 
2853
        self.assertFileEqual(b'foo = " a b c "' +
 
2854
                             os.linesep.encode('ascii'), 'foo.conf')
 
2855
 
 
2856
 
 
2857
class TestTransportIniFileStore(TestStore):
 
2858
 
 
2859
    def test_loading_unknown_file_fails(self):
 
2860
        store = config.TransportIniFileStore(self.get_transport(),
 
2861
                                             'I-do-not-exist')
 
2862
        self.assertRaises(errors.NoSuchFile, store.load)
 
2863
 
 
2864
    def test_invalid_content(self):
 
2865
        store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
2866
        self.assertEqual(False, store.is_loaded())
 
2867
        exc = self.assertRaises(
 
2868
            config.ParseConfigError, store._load_from_string,
 
2869
            b'this is invalid !')
 
2870
        self.assertEndsWith(exc.filename, 'foo.conf')
 
2871
        # And the load failed
 
2872
        self.assertEqual(False, store.is_loaded())
 
2873
 
 
2874
    def test_get_embedded_sections(self):
 
2875
        # A more complicated example (which also shows that section names and
 
2876
        # option names share the same name space...)
 
2877
        # FIXME: This should be fixed by forbidding dicts as values ?
 
2878
        # -- vila 2011-04-05
 
2879
        store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
2880
        store._load_from_string(b'''
 
2881
foo=bar
 
2882
l=1,2
 
2883
[DEFAULT]
 
2884
foo_in_DEFAULT=foo_DEFAULT
 
2885
[bar]
 
2886
foo_in_bar=barbar
 
2887
[baz]
 
2888
foo_in_baz=barbaz
 
2889
[[qux]]
 
2890
foo_in_qux=quux
 
2891
''')
 
2892
        sections = list(store.get_sections())
 
2893
        self.assertLength(4, sections)
 
2894
        # The default section has no name.
 
2895
        # List values are provided as strings and need to be explicitly
 
2896
        # converted by specifying from_unicode=list_from_store at option
 
2897
        # registration
 
2898
        self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
 
2899
                                  sections[0])
 
2900
        self.assertSectionContent(
 
2901
            ('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
 
2902
        self.assertSectionContent(
 
2903
            ('bar', {'foo_in_bar': 'barbar'}), sections[2])
 
2904
        # sub sections are provided as embedded dicts.
 
2905
        self.assertSectionContent(
 
2906
            ('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
 
2907
            sections[3])
 
2908
 
 
2909
 
 
2910
class TestLockableIniFileStore(TestStore):
 
2911
 
 
2912
    def test_create_store_in_created_dir(self):
 
2913
        self.assertPathDoesNotExist('dir')
 
2914
        t = self.get_transport('dir/subdir')
 
2915
        store = config.LockableIniFileStore(t, 'foo.conf')
 
2916
        store.get_mutable_section(None).set('foo', 'bar')
 
2917
        store.save()
 
2918
        self.assertPathExists('dir/subdir')
 
2919
 
 
2920
 
 
2921
class TestConcurrentStoreUpdates(TestStore):
 
2922
    """Test that Stores properly handle conccurent updates.
 
2923
 
 
2924
    New Store implementation may fail some of these tests but until such
 
2925
    implementations exist it's hard to properly filter them from the scenarios
 
2926
    applied here. If you encounter such a case, contact the bzr devs.
 
2927
    """
 
2928
 
 
2929
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
2930
                 in config.test_stack_builder_registry.iteritems()]
 
2931
 
 
2932
    def setUp(self):
 
2933
        super(TestConcurrentStoreUpdates, self).setUp()
 
2934
        self.stack = self.get_stack(self)
 
2935
        if not isinstance(self.stack, config._CompatibleStack):
 
2936
            raise tests.TestNotApplicable(
 
2937
                '%s is not meant to be compatible with the old config design'
 
2938
                % (self.stack,))
 
2939
        self.stack.set('one', '1')
 
2940
        self.stack.set('two', '2')
 
2941
        # Flush the store
 
2942
        self.stack.store.save()
 
2943
 
 
2944
    def test_simple_read_access(self):
 
2945
        self.assertEqual('1', self.stack.get('one'))
 
2946
 
 
2947
    def test_simple_write_access(self):
 
2948
        self.stack.set('one', 'one')
 
2949
        self.assertEqual('one', self.stack.get('one'))
 
2950
 
 
2951
    def test_listen_to_the_last_speaker(self):
 
2952
        c1 = self.stack
 
2953
        c2 = self.get_stack(self)
 
2954
        c1.set('one', 'ONE')
 
2955
        c2.set('two', 'TWO')
 
2956
        self.assertEqual('ONE', c1.get('one'))
 
2957
        self.assertEqual('TWO', c2.get('two'))
 
2958
        # The second update respect the first one
 
2959
        self.assertEqual('ONE', c2.get('one'))
 
2960
 
 
2961
    def test_last_speaker_wins(self):
 
2962
        # If the same config is not shared, the same variable modified twice
 
2963
        # can only see a single result.
 
2964
        c1 = self.stack
 
2965
        c2 = self.get_stack(self)
 
2966
        c1.set('one', 'c1')
 
2967
        c2.set('one', 'c2')
 
2968
        self.assertEqual('c2', c2.get('one'))
 
2969
        # The first modification is still available until another refresh
 
2970
        # occur
 
2971
        self.assertEqual('c1', c1.get('one'))
 
2972
        c1.set('two', 'done')
 
2973
        self.assertEqual('c2', c1.get('one'))
 
2974
 
 
2975
    def test_writes_are_serialized(self):
 
2976
        c1 = self.stack
 
2977
        c2 = self.get_stack(self)
 
2978
 
 
2979
        # We spawn a thread that will pause *during* the config saving.
 
2980
        before_writing = threading.Event()
 
2981
        after_writing = threading.Event()
 
2982
        writing_done = threading.Event()
 
2983
        c1_save_without_locking_orig = c1.store.save_without_locking
 
2984
 
 
2985
        def c1_save_without_locking():
 
2986
            before_writing.set()
 
2987
            c1_save_without_locking_orig()
 
2988
            # The lock is held. We wait for the main thread to decide when to
 
2989
            # continue
 
2990
            after_writing.wait()
 
2991
        c1.store.save_without_locking = c1_save_without_locking
 
2992
 
 
2993
        def c1_set():
 
2994
            c1.set('one', 'c1')
 
2995
            writing_done.set()
 
2996
        t1 = threading.Thread(target=c1_set)
 
2997
        # Collect the thread after the test
 
2998
        self.addCleanup(t1.join)
 
2999
        # Be ready to unblock the thread if the test goes wrong
 
3000
        self.addCleanup(after_writing.set)
 
3001
        t1.start()
 
3002
        before_writing.wait()
 
3003
        self.assertRaises(errors.LockContention,
 
3004
                          c2.set, 'one', 'c2')
 
3005
        self.assertEqual('c1', c1.get('one'))
 
3006
        # Let the lock be released
 
3007
        after_writing.set()
 
3008
        writing_done.wait()
 
3009
        c2.set('one', 'c2')
 
3010
        self.assertEqual('c2', c2.get('one'))
 
3011
 
 
3012
    def test_read_while_writing(self):
 
3013
        c1 = self.stack
 
3014
        # We spawn a thread that will pause *during* the write
 
3015
        ready_to_write = threading.Event()
 
3016
        do_writing = threading.Event()
 
3017
        writing_done = threading.Event()
 
3018
        # We override the _save implementation so we know the store is locked
 
3019
        c1_save_without_locking_orig = c1.store.save_without_locking
 
3020
 
 
3021
        def c1_save_without_locking():
 
3022
            ready_to_write.set()
 
3023
            # The lock is held. We wait for the main thread to decide when to
 
3024
            # continue
 
3025
            do_writing.wait()
 
3026
            c1_save_without_locking_orig()
 
3027
            writing_done.set()
 
3028
        c1.store.save_without_locking = c1_save_without_locking
 
3029
 
 
3030
        def c1_set():
 
3031
            c1.set('one', 'c1')
 
3032
        t1 = threading.Thread(target=c1_set)
 
3033
        # Collect the thread after the test
 
3034
        self.addCleanup(t1.join)
 
3035
        # Be ready to unblock the thread if the test goes wrong
 
3036
        self.addCleanup(do_writing.set)
 
3037
        t1.start()
 
3038
        # Ensure the thread is ready to write
 
3039
        ready_to_write.wait()
 
3040
        self.assertEqual('c1', c1.get('one'))
 
3041
        # If we read during the write, we get the old value
 
3042
        c2 = self.get_stack(self)
 
3043
        self.assertEqual('1', c2.get('one'))
 
3044
        # Let the writing occur and ensure it occurred
 
3045
        do_writing.set()
 
3046
        writing_done.wait()
 
3047
        # Now we get the updated value
 
3048
        c3 = self.get_stack(self)
 
3049
        self.assertEqual('c1', c3.get('one'))
 
3050
 
 
3051
    # FIXME: It may be worth looking into removing the lock dir when it's not
 
3052
    # needed anymore and look at possible fallouts for concurrent lockers. This
 
3053
    # will matter if/when we use config files outside of breezy directories
 
3054
    # (.config/breezy or .bzr) -- vila 20110-04-111
 
3055
 
 
3056
 
 
3057
class TestSectionMatcher(TestStore):
 
3058
 
 
3059
    scenarios = [('location', {'matcher': config.LocationMatcher}),
 
3060
                 ('id', {'matcher': config.NameMatcher}), ]
 
3061
 
 
3062
    def setUp(self):
 
3063
        super(TestSectionMatcher, self).setUp()
 
3064
        # Any simple store is good enough
 
3065
        self.get_store = config.test_store_builder_registry.get('configobj')
 
3066
 
 
3067
    def test_no_matches_for_empty_stores(self):
 
3068
        store = self.get_store(self)
 
3069
        store._load_from_string(b'')
 
3070
        matcher = self.matcher(store, '/bar')
 
3071
        self.assertEqual([], list(matcher.get_sections()))
 
3072
 
 
3073
    def test_build_doesnt_load_store(self):
 
3074
        store = self.get_store(self)
 
3075
        self.matcher(store, '/bar')
 
3076
        self.assertFalse(store.is_loaded())
 
3077
 
 
3078
 
 
3079
class TestLocationSection(tests.TestCase):
 
3080
 
 
3081
    def get_section(self, options, extra_path):
 
3082
        section = config.Section('foo', options)
 
3083
        return config.LocationSection(section, extra_path)
 
3084
 
 
3085
    def test_simple_option(self):
 
3086
        section = self.get_section({'foo': 'bar'}, '')
 
3087
        self.assertEqual('bar', section.get('foo'))
 
3088
 
 
3089
    def test_option_with_extra_path(self):
 
3090
        section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
 
3091
                                   'baz')
 
3092
        self.assertEqual('bar/baz', section.get('foo'))
 
3093
 
 
3094
    def test_invalid_policy(self):
 
3095
        section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
 
3096
                                   'baz')
 
3097
        # invalid policies are ignored
 
3098
        self.assertEqual('bar', section.get('foo'))
 
3099
 
 
3100
 
 
3101
class TestLocationMatcher(TestStore):
 
3102
 
 
3103
    def setUp(self):
 
3104
        super(TestLocationMatcher, self).setUp()
 
3105
        # Any simple store is good enough
 
3106
        self.get_store = config.test_store_builder_registry.get('configobj')
 
3107
 
 
3108
    def test_unrelated_section_excluded(self):
 
3109
        store = self.get_store(self)
 
3110
        store._load_from_string(b'''
 
3111
[/foo]
 
3112
section=/foo
 
3113
[/foo/baz]
 
3114
section=/foo/baz
 
3115
[/foo/bar]
 
3116
section=/foo/bar
 
3117
[/foo/bar/baz]
 
3118
section=/foo/bar/baz
 
3119
[/quux/quux]
 
3120
section=/quux/quux
 
3121
''')
 
3122
        self.assertEqual(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
 
3123
                          '/quux/quux'],
 
3124
                         [section.id for _, section in store.get_sections()])
 
3125
        matcher = config.LocationMatcher(store, '/foo/bar/quux')
 
3126
        sections = [section for _, section in matcher.get_sections()]
 
3127
        self.assertEqual(['/foo/bar', '/foo'],
 
3128
                         [section.id for section in sections])
 
3129
        self.assertEqual(['quux', 'bar/quux'],
 
3130
                         [section.extra_path for section in sections])
 
3131
 
 
3132
    def test_more_specific_sections_first(self):
 
3133
        store = self.get_store(self)
 
3134
        store._load_from_string(b'''
 
3135
[/foo]
 
3136
section=/foo
 
3137
[/foo/bar]
 
3138
section=/foo/bar
 
3139
''')
 
3140
        self.assertEqual(['/foo', '/foo/bar'],
 
3141
                         [section.id for _, section in store.get_sections()])
 
3142
        matcher = config.LocationMatcher(store, '/foo/bar/baz')
 
3143
        sections = [section for _, section in matcher.get_sections()]
 
3144
        self.assertEqual(['/foo/bar', '/foo'],
 
3145
                         [section.id for section in sections])
 
3146
        self.assertEqual(['baz', 'bar/baz'],
 
3147
                         [section.extra_path for section in sections])
 
3148
 
 
3149
    def test_appendpath_in_no_name_section(self):
 
3150
        # It's a bit weird to allow appendpath in a no-name section, but
 
3151
        # someone may found a use for it
 
3152
        store = self.get_store(self)
 
3153
        store._load_from_string(b'''
 
3154
foo=bar
 
3155
foo:policy = appendpath
 
3156
''')
 
3157
        matcher = config.LocationMatcher(store, 'dir/subdir')
 
3158
        sections = list(matcher.get_sections())
 
3159
        self.assertLength(1, sections)
 
3160
        self.assertEqual('bar/dir/subdir', sections[0][1].get('foo'))
 
3161
 
 
3162
    def test_file_urls_are_normalized(self):
 
3163
        store = self.get_store(self)
 
3164
        if sys.platform == 'win32':
 
3165
            expected_url = 'file:///C:/dir/subdir'
 
3166
            expected_location = 'C:/dir/subdir'
 
3167
        else:
 
3168
            expected_url = 'file:///dir/subdir'
 
3169
            expected_location = '/dir/subdir'
 
3170
        matcher = config.LocationMatcher(store, expected_url)
 
3171
        self.assertEqual(expected_location, matcher.location)
 
3172
 
 
3173
    def test_branch_name_colo(self):
 
3174
        store = self.get_store(self)
 
3175
        store._load_from_string(dedent("""\
 
3176
            [/]
 
3177
            push_location=my{branchname}
 
3178
        """).encode('ascii'))
 
3179
        matcher = config.LocationMatcher(store, 'file:///,branch=example%3c')
 
3180
        self.assertEqual('example<', matcher.branch_name)
 
3181
        ((_, section),) = matcher.get_sections()
 
3182
        self.assertEqual('example<', section.locals['branchname'])
 
3183
 
 
3184
    def test_branch_name_basename(self):
 
3185
        store = self.get_store(self)
 
3186
        store._load_from_string(dedent("""\
 
3187
            [/]
 
3188
            push_location=my{branchname}
 
3189
        """).encode('ascii'))
 
3190
        matcher = config.LocationMatcher(store, 'file:///parent/example%3c')
 
3191
        self.assertEqual('example<', matcher.branch_name)
 
3192
        ((_, section),) = matcher.get_sections()
 
3193
        self.assertEqual('example<', section.locals['branchname'])
 
3194
 
 
3195
 
 
3196
class TestStartingPathMatcher(TestStore):
 
3197
 
 
3198
    def setUp(self):
 
3199
        super(TestStartingPathMatcher, self).setUp()
 
3200
        # Any simple store is good enough
 
3201
        self.store = config.IniFileStore()
 
3202
 
 
3203
    def assertSectionIDs(self, expected, location, content):
 
3204
        self.store._load_from_string(content)
 
3205
        matcher = config.StartingPathMatcher(self.store, location)
 
3206
        sections = list(matcher.get_sections())
 
3207
        self.assertLength(len(expected), sections)
 
3208
        self.assertEqual(expected, [section.id for _, section in sections])
 
3209
        return sections
 
3210
 
 
3211
    def test_empty(self):
 
3212
        self.assertSectionIDs([], self.get_url(), b'')
 
3213
 
 
3214
    def test_url_vs_local_paths(self):
 
3215
        # The matcher location is an url and the section names are local paths
 
3216
        self.assertSectionIDs(['/foo/bar', '/foo'],
 
3217
                              'file:///foo/bar/baz', b'''\
 
3218
[/foo]
 
3219
[/foo/bar]
 
3220
''')
 
3221
 
 
3222
    def test_local_path_vs_url(self):
 
3223
        # The matcher location is a local path and the section names are urls
 
3224
        self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
 
3225
                              '/foo/bar/baz', b'''\
 
3226
[file:///foo]
 
3227
[file:///foo/bar]
 
3228
''')
 
3229
 
 
3230
    def test_no_name_section_included_when_present(self):
 
3231
        # Note that other tests will cover the case where the no-name section
 
3232
        # is empty and as such, not included.
 
3233
        sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
 
3234
                                         '/foo/bar/baz', b'''\
 
3235
option = defined so the no-name section exists
 
3236
[/foo]
 
3237
[/foo/bar]
 
3238
''')
 
3239
        self.assertEqual(['baz', 'bar/baz', '/foo/bar/baz'],
 
3240
                         [s.locals['relpath'] for _, s in sections])
 
3241
 
 
3242
    def test_order_reversed(self):
 
3243
        self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
 
3244
[/foo]
 
3245
[/foo/bar]
 
3246
''')
 
3247
 
 
3248
    def test_unrelated_section_excluded(self):
 
3249
        self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', b'''\
 
3250
[/foo]
 
3251
[/foo/qux]
 
3252
[/foo/bar]
 
3253
''')
 
3254
 
 
3255
    def test_glob_included(self):
 
3256
        sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
 
3257
                                         '/foo/bar/baz', b'''\
 
3258
[/foo]
 
3259
[/foo/qux]
 
3260
[/foo/b*]
 
3261
[/foo/*/baz]
 
3262
''')
 
3263
        # Note that 'baz' as a relpath for /foo/b* is not fully correct, but
 
3264
        # nothing really is... as far using {relpath} to append it to something
 
3265
        # else, this seems good enough though.
 
3266
        self.assertEqual(['', 'baz', 'bar/baz'],
 
3267
                         [s.locals['relpath'] for _, s in sections])
 
3268
 
 
3269
    def test_respect_order(self):
 
3270
        self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
 
3271
                              '/foo/bar/baz', b'''\
 
3272
[/foo/*/baz]
 
3273
[/foo/qux]
 
3274
[/foo/b*]
 
3275
[/foo]
 
3276
''')
 
3277
 
 
3278
 
 
3279
class TestNameMatcher(TestStore):
 
3280
 
 
3281
    def setUp(self):
 
3282
        super(TestNameMatcher, self).setUp()
 
3283
        self.matcher = config.NameMatcher
 
3284
        # Any simple store is good enough
 
3285
        self.get_store = config.test_store_builder_registry.get('configobj')
 
3286
 
 
3287
    def get_matching_sections(self, name):
 
3288
        store = self.get_store(self)
 
3289
        store._load_from_string(b'''
 
3290
[foo]
 
3291
option=foo
 
3292
[foo/baz]
 
3293
option=foo/baz
 
3294
[bar]
 
3295
option=bar
 
3296
''')
 
3297
        matcher = self.matcher(store, name)
 
3298
        return list(matcher.get_sections())
 
3299
 
 
3300
    def test_matching(self):
 
3301
        sections = self.get_matching_sections('foo')
 
3302
        self.assertLength(1, sections)
 
3303
        self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
 
3304
 
 
3305
    def test_not_matching(self):
 
3306
        sections = self.get_matching_sections('baz')
 
3307
        self.assertLength(0, sections)
 
3308
 
 
3309
 
 
3310
class TestBaseStackGet(tests.TestCase):
 
3311
 
 
3312
    def setUp(self):
 
3313
        super(TestBaseStackGet, self).setUp()
 
3314
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3315
 
 
3316
    def test_get_first_definition(self):
 
3317
        store1 = config.IniFileStore()
 
3318
        store1._load_from_string(b'foo=bar')
 
3319
        store2 = config.IniFileStore()
 
3320
        store2._load_from_string(b'foo=baz')
 
3321
        conf = config.Stack([store1.get_sections, store2.get_sections])
 
3322
        self.assertEqual('bar', conf.get('foo'))
 
3323
 
 
3324
    def test_get_with_registered_default_value(self):
 
3325
        config.option_registry.register(config.Option('foo', default='bar'))
 
3326
        conf_stack = config.Stack([])
 
3327
        self.assertEqual('bar', conf_stack.get('foo'))
 
3328
 
 
3329
    def test_get_without_registered_default_value(self):
 
3330
        config.option_registry.register(config.Option('foo'))
 
3331
        conf_stack = config.Stack([])
 
3332
        self.assertEqual(None, conf_stack.get('foo'))
 
3333
 
 
3334
    def test_get_without_default_value_for_not_registered(self):
 
3335
        conf_stack = config.Stack([])
 
3336
        self.assertEqual(None, conf_stack.get('foo'))
 
3337
 
 
3338
    def test_get_for_empty_section_callable(self):
 
3339
        conf_stack = config.Stack([lambda: []])
 
3340
        self.assertEqual(None, conf_stack.get('foo'))
 
3341
 
 
3342
    def test_get_for_broken_callable(self):
 
3343
        # Trying to use and invalid callable raises an exception on first use
 
3344
        conf_stack = config.Stack([object])
 
3345
        self.assertRaises(TypeError, conf_stack.get, 'foo')
 
3346
 
 
3347
 
 
3348
class TestStackWithSimpleStore(tests.TestCase):
 
3349
 
 
3350
    def setUp(self):
 
3351
        super(TestStackWithSimpleStore, self).setUp()
 
3352
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3353
        self.registry = config.option_registry
 
3354
 
 
3355
    def get_conf(self, content=None):
 
3356
        return config.MemoryStack(content)
 
3357
 
 
3358
    def test_override_value_from_env(self):
 
3359
        self.overrideEnv('FOO', None)
 
3360
        self.registry.register(
 
3361
            config.Option('foo', default='bar', override_from_env=['FOO']))
 
3362
        self.overrideEnv('FOO', 'quux')
 
3363
        # Env variable provides a default taking over the option one
 
3364
        conf = self.get_conf(b'foo=store')
 
3365
        self.assertEqual('quux', conf.get('foo'))
 
3366
 
 
3367
    def test_first_override_value_from_env_wins(self):
 
3368
        self.overrideEnv('NO_VALUE', None)
 
3369
        self.overrideEnv('FOO', None)
 
3370
        self.overrideEnv('BAZ', None)
 
3371
        self.registry.register(
 
3372
            config.Option('foo', default='bar',
 
3373
                          override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
 
3374
        self.overrideEnv('FOO', 'foo')
 
3375
        self.overrideEnv('BAZ', 'baz')
 
3376
        # The first env var set wins
 
3377
        conf = self.get_conf(b'foo=store')
 
3378
        self.assertEqual('foo', conf.get('foo'))
 
3379
 
 
3380
 
 
3381
class TestMemoryStack(tests.TestCase):
 
3382
 
 
3383
    def test_get(self):
 
3384
        conf = config.MemoryStack(b'foo=bar')
 
3385
        self.assertEqual('bar', conf.get('foo'))
 
3386
 
 
3387
    def test_set(self):
 
3388
        conf = config.MemoryStack(b'foo=bar')
 
3389
        conf.set('foo', 'baz')
 
3390
        self.assertEqual('baz', conf.get('foo'))
 
3391
 
 
3392
    def test_no_content(self):
 
3393
        conf = config.MemoryStack()
 
3394
        # No content means no loading
 
3395
        self.assertFalse(conf.store.is_loaded())
 
3396
        self.assertRaises(NotImplementedError, conf.get, 'foo')
 
3397
        # But a content can still be provided
 
3398
        conf.store._load_from_string(b'foo=bar')
 
3399
        self.assertEqual('bar', conf.get('foo'))
 
3400
 
 
3401
 
 
3402
class TestStackIterSections(tests.TestCase):
 
3403
 
 
3404
    def test_empty_stack(self):
 
3405
        conf = config.Stack([])
 
3406
        sections = list(conf.iter_sections())
 
3407
        self.assertLength(0, sections)
 
3408
 
 
3409
    def test_empty_store(self):
 
3410
        store = config.IniFileStore()
 
3411
        store._load_from_string(b'')
 
3412
        conf = config.Stack([store.get_sections])
 
3413
        sections = list(conf.iter_sections())
 
3414
        self.assertLength(0, sections)
 
3415
 
 
3416
    def test_simple_store(self):
 
3417
        store = config.IniFileStore()
 
3418
        store._load_from_string(b'foo=bar')
 
3419
        conf = config.Stack([store.get_sections])
 
3420
        tuples = list(conf.iter_sections())
 
3421
        self.assertLength(1, tuples)
 
3422
        (found_store, found_section) = tuples[0]
 
3423
        self.assertIs(store, found_store)
 
3424
 
 
3425
    def test_two_stores(self):
 
3426
        store1 = config.IniFileStore()
 
3427
        store1._load_from_string(b'foo=bar')
 
3428
        store2 = config.IniFileStore()
 
3429
        store2._load_from_string(b'bar=qux')
 
3430
        conf = config.Stack([store1.get_sections, store2.get_sections])
 
3431
        tuples = list(conf.iter_sections())
 
3432
        self.assertLength(2, tuples)
 
3433
        self.assertIs(store1, tuples[0][0])
 
3434
        self.assertIs(store2, tuples[1][0])
 
3435
 
 
3436
 
 
3437
class TestStackWithTransport(tests.TestCaseWithTransport):
 
3438
 
 
3439
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
3440
                 in config.test_stack_builder_registry.iteritems()]
 
3441
 
 
3442
 
 
3443
class TestConcreteStacks(TestStackWithTransport):
 
3444
 
 
3445
    def test_build_stack(self):
 
3446
        # Just a smoke test to help debug builders
 
3447
        self.get_stack(self)
 
3448
 
 
3449
 
 
3450
class TestStackGet(TestStackWithTransport):
 
3451
 
 
3452
    def setUp(self):
 
3453
        super(TestStackGet, self).setUp()
 
3454
        self.conf = self.get_stack(self)
 
3455
 
 
3456
    def test_get_for_empty_stack(self):
 
3457
        self.assertEqual(None, self.conf.get('foo'))
 
3458
 
 
3459
    def test_get_hook(self):
 
3460
        self.conf.set('foo', 'bar')
 
3461
        calls = []
 
3462
 
 
3463
        def hook(*args):
 
3464
            calls.append(args)
 
3465
        config.ConfigHooks.install_named_hook('get', hook, None)
 
3466
        self.assertLength(0, calls)
 
3467
        value = self.conf.get('foo')
 
3468
        self.assertEqual('bar', value)
 
3469
        self.assertLength(1, calls)
 
3470
        self.assertEqual((self.conf, 'foo', 'bar'), calls[0])
 
3471
 
 
3472
 
 
3473
class TestStackGetWithConverter(tests.TestCase):
 
3474
 
 
3475
    def setUp(self):
 
3476
        super(TestStackGetWithConverter, self).setUp()
 
3477
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3478
        self.registry = config.option_registry
 
3479
 
 
3480
    def get_conf(self, content=None):
 
3481
        return config.MemoryStack(content)
 
3482
 
 
3483
    def register_bool_option(self, name, default=None, default_from_env=None):
 
3484
        b = config.Option(name, help='A boolean.',
 
3485
                          default=default, default_from_env=default_from_env,
 
3486
                          from_unicode=config.bool_from_store)
 
3487
        self.registry.register(b)
 
3488
 
 
3489
    def test_get_default_bool_None(self):
 
3490
        self.register_bool_option('foo')
 
3491
        conf = self.get_conf(b'')
 
3492
        self.assertEqual(None, conf.get('foo'))
 
3493
 
 
3494
    def test_get_default_bool_True(self):
 
3495
        self.register_bool_option('foo', u'True')
 
3496
        conf = self.get_conf(b'')
 
3497
        self.assertEqual(True, conf.get('foo'))
 
3498
 
 
3499
    def test_get_default_bool_False(self):
 
3500
        self.register_bool_option('foo', False)
 
3501
        conf = self.get_conf(b'')
 
3502
        self.assertEqual(False, conf.get('foo'))
 
3503
 
 
3504
    def test_get_default_bool_False_as_string(self):
 
3505
        self.register_bool_option('foo', u'False')
 
3506
        conf = self.get_conf(b'')
 
3507
        self.assertEqual(False, conf.get('foo'))
 
3508
 
 
3509
    def test_get_default_bool_from_env_converted(self):
 
3510
        self.register_bool_option('foo', u'True', default_from_env=['FOO'])
 
3511
        self.overrideEnv('FOO', 'False')
 
3512
        conf = self.get_conf(b'')
 
3513
        self.assertEqual(False, conf.get('foo'))
 
3514
 
 
3515
    def test_get_default_bool_when_conversion_fails(self):
 
3516
        self.register_bool_option('foo', default='True')
 
3517
        conf = self.get_conf(b'foo=invalid boolean')
 
3518
        self.assertEqual(True, conf.get('foo'))
 
3519
 
 
3520
    def register_integer_option(self, name,
 
3521
                                default=None, default_from_env=None):
 
3522
        i = config.Option(name, help='An integer.',
 
3523
                          default=default, default_from_env=default_from_env,
 
3524
                          from_unicode=config.int_from_store)
 
3525
        self.registry.register(i)
 
3526
 
 
3527
    def test_get_default_integer_None(self):
 
3528
        self.register_integer_option('foo')
 
3529
        conf = self.get_conf(b'')
 
3530
        self.assertEqual(None, conf.get('foo'))
 
3531
 
 
3532
    def test_get_default_integer(self):
 
3533
        self.register_integer_option('foo', 42)
 
3534
        conf = self.get_conf(b'')
 
3535
        self.assertEqual(42, conf.get('foo'))
 
3536
 
 
3537
    def test_get_default_integer_as_string(self):
 
3538
        self.register_integer_option('foo', u'42')
 
3539
        conf = self.get_conf(b'')
 
3540
        self.assertEqual(42, conf.get('foo'))
 
3541
 
 
3542
    def test_get_default_integer_from_env(self):
 
3543
        self.register_integer_option('foo', default_from_env=['FOO'])
 
3544
        self.overrideEnv('FOO', '18')
 
3545
        conf = self.get_conf(b'')
 
3546
        self.assertEqual(18, conf.get('foo'))
 
3547
 
 
3548
    def test_get_default_integer_when_conversion_fails(self):
 
3549
        self.register_integer_option('foo', default='12')
 
3550
        conf = self.get_conf(b'foo=invalid integer')
 
3551
        self.assertEqual(12, conf.get('foo'))
 
3552
 
 
3553
    def register_list_option(self, name, default=None, default_from_env=None):
 
3554
        l = config.ListOption(name, help='A list.', default=default,
 
3555
                              default_from_env=default_from_env)
 
3556
        self.registry.register(l)
 
3557
 
 
3558
    def test_get_default_list_None(self):
 
3559
        self.register_list_option('foo')
 
3560
        conf = self.get_conf(b'')
 
3561
        self.assertEqual(None, conf.get('foo'))
 
3562
 
 
3563
    def test_get_default_list_empty(self):
 
3564
        self.register_list_option('foo', '')
 
3565
        conf = self.get_conf(b'')
 
3566
        self.assertEqual([], conf.get('foo'))
 
3567
 
 
3568
    def test_get_default_list_from_env(self):
 
3569
        self.register_list_option('foo', default_from_env=['FOO'])
 
3570
        self.overrideEnv('FOO', '')
 
3571
        conf = self.get_conf(b'')
 
3572
        self.assertEqual([], conf.get('foo'))
 
3573
 
 
3574
    def test_get_with_list_converter_no_item(self):
 
3575
        self.register_list_option('foo', None)
 
3576
        conf = self.get_conf(b'foo=,')
 
3577
        self.assertEqual([], conf.get('foo'))
 
3578
 
 
3579
    def test_get_with_list_converter_many_items(self):
 
3580
        self.register_list_option('foo', None)
 
3581
        conf = self.get_conf(b'foo=m,o,r,e')
 
3582
        self.assertEqual(['m', 'o', 'r', 'e'], conf.get('foo'))
 
3583
 
 
3584
    def test_get_with_list_converter_embedded_spaces_many_items(self):
 
3585
        self.register_list_option('foo', None)
 
3586
        conf = self.get_conf(b'foo=" bar", "baz "')
 
3587
        self.assertEqual([' bar', 'baz '], conf.get('foo'))
 
3588
 
 
3589
    def test_get_with_list_converter_stripped_spaces_many_items(self):
 
3590
        self.register_list_option('foo', None)
 
3591
        conf = self.get_conf(b'foo= bar ,  baz ')
 
3592
        self.assertEqual(['bar', 'baz'], conf.get('foo'))
 
3593
 
 
3594
 
 
3595
class TestIterOptionRefs(tests.TestCase):
 
3596
    """iter_option_refs is a bit unusual, document some cases."""
 
3597
 
 
3598
    def assertRefs(self, expected, string):
 
3599
        self.assertEqual(expected, list(config.iter_option_refs(string)))
 
3600
 
 
3601
    def test_empty(self):
 
3602
        self.assertRefs([(False, '')], '')
 
3603
 
 
3604
    def test_no_refs(self):
 
3605
        self.assertRefs([(False, 'foo bar')], 'foo bar')
 
3606
 
 
3607
    def test_single_ref(self):
 
3608
        self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
 
3609
 
 
3610
    def test_broken_ref(self):
 
3611
        self.assertRefs([(False, '{foo')], '{foo')
 
3612
 
 
3613
    def test_embedded_ref(self):
 
3614
        self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
 
3615
                        '{{foo}}')
 
3616
 
 
3617
    def test_two_refs(self):
 
3618
        self.assertRefs([(False, ''), (True, '{foo}'),
 
3619
                         (False, ''), (True, '{bar}'),
 
3620
                         (False, ''), ],
 
3621
                        '{foo}{bar}')
 
3622
 
 
3623
    def test_newline_in_refs_are_not_matched(self):
 
3624
        self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
 
3625
 
 
3626
 
 
3627
class TestStackExpandOptions(tests.TestCaseWithTransport):
 
3628
 
 
3629
    def setUp(self):
 
3630
        super(TestStackExpandOptions, self).setUp()
 
3631
        self.overrideAttr(config, 'option_registry', config.OptionRegistry())
 
3632
        self.registry = config.option_registry
 
3633
        store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
 
3634
        self.conf = config.Stack([store.get_sections], store)
 
3635
 
 
3636
    def assertExpansion(self, expected, string, env=None):
 
3637
        self.assertEqual(expected, self.conf.expand_options(string, env))
 
3638
 
 
3639
    def test_no_expansion(self):
 
3640
        self.assertExpansion('foo', 'foo')
 
3641
 
 
3642
    def test_expand_default_value(self):
 
3643
        self.conf.store._load_from_string(b'bar=baz')
 
3644
        self.registry.register(config.Option('foo', default=u'{bar}'))
 
3645
        self.assertEqual('baz', self.conf.get('foo', expand=True))
 
3646
 
 
3647
    def test_expand_default_from_env(self):
 
3648
        self.conf.store._load_from_string(b'bar=baz')
 
3649
        self.registry.register(config.Option('foo', default_from_env=['FOO']))
 
3650
        self.overrideEnv('FOO', '{bar}')
 
3651
        self.assertEqual('baz', self.conf.get('foo', expand=True))
 
3652
 
 
3653
    def test_expand_default_on_failed_conversion(self):
 
3654
        self.conf.store._load_from_string(b'baz=bogus\nbar=42\nfoo={baz}')
 
3655
        self.registry.register(
 
3656
            config.Option('foo', default=u'{bar}',
 
3657
                          from_unicode=config.int_from_store))
 
3658
        self.assertEqual(42, self.conf.get('foo', expand=True))
 
3659
 
 
3660
    def test_env_adding_options(self):
 
3661
        self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
 
3662
 
 
3663
    def test_env_overriding_options(self):
 
3664
        self.conf.store._load_from_string(b'foo=baz')
 
3665
        self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
 
3666
 
 
3667
    def test_simple_ref(self):
 
3668
        self.conf.store._load_from_string(b'foo=xxx')
 
3669
        self.assertExpansion('xxx', '{foo}')
 
3670
 
 
3671
    def test_unknown_ref(self):
 
3672
        self.assertRaises(config.ExpandingUnknownOption,
 
3673
                          self.conf.expand_options, '{foo}')
 
3674
 
 
3675
    def test_illegal_def_is_ignored(self):
 
3676
        self.assertExpansion('{1,2}', '{1,2}')
 
3677
        self.assertExpansion('{ }', '{ }')
 
3678
        self.assertExpansion('${Foo,f}', '${Foo,f}')
 
3679
 
 
3680
    def test_indirect_ref(self):
 
3681
        self.conf.store._load_from_string(b'''
 
3682
foo=xxx
 
3683
bar={foo}
 
3684
''')
 
3685
        self.assertExpansion('xxx', '{bar}')
 
3686
 
 
3687
    def test_embedded_ref(self):
 
3688
        self.conf.store._load_from_string(b'''
 
3689
foo=xxx
 
3690
bar=foo
 
3691
''')
 
3692
        self.assertExpansion('xxx', '{{bar}}')
 
3693
 
 
3694
    def test_simple_loop(self):
 
3695
        self.conf.store._load_from_string(b'foo={foo}')
 
3696
        self.assertRaises(config.OptionExpansionLoop,
 
3697
                          self.conf.expand_options, '{foo}')
 
3698
 
 
3699
    def test_indirect_loop(self):
 
3700
        self.conf.store._load_from_string(b'''
 
3701
foo={bar}
 
3702
bar={baz}
 
3703
baz={foo}''')
 
3704
        e = self.assertRaises(config.OptionExpansionLoop,
 
3705
                              self.conf.expand_options, '{foo}')
 
3706
        self.assertEqual('foo->bar->baz', e.refs)
 
3707
        self.assertEqual('{foo}', e.string)
 
3708
 
 
3709
    def test_list(self):
 
3710
        self.conf.store._load_from_string(b'''
 
3711
foo=start
 
3712
bar=middle
 
3713
baz=end
 
3714
list={foo},{bar},{baz}
 
3715
''')
 
3716
        self.registry.register(
 
3717
            config.ListOption('list'))
 
3718
        self.assertEqual(['start', 'middle', 'end'],
 
3719
                         self.conf.get('list', expand=True))
 
3720
 
 
3721
    def test_cascading_list(self):
 
3722
        self.conf.store._load_from_string(b'''
 
3723
foo=start,{bar}
 
3724
bar=middle,{baz}
 
3725
baz=end
 
3726
list={foo}
 
3727
''')
 
3728
        self.registry.register(config.ListOption('list'))
 
3729
        # Register an intermediate option as a list to ensure no conversion
 
3730
        # happen while expanding. Conversion should only occur for the original
 
3731
        # option ('list' here).
 
3732
        self.registry.register(config.ListOption('baz'))
 
3733
        self.assertEqual(['start', 'middle', 'end'],
 
3734
                         self.conf.get('list', expand=True))
 
3735
 
 
3736
    def test_pathologically_hidden_list(self):
 
3737
        self.conf.store._load_from_string(b'''
 
3738
foo=bin
 
3739
bar=go
 
3740
start={foo
 
3741
middle=},{
 
3742
end=bar}
 
3743
hidden={start}{middle}{end}
 
3744
''')
 
3745
        # What matters is what the registration says, the conversion happens
 
3746
        # only after all expansions have been performed
 
3747
        self.registry.register(config.ListOption('hidden'))
 
3748
        self.assertEqual(['bin', 'go'],
 
3749
                         self.conf.get('hidden', expand=True))
 
3750
 
 
3751
 
 
3752
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
 
3753
 
 
3754
    def setUp(self):
 
3755
        super(TestStackCrossSectionsExpand, self).setUp()
 
3756
 
 
3757
    def get_config(self, location, string):
 
3758
        if string is None:
 
3759
            string = b''
 
3760
        # Since we don't save the config we won't strictly require to inherit
 
3761
        # from TestCaseInTempDir, but an error occurs so quickly...
 
3762
        c = config.LocationStack(location)
 
3763
        c.store._load_from_string(string)
 
3764
        return c
 
3765
 
 
3766
    def test_dont_cross_unrelated_section(self):
 
3767
        c = self.get_config('/another/branch/path', b'''
 
3768
[/one/branch/path]
 
3769
foo = hello
 
3770
bar = {foo}/2
 
3771
 
 
3772
[/another/branch/path]
 
3773
bar = {foo}/2
 
3774
''')
 
3775
        self.assertRaises(config.ExpandingUnknownOption,
 
3776
                          c.get, 'bar', expand=True)
 
3777
 
 
3778
    def test_cross_related_sections(self):
 
3779
        c = self.get_config('/project/branch/path', b'''
 
3780
[/project]
 
3781
foo = qu
 
3782
 
 
3783
[/project/branch/path]
 
3784
bar = {foo}ux
 
3785
''')
 
3786
        self.assertEqual('quux', c.get('bar', expand=True))
 
3787
 
 
3788
 
 
3789
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
 
3790
 
 
3791
    def test_cross_global_locations(self):
 
3792
        l_store = config.LocationStore()
 
3793
        l_store._load_from_string(b'''
 
3794
[/branch]
 
3795
lfoo = loc-foo
 
3796
lbar = {gbar}
 
3797
''')
 
3798
        l_store.save()
 
3799
        g_store = config.GlobalStore()
 
3800
        g_store._load_from_string(b'''
 
3801
[DEFAULT]
 
3802
gfoo = {lfoo}
 
3803
gbar = glob-bar
 
3804
''')
 
3805
        g_store.save()
 
3806
        stack = config.LocationStack('/branch')
 
3807
        self.assertEqual('glob-bar', stack.get('lbar', expand=True))
 
3808
        self.assertEqual('loc-foo', stack.get('gfoo', expand=True))
 
3809
 
 
3810
 
 
3811
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
 
3812
 
 
3813
    def test_expand_locals_empty(self):
 
3814
        l_store = config.LocationStore()
 
3815
        l_store._load_from_string(b'''
 
3816
[/home/user/project]
 
3817
base = {basename}
 
3818
rel = {relpath}
 
3819
''')
 
3820
        l_store.save()
 
3821
        stack = config.LocationStack('/home/user/project/')
 
3822
        self.assertEqual('', stack.get('base', expand=True))
 
3823
        self.assertEqual('', stack.get('rel', expand=True))
 
3824
 
 
3825
    def test_expand_basename_locally(self):
 
3826
        l_store = config.LocationStore()
 
3827
        l_store._load_from_string(b'''
 
3828
[/home/user/project]
 
3829
bfoo = {basename}
 
3830
''')
 
3831
        l_store.save()
 
3832
        stack = config.LocationStack('/home/user/project/branch')
 
3833
        self.assertEqual('branch', stack.get('bfoo', expand=True))
 
3834
 
 
3835
    def test_expand_basename_locally_longer_path(self):
 
3836
        l_store = config.LocationStore()
 
3837
        l_store._load_from_string(b'''
 
3838
[/home/user]
 
3839
bfoo = {basename}
 
3840
''')
 
3841
        l_store.save()
 
3842
        stack = config.LocationStack('/home/user/project/dir/branch')
 
3843
        self.assertEqual('branch', stack.get('bfoo', expand=True))
 
3844
 
 
3845
    def test_expand_relpath_locally(self):
 
3846
        l_store = config.LocationStore()
 
3847
        l_store._load_from_string(b'''
 
3848
[/home/user/project]
 
3849
lfoo = loc-foo/{relpath}
 
3850
''')
 
3851
        l_store.save()
 
3852
        stack = config.LocationStack('/home/user/project/branch')
 
3853
        self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
 
3854
 
 
3855
    def test_expand_relpath_unknonw_in_global(self):
 
3856
        g_store = config.GlobalStore()
 
3857
        g_store._load_from_string(b'''
 
3858
[DEFAULT]
 
3859
gfoo = {relpath}
 
3860
''')
 
3861
        g_store.save()
 
3862
        stack = config.LocationStack('/home/user/project/branch')
 
3863
        self.assertRaises(config.ExpandingUnknownOption,
 
3864
                          stack.get, 'gfoo', expand=True)
 
3865
 
 
3866
    def test_expand_local_option_locally(self):
 
3867
        l_store = config.LocationStore()
 
3868
        l_store._load_from_string(b'''
 
3869
[/home/user/project]
 
3870
lfoo = loc-foo/{relpath}
 
3871
lbar = {gbar}
 
3872
''')
 
3873
        l_store.save()
 
3874
        g_store = config.GlobalStore()
 
3875
        g_store._load_from_string(b'''
 
3876
[DEFAULT]
 
3877
gfoo = {lfoo}
 
3878
gbar = glob-bar
 
3879
''')
 
3880
        g_store.save()
 
3881
        stack = config.LocationStack('/home/user/project/branch')
 
3882
        self.assertEqual('glob-bar', stack.get('lbar', expand=True))
 
3883
        self.assertEqual('loc-foo/branch', stack.get('gfoo', expand=True))
 
3884
 
 
3885
    def test_locals_dont_leak(self):
 
3886
        """Make sure we chose the right local in presence of several sections.
 
3887
        """
 
3888
        l_store = config.LocationStore()
 
3889
        l_store._load_from_string(b'''
 
3890
[/home/user]
 
3891
lfoo = loc-foo/{relpath}
 
3892
[/home/user/project]
 
3893
lfoo = loc-foo/{relpath}
 
3894
''')
 
3895
        l_store.save()
 
3896
        stack = config.LocationStack('/home/user/project/branch')
 
3897
        self.assertEqual('loc-foo/branch', stack.get('lfoo', expand=True))
 
3898
        stack = config.LocationStack('/home/user/bar/baz')
 
3899
        self.assertEqual('loc-foo/bar/baz', stack.get('lfoo', expand=True))
 
3900
 
 
3901
 
 
3902
class TestStackSet(TestStackWithTransport):
 
3903
 
 
3904
    def test_simple_set(self):
 
3905
        conf = self.get_stack(self)
 
3906
        self.assertEqual(None, conf.get('foo'))
 
3907
        conf.set('foo', 'baz')
 
3908
        # Did we get it back ?
 
3909
        self.assertEqual('baz', conf.get('foo'))
 
3910
 
 
3911
    def test_set_creates_a_new_section(self):
 
3912
        conf = self.get_stack(self)
 
3913
        conf.set('foo', 'baz')
 
3914
        self.assertEqual, 'baz', conf.get('foo')
 
3915
 
 
3916
    def test_set_hook(self):
 
3917
        calls = []
 
3918
 
 
3919
        def hook(*args):
 
3920
            calls.append(args)
 
3921
        config.ConfigHooks.install_named_hook('set', hook, None)
 
3922
        self.assertLength(0, calls)
 
3923
        conf = self.get_stack(self)
 
3924
        conf.set('foo', 'bar')
 
3925
        self.assertLength(1, calls)
 
3926
        self.assertEqual((conf, 'foo', 'bar'), calls[0])
 
3927
 
 
3928
 
 
3929
class TestStackRemove(TestStackWithTransport):
 
3930
 
 
3931
    def test_remove_existing(self):
 
3932
        conf = self.get_stack(self)
 
3933
        conf.set('foo', 'bar')
 
3934
        self.assertEqual('bar', conf.get('foo'))
 
3935
        conf.remove('foo')
 
3936
        # Did we get it back ?
 
3937
        self.assertEqual(None, conf.get('foo'))
 
3938
 
 
3939
    def test_remove_unknown(self):
 
3940
        conf = self.get_stack(self)
 
3941
        self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
 
3942
 
 
3943
    def test_remove_hook(self):
 
3944
        calls = []
 
3945
 
 
3946
        def hook(*args):
 
3947
            calls.append(args)
 
3948
        config.ConfigHooks.install_named_hook('remove', hook, None)
 
3949
        self.assertLength(0, calls)
 
3950
        conf = self.get_stack(self)
 
3951
        conf.set('foo', 'bar')
 
3952
        conf.remove('foo')
 
3953
        self.assertLength(1, calls)
 
3954
        self.assertEqual((conf, 'foo'), calls[0])
 
3955
 
 
3956
 
 
3957
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
 
3958
 
 
3959
    def setUp(self):
 
3960
        super(TestConfigGetOptions, self).setUp()
 
3961
        create_configs(self)
 
3962
 
 
3963
    def test_no_variable(self):
 
3964
        # Using branch should query branch, locations and breezy
 
3965
        self.assertOptions([], self.branch_config)
 
3966
 
 
3967
    def test_option_in_breezy(self):
 
3968
        self.breezy_config.set_user_option('file', 'breezy')
 
3969
        self.assertOptions([('file', 'breezy', 'DEFAULT', 'breezy')],
 
3970
                           self.breezy_config)
 
3971
 
 
3972
    def test_option_in_locations(self):
 
3973
        self.locations_config.set_user_option('file', 'locations')
 
3974
        self.assertOptions(
 
3975
            [('file', 'locations', self.tree.basedir, 'locations')],
 
3976
            self.locations_config)
 
3977
 
 
3978
    def test_option_in_branch(self):
 
3979
        self.branch_config.set_user_option('file', 'branch')
 
3980
        self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
 
3981
                           self.branch_config)
 
3982
 
 
3983
    def test_option_in_breezy_and_branch(self):
 
3984
        self.breezy_config.set_user_option('file', 'breezy')
 
3985
        self.branch_config.set_user_option('file', 'branch')
 
3986
        self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
 
3987
                            ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
3988
                           self.branch_config)
 
3989
 
 
3990
    def test_option_in_branch_and_locations(self):
 
3991
        # Hmm, locations override branch :-/
 
3992
        self.locations_config.set_user_option('file', 'locations')
 
3993
        self.branch_config.set_user_option('file', 'branch')
 
3994
        self.assertOptions(
 
3995
            [('file', 'locations', self.tree.basedir, 'locations'),
 
3996
             ('file', 'branch', 'DEFAULT', 'branch'), ],
 
3997
            self.branch_config)
 
3998
 
 
3999
    def test_option_in_breezy_locations_and_branch(self):
 
4000
        self.breezy_config.set_user_option('file', 'breezy')
 
4001
        self.locations_config.set_user_option('file', 'locations')
 
4002
        self.branch_config.set_user_option('file', 'branch')
 
4003
        self.assertOptions(
 
4004
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4005
             ('file', 'branch', 'DEFAULT', 'branch'),
 
4006
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
4007
            self.branch_config)
 
4008
 
 
4009
 
 
4010
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
 
4011
 
 
4012
    def setUp(self):
 
4013
        super(TestConfigRemoveOption, self).setUp()
 
4014
        create_configs_with_file_option(self)
 
4015
 
 
4016
    def test_remove_in_locations(self):
 
4017
        self.locations_config.remove_user_option('file', self.tree.basedir)
 
4018
        self.assertOptions(
 
4019
            [('file', 'branch', 'DEFAULT', 'branch'),
 
4020
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
4021
            self.branch_config)
 
4022
 
 
4023
    def test_remove_in_branch(self):
 
4024
        self.branch_config.remove_user_option('file')
 
4025
        self.assertOptions(
 
4026
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4027
             ('file', 'breezy', 'DEFAULT', 'breezy'), ],
 
4028
            self.branch_config)
 
4029
 
 
4030
    def test_remove_in_breezy(self):
 
4031
        self.breezy_config.remove_user_option('file')
 
4032
        self.assertOptions(
 
4033
            [('file', 'locations', self.tree.basedir, 'locations'),
 
4034
             ('file', 'branch', 'DEFAULT', 'branch'), ],
 
4035
            self.branch_config)
 
4036
 
 
4037
 
 
4038
class TestConfigGetSections(tests.TestCaseWithTransport):
 
4039
 
 
4040
    def setUp(self):
 
4041
        super(TestConfigGetSections, self).setUp()
 
4042
        create_configs(self)
 
4043
 
 
4044
    def assertSectionNames(self, expected, conf, name=None):
 
4045
        """Check which sections are returned for a given config.
 
4046
 
 
4047
        If fallback configurations exist their sections can be included.
 
4048
 
 
4049
        :param expected: A list of section names.
 
4050
 
 
4051
        :param conf: The configuration that will be queried.
 
4052
 
 
4053
        :param name: An optional section name that will be passed to
 
4054
            get_sections().
 
4055
        """
 
4056
        sections = list(conf._get_sections(name))
 
4057
        self.assertLength(len(expected), sections)
 
4058
        self.assertEqual(expected, [n for n, _, _ in sections])
 
4059
 
 
4060
    def test_breezy_default_section(self):
 
4061
        self.assertSectionNames(['DEFAULT'], self.breezy_config)
 
4062
 
 
4063
    def test_locations_default_section(self):
 
4064
        # No sections are defined in an empty file
 
4065
        self.assertSectionNames([], self.locations_config)
 
4066
 
 
4067
    def test_locations_named_section(self):
 
4068
        self.locations_config.set_user_option('file', 'locations')
 
4069
        self.assertSectionNames([self.tree.basedir], self.locations_config)
 
4070
 
 
4071
    def test_locations_matching_sections(self):
 
4072
        loc_config = self.locations_config
 
4073
        loc_config.set_user_option('file', 'locations')
 
4074
        # We need to cheat a bit here to create an option in sections above and
 
4075
        # below the 'location' one.
 
4076
        parser = loc_config._get_parser()
 
4077
        # locations.cong deals with '/' ignoring native os.sep
 
4078
        location_names = self.tree.basedir.split('/')
 
4079
        parent = '/'.join(location_names[:-1])
 
4080
        child = '/'.join(location_names + ['child'])
 
4081
        parser[parent] = {}
 
4082
        parser[parent]['file'] = 'parent'
 
4083
        parser[child] = {}
 
4084
        parser[child]['file'] = 'child'
 
4085
        self.assertSectionNames([self.tree.basedir, parent], loc_config)
 
4086
 
 
4087
    def test_branch_data_default_section(self):
 
4088
        self.assertSectionNames([None],
 
4089
                                self.branch_config._get_branch_data_config())
 
4090
 
 
4091
    def test_branch_default_sections(self):
 
4092
        # No sections are defined in an empty locations file
 
4093
        self.assertSectionNames([None, 'DEFAULT'],
 
4094
                                self.branch_config)
 
4095
        # Unless we define an option
 
4096
        self.branch_config._get_location_config().set_user_option(
 
4097
            'file', 'locations')
 
4098
        self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
 
4099
                                self.branch_config)
 
4100
 
 
4101
    def test_breezy_named_section(self):
 
4102
        # We need to cheat as the API doesn't give direct access to sections
 
4103
        # other than DEFAULT.
 
4104
        self.breezy_config.set_alias('breezy', 'bzr')
 
4105
        self.assertSectionNames(['ALIASES'], self.breezy_config, 'ALIASES')
 
4106
 
 
4107
 
 
4108
class TestSharedStores(tests.TestCaseInTempDir):
 
4109
 
 
4110
    def test_breezy_conf_shared(self):
 
4111
        g1 = config.GlobalStack()
 
4112
        g2 = config.GlobalStack()
 
4113
        # The two stacks share the same store
 
4114
        self.assertIs(g1.store, g2.store)
 
4115
 
 
4116
 
 
4117
class TestAuthenticationConfigFilePermissions(tests.TestCaseInTempDir):
 
4118
    """Test warning for permissions of authentication.conf."""
 
4119
 
 
4120
    def setUp(self):
 
4121
        super(TestAuthenticationConfigFilePermissions, self).setUp()
 
4122
        self.path = osutils.pathjoin(self.test_dir, 'authentication.conf')
 
4123
        with open(self.path, 'wb') as f:
 
4124
            f.write(b"""[broken]
 
4125
scheme=ftp
 
4126
user=joe
 
4127
port=port # Error: Not an int
 
4128
""")
 
4129
        self.overrideAttr(bedding, 'authentication_config_path',
 
4130
                          lambda: self.path)
 
4131
        osutils.chmod_if_possible(self.path, 0o755)
 
4132
 
 
4133
    def test_check_warning(self):
 
4134
        conf = config.AuthenticationConfig()
 
4135
        self.assertEqual(conf._filename, self.path)
 
4136
        self.assertContainsRe(self.get_log(),
 
4137
                              'Saved passwords may be accessible by other users.')
 
4138
 
 
4139
    def test_check_suppressed_warning(self):
 
4140
        global_config = config.GlobalConfig()
 
4141
        global_config.set_user_option('suppress_warnings',
 
4142
                                      'insecure_permissions')
 
4143
        conf = config.AuthenticationConfig()
 
4144
        self.assertEqual(conf._filename, self.path)
 
4145
        self.assertNotContainsRe(self.get_log(),
 
4146
                                 'Saved passwords may be accessible by other users.')
 
4147
 
 
4148
 
 
4149
class TestAuthenticationConfigFile(tests.TestCase):
 
4150
    """Test the authentication.conf file matching"""
 
4151
 
 
4152
    def _got_user_passwd(self, expected_user, expected_password,
 
4153
                         config, *args, **kwargs):
 
4154
        credentials = config.get_credentials(*args, **kwargs)
 
4155
        if credentials is None:
 
4156
            user = None
 
4157
            password = None
 
4158
        else:
 
4159
            user = credentials['user']
 
4160
            password = credentials['password']
 
4161
        self.assertEqual(expected_user, user)
 
4162
        self.assertEqual(expected_password, password)
 
4163
 
 
4164
    def test_empty_config(self):
 
4165
        conf = config.AuthenticationConfig(_file=BytesIO())
 
4166
        self.assertEqual({}, conf._get_config())
 
4167
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
 
4168
 
 
4169
    def test_non_utf8_config(self):
 
4170
        conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar\xff'))
 
4171
        self.assertRaises(config.ConfigContentError, conf._get_config)
 
4172
 
 
4173
    def test_missing_auth_section_header(self):
 
4174
        conf = config.AuthenticationConfig(_file=BytesIO(b'foo = bar'))
 
4175
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
 
4176
 
 
4177
    def test_auth_section_header_not_closed(self):
 
4178
        conf = config.AuthenticationConfig(_file=BytesIO(b'[DEF'))
 
4179
        self.assertRaises(config.ParseConfigError, conf._get_config)
 
4180
 
 
4181
    def test_auth_value_not_boolean(self):
 
4182
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4183
[broken]
 
4184
scheme=ftp
 
4185
user=joe
 
4186
verify_certificates=askme # Error: Not a boolean
 
4187
"""))
 
4188
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
 
4189
 
 
4190
    def test_auth_value_not_int(self):
 
4191
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4192
[broken]
 
4193
scheme=ftp
 
4194
user=joe
 
4195
port=port # Error: Not an int
 
4196
"""))
 
4197
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
 
4198
 
 
4199
    def test_unknown_password_encoding(self):
 
4200
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4201
[broken]
 
4202
scheme=ftp
 
4203
user=joe
 
4204
password_encoding=unknown
 
4205
"""))
 
4206
        self.assertRaises(ValueError, conf.get_password,
 
4207
                          'ftp', 'foo.net', 'joe')
 
4208
 
 
4209
    def test_credentials_for_scheme_host(self):
 
4210
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4211
# Identity on foo.net
 
4212
[ftp definition]
 
4213
scheme=ftp
 
4214
host=foo.net
 
4215
user=joe
 
4216
password=secret-pass
 
4217
"""))
 
4218
        # Basic matching
 
4219
        self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
 
4220
        # different scheme
 
4221
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
 
4222
        # different host
 
4223
        self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
 
4224
 
 
4225
    def test_credentials_for_host_port(self):
 
4226
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4227
# Identity on foo.net
 
4228
[ftp definition]
 
4229
scheme=ftp
 
4230
port=10021
 
4231
host=foo.net
 
4232
user=joe
 
4233
password=secret-pass
 
4234
"""))
 
4235
        # No port
 
4236
        self._got_user_passwd('joe', 'secret-pass',
 
4237
                              conf, 'ftp', 'foo.net', port=10021)
 
4238
        # different port
 
4239
        self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
 
4240
 
 
4241
    def test_for_matching_host(self):
 
4242
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4243
# Identity on foo.net
 
4244
[sourceforge]
 
4245
scheme=bzr
 
4246
host=bzr.sf.net
 
4247
user=joe
 
4248
password=joepass
 
4249
[sourceforge domain]
 
4250
scheme=bzr
 
4251
host=.bzr.sf.net
 
4252
user=georges
 
4253
password=bendover
 
4254
"""))
 
4255
        # matching domain
 
4256
        self._got_user_passwd('georges', 'bendover',
 
4257
                              conf, 'bzr', 'foo.bzr.sf.net')
 
4258
        # phishing attempt
 
4259
        self._got_user_passwd(None, None,
 
4260
                              conf, 'bzr', 'bbzr.sf.net')
 
4261
 
 
4262
    def test_for_matching_host_None(self):
 
4263
        conf = config.AuthenticationConfig(_file=BytesIO(b"""\
 
4264
# Identity on foo.net
 
4265
[catchup bzr]
 
4266
scheme=bzr
 
4267
user=joe
 
4268
password=joepass
 
4269
[DEFAULT]
 
4270
user=georges
 
4271
password=bendover
 
4272
"""))
 
4273
        # match no host
 
4274
        self._got_user_passwd('joe', 'joepass',
 
4275
                              conf, 'bzr', 'quux.net')
 
4276
        # no host but different scheme
 
4277
        self._got_user_passwd('georges', 'bendover',
 
4278
                              conf, 'ftp', 'quux.net')
 
4279
 
 
4280
    def test_credentials_for_path(self):
 
4281
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4282
[http dir1]
 
4283
scheme=http
 
4284
host=bar.org
 
4285
path=/dir1
 
4286
user=jim
 
4287
password=jimpass
 
4288
[http dir2]
 
4289
scheme=http
 
4290
host=bar.org
 
4291
path=/dir2
 
4292
user=georges
 
4293
password=bendover
 
4294
"""))
 
4295
        # no path no dice
 
4296
        self._got_user_passwd(None, None,
 
4297
                              conf, 'http', host='bar.org', path='/dir3')
 
4298
        # matching path
 
4299
        self._got_user_passwd('georges', 'bendover',
 
4300
                              conf, 'http', host='bar.org', path='/dir2')
 
4301
        # matching subdir
 
4302
        self._got_user_passwd('jim', 'jimpass',
 
4303
                              conf, 'http', host='bar.org', path='/dir1/subdir')
 
4304
 
 
4305
    def test_credentials_for_user(self):
 
4306
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4307
[with user]
 
4308
scheme=http
 
4309
host=bar.org
 
4310
user=jim
 
4311
password=jimpass
 
4312
"""))
 
4313
        # Get user
 
4314
        self._got_user_passwd('jim', 'jimpass',
 
4315
                              conf, 'http', 'bar.org')
 
4316
        # Get same user
 
4317
        self._got_user_passwd('jim', 'jimpass',
 
4318
                              conf, 'http', 'bar.org', user='jim')
 
4319
        # Don't get a different user if one is specified
 
4320
        self._got_user_passwd(None, None,
 
4321
                              conf, 'http', 'bar.org', user='georges')
 
4322
 
 
4323
    def test_credentials_for_user_without_password(self):
 
4324
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4325
[without password]
 
4326
scheme=http
 
4327
host=bar.org
 
4328
user=jim
 
4329
"""))
 
4330
        # Get user but no password
 
4331
        self._got_user_passwd('jim', None,
 
4332
                              conf, 'http', 'bar.org')
 
4333
 
 
4334
    def test_verify_certificates(self):
 
4335
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4336
[self-signed]
 
4337
scheme=https
 
4338
host=bar.org
 
4339
user=jim
 
4340
password=jimpass
 
4341
verify_certificates=False
 
4342
[normal]
 
4343
scheme=https
 
4344
host=foo.net
 
4345
user=georges
 
4346
password=bendover
 
4347
"""))
 
4348
        credentials = conf.get_credentials('https', 'bar.org')
 
4349
        self.assertEqual(False, credentials.get('verify_certificates'))
 
4350
        credentials = conf.get_credentials('https', 'foo.net')
 
4351
        self.assertEqual(True, credentials.get('verify_certificates'))
 
4352
 
 
4353
 
 
4354
class TestAuthenticationStorage(tests.TestCaseInTempDir):
 
4355
 
 
4356
    def test_set_credentials(self):
 
4357
        conf = config.AuthenticationConfig()
 
4358
        conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
 
4359
                             99, path='/foo', verify_certificates=False, realm='realm')
 
4360
        credentials = conf.get_credentials(host='host', scheme='scheme',
 
4361
                                           port=99, path='/foo',
 
4362
                                           realm='realm')
 
4363
        CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
 
4364
                       'verify_certificates': False, 'scheme': 'scheme',
 
4365
                       'host': 'host', 'port': 99, 'path': '/foo',
 
4366
                       'realm': 'realm'}
 
4367
        self.assertEqual(CREDENTIALS, credentials)
 
4368
        credentials_from_disk = config.AuthenticationConfig().get_credentials(
 
4369
            host='host', scheme='scheme', port=99, path='/foo', realm='realm')
 
4370
        self.assertEqual(CREDENTIALS, credentials_from_disk)
 
4371
 
 
4372
    def test_reset_credentials_different_name(self):
 
4373
        conf = config.AuthenticationConfig()
 
4374
        conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
 
4375
        conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
 
4376
        self.assertIs(None, conf._get_config().get('name'))
 
4377
        credentials = conf.get_credentials(host='host', scheme='scheme')
 
4378
        CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
 
4379
                       'password', 'verify_certificates': True,
 
4380
                       'scheme': 'scheme', 'host': 'host', 'port': None,
 
4381
                       'path': None, 'realm': None}
 
4382
        self.assertEqual(CREDENTIALS, credentials)
 
4383
 
 
4384
 
 
4385
class TestAuthenticationConfig(tests.TestCaseInTempDir):
 
4386
    """Test AuthenticationConfig behaviour"""
 
4387
 
 
4388
    def _check_default_password_prompt(self, expected_prompt_format, scheme,
 
4389
                                       host=None, port=None, realm=None,
 
4390
                                       path=None):
 
4391
        if host is None:
 
4392
            host = 'bar.org'
 
4393
        user, password = 'jim', 'precious'
 
4394
        expected_prompt = expected_prompt_format % {
 
4395
            'scheme': scheme, 'host': host, 'port': port,
 
4396
            'user': user, 'realm': realm}
 
4397
 
 
4398
        ui.ui_factory = tests.TestUIFactory(stdin=password + '\n')
 
4399
        # We use an empty conf so that the user is always prompted
 
4400
        conf = config.AuthenticationConfig()
 
4401
        self.assertEqual(password,
 
4402
                         conf.get_password(scheme, host, user, port=port,
 
4403
                                           realm=realm, path=path))
 
4404
        self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
 
4405
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
 
4406
 
 
4407
    def _check_default_username_prompt(self, expected_prompt_format, scheme,
 
4408
                                       host=None, port=None, realm=None,
 
4409
                                       path=None):
 
4410
        if host is None:
 
4411
            host = 'bar.org'
 
4412
        username = 'jim'
 
4413
        expected_prompt = expected_prompt_format % {
 
4414
            'scheme': scheme, 'host': host, 'port': port,
 
4415
            'realm': realm}
 
4416
        ui.ui_factory = tests.TestUIFactory(stdin=username + '\n')
 
4417
        # We use an empty conf so that the user is always prompted
 
4418
        conf = config.AuthenticationConfig()
 
4419
        self.assertEqual(username, conf.get_user(scheme, host, port=port,
 
4420
                                                 realm=realm, path=path, ask=True))
 
4421
        self.assertEqual(expected_prompt, ui.ui_factory.stderr.getvalue())
 
4422
        self.assertEqual('', ui.ui_factory.stdout.getvalue())
 
4423
 
 
4424
    def test_username_defaults_prompts(self):
 
4425
        # HTTP prompts can't be tested here, see test_http.py
 
4426
        self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
 
4427
        self._check_default_username_prompt(
 
4428
            u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
 
4429
        self._check_default_username_prompt(
 
4430
            u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
 
4431
 
 
4432
    def test_username_default_no_prompt(self):
 
4433
        conf = config.AuthenticationConfig()
 
4434
        self.assertEqual(None,
 
4435
                         conf.get_user('ftp', 'example.com'))
 
4436
        self.assertEqual("explicitdefault",
 
4437
                         conf.get_user('ftp', 'example.com', default="explicitdefault"))
 
4438
 
 
4439
    def test_password_default_prompts(self):
 
4440
        # HTTP prompts can't be tested here, see test_http.py
 
4441
        self._check_default_password_prompt(
 
4442
            u'FTP %(user)s@%(host)s password: ', 'ftp')
 
4443
        self._check_default_password_prompt(
 
4444
            u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
 
4445
        self._check_default_password_prompt(
 
4446
            u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
 
4447
        # SMTP port handling is a bit special (it's handled if embedded in the
 
4448
        # host too)
 
4449
        # FIXME: should we: forbid that, extend it to other schemes, leave
 
4450
        # things as they are that's fine thank you ?
 
4451
        self._check_default_password_prompt(
 
4452
            u'SMTP %(user)s@%(host)s password: ', 'smtp')
 
4453
        self._check_default_password_prompt(
 
4454
            u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
 
4455
        self._check_default_password_prompt(
 
4456
            u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
 
4457
 
 
4458
    def test_ssh_password_emits_warning(self):
 
4459
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4460
[ssh with password]
 
4461
scheme=ssh
 
4462
host=bar.org
 
4463
user=jim
 
4464
password=jimpass
 
4465
"""))
 
4466
        entered_password = 'typed-by-hand'
 
4467
        ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
 
4468
 
 
4469
        # Since the password defined in the authentication config is ignored,
 
4470
        # the user is prompted
 
4471
        self.assertEqual(entered_password,
 
4472
                         conf.get_password('ssh', 'bar.org', user='jim'))
 
4473
        self.assertContainsRe(
 
4474
            self.get_log(),
 
4475
            'password ignored in section \\[ssh with password\\]')
 
4476
 
 
4477
    def test_ssh_without_password_doesnt_emit_warning(self):
 
4478
        conf = config.AuthenticationConfig(_file=BytesIO(b"""
 
4479
[ssh with password]
 
4480
scheme=ssh
 
4481
host=bar.org
 
4482
user=jim
 
4483
"""))
 
4484
        entered_password = 'typed-by-hand'
 
4485
        ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n')
 
4486
 
 
4487
        # Since the password defined in the authentication config is ignored,
 
4488
        # the user is prompted
 
4489
        self.assertEqual(entered_password,
 
4490
                         conf.get_password('ssh', 'bar.org', user='jim'))
 
4491
        # No warning shoud be emitted since there is no password. We are only
 
4492
        # providing "user".
 
4493
        self.assertNotContainsRe(
 
4494
            self.get_log(),
 
4495
            'password ignored in section \\[ssh with password\\]')
 
4496
 
 
4497
    def test_uses_fallback_stores(self):
 
4498
        self.overrideAttr(config, 'credential_store_registry',
 
4499
                          config.CredentialStoreRegistry())
 
4500
        store = StubCredentialStore()
 
4501
        store.add_credentials("http", "example.com", "joe", "secret")
 
4502
        config.credential_store_registry.register("stub", store, fallback=True)
 
4503
        conf = config.AuthenticationConfig(_file=BytesIO())
 
4504
        creds = conf.get_credentials("http", "example.com")
 
4505
        self.assertEqual("joe", creds["user"])
 
4506
        self.assertEqual("secret", creds["password"])
 
4507
 
 
4508
 
 
4509
class StubCredentialStore(config.CredentialStore):
 
4510
 
 
4511
    def __init__(self):
 
4512
        self._username = {}
 
4513
        self._password = {}
 
4514
 
 
4515
    def add_credentials(self, scheme, host, user, password=None):
 
4516
        self._username[(scheme, host)] = user
 
4517
        self._password[(scheme, host)] = password
 
4518
 
 
4519
    def get_credentials(self, scheme, host, port=None, user=None,
 
4520
                        path=None, realm=None):
 
4521
        key = (scheme, host)
 
4522
        if key not in self._username:
 
4523
            return None
 
4524
        return {"scheme": scheme, "host": host, "port": port,
 
4525
                "user": self._username[key], "password": self._password[key]}
 
4526
 
 
4527
 
 
4528
class CountingCredentialStore(config.CredentialStore):
 
4529
 
 
4530
    def __init__(self):
 
4531
        self._calls = 0
 
4532
 
 
4533
    def get_credentials(self, scheme, host, port=None, user=None,
 
4534
                        path=None, realm=None):
 
4535
        self._calls += 1
 
4536
        return None
 
4537
 
 
4538
 
 
4539
class TestCredentialStoreRegistry(tests.TestCase):
 
4540
 
 
4541
    def _get_cs_registry(self):
 
4542
        return config.credential_store_registry
 
4543
 
 
4544
    def test_default_credential_store(self):
 
4545
        r = self._get_cs_registry()
 
4546
        default = r.get_credential_store(None)
 
4547
        self.assertIsInstance(default, config.PlainTextCredentialStore)
 
4548
 
 
4549
    def test_unknown_credential_store(self):
 
4550
        r = self._get_cs_registry()
 
4551
        # It's hard to imagine someone creating a credential store named
 
4552
        # 'unknown' so we use that as an never registered key.
 
4553
        self.assertRaises(KeyError, r.get_credential_store, 'unknown')
 
4554
 
 
4555
    def test_fallback_none_registered(self):
 
4556
        r = config.CredentialStoreRegistry()
 
4557
        self.assertEqual(None,
 
4558
                         r.get_fallback_credentials("http", "example.com"))
 
4559
 
 
4560
    def test_register(self):
 
4561
        r = config.CredentialStoreRegistry()
 
4562
        r.register("stub", StubCredentialStore(), fallback=False)
 
4563
        r.register("another", StubCredentialStore(), fallback=True)
 
4564
        self.assertEqual(["another", "stub"], r.keys())
 
4565
 
 
4566
    def test_register_lazy(self):
 
4567
        r = config.CredentialStoreRegistry()
 
4568
        r.register_lazy("stub", "breezy.tests.test_config",
 
4569
                        "StubCredentialStore", fallback=False)
 
4570
        self.assertEqual(["stub"], r.keys())
 
4571
        self.assertIsInstance(r.get_credential_store("stub"),
 
4572
                              StubCredentialStore)
 
4573
 
 
4574
    def test_is_fallback(self):
 
4575
        r = config.CredentialStoreRegistry()
 
4576
        r.register("stub1", None, fallback=False)
 
4577
        r.register("stub2", None, fallback=True)
 
4578
        self.assertEqual(False, r.is_fallback("stub1"))
 
4579
        self.assertEqual(True, r.is_fallback("stub2"))
 
4580
 
 
4581
    def test_no_fallback(self):
 
4582
        r = config.CredentialStoreRegistry()
 
4583
        store = CountingCredentialStore()
 
4584
        r.register("count", store, fallback=False)
 
4585
        self.assertEqual(None,
 
4586
                         r.get_fallback_credentials("http", "example.com"))
 
4587
        self.assertEqual(0, store._calls)
 
4588
 
 
4589
    def test_fallback_credentials(self):
 
4590
        r = config.CredentialStoreRegistry()
 
4591
        store = StubCredentialStore()
 
4592
        store.add_credentials("http", "example.com",
 
4593
                              "somebody", "geheim")
 
4594
        r.register("stub", store, fallback=True)
 
4595
        creds = r.get_fallback_credentials("http", "example.com")
 
4596
        self.assertEqual("somebody", creds["user"])
 
4597
        self.assertEqual("geheim", creds["password"])
 
4598
 
 
4599
    def test_fallback_first_wins(self):
 
4600
        r = config.CredentialStoreRegistry()
 
4601
        stub1 = StubCredentialStore()
 
4602
        stub1.add_credentials("http", "example.com",
 
4603
                              "somebody", "stub1")
 
4604
        r.register("stub1", stub1, fallback=True)
 
4605
        stub2 = StubCredentialStore()
 
4606
        stub2.add_credentials("http", "example.com",
 
4607
                              "somebody", "stub2")
 
4608
        r.register("stub2", stub1, fallback=True)
 
4609
        creds = r.get_fallback_credentials("http", "example.com")
 
4610
        self.assertEqual("somebody", creds["user"])
 
4611
        self.assertEqual("stub1", creds["password"])
 
4612
 
 
4613
 
 
4614
class TestPlainTextCredentialStore(tests.TestCase):
 
4615
 
 
4616
    def test_decode_password(self):
 
4617
        r = config.credential_store_registry
 
4618
        plain_text = r.get_credential_store()
 
4619
        decoded = plain_text.decode_password(dict(password='secret'))
 
4620
        self.assertEqual('secret', decoded)
 
4621
 
 
4622
 
 
4623
class TestBase64CredentialStore(tests.TestCase):
 
4624
 
 
4625
    def test_decode_password(self):
 
4626
        r = config.credential_store_registry
 
4627
        plain_text = r.get_credential_store('base64')
 
4628
        decoded = plain_text.decode_password(dict(password='c2VjcmV0'))
 
4629
        self.assertEqual(b'secret', decoded)
 
4630
 
 
4631
 
 
4632
# FIXME: Once we have a way to declare authentication to all test servers, we
 
4633
# can implement generic tests.
 
4634
# test_user_password_in_url
 
4635
# test_user_in_url_password_from_config
 
4636
# test_user_in_url_password_prompted
 
4637
# test_user_in_config
 
4638
# test_user_getpass.getuser
 
4639
# test_user_prompted ?
 
4640
class TestAuthenticationRing(tests.TestCaseWithTransport):
 
4641
    pass
 
4642
 
 
4643
 
 
4644
class EmailOptionTests(tests.TestCase):
 
4645
 
 
4646
    def test_default_email_uses_BRZ_EMAIL(self):
 
4647
        conf = config.MemoryStack(b'email=jelmer@debian.org')
 
4648
        # BRZ_EMAIL takes precedence over EMAIL
 
4649
        self.overrideEnv('BRZ_EMAIL', 'jelmer@samba.org')
 
4650
        self.overrideEnv('EMAIL', 'jelmer@apache.org')
 
4651
        self.assertEqual('jelmer@samba.org', conf.get('email'))
 
4652
 
 
4653
    def test_default_email_uses_EMAIL(self):
 
4654
        conf = config.MemoryStack(b'')
 
4655
        self.overrideEnv('BRZ_EMAIL', None)
 
4656
        self.overrideEnv('EMAIL', 'jelmer@apache.org')
 
4657
        self.assertEqual('jelmer@apache.org', conf.get('email'))
 
4658
 
 
4659
    def test_BRZ_EMAIL_overrides(self):
 
4660
        conf = config.MemoryStack(b'email=jelmer@debian.org')
 
4661
        self.overrideEnv('BRZ_EMAIL', 'jelmer@apache.org')
 
4662
        self.assertEqual('jelmer@apache.org', conf.get('email'))
 
4663
        self.overrideEnv('BRZ_EMAIL', None)
 
4664
        self.overrideEnv('EMAIL', 'jelmer@samba.org')
 
4665
        self.assertEqual('jelmer@debian.org', conf.get('email'))
 
4666
 
 
4667
 
 
4668
class MailClientOptionTests(tests.TestCase):
 
4669
 
 
4670
    def test_default(self):
 
4671
        conf = config.MemoryStack(b'')
 
4672
        client = conf.get('mail_client')
 
4673
        self.assertIs(client, mail_client.DefaultMail)
 
4674
 
 
4675
    def test_evolution(self):
 
4676
        conf = config.MemoryStack(b'mail_client=evolution')
 
4677
        client = conf.get('mail_client')
 
4678
        self.assertIs(client, mail_client.Evolution)
 
4679
 
 
4680
    def test_kmail(self):
 
4681
        conf = config.MemoryStack(b'mail_client=kmail')
 
4682
        client = conf.get('mail_client')
 
4683
        self.assertIs(client, mail_client.KMail)
 
4684
 
 
4685
    def test_mutt(self):
 
4686
        conf = config.MemoryStack(b'mail_client=mutt')
 
4687
        client = conf.get('mail_client')
 
4688
        self.assertIs(client, mail_client.Mutt)
 
4689
 
 
4690
    def test_thunderbird(self):
 
4691
        conf = config.MemoryStack(b'mail_client=thunderbird')
 
4692
        client = conf.get('mail_client')
 
4693
        self.assertIs(client, mail_client.Thunderbird)
 
4694
 
 
4695
    def test_explicit_default(self):
 
4696
        conf = config.MemoryStack(b'mail_client=default')
 
4697
        client = conf.get('mail_client')
 
4698
        self.assertIs(client, mail_client.DefaultMail)
 
4699
 
 
4700
    def test_editor(self):
 
4701
        conf = config.MemoryStack(b'mail_client=editor')
 
4702
        client = conf.get('mail_client')
 
4703
        self.assertIs(client, mail_client.Editor)
 
4704
 
 
4705
    def test_mapi(self):
 
4706
        conf = config.MemoryStack(b'mail_client=mapi')
 
4707
        client = conf.get('mail_client')
 
4708
        self.assertIs(client, mail_client.MAPIClient)
 
4709
 
 
4710
    def test_xdg_email(self):
 
4711
        conf = config.MemoryStack(b'mail_client=xdg-email')
 
4712
        client = conf.get('mail_client')
 
4713
        self.assertIs(client, mail_client.XDGEmail)
 
4714
 
 
4715
    def test_unknown(self):
 
4716
        conf = config.MemoryStack(b'mail_client=firebird')
 
4717
        self.assertRaises(config.ConfigOptionValueError, conf.get,
 
4718
                          'mail_client')