/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_bzrdir.py

Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2013, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/per_bzr_dir.
 
20
"""
 
21
 
 
22
import os
 
23
import subprocess
 
24
import sys
 
25
 
 
26
from .. import (
 
27
    branch,
 
28
    bzr,
 
29
    config,
 
30
    controldir,
 
31
    errors,
 
32
    help_topics,
 
33
    lock,
 
34
    repository,
 
35
    revision as _mod_revision,
 
36
    osutils,
 
37
    transport as _mod_transport,
 
38
    urlutils,
 
39
    win32utils,
 
40
    )
 
41
from ..bzr import (
 
42
    branch as bzrbranch,
 
43
    bzrdir,
 
44
    remote,
 
45
    workingtree_3,
 
46
    workingtree_4,
 
47
    )
 
48
import breezy.branch
 
49
import breezy.bzr.branch
 
50
from ..bzr.fullhistory import BzrBranchFormat5
 
51
from ..errors import (
 
52
    NotBranchError,
 
53
    NoColocatedBranchSupport,
 
54
    UnknownFormatError,
 
55
    UnsupportedFormatError,
 
56
    )
 
57
from . import (
 
58
    TestCase,
 
59
    TestCaseWithMemoryTransport,
 
60
    TestCaseWithTransport,
 
61
    TestSkipped,
 
62
    )
 
63
from . import(
 
64
    http_server,
 
65
    http_utils,
 
66
    )
 
67
from ..transport import (
 
68
    memory,
 
69
    pathfilter,
 
70
    )
 
71
from ..transport.http import HttpTransport
 
72
from ..transport.nosmart import NoSmartTransportDecorator
 
73
from ..transport.readonly import ReadonlyTransportDecorator
 
74
from ..bzr import knitrepo, knitpack_repo
 
75
 
 
76
 
 
77
class TestDefaultFormat(TestCase):
 
78
 
 
79
    def test_get_set_default_format(self):
 
80
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
81
        # default is BzrDirMetaFormat1
 
82
        self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
 
83
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
 
84
        # creating a bzr dir should now create an instrumented dir.
 
85
        try:
 
86
            result = bzrdir.BzrDir.create('memory:///')
 
87
            self.assertIsInstance(result, SampleBzrDir)
 
88
        finally:
 
89
            controldir.ControlDirFormat._set_default_format(old_format)
 
90
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
91
 
 
92
 
 
93
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
 
94
    """A deprecated bzr dir format."""
 
95
 
 
96
 
 
97
class TestFormatRegistry(TestCase):
 
98
 
 
99
    def make_format_registry(self):
 
100
        my_format_registry = controldir.ControlDirFormatRegistry()
 
101
        my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
 
102
                                    'Some format.  Slower and unawesome and deprecated.',
 
103
                                    deprecated=True)
 
104
        my_format_registry.register_lazy('lazy', 'breezy.tests.test_bzrdir',
 
105
                                         'DeprecatedBzrDirFormat', 'Format registered lazily',
 
106
                                         deprecated=True)
 
107
        bzr.register_metadir(my_format_registry, 'knit',
 
108
                             'breezy.bzr.knitrepo.RepositoryFormatKnit1',
 
109
                             'Format using knits',
 
110
                             )
 
111
        my_format_registry.set_default('knit')
 
112
        bzr.register_metadir(my_format_registry,
 
113
                             'branch6',
 
114
                             'breezy.bzr.knitrepo.RepositoryFormatKnit3',
 
115
                             'Experimental successor to knit.  Use at your own risk.',
 
116
                             branch_format='breezy.bzr.branch.BzrBranchFormat6',
 
117
                             experimental=True)
 
118
        bzr.register_metadir(my_format_registry,
 
119
                             'hidden format',
 
120
                             'breezy.bzr.knitrepo.RepositoryFormatKnit3',
 
121
                             'Experimental successor to knit.  Use at your own risk.',
 
122
                             branch_format='breezy.bzr.branch.BzrBranchFormat6', hidden=True)
 
123
        my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
 
124
                                    'Old format.  Slower and does not support things. ', hidden=True)
 
125
        my_format_registry.register_lazy('hiddenlazy', 'breezy.tests.test_bzrdir',
 
126
                                         'DeprecatedBzrDirFormat', 'Format registered lazily',
 
127
                                         deprecated=True, hidden=True)
 
128
        return my_format_registry
 
129
 
 
130
    def test_format_registry(self):
 
131
        my_format_registry = self.make_format_registry()
 
132
        my_bzrdir = my_format_registry.make_controldir('lazy')
 
133
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
 
134
        my_bzrdir = my_format_registry.make_controldir('deprecated')
 
135
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
 
136
        my_bzrdir = my_format_registry.make_controldir('default')
 
137
        self.assertIsInstance(my_bzrdir.repository_format,
 
138
                              knitrepo.RepositoryFormatKnit1)
 
139
        my_bzrdir = my_format_registry.make_controldir('knit')
 
140
        self.assertIsInstance(my_bzrdir.repository_format,
 
141
                              knitrepo.RepositoryFormatKnit1)
 
142
        my_bzrdir = my_format_registry.make_controldir('branch6')
 
143
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
144
                              breezy.bzr.branch.BzrBranchFormat6)
 
145
 
 
146
    def test_get_help(self):
 
147
        my_format_registry = self.make_format_registry()
 
148
        self.assertEqual('Format registered lazily',
 
149
                         my_format_registry.get_help('lazy'))
 
150
        self.assertEqual('Format using knits',
 
151
                         my_format_registry.get_help('knit'))
 
152
        self.assertEqual('Format using knits',
 
153
                         my_format_registry.get_help('default'))
 
154
        self.assertEqual('Some format.  Slower and unawesome and deprecated.',
 
155
                         my_format_registry.get_help('deprecated'))
 
156
 
 
157
    def test_help_topic(self):
 
158
        topics = help_topics.HelpTopicRegistry()
 
159
        registry = self.make_format_registry()
 
160
        topics.register('current-formats', registry.help_topic,
 
161
                        'Current formats')
 
162
        topics.register('other-formats', registry.help_topic,
 
163
                        'Other formats')
 
164
        new = topics.get_detail('current-formats')
 
165
        rest = topics.get_detail('other-formats')
 
166
        experimental, deprecated = rest.split('Deprecated formats')
 
167
        self.assertContainsRe(new, 'formats-help')
 
168
        self.assertContainsRe(new,
 
169
                              ':knit:\n    \\(native\\) \\(default\\) Format using knits\n')
 
170
        self.assertContainsRe(experimental,
 
171
                              ':branch6:\n    \\(native\\) Experimental successor to knit')
 
172
        self.assertContainsRe(deprecated,
 
173
                              ':lazy:\n    \\(native\\) Format registered lazily\n')
 
174
        self.assertNotContainsRe(new, 'hidden')
 
175
 
 
176
    def test_set_default_repository(self):
 
177
        default_factory = controldir.format_registry.get('default')
 
178
        old_default = [k for k, v in controldir.format_registry.iteritems()
 
179
                       if v == default_factory and k != 'default'][0]
 
180
        controldir.format_registry.set_default_repository(
 
181
            'dirstate-with-subtree')
 
182
        try:
 
183
            self.assertIs(controldir.format_registry.get('dirstate-with-subtree'),
 
184
                          controldir.format_registry.get('default'))
 
185
            self.assertIs(
 
186
                repository.format_registry.get_default().__class__,
 
187
                knitrepo.RepositoryFormatKnit3)
 
188
        finally:
 
189
            controldir.format_registry.set_default_repository(old_default)
 
190
 
 
191
    def test_aliases(self):
 
192
        a_registry = controldir.ControlDirFormatRegistry()
 
193
        a_registry.register('deprecated', DeprecatedBzrDirFormat,
 
194
                            'Old format.  Slower and does not support stuff',
 
195
                            deprecated=True)
 
196
        a_registry.register_alias('deprecatedalias', 'deprecated')
 
197
        self.assertEqual({'deprecatedalias': 'deprecated'},
 
198
                         a_registry.aliases())
 
199
 
 
200
 
 
201
class SampleBranch(breezy.branch.Branch):
 
202
    """A dummy branch for guess what, dummy use."""
 
203
 
 
204
    def __init__(self, dir):
 
205
        self.controldir = dir
 
206
 
 
207
 
 
208
class SampleRepository(breezy.repository.Repository):
 
209
    """A dummy repo."""
 
210
 
 
211
    def __init__(self, dir):
 
212
        self.controldir = dir
 
213
 
 
214
 
 
215
class SampleBzrDir(bzrdir.BzrDir):
 
216
    """A sample BzrDir implementation to allow testing static methods."""
 
217
 
 
218
    def create_repository(self, shared=False):
 
219
        """See ControlDir.create_repository."""
 
220
        return "A repository"
 
221
 
 
222
    def open_repository(self):
 
223
        """See ControlDir.open_repository."""
 
224
        return SampleRepository(self)
 
225
 
 
226
    def create_branch(self, name=None):
 
227
        """See ControlDir.create_branch."""
 
228
        if name is not None:
 
229
            raise NoColocatedBranchSupport(self)
 
230
        return SampleBranch(self)
 
231
 
 
232
    def create_workingtree(self):
 
233
        """See ControlDir.create_workingtree."""
 
234
        return "A tree"
 
235
 
 
236
 
 
237
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
238
    """A sample format
 
239
 
 
240
    this format is initializable, unsupported to aid in testing the
 
241
    open and open_downlevel routines.
 
242
    """
 
243
 
 
244
    def get_format_string(self):
 
245
        """See BzrDirFormat.get_format_string()."""
 
246
        return b"Sample .bzr dir format."
 
247
 
 
248
    def initialize_on_transport(self, t):
 
249
        """Create a bzr dir."""
 
250
        t.mkdir('.bzr')
 
251
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
252
        return SampleBzrDir(t, self)
 
253
 
 
254
    def is_supported(self):
 
255
        return False
 
256
 
 
257
    def open(self, transport, _found=None):
 
258
        return "opened branch."
 
259
 
 
260
    @classmethod
 
261
    def from_string(cls, format_string):
 
262
        return cls()
 
263
 
 
264
 
 
265
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
 
266
 
 
267
    @staticmethod
 
268
    def get_format_string():
 
269
        return b"Test format 1"
 
270
 
 
271
 
 
272
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
 
273
 
 
274
    @staticmethod
 
275
    def get_format_string():
 
276
        return b"Test format 2"
 
277
 
 
278
 
 
279
class TestBzrDirFormat(TestCaseWithTransport):
 
280
    """Tests for the BzrDirFormat facility."""
 
281
 
 
282
    def test_find_format(self):
 
283
        # is the right format object found for a branch?
 
284
        # create a branch with a few known format objects.
 
285
        bzr.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
 
286
                                       BzrDirFormatTest1())
 
287
        self.addCleanup(bzr.BzrProber.formats.remove,
 
288
                        BzrDirFormatTest1.get_format_string())
 
289
        bzr.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
 
290
                                       BzrDirFormatTest2())
 
291
        self.addCleanup(bzr.BzrProber.formats.remove,
 
292
                        BzrDirFormatTest2.get_format_string())
 
293
        t = self.get_transport()
 
294
        self.build_tree(["foo/", "bar/"], transport=t)
 
295
 
 
296
        def check_format(format, url):
 
297
            format.initialize(url)
 
298
            t = _mod_transport.get_transport_from_path(url)
 
299
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
300
            self.assertIsInstance(found_format, format.__class__)
 
301
        check_format(BzrDirFormatTest1(), "foo")
 
302
        check_format(BzrDirFormatTest2(), "bar")
 
303
 
 
304
    def test_find_format_nothing_there(self):
 
305
        self.assertRaises(NotBranchError,
 
306
                          bzrdir.BzrDirFormat.find_format,
 
307
                          _mod_transport.get_transport_from_path('.'))
 
308
 
 
309
    def test_find_format_unknown_format(self):
 
310
        t = self.get_transport()
 
311
        t.mkdir('.bzr')
 
312
        t.put_bytes('.bzr/branch-format', b'')
 
313
        self.assertRaises(UnknownFormatError,
 
314
                          bzrdir.BzrDirFormat.find_format,
 
315
                          _mod_transport.get_transport_from_path('.'))
 
316
 
 
317
    def test_find_format_line_endings(self):
 
318
        t = self.get_transport()
 
319
        t.mkdir('.bzr')
 
320
        t.put_bytes('.bzr/branch-format', b'Corrupt line endings\r\n')
 
321
        self.assertRaises(errors.LineEndingError,
 
322
                          bzrdir.BzrDirFormat.find_format,
 
323
                          _mod_transport.get_transport_from_path('.'))
 
324
 
 
325
    def test_register_unregister_format(self):
 
326
        format = SampleBzrDirFormat()
 
327
        url = self.get_url()
 
328
        # make a bzrdir
 
329
        format.initialize(url)
 
330
        # register a format for it.
 
331
        bzr.BzrProber.formats.register(format.get_format_string(), format)
 
332
        # which bzrdir.Open will refuse (not supported)
 
333
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
334
        # which bzrdir.open_containing will refuse (not supported)
 
335
        self.assertRaises(UnsupportedFormatError,
 
336
                          bzrdir.BzrDir.open_containing, url)
 
337
        # but open_downlevel will work
 
338
        t = _mod_transport.get_transport_from_url(url)
 
339
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
340
        # unregister the format
 
341
        bzr.BzrProber.formats.remove(format.get_format_string())
 
342
        # now open_downlevel should fail too.
 
343
        self.assertRaises(UnknownFormatError,
 
344
                          bzrdir.BzrDir.open_unsupported, url)
 
345
 
 
346
    def test_create_branch_and_repo_uses_default(self):
 
347
        format = SampleBzrDirFormat()
 
348
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
349
                                                      format=format)
 
350
        self.assertTrue(isinstance(branch, SampleBranch))
 
351
 
 
352
    def test_create_branch_and_repo_under_shared(self):
 
353
        # creating a branch and repo in a shared repo uses the
 
354
        # shared repository
 
355
        format = controldir.format_registry.make_controldir('knit')
 
356
        self.make_repository('.', shared=True, format=format)
 
357
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
358
            self.get_url('child'), format=format)
 
359
        self.assertRaises(errors.NoRepositoryPresent,
 
360
                          branch.controldir.open_repository)
 
361
 
 
362
    def test_create_branch_and_repo_under_shared_force_new(self):
 
363
        # creating a branch and repo in a shared repo can be forced to
 
364
        # make a new repo
 
365
        format = controldir.format_registry.make_controldir('knit')
 
366
        self.make_repository('.', shared=True, format=format)
 
367
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
368
                                                      force_new_repo=True,
 
369
                                                      format=format)
 
370
        branch.controldir.open_repository()
 
371
 
 
372
    def test_create_standalone_working_tree(self):
 
373
        format = SampleBzrDirFormat()
 
374
        # note this is deliberately readonly, as this failure should
 
375
        # occur before any writes.
 
376
        self.assertRaises(errors.NotLocalUrl,
 
377
                          bzrdir.BzrDir.create_standalone_workingtree,
 
378
                          self.get_readonly_url(), format=format)
 
379
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
380
                                                           format=format)
 
381
        self.assertEqual('A tree', tree)
 
382
 
 
383
    def test_create_standalone_working_tree_under_shared_repo(self):
 
384
        # create standalone working tree always makes a repo.
 
385
        format = controldir.format_registry.make_controldir('knit')
 
386
        self.make_repository('.', shared=True, format=format)
 
387
        # note this is deliberately readonly, as this failure should
 
388
        # occur before any writes.
 
389
        self.assertRaises(errors.NotLocalUrl,
 
390
                          bzrdir.BzrDir.create_standalone_workingtree,
 
391
                          self.get_readonly_url('child'), format=format)
 
392
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
393
                                                           format=format)
 
394
        tree.controldir.open_repository()
 
395
 
 
396
    def test_create_branch_convenience(self):
 
397
        # outside a repo the default convenience output is a repo+branch_tree
 
398
        format = controldir.format_registry.make_controldir('knit')
 
399
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
400
        branch.controldir.open_workingtree()
 
401
        branch.controldir.open_repository()
 
402
 
 
403
    def test_create_branch_convenience_possible_transports(self):
 
404
        """Check that the optional 'possible_transports' is recognized"""
 
405
        format = controldir.format_registry.make_controldir('knit')
 
406
        t = self.get_transport()
 
407
        branch = bzrdir.BzrDir.create_branch_convenience(
 
408
            '.', format=format, possible_transports=[t])
 
409
        branch.controldir.open_workingtree()
 
410
        branch.controldir.open_repository()
 
411
 
 
412
    def test_create_branch_convenience_root(self):
 
413
        """Creating a branch at the root of a fs should work."""
 
414
        self.vfs_transport_factory = memory.MemoryServer
 
415
        # outside a repo the default convenience output is a repo+branch_tree
 
416
        format = controldir.format_registry.make_controldir('knit')
 
417
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
418
                                                         format=format)
 
419
        self.assertRaises(errors.NoWorkingTree,
 
420
                          branch.controldir.open_workingtree)
 
421
        branch.controldir.open_repository()
 
422
 
 
423
    def test_create_branch_convenience_under_shared_repo(self):
 
424
        # inside a repo the default convenience output is a branch+ follow the
 
425
        # repo tree policy
 
426
        format = controldir.format_registry.make_controldir('knit')
 
427
        self.make_repository('.', shared=True, format=format)
 
428
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
429
                                                         format=format)
 
430
        branch.controldir.open_workingtree()
 
431
        self.assertRaises(errors.NoRepositoryPresent,
 
432
                          branch.controldir.open_repository)
 
433
 
 
434
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
435
        # inside a repo the default convenience output is a branch+ follow the
 
436
        # repo tree policy but we can override that
 
437
        format = controldir.format_registry.make_controldir('knit')
 
438
        self.make_repository('.', shared=True, format=format)
 
439
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
440
                                                         force_new_tree=False, format=format)
 
441
        self.assertRaises(errors.NoWorkingTree,
 
442
                          branch.controldir.open_workingtree)
 
443
        self.assertRaises(errors.NoRepositoryPresent,
 
444
                          branch.controldir.open_repository)
 
445
 
 
446
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
447
        # inside a repo the default convenience output is a branch+ follow the
 
448
        # repo tree policy
 
449
        format = controldir.format_registry.make_controldir('knit')
 
450
        repo = self.make_repository('.', shared=True, format=format)
 
451
        repo.set_make_working_trees(False)
 
452
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
453
                                                         format=format)
 
454
        self.assertRaises(errors.NoWorkingTree,
 
455
                          branch.controldir.open_workingtree)
 
456
        self.assertRaises(errors.NoRepositoryPresent,
 
457
                          branch.controldir.open_repository)
 
458
 
 
459
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
460
        # inside a repo the default convenience output is a branch+ follow the
 
461
        # repo tree policy but we can override that
 
462
        format = controldir.format_registry.make_controldir('knit')
 
463
        repo = self.make_repository('.', shared=True, format=format)
 
464
        repo.set_make_working_trees(False)
 
465
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
466
                                                         force_new_tree=True, format=format)
 
467
        branch.controldir.open_workingtree()
 
468
        self.assertRaises(errors.NoRepositoryPresent,
 
469
                          branch.controldir.open_repository)
 
470
 
 
471
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
472
        # inside a repo the default convenience output is overridable to give
 
473
        # repo+branch+tree
 
474
        format = controldir.format_registry.make_controldir('knit')
 
475
        self.make_repository('.', shared=True, format=format)
 
476
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
477
                                                         force_new_repo=True, format=format)
 
478
        branch.controldir.open_repository()
 
479
        branch.controldir.open_workingtree()
 
480
 
 
481
 
 
482
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
483
 
 
484
    def test_acquire_repository_standalone(self):
 
485
        """The default acquisition policy should create a standalone branch."""
 
486
        my_bzrdir = self.make_controldir('.')
 
487
        repo_policy = my_bzrdir.determine_repository_policy()
 
488
        repo, is_new = repo_policy.acquire_repository()
 
489
        self.assertEqual(repo.controldir.root_transport.base,
 
490
                         my_bzrdir.root_transport.base)
 
491
        self.assertFalse(repo.is_shared())
 
492
 
 
493
    def test_determine_stacking_policy(self):
 
494
        parent_bzrdir = self.make_controldir('.')
 
495
        child_bzrdir = self.make_controldir('child')
 
496
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
 
497
        repo_policy = child_bzrdir.determine_repository_policy()
 
498
        self.assertEqual('http://example.org', repo_policy._stack_on)
 
499
 
 
500
    def test_determine_stacking_policy_relative(self):
 
501
        parent_bzrdir = self.make_controldir('.')
 
502
        child_bzrdir = self.make_controldir('child')
 
503
        parent_bzrdir.get_config().set_default_stack_on('child2')
 
504
        repo_policy = child_bzrdir.determine_repository_policy()
 
505
        self.assertEqual('child2', repo_policy._stack_on)
 
506
        self.assertEqual(parent_bzrdir.root_transport.base,
 
507
                         repo_policy._stack_on_pwd)
 
508
 
 
509
    def prepare_default_stacking(self, child_format='1.6'):
 
510
        parent_bzrdir = self.make_controldir('.')
 
511
        child_branch = self.make_branch('child', format=child_format)
 
512
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
 
513
        new_child_transport = parent_bzrdir.transport.clone('child2')
 
514
        return child_branch, new_child_transport
 
515
 
 
516
    def test_clone_on_transport_obeys_stacking_policy(self):
 
517
        child_branch, new_child_transport = self.prepare_default_stacking()
 
518
        new_child = child_branch.controldir.clone_on_transport(
 
519
            new_child_transport)
 
520
        self.assertEqual(child_branch.base,
 
521
                         new_child.open_branch().get_stacked_on_url())
 
522
 
 
523
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
 
524
        # Make stackable source branch with an unstackable repo format.
 
525
        source_bzrdir = self.make_controldir('source')
 
526
        knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
527
        source_branch = breezy.bzr.branch.BzrBranchFormat7().initialize(
 
528
            source_bzrdir)
 
529
        # Make a directory with a default stacking policy
 
530
        parent_bzrdir = self.make_controldir('parent')
 
531
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
 
532
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
 
533
        # Clone source into directory
 
534
        target = source_bzrdir.clone(self.get_url('parent/target'))
 
535
 
 
536
    def test_format_initialize_on_transport_ex_stacked_on(self):
 
537
        # trunk is a stackable format.  Note that its in the same server area
 
538
        # which is what launchpad does, but not sufficient to exercise the
 
539
        # general case.
 
540
        trunk = self.make_branch('trunk', format='1.9')
 
541
        t = self.get_transport('stacked')
 
542
        old_fmt = controldir.format_registry.make_controldir('pack-0.92')
 
543
        repo_name = old_fmt.repository_format.network_name()
 
544
        # Should end up with a 1.9 format (stackable)
 
545
        repo, control, require_stacking, repo_policy = \
 
546
            old_fmt.initialize_on_transport_ex(t,
 
547
                                               repo_format_name=repo_name, stacked_on='../trunk',
 
548
                                               stack_on_pwd=t.base)
 
549
        if repo is not None:
 
550
            # Repositories are open write-locked
 
551
            self.assertTrue(repo.is_write_locked())
 
552
            self.addCleanup(repo.unlock)
 
553
        else:
 
554
            repo = control.open_repository()
 
555
        self.assertIsInstance(control, bzrdir.BzrDir)
 
556
        opened = bzrdir.BzrDir.open(t.base)
 
557
        if not isinstance(old_fmt, remote.RemoteBzrDirFormat):
 
558
            self.assertEqual(control._format.network_name(),
 
559
                             old_fmt.network_name())
 
560
            self.assertEqual(control._format.network_name(),
 
561
                             opened._format.network_name())
 
562
        self.assertEqual(control.__class__, opened.__class__)
 
563
        self.assertLength(1, repo._fallback_repositories)
 
564
 
 
565
    def test_sprout_obeys_stacking_policy(self):
 
566
        child_branch, new_child_transport = self.prepare_default_stacking()
 
567
        new_child = child_branch.controldir.sprout(new_child_transport.base)
 
568
        self.assertEqual(child_branch.base,
 
569
                         new_child.open_branch().get_stacked_on_url())
 
570
 
 
571
    def test_clone_ignores_policy_for_unsupported_formats(self):
 
572
        child_branch, new_child_transport = self.prepare_default_stacking(
 
573
            child_format='pack-0.92')
 
574
        new_child = child_branch.controldir.clone_on_transport(
 
575
            new_child_transport)
 
576
        self.assertRaises(branch.UnstackableBranchFormat,
 
577
                          new_child.open_branch().get_stacked_on_url)
 
578
 
 
579
    def test_sprout_ignores_policy_for_unsupported_formats(self):
 
580
        child_branch, new_child_transport = self.prepare_default_stacking(
 
581
            child_format='pack-0.92')
 
582
        new_child = child_branch.controldir.sprout(new_child_transport.base)
 
583
        self.assertRaises(branch.UnstackableBranchFormat,
 
584
                          new_child.open_branch().get_stacked_on_url)
 
585
 
 
586
    def test_sprout_upgrades_format_if_stacked_specified(self):
 
587
        child_branch, new_child_transport = self.prepare_default_stacking(
 
588
            child_format='pack-0.92')
 
589
        new_child = child_branch.controldir.sprout(new_child_transport.base,
 
590
                                                   stacked=True)
 
591
        self.assertEqual(child_branch.controldir.root_transport.base,
 
592
                         new_child.open_branch().get_stacked_on_url())
 
593
        repo = new_child.open_repository()
 
594
        self.assertTrue(repo._format.supports_external_lookups)
 
595
        self.assertFalse(repo.supports_rich_root())
 
596
 
 
597
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
 
598
        child_branch, new_child_transport = self.prepare_default_stacking(
 
599
            child_format='pack-0.92')
 
600
        new_child = child_branch.controldir.clone_on_transport(new_child_transport,
 
601
                                                               stacked_on=child_branch.controldir.root_transport.base)
 
602
        self.assertEqual(child_branch.controldir.root_transport.base,
 
603
                         new_child.open_branch().get_stacked_on_url())
 
604
        repo = new_child.open_repository()
 
605
        self.assertTrue(repo._format.supports_external_lookups)
 
606
        self.assertFalse(repo.supports_rich_root())
 
607
 
 
608
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
 
609
        child_branch, new_child_transport = self.prepare_default_stacking(
 
610
            child_format='rich-root-pack')
 
611
        new_child = child_branch.controldir.sprout(new_child_transport.base,
 
612
                                                   stacked=True)
 
613
        repo = new_child.open_repository()
 
614
        self.assertTrue(repo._format.supports_external_lookups)
 
615
        self.assertTrue(repo.supports_rich_root())
 
616
 
 
617
    def test_add_fallback_repo_handles_absolute_urls(self):
 
618
        stack_on = self.make_branch('stack_on', format='1.6')
 
619
        repo = self.make_repository('repo', format='1.6')
 
620
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
 
621
        policy._add_fallback(repo)
 
622
 
 
623
    def test_add_fallback_repo_handles_relative_urls(self):
 
624
        stack_on = self.make_branch('stack_on', format='1.6')
 
625
        repo = self.make_repository('repo', format='1.6')
 
626
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
 
627
        policy._add_fallback(repo)
 
628
 
 
629
    def test_configure_relative_branch_stacking_url(self):
 
630
        stack_on = self.make_branch('stack_on', format='1.6')
 
631
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
632
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
633
                                              '.', stack_on.base)
 
634
        policy.configure_branch(stacked)
 
635
        self.assertEqual('..', stacked.get_stacked_on_url())
 
636
 
 
637
    def test_relative_branch_stacking_to_absolute(self):
 
638
        stack_on = self.make_branch('stack_on', format='1.6')
 
639
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
640
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
641
                                              '.', self.get_readonly_url('stack_on'))
 
642
        policy.configure_branch(stacked)
 
643
        self.assertEqual(self.get_readonly_url('stack_on'),
 
644
                         stacked.get_stacked_on_url())
 
645
 
 
646
 
 
647
class ChrootedTests(TestCaseWithTransport):
 
648
    """A support class that provides readonly urls outside the local namespace.
 
649
 
 
650
    This is done by checking if self.transport_server is a MemoryServer. if it
 
651
    is then we are chrooted already, if it is not then an HttpServer is used
 
652
    for readonly urls.
 
653
    """
 
654
 
 
655
    def setUp(self):
 
656
        super(ChrootedTests, self).setUp()
 
657
        if not self.vfs_transport_factory == memory.MemoryServer:
 
658
            self.transport_readonly_server = http_server.HttpServer
 
659
 
 
660
    def local_branch_path(self, branch):
 
661
        return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
662
 
 
663
    def test_open_containing(self):
 
664
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
665
                          self.get_readonly_url(''))
 
666
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
667
                          self.get_readonly_url('g/p/q'))
 
668
        control = bzrdir.BzrDir.create(self.get_url())
 
669
        branch, relpath = bzrdir.BzrDir.open_containing(
 
670
            self.get_readonly_url(''))
 
671
        self.assertEqual('', relpath)
 
672
        branch, relpath = bzrdir.BzrDir.open_containing(
 
673
            self.get_readonly_url('g/p/q'))
 
674
        self.assertEqual('g/p/q', relpath)
 
675
 
 
676
    def test_open_containing_tree_branch_or_repository_empty(self):
 
677
        self.assertRaises(errors.NotBranchError,
 
678
                          bzrdir.BzrDir.open_containing_tree_branch_or_repository,
 
679
                          self.get_readonly_url(''))
 
680
 
 
681
    def test_open_containing_tree_branch_or_repository_all(self):
 
682
        self.make_branch_and_tree('topdir')
 
683
        tree, branch, repo, relpath = \
 
684
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
685
                'topdir/foo')
 
686
        self.assertEqual(os.path.realpath('topdir'),
 
687
                         os.path.realpath(tree.basedir))
 
688
        self.assertEqual(os.path.realpath('topdir'),
 
689
                         self.local_branch_path(branch))
 
690
        self.assertEqual(
 
691
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
 
692
            repo.controldir.transport.local_abspath('repository'))
 
693
        self.assertEqual(relpath, 'foo')
 
694
 
 
695
    def test_open_containing_tree_branch_or_repository_no_tree(self):
 
696
        self.make_branch('branch')
 
697
        tree, branch, repo, relpath = \
 
698
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
699
                'branch/foo')
 
700
        self.assertEqual(tree, None)
 
701
        self.assertEqual(os.path.realpath('branch'),
 
702
                         self.local_branch_path(branch))
 
703
        self.assertEqual(
 
704
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
 
705
            repo.controldir.transport.local_abspath('repository'))
 
706
        self.assertEqual(relpath, 'foo')
 
707
 
 
708
    def test_open_containing_tree_branch_or_repository_repo(self):
 
709
        self.make_repository('repo')
 
710
        tree, branch, repo, relpath = \
 
711
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
712
                'repo')
 
713
        self.assertEqual(tree, None)
 
714
        self.assertEqual(branch, None)
 
715
        self.assertEqual(
 
716
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
 
717
            repo.controldir.transport.local_abspath('repository'))
 
718
        self.assertEqual(relpath, '')
 
719
 
 
720
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
 
721
        self.make_repository('shared', shared=True)
 
722
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
 
723
                                                force_new_tree=False)
 
724
        tree, branch, repo, relpath = \
 
725
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
726
                'shared/branch')
 
727
        self.assertEqual(tree, None)
 
728
        self.assertEqual(os.path.realpath('shared/branch'),
 
729
                         self.local_branch_path(branch))
 
730
        self.assertEqual(
 
731
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
 
732
            repo.controldir.transport.local_abspath('repository'))
 
733
        self.assertEqual(relpath, '')
 
734
 
 
735
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
 
736
        self.make_branch_and_tree('foo')
 
737
        self.build_tree(['foo/bar/'])
 
738
        tree, branch, repo, relpath = \
 
739
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
740
                'foo/bar')
 
741
        self.assertEqual(os.path.realpath('foo'),
 
742
                         os.path.realpath(tree.basedir))
 
743
        self.assertEqual(os.path.realpath('foo'),
 
744
                         self.local_branch_path(branch))
 
745
        self.assertEqual(
 
746
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
 
747
            repo.controldir.transport.local_abspath('repository'))
 
748
        self.assertEqual(relpath, 'bar')
 
749
 
 
750
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
 
751
        self.make_repository('bar')
 
752
        self.build_tree(['bar/baz/'])
 
753
        tree, branch, repo, relpath = \
 
754
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
755
                'bar/baz')
 
756
        self.assertEqual(tree, None)
 
757
        self.assertEqual(branch, None)
 
758
        self.assertEqual(
 
759
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
 
760
            repo.controldir.transport.local_abspath('repository'))
 
761
        self.assertEqual(relpath, 'baz')
 
762
 
 
763
    def test_open_containing_from_transport(self):
 
764
        self.assertRaises(NotBranchError,
 
765
                          bzrdir.BzrDir.open_containing_from_transport,
 
766
                          _mod_transport.get_transport_from_url(self.get_readonly_url('')))
 
767
        self.assertRaises(NotBranchError,
 
768
                          bzrdir.BzrDir.open_containing_from_transport,
 
769
                          _mod_transport.get_transport_from_url(
 
770
                              self.get_readonly_url('g/p/q')))
 
771
        control = bzrdir.BzrDir.create(self.get_url())
 
772
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
773
            _mod_transport.get_transport_from_url(
 
774
                self.get_readonly_url('')))
 
775
        self.assertEqual('', relpath)
 
776
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
777
            _mod_transport.get_transport_from_url(
 
778
                self.get_readonly_url('g/p/q')))
 
779
        self.assertEqual('g/p/q', relpath)
 
780
 
 
781
    def test_open_containing_tree_or_branch(self):
 
782
        self.make_branch_and_tree('topdir')
 
783
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
784
            'topdir/foo')
 
785
        self.assertEqual(os.path.realpath('topdir'),
 
786
                         os.path.realpath(tree.basedir))
 
787
        self.assertEqual(os.path.realpath('topdir'),
 
788
                         self.local_branch_path(branch))
 
789
        self.assertIs(tree.controldir, branch.controldir)
 
790
        self.assertEqual('foo', relpath)
 
791
        # opening from non-local should not return the tree
 
792
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
793
            self.get_readonly_url('topdir/foo'))
 
794
        self.assertEqual(None, tree)
 
795
        self.assertEqual('foo', relpath)
 
796
        # without a tree:
 
797
        self.make_branch('topdir/foo')
 
798
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
799
            'topdir/foo')
 
800
        self.assertIs(tree, None)
 
801
        self.assertEqual(os.path.realpath('topdir/foo'),
 
802
                         self.local_branch_path(branch))
 
803
        self.assertEqual('', relpath)
 
804
 
 
805
    def test_open_tree_or_branch(self):
 
806
        self.make_branch_and_tree('topdir')
 
807
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
808
        self.assertEqual(os.path.realpath('topdir'),
 
809
                         os.path.realpath(tree.basedir))
 
810
        self.assertEqual(os.path.realpath('topdir'),
 
811
                         self.local_branch_path(branch))
 
812
        self.assertIs(tree.controldir, branch.controldir)
 
813
        # opening from non-local should not return the tree
 
814
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
815
            self.get_readonly_url('topdir'))
 
816
        self.assertEqual(None, tree)
 
817
        # without a tree:
 
818
        self.make_branch('topdir/foo')
 
819
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
820
        self.assertIs(tree, None)
 
821
        self.assertEqual(os.path.realpath('topdir/foo'),
 
822
                         self.local_branch_path(branch))
 
823
 
 
824
    def test_open_from_transport(self):
 
825
        # transport pointing at bzrdir should give a bzrdir with root transport
 
826
        # set to the given transport
 
827
        control = bzrdir.BzrDir.create(self.get_url())
 
828
        t = self.get_transport()
 
829
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
 
830
        self.assertEqual(t.base, opened_bzrdir.root_transport.base)
 
831
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
832
 
 
833
    def test_open_from_transport_no_bzrdir(self):
 
834
        t = self.get_transport()
 
835
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
 
836
 
 
837
    def test_open_from_transport_bzrdir_in_parent(self):
 
838
        control = bzrdir.BzrDir.create(self.get_url())
 
839
        t = self.get_transport()
 
840
        t.mkdir('subdir')
 
841
        t = t.clone('subdir')
 
842
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
 
843
 
 
844
    def test_sprout_recursive(self):
 
845
        tree = self.make_branch_and_tree('tree1',
 
846
                                         format='development-subtree')
 
847
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
848
                                             format='development-subtree')
 
849
        sub_tree.set_root_id(b'subtree-root')
 
850
        tree.add_reference(sub_tree)
 
851
        self.build_tree(['tree1/subtree/file'])
 
852
        sub_tree.add('file')
 
853
        tree.commit('Initial commit')
 
854
        tree2 = tree.controldir.sprout('tree2').open_workingtree()
 
855
        tree2.lock_read()
 
856
        self.addCleanup(tree2.unlock)
 
857
        self.assertPathExists('tree2/subtree/file')
 
858
        self.assertEqual(
 
859
            'tree-reference',
 
860
            tree2.kind('subtree', 'subtree-root'))
 
861
 
 
862
    def test_cloning_metadir(self):
 
863
        """Ensure that cloning metadir is suitable"""
 
864
        bzrdir = self.make_controldir('bzrdir')
 
865
        bzrdir.cloning_metadir()
 
866
        branch = self.make_branch('branch', format='knit')
 
867
        format = branch.controldir.cloning_metadir()
 
868
        self.assertIsInstance(format.workingtree_format,
 
869
                              workingtree_4.WorkingTreeFormat6)
 
870
 
 
871
    def test_sprout_recursive_treeless(self):
 
872
        tree = self.make_branch_and_tree('tree1',
 
873
                                         format='development-subtree')
 
874
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
875
                                             format='development-subtree')
 
876
        tree.add_reference(sub_tree)
 
877
        self.build_tree(['tree1/subtree/file'])
 
878
        sub_tree.add('file')
 
879
        tree.commit('Initial commit')
 
880
        # The following line force the orhaning to reveal bug #634470
 
881
        tree.branch.get_config_stack().set(
 
882
            'transform.orphan_policy', 'move')
 
883
        tree.controldir.destroy_workingtree()
 
884
        # FIXME: subtree/.bzr is left here which allows the test to pass (or
 
885
        # fail :-( ) -- vila 20100909
 
886
        repo = self.make_repository('repo', shared=True,
 
887
                                    format='development-subtree')
 
888
        repo.set_make_working_trees(False)
 
889
        # FIXME: we just deleted the workingtree and now we want to use it ????
 
890
        # At a minimum, we should use tree.branch below (but this fails too
 
891
        # currently) or stop calling this test 'treeless'. Specifically, I've
 
892
        # turn the line below into an assertRaises when 'subtree/.bzr' is
 
893
        # orphaned and sprout tries to access the branch there (which is left
 
894
        # by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
 
895
        # [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
 
896
        # #634470.  -- vila 20100909
 
897
        self.assertRaises(errors.NotBranchError,
 
898
                          tree.controldir.sprout, 'repo/tree2')
 
899
#        self.assertPathExists('repo/tree2/subtree')
 
900
#        self.assertPathDoesNotExist('repo/tree2/subtree/file')
 
901
 
 
902
    def make_foo_bar_baz(self):
 
903
        foo = bzrdir.BzrDir.create_branch_convenience('foo').controldir
 
904
        bar = self.make_branch('foo/bar').controldir
 
905
        baz = self.make_branch('baz').controldir
 
906
        return foo, bar, baz
 
907
 
 
908
    def test_find_controldirs(self):
 
909
        foo, bar, baz = self.make_foo_bar_baz()
 
910
        t = self.get_transport()
 
911
        self.assertEqualBzrdirs(
 
912
            [baz, foo, bar], bzrdir.BzrDir.find_controldirs(t))
 
913
 
 
914
    def make_fake_permission_denied_transport(self, transport, paths):
 
915
        """Create a transport that raises PermissionDenied for some paths."""
 
916
        def filter(path):
 
917
            if path in paths:
 
918
                raise errors.PermissionDenied(path)
 
919
            return path
 
920
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
 
921
        path_filter_server.start_server()
 
922
        self.addCleanup(path_filter_server.stop_server)
 
923
        path_filter_transport = pathfilter.PathFilteringTransport(
 
924
            path_filter_server, '.')
 
925
        return (path_filter_server, path_filter_transport)
 
926
 
 
927
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
 
928
        """Check that each branch url ends with the given suffix."""
 
929
        for actual_bzrdir in actual_bzrdirs:
 
930
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
 
931
 
 
932
    def test_find_controldirs_permission_denied(self):
 
933
        foo, bar, baz = self.make_foo_bar_baz()
 
934
        t = self.get_transport()
 
935
        path_filter_server, path_filter_transport = \
 
936
            self.make_fake_permission_denied_transport(t, ['foo'])
 
937
        # local transport
 
938
        self.assertBranchUrlsEndWith('/baz/',
 
939
                                     bzrdir.BzrDir.find_controldirs(path_filter_transport))
 
940
        # smart server
 
941
        smart_transport = self.make_smart_server('.',
 
942
                                                 backing_server=path_filter_server)
 
943
        self.assertBranchUrlsEndWith('/baz/',
 
944
                                     bzrdir.BzrDir.find_controldirs(smart_transport))
 
945
 
 
946
    def test_find_controldirs_list_current(self):
 
947
        def list_current(transport):
 
948
            return [s for s in transport.list_dir('') if s != 'baz']
 
949
 
 
950
        foo, bar, baz = self.make_foo_bar_baz()
 
951
        t = self.get_transport()
 
952
        self.assertEqualBzrdirs(
 
953
            [foo, bar],
 
954
            bzrdir.BzrDir.find_controldirs(t, list_current=list_current))
 
955
 
 
956
    def test_find_controldirs_evaluate(self):
 
957
        def evaluate(bzrdir):
 
958
            try:
 
959
                repo = bzrdir.open_repository()
 
960
            except errors.NoRepositoryPresent:
 
961
                return True, bzrdir.root_transport.base
 
962
            else:
 
963
                return False, bzrdir.root_transport.base
 
964
 
 
965
        foo, bar, baz = self.make_foo_bar_baz()
 
966
        t = self.get_transport()
 
967
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
968
                         list(bzrdir.BzrDir.find_controldirs(t, evaluate=evaluate)))
 
969
 
 
970
    def assertEqualBzrdirs(self, first, second):
 
971
        first = list(first)
 
972
        second = list(second)
 
973
        self.assertEqual(len(first), len(second))
 
974
        for x, y in zip(first, second):
 
975
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
976
 
 
977
    def test_find_branches(self):
 
978
        root = self.make_repository('', shared=True)
 
979
        foo, bar, baz = self.make_foo_bar_baz()
 
980
        qux = self.make_controldir('foo/qux')
 
981
        t = self.get_transport()
 
982
        branches = bzrdir.BzrDir.find_branches(t)
 
983
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
984
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
985
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
986
 
 
987
        # ensure this works without a top-level repo
 
988
        branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
 
989
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
990
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
991
 
 
992
 
 
993
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
 
994
 
 
995
    def test_find_controldirs_missing_repo(self):
 
996
        t = self.get_transport()
 
997
        arepo = self.make_repository('arepo', shared=True)
 
998
        abranch_url = arepo.user_url + '/abranch'
 
999
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
 
1000
        t.delete_tree('arepo/.bzr')
 
1001
        self.assertRaises(errors.NoRepositoryPresent,
 
1002
                          branch.Branch.open, abranch_url)
 
1003
        self.make_branch('baz')
 
1004
        for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
 
1005
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
 
1006
 
 
1007
 
 
1008
class TestMeta1DirFormat(TestCaseWithTransport):
 
1009
    """Tests specific to the meta1 dir format."""
 
1010
 
 
1011
    def test_right_base_dirs(self):
 
1012
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
1013
        t = dir.transport
 
1014
        branch_base = t.clone('branch').base
 
1015
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
1016
        self.assertEqual(branch_base,
 
1017
                         dir.get_branch_transport(BzrBranchFormat5()).base)
 
1018
        repository_base = t.clone('repository').base
 
1019
        self.assertEqual(
 
1020
            repository_base, dir.get_repository_transport(None).base)
 
1021
        repository_format = repository.format_registry.get_default()
 
1022
        self.assertEqual(repository_base,
 
1023
                         dir.get_repository_transport(repository_format).base)
 
1024
        checkout_base = t.clone('checkout').base
 
1025
        self.assertEqual(
 
1026
            checkout_base, dir.get_workingtree_transport(None).base)
 
1027
        self.assertEqual(checkout_base,
 
1028
                         dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
 
1029
 
 
1030
    def test_meta1dir_uses_lockdir(self):
 
1031
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
1032
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
1033
        t = dir.transport
 
1034
        self.assertIsDirectory('branch-lock', t)
 
1035
 
 
1036
    def test_comparison(self):
 
1037
        """Equality and inequality behave properly.
 
1038
 
 
1039
        Metadirs should compare equal iff they have the same repo, branch and
 
1040
        tree formats.
 
1041
        """
 
1042
        mydir = controldir.format_registry.make_controldir('knit')
 
1043
        self.assertEqual(mydir, mydir)
 
1044
        self.assertFalse(mydir != mydir)
 
1045
        otherdir = controldir.format_registry.make_controldir('knit')
 
1046
        self.assertEqual(otherdir, mydir)
 
1047
        self.assertFalse(otherdir != mydir)
 
1048
        otherdir2 = controldir.format_registry.make_controldir(
 
1049
            'development-subtree')
 
1050
        self.assertNotEqual(otherdir2, mydir)
 
1051
        self.assertFalse(otherdir2 == mydir)
 
1052
 
 
1053
    def test_with_features(self):
 
1054
        tree = self.make_branch_and_tree('tree', format='2a')
 
1055
        tree.controldir.update_feature_flags({b"bar": b"required"})
 
1056
        self.assertRaises(bzrdir.MissingFeature, bzrdir.BzrDir.open, 'tree')
 
1057
        bzrdir.BzrDirMetaFormat1.register_feature(b'bar')
 
1058
        self.addCleanup(bzrdir.BzrDirMetaFormat1.unregister_feature, b'bar')
 
1059
        dir = bzrdir.BzrDir.open('tree')
 
1060
        self.assertEqual(b"required", dir._format.features.get(b"bar"))
 
1061
        tree.controldir.update_feature_flags({
 
1062
            b"bar": None,
 
1063
            b"nonexistant": None})
 
1064
        dir = bzrdir.BzrDir.open('tree')
 
1065
        self.assertEqual({}, dir._format.features)
 
1066
 
 
1067
    def test_needs_conversion_different_working_tree(self):
 
1068
        # meta1dirs need an conversion if any element is not the default.
 
1069
        new_format = controldir.format_registry.make_controldir('dirstate')
 
1070
        tree = self.make_branch_and_tree('tree', format='knit')
 
1071
        self.assertTrue(tree.controldir.needs_format_conversion(
 
1072
            new_format))
 
1073
 
 
1074
    def test_initialize_on_format_uses_smart_transport(self):
 
1075
        self.setup_smart_server_with_call_log()
 
1076
        new_format = controldir.format_registry.make_controldir('dirstate')
 
1077
        transport = self.get_transport('target')
 
1078
        transport.ensure_base()
 
1079
        self.reset_smart_call_log()
 
1080
        instance = new_format.initialize_on_transport(transport)
 
1081
        self.assertIsInstance(instance, remote.RemoteBzrDir)
 
1082
        rpc_count = len(self.hpss_calls)
 
1083
        # This figure represent the amount of work to perform this use case. It
 
1084
        # is entirely ok to reduce this number if a test fails due to rpc_count
 
1085
        # being too low. If rpc_count increases, more network roundtrips have
 
1086
        # become necessary for this use case. Please do not adjust this number
 
1087
        # upwards without agreement from bzr's network support maintainers.
 
1088
        self.assertEqual(2, rpc_count)
 
1089
 
 
1090
 
 
1091
class NonLocalTests(TestCaseWithTransport):
 
1092
    """Tests for bzrdir static behaviour on non local paths."""
 
1093
 
 
1094
    def setUp(self):
 
1095
        super(NonLocalTests, self).setUp()
 
1096
        self.vfs_transport_factory = memory.MemoryServer
 
1097
 
 
1098
    def test_create_branch_convenience(self):
 
1099
        # outside a repo the default convenience output is a repo+branch_tree
 
1100
        format = controldir.format_registry.make_controldir('knit')
 
1101
        branch = bzrdir.BzrDir.create_branch_convenience(
 
1102
            self.get_url('foo'), format=format)
 
1103
        self.assertRaises(errors.NoWorkingTree,
 
1104
                          branch.controldir.open_workingtree)
 
1105
        branch.controldir.open_repository()
 
1106
 
 
1107
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
1108
        # outside a repo the default convenience output is a repo+branch_tree
 
1109
        format = controldir.format_registry.make_controldir('knit')
 
1110
        self.assertRaises(errors.NotLocalUrl,
 
1111
                          bzrdir.BzrDir.create_branch_convenience,
 
1112
                          self.get_url('foo'),
 
1113
                          force_new_tree=True,
 
1114
                          format=format)
 
1115
        t = self.get_transport()
 
1116
        self.assertFalse(t.has('foo'))
 
1117
 
 
1118
    def test_clone(self):
 
1119
        # clone into a nonlocal path works
 
1120
        format = controldir.format_registry.make_controldir('knit')
 
1121
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
1122
                                                         format=format)
 
1123
        branch.controldir.open_workingtree()
 
1124
        result = branch.controldir.clone(self.get_url('remote'))
 
1125
        self.assertRaises(errors.NoWorkingTree,
 
1126
                          result.open_workingtree)
 
1127
        result.open_branch()
 
1128
        result.open_repository()
 
1129
 
 
1130
    def test_checkout_metadir(self):
 
1131
        # checkout_metadir has reasonable working tree format even when no
 
1132
        # working tree is present
 
1133
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
1134
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
1135
        checkout_format = my_bzrdir.checkout_metadir()
 
1136
        self.assertIsInstance(checkout_format.workingtree_format,
 
1137
                              workingtree_4.WorkingTreeFormat4)
 
1138
 
 
1139
 
 
1140
class TestHTTPRedirectionsBase(object):
 
1141
    """Test redirection between two http servers.
 
1142
 
 
1143
    This MUST be used by daughter classes that also inherit from
 
1144
    TestCaseWithTwoWebservers.
 
1145
 
 
1146
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
1147
    test framework will try to create an instance which cannot
 
1148
    run, its implementation being incomplete.
 
1149
    """
 
1150
 
 
1151
    def create_transport_readonly_server(self):
 
1152
        # We don't set the http protocol version, relying on the default
 
1153
        return http_utils.HTTPServerRedirecting()
 
1154
 
 
1155
    def create_transport_secondary_server(self):
 
1156
        # We don't set the http protocol version, relying on the default
 
1157
        return http_utils.HTTPServerRedirecting()
 
1158
 
 
1159
    def setUp(self):
 
1160
        super(TestHTTPRedirectionsBase, self).setUp()
 
1161
        # The redirections will point to the new server
 
1162
        self.new_server = self.get_readonly_server()
 
1163
        # The requests to the old server will be redirected
 
1164
        self.old_server = self.get_secondary_server()
 
1165
        # Configure the redirections
 
1166
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1167
 
 
1168
    def test_loop(self):
 
1169
        # Both servers redirect to each other creating a loop
 
1170
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
1171
        # Starting from either server should loop
 
1172
        old_url = self._qualified_url(self.old_server.host,
 
1173
                                      self.old_server.port)
 
1174
        oldt = self._transport(old_url)
 
1175
        self.assertRaises(errors.NotBranchError,
 
1176
                          bzrdir.BzrDir.open_from_transport, oldt)
 
1177
        new_url = self._qualified_url(self.new_server.host,
 
1178
                                      self.new_server.port)
 
1179
        newt = self._transport(new_url)
 
1180
        self.assertRaises(errors.NotBranchError,
 
1181
                          bzrdir.BzrDir.open_from_transport, newt)
 
1182
 
 
1183
    def test_qualifier_preserved(self):
 
1184
        wt = self.make_branch_and_tree('branch')
 
1185
        old_url = self._qualified_url(self.old_server.host,
 
1186
                                      self.old_server.port)
 
1187
        start = self._transport(old_url).clone('branch')
 
1188
        bdir = bzrdir.BzrDir.open_from_transport(start)
 
1189
        # Redirection should preserve the qualifier, hence the transport class
 
1190
        # itself.
 
1191
        self.assertIsInstance(bdir.root_transport, type(start))
 
1192
 
 
1193
 
 
1194
class TestHTTPRedirections(TestHTTPRedirectionsBase,
 
1195
                           http_utils.TestCaseWithTwoWebservers):
 
1196
    """Tests redirections for urllib implementation"""
 
1197
 
 
1198
    _transport = HttpTransport
 
1199
 
 
1200
    def _qualified_url(self, host, port):
 
1201
        result = 'http://%s:%s' % (host, port)
 
1202
        self.permit_url(result)
 
1203
        return result
 
1204
 
 
1205
 
 
1206
class TestHTTPRedirections_nosmart(TestHTTPRedirectionsBase,
 
1207
                                   http_utils.TestCaseWithTwoWebservers):
 
1208
    """Tests redirections for the nosmart decorator"""
 
1209
 
 
1210
    _transport = NoSmartTransportDecorator
 
1211
 
 
1212
    def _qualified_url(self, host, port):
 
1213
        result = 'nosmart+http://%s:%s' % (host, port)
 
1214
        self.permit_url(result)
 
1215
        return result
 
1216
 
 
1217
 
 
1218
class TestHTTPRedirections_readonly(TestHTTPRedirectionsBase,
 
1219
                                    http_utils.TestCaseWithTwoWebservers):
 
1220
    """Tests redirections for readonly decoratror"""
 
1221
 
 
1222
    _transport = ReadonlyTransportDecorator
 
1223
 
 
1224
    def _qualified_url(self, host, port):
 
1225
        result = 'readonly+http://%s:%s' % (host, port)
 
1226
        self.permit_url(result)
 
1227
        return result
 
1228
 
 
1229
 
 
1230
class TestDotBzrHidden(TestCaseWithTransport):
 
1231
 
 
1232
    ls = ['ls']
 
1233
    if sys.platform == 'win32':
 
1234
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
1235
 
 
1236
    def get_ls(self):
 
1237
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
1238
                             stderr=subprocess.PIPE)
 
1239
        out, err = f.communicate()
 
1240
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
1241
                         % (self.ls, err))
 
1242
        return out.splitlines()
 
1243
 
 
1244
    def test_dot_bzr_hidden(self):
 
1245
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1246
            raise TestSkipped(
 
1247
                'unable to make file hidden without pywin32 library')
 
1248
        b = bzrdir.BzrDir.create('.')
 
1249
        self.build_tree(['a'])
 
1250
        self.assertEqual([b'a'], self.get_ls())
 
1251
 
 
1252
    def test_dot_bzr_hidden_with_url(self):
 
1253
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1254
            raise TestSkipped(
 
1255
                'unable to make file hidden without pywin32 library')
 
1256
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
1257
        self.build_tree(['a'])
 
1258
        self.assertEqual([b'a'], self.get_ls())
 
1259
 
 
1260
 
 
1261
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
 
1262
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
 
1263
 
 
1264
    def _open(self, transport):
 
1265
        return _TestBzrDir(transport, self)
 
1266
 
 
1267
 
 
1268
class _TestBzrDir(bzrdir.BzrDirMeta1):
 
1269
    """Test BzrDir implementation for TestBzrDirSprout.
 
1270
 
 
1271
    When created a _TestBzrDir already has repository and a branch.  The branch
 
1272
    is a test double as well.
 
1273
    """
 
1274
 
 
1275
    def __init__(self, *args, **kwargs):
 
1276
        super(_TestBzrDir, self).__init__(*args, **kwargs)
 
1277
        self.test_branch = _TestBranch(self.transport)
 
1278
        self.test_branch.repository = self.create_repository()
 
1279
 
 
1280
    def open_branch(self, unsupported=False, possible_transports=None):
 
1281
        return self.test_branch
 
1282
 
 
1283
    def cloning_metadir(self, require_stacking=False):
 
1284
        return _TestBzrDirFormat()
 
1285
 
 
1286
 
 
1287
class _TestBranchFormat(breezy.branch.BranchFormat):
 
1288
    """Test Branch format for TestBzrDirSprout."""
 
1289
 
 
1290
 
 
1291
class _TestBranch(breezy.branch.Branch):
 
1292
    """Test Branch implementation for TestBzrDirSprout."""
 
1293
 
 
1294
    def __init__(self, transport, *args, **kwargs):
 
1295
        self._format = _TestBranchFormat()
 
1296
        self._transport = transport
 
1297
        self.base = transport.base
 
1298
        super(_TestBranch, self).__init__(*args, **kwargs)
 
1299
        self.calls = []
 
1300
        self._parent = None
 
1301
 
 
1302
    def sprout(self, *args, **kwargs):
 
1303
        self.calls.append('sprout')
 
1304
        return _TestBranch(self._transport)
 
1305
 
 
1306
    def copy_content_into(self, destination, revision_id=None):
 
1307
        self.calls.append('copy_content_into')
 
1308
 
 
1309
    def last_revision(self):
 
1310
        return _mod_revision.NULL_REVISION
 
1311
 
 
1312
    def get_parent(self):
 
1313
        return self._parent
 
1314
 
 
1315
    def _get_config(self):
 
1316
        return config.TransportConfig(self._transport, 'branch.conf')
 
1317
 
 
1318
    def _get_config_store(self):
 
1319
        return config.BranchStore(self)
 
1320
 
 
1321
    def set_parent(self, parent):
 
1322
        self._parent = parent
 
1323
 
 
1324
    def lock_read(self):
 
1325
        return lock.LogicalLockResult(self.unlock)
 
1326
 
 
1327
    def unlock(self):
 
1328
        return
 
1329
 
 
1330
 
 
1331
class TestBzrDirSprout(TestCaseWithMemoryTransport):
 
1332
 
 
1333
    def test_sprout_uses_branch_sprout(self):
 
1334
        """BzrDir.sprout calls Branch.sprout.
 
1335
 
 
1336
        Usually, BzrDir.sprout should delegate to the branch's sprout method
 
1337
        for part of the work.  This allows the source branch to control the
 
1338
        choice of format for the new branch.
 
1339
 
 
1340
        There are exceptions, but this tests avoids them:
 
1341
          - if there's no branch in the source bzrdir,
 
1342
          - or if the stacking has been requested and the format needs to be
 
1343
            overridden to satisfy that.
 
1344
        """
 
1345
        # Make an instrumented bzrdir.
 
1346
        t = self.get_transport('source')
 
1347
        t.ensure_base()
 
1348
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
 
1349
        # The instrumented bzrdir has a test_branch attribute that logs calls
 
1350
        # made to the branch contained in that bzrdir.  Initially the test
 
1351
        # branch exists but no calls have been made to it.
 
1352
        self.assertEqual([], source_bzrdir.test_branch.calls)
 
1353
 
 
1354
        # Sprout the bzrdir
 
1355
        target_url = self.get_url('target')
 
1356
        result = source_bzrdir.sprout(target_url, recurse='no')
 
1357
 
 
1358
        # The bzrdir called the branch's sprout method.
 
1359
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
 
1360
 
 
1361
    def test_sprout_parent(self):
 
1362
        grandparent_tree = self.make_branch('grandparent')
 
1363
        parent = grandparent_tree.controldir.sprout('parent').open_branch()
 
1364
        branch_tree = parent.controldir.sprout('branch').open_branch()
 
1365
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
 
1366
 
 
1367
 
 
1368
class TestBzrDirHooks(TestCaseWithMemoryTransport):
 
1369
 
 
1370
    def test_pre_open_called(self):
 
1371
        calls = []
 
1372
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
 
1373
        transport = self.get_transport('foo')
 
1374
        url = transport.base
 
1375
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
 
1376
        self.assertEqual([transport.base], [t.base for t in calls])
 
1377
 
 
1378
    def test_pre_open_actual_exceptions_raised(self):
 
1379
        count = [0]
 
1380
 
 
1381
        def fail_once(transport):
 
1382
            count[0] += 1
 
1383
            if count[0] == 1:
 
1384
                raise errors.BzrError("fail")
 
1385
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
 
1386
        transport = self.get_transport('foo')
 
1387
        url = transport.base
 
1388
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
 
1389
        self.assertEqual('fail', err._preformatted_string)
 
1390
 
 
1391
    def test_post_repo_init(self):
 
1392
        from ..controldir import RepoInitHookParams
 
1393
        calls = []
 
1394
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1395
                                               calls.append, None)
 
1396
        self.make_repository('foo')
 
1397
        self.assertLength(1, calls)
 
1398
        params = calls[0]
 
1399
        self.assertIsInstance(params, RepoInitHookParams)
 
1400
        self.assertTrue(hasattr(params, 'controldir'))
 
1401
        self.assertTrue(hasattr(params, 'repository'))
 
1402
 
 
1403
    def test_post_repo_init_hook_repr(self):
 
1404
        param_reprs = []
 
1405
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1406
                                               lambda params: param_reprs.append(repr(params)), None)
 
1407
        self.make_repository('foo')
 
1408
        self.assertLength(1, param_reprs)
 
1409
        param_repr = param_reprs[0]
 
1410
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
 
1411
 
 
1412
 
 
1413
class TestGenerateBackupName(TestCaseWithMemoryTransport):
 
1414
    # FIXME: This may need to be unified with test_osutils.TestBackupNames or
 
1415
    # moved to per_bzrdir or per_transport for better coverage ?
 
1416
    # -- vila 20100909
 
1417
 
 
1418
    def setUp(self):
 
1419
        super(TestGenerateBackupName, self).setUp()
 
1420
        self._transport = self.get_transport()
 
1421
        bzrdir.BzrDir.create(self.get_url(),
 
1422
                             possible_transports=[self._transport])
 
1423
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
 
1424
 
 
1425
    def test_new(self):
 
1426
        self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
 
1427
 
 
1428
    def test_exiting(self):
 
1429
        self._transport.put_bytes("a.~1~", b"some content")
 
1430
        self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))
 
1431
 
 
1432
 
 
1433
class TestMeta1DirColoFormat(TestCaseWithTransport):
 
1434
    """Tests specific to the meta1 dir with colocated branches format."""
 
1435
 
 
1436
    def test_supports_colo(self):
 
1437
        format = bzrdir.BzrDirMetaFormat1Colo()
 
1438
        self.assertTrue(format.colocated_branches)
 
1439
 
 
1440
    def test_upgrade_from_2a(self):
 
1441
        tree = self.make_branch_and_tree('.', format='2a')
 
1442
        format = bzrdir.BzrDirMetaFormat1Colo()
 
1443
        self.assertTrue(tree.controldir.needs_format_conversion(format))
 
1444
        converter = tree.controldir._format.get_converter(format)
 
1445
        result = converter.convert(tree.controldir, None)
 
1446
        self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1Colo)
 
1447
        self.assertFalse(result.needs_format_conversion(format))
 
1448
 
 
1449
    def test_downgrade_to_2a(self):
 
1450
        tree = self.make_branch_and_tree('.', format='development-colo')
 
1451
        format = bzrdir.BzrDirMetaFormat1()
 
1452
        self.assertTrue(tree.controldir.needs_format_conversion(format))
 
1453
        converter = tree.controldir._format.get_converter(format)
 
1454
        result = converter.convert(tree.controldir, None)
 
1455
        self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1)
 
1456
        self.assertFalse(result.needs_format_conversion(format))
 
1457
 
 
1458
    def test_downgrade_to_2a_too_many_branches(self):
 
1459
        tree = self.make_branch_and_tree('.', format='development-colo')
 
1460
        tree.controldir.create_branch(name="another-colocated-branch")
 
1461
        converter = tree.controldir._format.get_converter(
 
1462
            bzrdir.BzrDirMetaFormat1())
 
1463
        result = converter.convert(tree.controldir, bzrdir.BzrDirMetaFormat1())
 
1464
        self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1)
 
1465
 
 
1466
    def test_nested(self):
 
1467
        tree = self.make_branch_and_tree('.', format='development-colo')
 
1468
        tree.controldir.create_branch(name='foo')
 
1469
        tree.controldir.create_branch(name='fool/bla')
 
1470
        self.assertRaises(
 
1471
            errors.ParentBranchExists, tree.controldir.create_branch,
 
1472
            name='foo/bar')
 
1473
 
 
1474
    def test_parent(self):
 
1475
        tree = self.make_branch_and_tree('.', format='development-colo')
 
1476
        tree.controldir.create_branch(name='fool/bla')
 
1477
        tree.controldir.create_branch(name='foo/bar')
 
1478
        self.assertRaises(
 
1479
            errors.AlreadyBranchError, tree.controldir.create_branch,
 
1480
            name='foo')
 
1481
 
 
1482
 
 
1483
class SampleBzrFormat(bzrdir.BzrFormat):
 
1484
 
 
1485
    @classmethod
 
1486
    def get_format_string(cls):
 
1487
        return b"First line\n"
 
1488
 
 
1489
 
 
1490
class TestBzrFormat(TestCase):
 
1491
    """Tests for BzrFormat."""
 
1492
 
 
1493
    def test_as_string(self):
 
1494
        format = SampleBzrFormat()
 
1495
        format.features = {b"foo": b"required"}
 
1496
        self.assertEqual(format.as_string(),
 
1497
                         b"First line\n"
 
1498
                         b"required foo\n")
 
1499
        format.features[b"another"] = b"optional"
 
1500
        self.assertEqual(format.as_string(),
 
1501
                         b"First line\n"
 
1502
                         b"optional another\n"
 
1503
                         b"required foo\n")
 
1504
 
 
1505
    def test_network_name(self):
 
1506
        # The network string should include the feature info
 
1507
        format = SampleBzrFormat()
 
1508
        format.features = {b"foo": b"required"}
 
1509
        self.assertEqual(
 
1510
            b"First line\nrequired foo\n",
 
1511
            format.network_name())
 
1512
 
 
1513
    def test_from_string_no_features(self):
 
1514
        # No features
 
1515
        format = SampleBzrFormat.from_string(
 
1516
            b"First line\n")
 
1517
        self.assertEqual({}, format.features)
 
1518
 
 
1519
    def test_from_string_with_feature(self):
 
1520
        # Proper feature
 
1521
        format = SampleBzrFormat.from_string(
 
1522
            b"First line\nrequired foo\n")
 
1523
        self.assertEqual(b"required", format.features.get(b"foo"))
 
1524
 
 
1525
    def test_from_string_format_string_mismatch(self):
 
1526
        # The first line has to match the format string
 
1527
        self.assertRaises(AssertionError, SampleBzrFormat.from_string,
 
1528
                          b"Second line\nrequired foo\n")
 
1529
 
 
1530
    def test_from_string_missing_space(self):
 
1531
        # At least one space is required in the feature lines
 
1532
        self.assertRaises(errors.ParseFormatError, SampleBzrFormat.from_string,
 
1533
                          b"First line\nfoo\n")
 
1534
 
 
1535
    def test_from_string_with_spaces(self):
 
1536
        # Feature with spaces (in case we add stuff like this in the future)
 
1537
        format = SampleBzrFormat.from_string(
 
1538
            b"First line\nrequired foo with spaces\n")
 
1539
        self.assertEqual(b"required", format.features.get(b"foo with spaces"))
 
1540
 
 
1541
    def test_eq(self):
 
1542
        format1 = SampleBzrFormat()
 
1543
        format1.features = {b"nested-trees": b"optional"}
 
1544
        format2 = SampleBzrFormat()
 
1545
        format2.features = {b"nested-trees": b"optional"}
 
1546
        self.assertEqual(format1, format1)
 
1547
        self.assertEqual(format1, format2)
 
1548
        format3 = SampleBzrFormat()
 
1549
        self.assertNotEqual(format1, format3)
 
1550
 
 
1551
    def test_check_support_status_optional(self):
 
1552
        # Optional, so silently ignore
 
1553
        format = SampleBzrFormat()
 
1554
        format.features = {b"nested-trees": b"optional"}
 
1555
        format.check_support_status(True)
 
1556
        self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
 
1557
        SampleBzrFormat.register_feature(b"nested-trees")
 
1558
        format.check_support_status(True)
 
1559
 
 
1560
    def test_check_support_status_required(self):
 
1561
        # Optional, so trigger an exception
 
1562
        format = SampleBzrFormat()
 
1563
        format.features = {b"nested-trees": b"required"}
 
1564
        self.assertRaises(bzrdir.MissingFeature, format.check_support_status,
 
1565
                          True)
 
1566
        self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
 
1567
        SampleBzrFormat.register_feature(b"nested-trees")
 
1568
        format.check_support_status(True)
 
1569
 
 
1570
    def test_check_support_status_unknown(self):
 
1571
        # treat unknown necessity as required
 
1572
        format = SampleBzrFormat()
 
1573
        format.features = {b"nested-trees": b"unknown"}
 
1574
        self.assertRaises(bzrdir.MissingFeature, format.check_support_status,
 
1575
                          True)
 
1576
        self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
 
1577
        SampleBzrFormat.register_feature(b"nested-trees")
 
1578
        format.check_support_status(True)
 
1579
 
 
1580
    def test_feature_already_registered(self):
 
1581
        # a feature can only be registered once
 
1582
        self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
 
1583
        SampleBzrFormat.register_feature(b"nested-trees")
 
1584
        self.assertRaises(bzrdir.FeatureAlreadyRegistered,
 
1585
                          SampleBzrFormat.register_feature, b"nested-trees")
 
1586
 
 
1587
    def test_feature_with_space(self):
 
1588
        # spaces are not allowed in feature names
 
1589
        self.assertRaises(ValueError, SampleBzrFormat.register_feature,
 
1590
                          b"nested trees")