/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: John Arbash Meinel
  • Date: 2008-09-02 17:52:00 UTC
  • mto: This revision was merged to the branch mainline in revision 3679.
  • Revision ID: john@arbash-meinel.com-20080902175200-nge9qgk0gklkd5ew
Move the point at which we 'buffer_all' if we've read >50% of the index.

We were doing it as soon as you entered 'iter_entries', but often you may already have enough
info to return results. And for small mostly local ops, we don't need to buffer all.
(This happens mostly with moderate size indexes, where the first read of the header
is enough to give you the data you need, but happens to be >50% of the whole file.)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/bzr_dir_implementations.
 
20
"""
 
21
 
 
22
import os
 
23
import os.path
 
24
from StringIO import StringIO
 
25
import subprocess
 
26
import sys
 
27
 
 
28
from bzrlib import (
 
29
    bzrdir,
 
30
    errors,
 
31
    help_topics,
 
32
    repository,
 
33
    osutils,
 
34
    symbol_versioning,
 
35
    urlutils,
 
36
    win32utils,
 
37
    workingtree,
 
38
    )
 
39
import bzrlib.branch
 
40
from bzrlib.errors import (NotBranchError,
 
41
                           UnknownFormatError,
 
42
                           UnsupportedFormatError,
 
43
                           )
 
44
from bzrlib.tests import (
 
45
    TestCase,
 
46
    TestCaseWithMemoryTransport,
 
47
    TestCaseWithTransport,
 
48
    TestSkipped,
 
49
    test_sftp_transport
 
50
    )
 
51
from bzrlib.tests.http_server import HttpServer
 
52
from bzrlib.tests.http_utils import (
 
53
    TestCaseWithTwoWebservers,
 
54
    HTTPServerRedirecting,
 
55
    )
 
56
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
57
from bzrlib.transport import get_transport
 
58
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
59
from bzrlib.transport.memory import MemoryServer
 
60
from bzrlib.repofmt import knitrepo, weaverepo
 
61
 
 
62
 
 
63
class TestDefaultFormat(TestCase):
 
64
 
 
65
    def test_get_set_default_format(self):
 
66
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
67
        # default is BzrDirFormat6
 
68
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
69
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
70
        # creating a bzr dir should now create an instrumented dir.
 
71
        try:
 
72
            result = bzrdir.BzrDir.create('memory:///')
 
73
            self.failUnless(isinstance(result, SampleBzrDir))
 
74
        finally:
 
75
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
76
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
77
 
 
78
 
 
79
class TestFormatRegistry(TestCase):
 
80
 
 
81
    def make_format_registry(self):
 
82
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
83
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
84
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
85
            ' repositories', deprecated=True)
 
86
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir', 
 
87
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
88
        my_format_registry.register_metadir('knit',
 
89
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
90
            'Format using knits',
 
91
            )
 
92
        my_format_registry.set_default('knit')
 
93
        my_format_registry.register_metadir(
 
94
            'branch6',
 
95
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
96
            'Experimental successor to knit.  Use at your own risk.',
 
97
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
98
            experimental=True)
 
99
        my_format_registry.register_metadir(
 
100
            'hidden format',
 
101
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
102
            'Experimental successor to knit.  Use at your own risk.',
 
103
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
104
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
105
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
106
            ' repositories', hidden=True)
 
107
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
108
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
109
            hidden=True)
 
110
        return my_format_registry
 
111
 
 
112
    def test_format_registry(self):
 
113
        my_format_registry = self.make_format_registry()
 
114
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
115
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
116
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
117
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
118
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
119
        self.assertIsInstance(my_bzrdir.repository_format, 
 
120
            knitrepo.RepositoryFormatKnit1)
 
121
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
122
        self.assertIsInstance(my_bzrdir.repository_format, 
 
123
            knitrepo.RepositoryFormatKnit1)
 
124
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
125
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
126
                              bzrlib.branch.BzrBranchFormat6)
 
127
 
 
128
    def test_get_help(self):
 
129
        my_format_registry = self.make_format_registry()
 
130
        self.assertEqual('Format registered lazily',
 
131
                         my_format_registry.get_help('lazy'))
 
132
        self.assertEqual('Format using knits', 
 
133
                         my_format_registry.get_help('knit'))
 
134
        self.assertEqual('Format using knits', 
 
135
                         my_format_registry.get_help('default'))
 
136
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
137
                         ' checkouts or shared repositories', 
 
138
                         my_format_registry.get_help('weave'))
 
139
        
 
140
    def test_help_topic(self):
 
141
        topics = help_topics.HelpTopicRegistry()
 
142
        topics.register('formats', self.make_format_registry().help_topic, 
 
143
                        'Directory formats')
 
144
        topic = topics.get_detail('formats')
 
145
        new, rest = topic.split('Experimental formats')
 
146
        experimental, deprecated = rest.split('Deprecated formats')
 
147
        self.assertContainsRe(new, 'These formats can be used')
 
148
        self.assertContainsRe(new, 
 
149
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
150
        self.assertContainsRe(experimental, 
 
151
                ':branch6:\n    \(native\) Experimental successor to knit')
 
152
        self.assertContainsRe(deprecated, 
 
153
                ':lazy:\n    \(native\) Format registered lazily\n')
 
154
        self.assertNotContainsRe(new, 'hidden')
 
155
 
 
156
    def test_set_default_repository(self):
 
157
        default_factory = bzrdir.format_registry.get('default')
 
158
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
159
                       if v == default_factory and k != 'default'][0]
 
160
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
161
        try:
 
162
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
163
                          bzrdir.format_registry.get('default'))
 
164
            self.assertIs(
 
165
                repository.RepositoryFormat.get_default_format().__class__,
 
166
                knitrepo.RepositoryFormatKnit3)
 
167
        finally:
 
168
            bzrdir.format_registry.set_default_repository(old_default)
 
169
 
 
170
    def test_aliases(self):
 
171
        a_registry = bzrdir.BzrDirFormatRegistry()
 
172
        a_registry.register('weave', bzrdir.BzrDirFormat6,
 
173
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
174
            ' repositories', deprecated=True)
 
175
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
 
176
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
177
            ' repositories', deprecated=True, alias=True)
 
178
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
 
179
    
 
180
 
 
181
class SampleBranch(bzrlib.branch.Branch):
 
182
    """A dummy branch for guess what, dummy use."""
 
183
 
 
184
    def __init__(self, dir):
 
185
        self.bzrdir = dir
 
186
 
 
187
 
 
188
class SampleBzrDir(bzrdir.BzrDir):
 
189
    """A sample BzrDir implementation to allow testing static methods."""
 
190
 
 
191
    def create_repository(self, shared=False):
 
192
        """See BzrDir.create_repository."""
 
193
        return "A repository"
 
194
 
 
195
    def open_repository(self):
 
196
        """See BzrDir.open_repository."""
 
197
        return "A repository"
 
198
 
 
199
    def create_branch(self):
 
200
        """See BzrDir.create_branch."""
 
201
        return SampleBranch(self)
 
202
 
 
203
    def create_workingtree(self):
 
204
        """See BzrDir.create_workingtree."""
 
205
        return "A tree"
 
206
 
 
207
 
 
208
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
209
    """A sample format
 
210
 
 
211
    this format is initializable, unsupported to aid in testing the 
 
212
    open and open_downlevel routines.
 
213
    """
 
214
 
 
215
    def get_format_string(self):
 
216
        """See BzrDirFormat.get_format_string()."""
 
217
        return "Sample .bzr dir format."
 
218
 
 
219
    def initialize_on_transport(self, t):
 
220
        """Create a bzr dir."""
 
221
        t.mkdir('.bzr')
 
222
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
223
        return SampleBzrDir(t, self)
 
224
 
 
225
    def is_supported(self):
 
226
        return False
 
227
 
 
228
    def open(self, transport, _found=None):
 
229
        return "opened branch."
 
230
 
 
231
 
 
232
class TestBzrDirFormat(TestCaseWithTransport):
 
233
    """Tests for the BzrDirFormat facility."""
 
234
 
 
235
    def test_find_format(self):
 
236
        # is the right format object found for a branch?
 
237
        # create a branch with a few known format objects.
 
238
        # this is not quite the same as 
 
239
        t = get_transport(self.get_url())
 
240
        self.build_tree(["foo/", "bar/"], transport=t)
 
241
        def check_format(format, url):
 
242
            format.initialize(url)
 
243
            t = get_transport(url)
 
244
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
245
            self.failUnless(isinstance(found_format, format.__class__))
 
246
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
247
        check_format(bzrdir.BzrDirFormat6(), "bar")
 
248
        
 
249
    def test_find_format_nothing_there(self):
 
250
        self.assertRaises(NotBranchError,
 
251
                          bzrdir.BzrDirFormat.find_format,
 
252
                          get_transport('.'))
 
253
 
 
254
    def test_find_format_unknown_format(self):
 
255
        t = get_transport(self.get_url())
 
256
        t.mkdir('.bzr')
 
257
        t.put_bytes('.bzr/branch-format', '')
 
258
        self.assertRaises(UnknownFormatError,
 
259
                          bzrdir.BzrDirFormat.find_format,
 
260
                          get_transport('.'))
 
261
 
 
262
    def test_register_unregister_format(self):
 
263
        format = SampleBzrDirFormat()
 
264
        url = self.get_url()
 
265
        # make a bzrdir
 
266
        format.initialize(url)
 
267
        # register a format for it.
 
268
        bzrdir.BzrDirFormat.register_format(format)
 
269
        # which bzrdir.Open will refuse (not supported)
 
270
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
271
        # which bzrdir.open_containing will refuse (not supported)
 
272
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
273
        # but open_downlevel will work
 
274
        t = get_transport(url)
 
275
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
276
        # unregister the format
 
277
        bzrdir.BzrDirFormat.unregister_format(format)
 
278
        # now open_downlevel should fail too.
 
279
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
280
 
 
281
    def test_create_branch_and_repo_uses_default(self):
 
282
        format = SampleBzrDirFormat()
 
283
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
284
                                                      format=format)
 
285
        self.assertTrue(isinstance(branch, SampleBranch))
 
286
 
 
287
    def test_create_branch_and_repo_under_shared(self):
 
288
        # creating a branch and repo in a shared repo uses the
 
289
        # shared repository
 
290
        format = bzrdir.format_registry.make_bzrdir('knit')
 
291
        self.make_repository('.', shared=True, format=format)
 
292
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
293
            self.get_url('child'), format=format)
 
294
        self.assertRaises(errors.NoRepositoryPresent,
 
295
                          branch.bzrdir.open_repository)
 
296
 
 
297
    def test_create_branch_and_repo_under_shared_force_new(self):
 
298
        # creating a branch and repo in a shared repo can be forced to 
 
299
        # make a new repo
 
300
        format = bzrdir.format_registry.make_bzrdir('knit')
 
301
        self.make_repository('.', shared=True, format=format)
 
302
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
303
                                                      force_new_repo=True,
 
304
                                                      format=format)
 
305
        branch.bzrdir.open_repository()
 
306
 
 
307
    def test_create_standalone_working_tree(self):
 
308
        format = SampleBzrDirFormat()
 
309
        # note this is deliberately readonly, as this failure should 
 
310
        # occur before any writes.
 
311
        self.assertRaises(errors.NotLocalUrl,
 
312
                          bzrdir.BzrDir.create_standalone_workingtree,
 
313
                          self.get_readonly_url(), format=format)
 
314
        tree = bzrdir.BzrDir.create_standalone_workingtree('.', 
 
315
                                                           format=format)
 
316
        self.assertEqual('A tree', tree)
 
317
 
 
318
    def test_create_standalone_working_tree_under_shared_repo(self):
 
319
        # create standalone working tree always makes a repo.
 
320
        format = bzrdir.format_registry.make_bzrdir('knit')
 
321
        self.make_repository('.', shared=True, format=format)
 
322
        # note this is deliberately readonly, as this failure should 
 
323
        # occur before any writes.
 
324
        self.assertRaises(errors.NotLocalUrl,
 
325
                          bzrdir.BzrDir.create_standalone_workingtree,
 
326
                          self.get_readonly_url('child'), format=format)
 
327
        tree = bzrdir.BzrDir.create_standalone_workingtree('child', 
 
328
            format=format)
 
329
        tree.bzrdir.open_repository()
 
330
 
 
331
    def test_create_branch_convenience(self):
 
332
        # outside a repo the default convenience output is a repo+branch_tree
 
333
        format = bzrdir.format_registry.make_bzrdir('knit')
 
334
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
335
        branch.bzrdir.open_workingtree()
 
336
        branch.bzrdir.open_repository()
 
337
 
 
338
    def test_create_branch_convenience_possible_transports(self):
 
339
        """Check that the optional 'possible_transports' is recognized"""
 
340
        format = bzrdir.format_registry.make_bzrdir('knit')
 
341
        t = self.get_transport()
 
342
        branch = bzrdir.BzrDir.create_branch_convenience(
 
343
            '.', format=format, possible_transports=[t])
 
344
        branch.bzrdir.open_workingtree()
 
345
        branch.bzrdir.open_repository()
 
346
 
 
347
    def test_create_branch_convenience_root(self):
 
348
        """Creating a branch at the root of a fs should work."""
 
349
        self.vfs_transport_factory = MemoryServer
 
350
        # outside a repo the default convenience output is a repo+branch_tree
 
351
        format = bzrdir.format_registry.make_bzrdir('knit')
 
352
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(), 
 
353
                                                         format=format)
 
354
        self.assertRaises(errors.NoWorkingTree,
 
355
                          branch.bzrdir.open_workingtree)
 
356
        branch.bzrdir.open_repository()
 
357
 
 
358
    def test_create_branch_convenience_under_shared_repo(self):
 
359
        # inside a repo the default convenience output is a branch+ follow the
 
360
        # repo tree policy
 
361
        format = bzrdir.format_registry.make_bzrdir('knit')
 
362
        self.make_repository('.', shared=True, format=format)
 
363
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
364
            format=format)
 
365
        branch.bzrdir.open_workingtree()
 
366
        self.assertRaises(errors.NoRepositoryPresent,
 
367
                          branch.bzrdir.open_repository)
 
368
            
 
369
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
370
        # inside a repo the default convenience output is a branch+ follow the
 
371
        # repo tree policy but we can override that
 
372
        format = bzrdir.format_registry.make_bzrdir('knit')
 
373
        self.make_repository('.', shared=True, format=format)
 
374
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
375
            force_new_tree=False, format=format)
 
376
        self.assertRaises(errors.NoWorkingTree,
 
377
                          branch.bzrdir.open_workingtree)
 
378
        self.assertRaises(errors.NoRepositoryPresent,
 
379
                          branch.bzrdir.open_repository)
 
380
            
 
381
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
382
        # inside a repo the default convenience output is a branch+ follow the
 
383
        # repo tree policy
 
384
        format = bzrdir.format_registry.make_bzrdir('knit')
 
385
        repo = self.make_repository('.', shared=True, format=format)
 
386
        repo.set_make_working_trees(False)
 
387
        branch = bzrdir.BzrDir.create_branch_convenience('child', 
 
388
                                                         format=format)
 
389
        self.assertRaises(errors.NoWorkingTree,
 
390
                          branch.bzrdir.open_workingtree)
 
391
        self.assertRaises(errors.NoRepositoryPresent,
 
392
                          branch.bzrdir.open_repository)
 
393
 
 
394
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
395
        # inside a repo the default convenience output is a branch+ follow the
 
396
        # repo tree policy but we can override that
 
397
        format = bzrdir.format_registry.make_bzrdir('knit')
 
398
        repo = self.make_repository('.', shared=True, format=format)
 
399
        repo.set_make_working_trees(False)
 
400
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
401
            force_new_tree=True, format=format)
 
402
        branch.bzrdir.open_workingtree()
 
403
        self.assertRaises(errors.NoRepositoryPresent,
 
404
                          branch.bzrdir.open_repository)
 
405
 
 
406
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
407
        # inside a repo the default convenience output is overridable to give
 
408
        # repo+branch+tree
 
409
        format = bzrdir.format_registry.make_bzrdir('knit')
 
410
        self.make_repository('.', shared=True, format=format)
 
411
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
412
            force_new_repo=True, format=format)
 
413
        branch.bzrdir.open_repository()
 
414
        branch.bzrdir.open_workingtree()
 
415
 
 
416
 
 
417
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
418
 
 
419
    def test_acquire_repository_standalone(self):
 
420
        """The default acquisition policy should create a standalone branch."""
 
421
        my_bzrdir = self.make_bzrdir('.')
 
422
        repo_policy = my_bzrdir.determine_repository_policy()
 
423
        repo = repo_policy.acquire_repository()
 
424
        self.assertEqual(repo.bzrdir.root_transport.base,
 
425
                         my_bzrdir.root_transport.base)
 
426
        self.assertFalse(repo.is_shared())
 
427
 
 
428
 
 
429
    def test_determine_stacking_policy(self):
 
430
        parent_bzrdir = self.make_bzrdir('.')
 
431
        child_bzrdir = self.make_bzrdir('child')
 
432
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
 
433
        repo_policy = child_bzrdir.determine_repository_policy()
 
434
        self.assertEqual('http://example.org', repo_policy._stack_on)
 
435
 
 
436
    def test_determine_stacking_policy_relative(self):
 
437
        parent_bzrdir = self.make_bzrdir('.')
 
438
        child_bzrdir = self.make_bzrdir('child')
 
439
        parent_bzrdir.get_config().set_default_stack_on('child2')
 
440
        repo_policy = child_bzrdir.determine_repository_policy()
 
441
        self.assertEqual('child2', repo_policy._stack_on)
 
442
        self.assertEqual(parent_bzrdir.root_transport.base,
 
443
                         repo_policy._stack_on_pwd)
 
444
 
 
445
    def prepare_default_stacking(self, child_format='development1'):
 
446
        parent_bzrdir = self.make_bzrdir('.')
 
447
        child_branch = self.make_branch('child', format=child_format)
 
448
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
 
449
        new_child_transport = parent_bzrdir.transport.clone('child2')
 
450
        return child_branch, new_child_transport
 
451
 
 
452
    def test_clone_on_transport_obeys_stacking_policy(self):
 
453
        child_branch, new_child_transport = self.prepare_default_stacking()
 
454
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
455
        self.assertEqual(child_branch.base,
 
456
                         new_child.open_branch().get_stacked_on_url())
 
457
 
 
458
    def test_sprout_obeys_stacking_policy(self):
 
459
        child_branch, new_child_transport = self.prepare_default_stacking()
 
460
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
461
        self.assertEqual(child_branch.base,
 
462
                         new_child.open_branch().get_stacked_on_url())
 
463
 
 
464
    def test_clone_ignores_policy_for_unsupported_formats(self):
 
465
        child_branch, new_child_transport = self.prepare_default_stacking(
 
466
            child_format='pack-0.92')
 
467
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
468
        self.assertRaises(errors.UnstackableBranchFormat,
 
469
                          new_child.open_branch().get_stacked_on_url)
 
470
 
 
471
    def test_sprout_ignores_policy_for_unsupported_formats(self):
 
472
        child_branch, new_child_transport = self.prepare_default_stacking(
 
473
            child_format='pack-0.92')
 
474
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
475
        self.assertRaises(errors.UnstackableBranchFormat,
 
476
                          new_child.open_branch().get_stacked_on_url)
 
477
 
 
478
    def test_sprout_upgrades_format_if_stacked_specified(self):
 
479
        child_branch, new_child_transport = self.prepare_default_stacking(
 
480
            child_format='pack-0.92')
 
481
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
482
                                               stacked=True)
 
483
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
484
                         new_child.open_branch().get_stacked_on_url())
 
485
        repo = new_child.open_repository()
 
486
        self.assertTrue(repo._format.supports_external_lookups)
 
487
        self.assertFalse(repo.supports_rich_root())
 
488
 
 
489
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
 
490
        child_branch, new_child_transport = self.prepare_default_stacking(
 
491
            child_format='rich-root-pack')
 
492
        def do_sprout():
 
493
            try:
 
494
                return child_branch.bzrdir.sprout(new_child_transport.base,
 
495
                                                  stacked=True)
 
496
            except errors.IncompatibleRepositories:
 
497
                raise AssertionError(
 
498
                    'Rich root format should be sprout-compatible')
 
499
        self.expectFailure('Rich root format should be sprout-compatible',
 
500
                           do_sprout)
 
501
        repo = new_child.open_repository()
 
502
        self.assertTrue(repo._format.supports_external_lookups)
 
503
        self.assertTrue(repo.supports_rich_root())
 
504
 
 
505
    def test_add_fallback_repo_handles_absolute_urls(self):
 
506
        stack_on = self.make_branch('stack_on', format='development1')
 
507
        repo = self.make_repository('repo', format='development1')
 
508
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
 
509
        policy._add_fallback(repo)
 
510
 
 
511
    def test_add_fallback_repo_handles_relative_urls(self):
 
512
        stack_on = self.make_branch('stack_on', format='development1')
 
513
        repo = self.make_repository('repo', format='development1')
 
514
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
 
515
        policy._add_fallback(repo)
 
516
 
 
517
    def test_configure_relative_branch_stacking_url(self):
 
518
        stack_on = self.make_branch('stack_on', format='development1')
 
519
        stacked = self.make_branch('stack_on/stacked', format='development1')
 
520
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
521
            '.', stack_on.base)
 
522
        policy.configure_branch(stacked)
 
523
        self.assertEqual('..', stacked.get_stacked_on_url())
 
524
 
 
525
    def test_relative_branch_stacking_to_absolute(self):
 
526
        stack_on = self.make_branch('stack_on', format='development1')
 
527
        stacked = self.make_branch('stack_on/stacked', format='development1')
 
528
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
529
            '.', self.get_readonly_url('stack_on'))
 
530
        policy.configure_branch(stacked)
 
531
        self.assertEqual(self.get_readonly_url('stack_on'),
 
532
                         stacked.get_stacked_on_url())
 
533
 
 
534
 
 
535
class ChrootedTests(TestCaseWithTransport):
 
536
    """A support class that provides readonly urls outside the local namespace.
 
537
 
 
538
    This is done by checking if self.transport_server is a MemoryServer. if it
 
539
    is then we are chrooted already, if it is not then an HttpServer is used
 
540
    for readonly urls.
 
541
    """
 
542
 
 
543
    def setUp(self):
 
544
        super(ChrootedTests, self).setUp()
 
545
        if not self.vfs_transport_factory == MemoryServer:
 
546
            self.transport_readonly_server = HttpServer
 
547
 
 
548
    def local_branch_path(self, branch):
 
549
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
550
 
 
551
    def test_open_containing(self):
 
552
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
553
                          self.get_readonly_url(''))
 
554
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
555
                          self.get_readonly_url('g/p/q'))
 
556
        control = bzrdir.BzrDir.create(self.get_url())
 
557
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
558
        self.assertEqual('', relpath)
 
559
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
560
        self.assertEqual('g/p/q', relpath)
 
561
 
 
562
    def test_open_containing_tree_branch_or_repository_empty(self):
 
563
        self.assertRaises(errors.NotBranchError,
 
564
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
 
565
            self.get_readonly_url(''))
 
566
 
 
567
    def test_open_containing_tree_branch_or_repository_all(self):
 
568
        self.make_branch_and_tree('topdir')
 
569
        tree, branch, repo, relpath = \
 
570
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
571
                'topdir/foo')
 
572
        self.assertEqual(os.path.realpath('topdir'),
 
573
                         os.path.realpath(tree.basedir))
 
574
        self.assertEqual(os.path.realpath('topdir'),
 
575
                         self.local_branch_path(branch))
 
576
        self.assertEqual(
 
577
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
 
578
            repo.bzrdir.transport.local_abspath('repository'))
 
579
        self.assertEqual(relpath, 'foo')
 
580
 
 
581
    def test_open_containing_tree_branch_or_repository_no_tree(self):
 
582
        self.make_branch('branch')
 
583
        tree, branch, repo, relpath = \
 
584
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
585
                'branch/foo')
 
586
        self.assertEqual(tree, None)
 
587
        self.assertEqual(os.path.realpath('branch'),
 
588
                         self.local_branch_path(branch))
 
589
        self.assertEqual(
 
590
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
 
591
            repo.bzrdir.transport.local_abspath('repository'))
 
592
        self.assertEqual(relpath, 'foo')
 
593
 
 
594
    def test_open_containing_tree_branch_or_repository_repo(self):
 
595
        self.make_repository('repo')
 
596
        tree, branch, repo, relpath = \
 
597
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
598
                'repo')
 
599
        self.assertEqual(tree, None)
 
600
        self.assertEqual(branch, None)
 
601
        self.assertEqual(
 
602
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
 
603
            repo.bzrdir.transport.local_abspath('repository'))
 
604
        self.assertEqual(relpath, '')
 
605
 
 
606
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
 
607
        self.make_repository('shared', shared=True)
 
608
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
 
609
                                                force_new_tree=False)
 
610
        tree, branch, repo, relpath = \
 
611
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
612
                'shared/branch')
 
613
        self.assertEqual(tree, None)
 
614
        self.assertEqual(os.path.realpath('shared/branch'),
 
615
                         self.local_branch_path(branch))
 
616
        self.assertEqual(
 
617
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
 
618
            repo.bzrdir.transport.local_abspath('repository'))
 
619
        self.assertEqual(relpath, '')
 
620
 
 
621
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
 
622
        self.make_branch_and_tree('foo')
 
623
        self.build_tree(['foo/bar/'])
 
624
        tree, branch, repo, relpath = \
 
625
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
626
                'foo/bar')
 
627
        self.assertEqual(os.path.realpath('foo'),
 
628
                         os.path.realpath(tree.basedir))
 
629
        self.assertEqual(os.path.realpath('foo'),
 
630
                         self.local_branch_path(branch))
 
631
        self.assertEqual(
 
632
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
 
633
            repo.bzrdir.transport.local_abspath('repository'))
 
634
        self.assertEqual(relpath, 'bar')
 
635
 
 
636
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
 
637
        self.make_repository('bar')
 
638
        self.build_tree(['bar/baz/'])
 
639
        tree, branch, repo, relpath = \
 
640
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
641
                'bar/baz')
 
642
        self.assertEqual(tree, None)
 
643
        self.assertEqual(branch, None)
 
644
        self.assertEqual(
 
645
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
 
646
            repo.bzrdir.transport.local_abspath('repository'))
 
647
        self.assertEqual(relpath, 'baz')
 
648
 
 
649
    def test_open_containing_from_transport(self):
 
650
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
651
                          get_transport(self.get_readonly_url('')))
 
652
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
653
                          get_transport(self.get_readonly_url('g/p/q')))
 
654
        control = bzrdir.BzrDir.create(self.get_url())
 
655
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
656
            get_transport(self.get_readonly_url('')))
 
657
        self.assertEqual('', relpath)
 
658
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
659
            get_transport(self.get_readonly_url('g/p/q')))
 
660
        self.assertEqual('g/p/q', relpath)
 
661
 
 
662
    def test_open_containing_tree_or_branch(self):
 
663
        self.make_branch_and_tree('topdir')
 
664
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
665
            'topdir/foo')
 
666
        self.assertEqual(os.path.realpath('topdir'),
 
667
                         os.path.realpath(tree.basedir))
 
668
        self.assertEqual(os.path.realpath('topdir'),
 
669
                         self.local_branch_path(branch))
 
670
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
671
        self.assertEqual('foo', relpath)
 
672
        # opening from non-local should not return the tree
 
673
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
674
            self.get_readonly_url('topdir/foo'))
 
675
        self.assertEqual(None, tree)
 
676
        self.assertEqual('foo', relpath)
 
677
        # without a tree:
 
678
        self.make_branch('topdir/foo')
 
679
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
680
            'topdir/foo')
 
681
        self.assertIs(tree, None)
 
682
        self.assertEqual(os.path.realpath('topdir/foo'),
 
683
                         self.local_branch_path(branch))
 
684
        self.assertEqual('', relpath)
 
685
 
 
686
    def test_open_tree_or_branch(self):
 
687
        self.make_branch_and_tree('topdir')
 
688
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
689
        self.assertEqual(os.path.realpath('topdir'),
 
690
                         os.path.realpath(tree.basedir))
 
691
        self.assertEqual(os.path.realpath('topdir'),
 
692
                         self.local_branch_path(branch))
 
693
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
694
        # opening from non-local should not return the tree
 
695
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
696
            self.get_readonly_url('topdir'))
 
697
        self.assertEqual(None, tree)
 
698
        # without a tree:
 
699
        self.make_branch('topdir/foo')
 
700
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
701
        self.assertIs(tree, None)
 
702
        self.assertEqual(os.path.realpath('topdir/foo'),
 
703
                         self.local_branch_path(branch))
 
704
 
 
705
    def test_open_from_transport(self):
 
706
        # transport pointing at bzrdir should give a bzrdir with root transport
 
707
        # set to the given transport
 
708
        control = bzrdir.BzrDir.create(self.get_url())
 
709
        transport = get_transport(self.get_url())
 
710
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
711
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
712
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
713
        
 
714
    def test_open_from_transport_no_bzrdir(self):
 
715
        transport = get_transport(self.get_url())
 
716
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
717
                          transport)
 
718
 
 
719
    def test_open_from_transport_bzrdir_in_parent(self):
 
720
        control = bzrdir.BzrDir.create(self.get_url())
 
721
        transport = get_transport(self.get_url())
 
722
        transport.mkdir('subdir')
 
723
        transport = transport.clone('subdir')
 
724
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
725
                          transport)
 
726
 
 
727
    def test_sprout_recursive(self):
 
728
        tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
 
729
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
730
            format='dirstate-with-subtree')
 
731
        tree.add_reference(sub_tree)
 
732
        self.build_tree(['tree1/subtree/file'])
 
733
        sub_tree.add('file')
 
734
        tree.commit('Initial commit')
 
735
        tree.bzrdir.sprout('tree2')
 
736
        self.failUnlessExists('tree2/subtree/file')
 
737
 
 
738
    def test_cloning_metadir(self):
 
739
        """Ensure that cloning metadir is suitable"""
 
740
        bzrdir = self.make_bzrdir('bzrdir')
 
741
        bzrdir.cloning_metadir()
 
742
        branch = self.make_branch('branch', format='knit')
 
743
        format = branch.bzrdir.cloning_metadir()
 
744
        self.assertIsInstance(format.workingtree_format,
 
745
            workingtree.WorkingTreeFormat3)
 
746
 
 
747
    def test_sprout_recursive_treeless(self):
 
748
        tree = self.make_branch_and_tree('tree1',
 
749
            format='dirstate-with-subtree')
 
750
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
751
            format='dirstate-with-subtree')
 
752
        tree.add_reference(sub_tree)
 
753
        self.build_tree(['tree1/subtree/file'])
 
754
        sub_tree.add('file')
 
755
        tree.commit('Initial commit')
 
756
        tree.bzrdir.destroy_workingtree()
 
757
        repo = self.make_repository('repo', shared=True,
 
758
            format='dirstate-with-subtree')
 
759
        repo.set_make_working_trees(False)
 
760
        tree.bzrdir.sprout('repo/tree2')
 
761
        self.failUnlessExists('repo/tree2/subtree')
 
762
        self.failIfExists('repo/tree2/subtree/file')
 
763
 
 
764
    def make_foo_bar_baz(self):
 
765
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
 
766
        bar = self.make_branch('foo/bar').bzrdir
 
767
        baz = self.make_branch('baz').bzrdir
 
768
        return foo, bar, baz
 
769
 
 
770
    def test_find_bzrdirs(self):
 
771
        foo, bar, baz = self.make_foo_bar_baz()
 
772
        transport = get_transport(self.get_url())
 
773
        self.assertEqualBzrdirs([baz, foo, bar],
 
774
                                bzrdir.BzrDir.find_bzrdirs(transport))
 
775
 
 
776
    def test_find_bzrdirs_list_current(self):
 
777
        def list_current(transport):
 
778
            return [s for s in transport.list_dir('') if s != 'baz']
 
779
 
 
780
        foo, bar, baz = self.make_foo_bar_baz()
 
781
        transport = get_transport(self.get_url())
 
782
        self.assertEqualBzrdirs([foo, bar],
 
783
                                bzrdir.BzrDir.find_bzrdirs(transport,
 
784
                                    list_current=list_current))
 
785
 
 
786
 
 
787
    def test_find_bzrdirs_evaluate(self):
 
788
        def evaluate(bzrdir):
 
789
            try:
 
790
                repo = bzrdir.open_repository()
 
791
            except NoRepositoryPresent:
 
792
                return True, bzrdir.root_transport.base
 
793
            else:
 
794
                return False, bzrdir.root_transport.base
 
795
 
 
796
        foo, bar, baz = self.make_foo_bar_baz()
 
797
        transport = get_transport(self.get_url())
 
798
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
799
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
 
800
                                                         evaluate=evaluate)))
 
801
 
 
802
    def assertEqualBzrdirs(self, first, second):
 
803
        first = list(first)
 
804
        second = list(second)
 
805
        self.assertEqual(len(first), len(second))
 
806
        for x, y in zip(first, second):
 
807
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
808
 
 
809
    def test_find_branches(self):
 
810
        root = self.make_repository('', shared=True)
 
811
        foo, bar, baz = self.make_foo_bar_baz()
 
812
        qux = self.make_bzrdir('foo/qux')
 
813
        transport = get_transport(self.get_url())
 
814
        branches = bzrdir.BzrDir.find_branches(transport)
 
815
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
816
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
817
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
818
 
 
819
        # ensure this works without a top-level repo
 
820
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
 
821
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
822
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
823
 
 
824
 
 
825
class TestMeta1DirFormat(TestCaseWithTransport):
 
826
    """Tests specific to the meta1 dir format."""
 
827
 
 
828
    def test_right_base_dirs(self):
 
829
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
830
        t = dir.transport
 
831
        branch_base = t.clone('branch').base
 
832
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
833
        self.assertEqual(branch_base,
 
834
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
835
        repository_base = t.clone('repository').base
 
836
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
837
        self.assertEqual(repository_base,
 
838
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
839
        checkout_base = t.clone('checkout').base
 
840
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
841
        self.assertEqual(checkout_base,
 
842
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
843
 
 
844
    def test_meta1dir_uses_lockdir(self):
 
845
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
846
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
847
        t = dir.transport
 
848
        self.assertIsDirectory('branch-lock', t)
 
849
 
 
850
    def test_comparison(self):
 
851
        """Equality and inequality behave properly.
 
852
 
 
853
        Metadirs should compare equal iff they have the same repo, branch and
 
854
        tree formats.
 
855
        """
 
856
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
857
        self.assertEqual(mydir, mydir)
 
858
        self.assertFalse(mydir != mydir)
 
859
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
860
        self.assertEqual(otherdir, mydir)
 
861
        self.assertFalse(otherdir != mydir)
 
862
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
863
        self.assertNotEqual(otherdir2, mydir)
 
864
        self.assertFalse(otherdir2 == mydir)
 
865
 
 
866
    def test_needs_conversion_different_working_tree(self):
 
867
        # meta1dirs need an conversion if any element is not the default.
 
868
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
869
        # test with 
 
870
        new_default = bzrdir.format_registry.make_bzrdir('dirstate')
 
871
        bzrdir.BzrDirFormat._set_default_format(new_default)
 
872
        try:
 
873
            tree = self.make_branch_and_tree('tree', format='knit')
 
874
            self.assertTrue(tree.bzrdir.needs_format_conversion())
 
875
        finally:
 
876
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
877
 
 
878
 
 
879
class TestFormat5(TestCaseWithTransport):
 
880
    """Tests specific to the version 5 bzrdir format."""
 
881
 
 
882
    def test_same_lockfiles_between_tree_repo_branch(self):
 
883
        # this checks that only a single lockfiles instance is created 
 
884
        # for format 5 objects
 
885
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
886
        def check_dir_components_use_same_lock(dir):
 
887
            ctrl_1 = dir.open_repository().control_files
 
888
            ctrl_2 = dir.open_branch().control_files
 
889
            ctrl_3 = dir.open_workingtree()._control_files
 
890
            self.assertTrue(ctrl_1 is ctrl_2)
 
891
            self.assertTrue(ctrl_2 is ctrl_3)
 
892
        check_dir_components_use_same_lock(dir)
 
893
        # and if we open it normally.
 
894
        dir = bzrdir.BzrDir.open(self.get_url())
 
895
        check_dir_components_use_same_lock(dir)
 
896
    
 
897
    def test_can_convert(self):
 
898
        # format 5 dirs are convertable
 
899
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
900
        self.assertTrue(dir.can_convert_format())
 
901
    
 
902
    def test_needs_conversion(self):
 
903
        # format 5 dirs need a conversion if they are not the default.
 
904
        # and they start of not the default.
 
905
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
906
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
 
907
        try:
 
908
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
909
            self.assertFalse(dir.needs_format_conversion())
 
910
        finally:
 
911
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
912
        self.assertTrue(dir.needs_format_conversion())
 
913
 
 
914
 
 
915
class TestFormat6(TestCaseWithTransport):
 
916
    """Tests specific to the version 6 bzrdir format."""
 
917
 
 
918
    def test_same_lockfiles_between_tree_repo_branch(self):
 
919
        # this checks that only a single lockfiles instance is created 
 
920
        # for format 6 objects
 
921
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
922
        def check_dir_components_use_same_lock(dir):
 
923
            ctrl_1 = dir.open_repository().control_files
 
924
            ctrl_2 = dir.open_branch().control_files
 
925
            ctrl_3 = dir.open_workingtree()._control_files
 
926
            self.assertTrue(ctrl_1 is ctrl_2)
 
927
            self.assertTrue(ctrl_2 is ctrl_3)
 
928
        check_dir_components_use_same_lock(dir)
 
929
        # and if we open it normally.
 
930
        dir = bzrdir.BzrDir.open(self.get_url())
 
931
        check_dir_components_use_same_lock(dir)
 
932
    
 
933
    def test_can_convert(self):
 
934
        # format 6 dirs are convertable
 
935
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
936
        self.assertTrue(dir.can_convert_format())
 
937
    
 
938
    def test_needs_conversion(self):
 
939
        # format 6 dirs need an conversion if they are not the default.
 
940
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
941
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
 
942
        try:
 
943
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
944
            self.assertTrue(dir.needs_format_conversion())
 
945
        finally:
 
946
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
947
 
 
948
 
 
949
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
950
    """A non .bzr based control directory."""
 
951
 
 
952
    def __init__(self, transport, format):
 
953
        self._format = format
 
954
        self.root_transport = transport
 
955
        self.transport = transport.clone('.not')
 
956
 
 
957
 
 
958
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
959
    """A test class representing any non-.bzr based disk format."""
 
960
 
 
961
    def initialize_on_transport(self, transport):
 
962
        """Initialize a new .not dir in the base directory of a Transport."""
 
963
        transport.mkdir('.not')
 
964
        return self.open(transport)
 
965
 
 
966
    def open(self, transport):
 
967
        """Open this directory."""
 
968
        return NotBzrDir(transport, self)
 
969
 
 
970
    @classmethod
 
971
    def _known_formats(self):
 
972
        return set([NotBzrDirFormat()])
 
973
 
 
974
    @classmethod
 
975
    def probe_transport(self, transport):
 
976
        """Our format is present if the transport ends in '.not/'."""
 
977
        if transport.has('.not'):
 
978
            return NotBzrDirFormat()
 
979
 
 
980
 
 
981
class TestNotBzrDir(TestCaseWithTransport):
 
982
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
983
    
 
984
    If/when one of these is in the core, we can let the implementation tests
 
985
    verify this works.
 
986
    """
 
987
 
 
988
    def test_create_and_find_format(self):
 
989
        # create a .notbzr dir 
 
990
        format = NotBzrDirFormat()
 
991
        dir = format.initialize(self.get_url())
 
992
        self.assertIsInstance(dir, NotBzrDir)
 
993
        # now probe for it.
 
994
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
995
        try:
 
996
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
997
                get_transport(self.get_url()))
 
998
            self.assertIsInstance(found, NotBzrDirFormat)
 
999
        finally:
 
1000
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
1001
 
 
1002
    def test_included_in_known_formats(self):
 
1003
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
1004
        try:
 
1005
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
1006
            for format in formats:
 
1007
                if isinstance(format, NotBzrDirFormat):
 
1008
                    return
 
1009
            self.fail("No NotBzrDirFormat in %s" % formats)
 
1010
        finally:
 
1011
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
1012
 
 
1013
 
 
1014
class NonLocalTests(TestCaseWithTransport):
 
1015
    """Tests for bzrdir static behaviour on non local paths."""
 
1016
 
 
1017
    def setUp(self):
 
1018
        super(NonLocalTests, self).setUp()
 
1019
        self.vfs_transport_factory = MemoryServer
 
1020
    
 
1021
    def test_create_branch_convenience(self):
 
1022
        # outside a repo the default convenience output is a repo+branch_tree
 
1023
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1024
        branch = bzrdir.BzrDir.create_branch_convenience(
 
1025
            self.get_url('foo'), format=format)
 
1026
        self.assertRaises(errors.NoWorkingTree,
 
1027
                          branch.bzrdir.open_workingtree)
 
1028
        branch.bzrdir.open_repository()
 
1029
 
 
1030
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
1031
        # outside a repo the default convenience output is a repo+branch_tree
 
1032
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1033
        self.assertRaises(errors.NotLocalUrl,
 
1034
            bzrdir.BzrDir.create_branch_convenience,
 
1035
            self.get_url('foo'),
 
1036
            force_new_tree=True,
 
1037
            format=format)
 
1038
        t = get_transport(self.get_url('.'))
 
1039
        self.assertFalse(t.has('foo'))
 
1040
 
 
1041
    def test_clone(self):
 
1042
        # clone into a nonlocal path works
 
1043
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1044
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
1045
                                                         format=format)
 
1046
        branch.bzrdir.open_workingtree()
 
1047
        result = branch.bzrdir.clone(self.get_url('remote'))
 
1048
        self.assertRaises(errors.NoWorkingTree,
 
1049
                          result.open_workingtree)
 
1050
        result.open_branch()
 
1051
        result.open_repository()
 
1052
 
 
1053
    def test_checkout_metadir(self):
 
1054
        # checkout_metadir has reasonable working tree format even when no
 
1055
        # working tree is present
 
1056
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
1057
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
1058
        checkout_format = my_bzrdir.checkout_metadir()
 
1059
        self.assertIsInstance(checkout_format.workingtree_format,
 
1060
                              workingtree.WorkingTreeFormat3)
 
1061
 
 
1062
 
 
1063
class TestHTTPRedirectionLoop(object):
 
1064
    """Test redirection loop between two http servers.
 
1065
 
 
1066
    This MUST be used by daughter classes that also inherit from
 
1067
    TestCaseWithTwoWebservers.
 
1068
 
 
1069
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
1070
    test framework will try to create an instance which cannot
 
1071
    run, its implementation being incomplete. 
 
1072
    """
 
1073
 
 
1074
    # Should be defined by daughter classes to ensure redirection
 
1075
    # still use the same transport implementation (not currently
 
1076
    # enforced as it's a bit tricky to get right (see the FIXME
 
1077
    # in BzrDir.open_from_transport for the unique use case so
 
1078
    # far)
 
1079
    _qualifier = None
 
1080
 
 
1081
    def create_transport_readonly_server(self):
 
1082
        return HTTPServerRedirecting()
 
1083
 
 
1084
    def create_transport_secondary_server(self):
 
1085
        return HTTPServerRedirecting()
 
1086
 
 
1087
    def setUp(self):
 
1088
        # Both servers redirect to each server creating a loop
 
1089
        super(TestHTTPRedirectionLoop, self).setUp()
 
1090
        # The redirections will point to the new server
 
1091
        self.new_server = self.get_readonly_server()
 
1092
        # The requests to the old server will be redirected
 
1093
        self.old_server = self.get_secondary_server()
 
1094
        # Configure the redirections
 
1095
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1096
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
1097
 
 
1098
    def _qualified_url(self, host, port):
 
1099
        return 'http+%s://%s:%s' % (self._qualifier, host, port)
 
1100
 
 
1101
    def test_loop(self):
 
1102
        # Starting from either server should loop
 
1103
        old_url = self._qualified_url(self.old_server.host, 
 
1104
                                      self.old_server.port)
 
1105
        oldt = self._transport(old_url)
 
1106
        self.assertRaises(errors.NotBranchError,
 
1107
                          bzrdir.BzrDir.open_from_transport, oldt)
 
1108
        new_url = self._qualified_url(self.new_server.host, 
 
1109
                                      self.new_server.port)
 
1110
        newt = self._transport(new_url)
 
1111
        self.assertRaises(errors.NotBranchError,
 
1112
                          bzrdir.BzrDir.open_from_transport, newt)
 
1113
 
 
1114
 
 
1115
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
 
1116
                                  TestCaseWithTwoWebservers):
 
1117
    """Tests redirections for urllib implementation"""
 
1118
 
 
1119
    _qualifier = 'urllib'
 
1120
    _transport = HttpTransport_urllib
 
1121
 
 
1122
 
 
1123
 
 
1124
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
1125
                                  TestHTTPRedirectionLoop,
 
1126
                                  TestCaseWithTwoWebservers):
 
1127
    """Tests redirections for pycurl implementation"""
 
1128
 
 
1129
    _qualifier = 'pycurl'
 
1130
 
 
1131
 
 
1132
class TestDotBzrHidden(TestCaseWithTransport):
 
1133
 
 
1134
    ls = ['ls']
 
1135
    if sys.platform == 'win32':
 
1136
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
1137
 
 
1138
    def get_ls(self):
 
1139
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
1140
            stderr=subprocess.PIPE)
 
1141
        out, err = f.communicate()
 
1142
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
1143
                         % (self.ls, err))
 
1144
        return out.splitlines()
 
1145
 
 
1146
    def test_dot_bzr_hidden(self):
 
1147
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1148
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1149
        b = bzrdir.BzrDir.create('.')
 
1150
        self.build_tree(['a'])
 
1151
        self.assertEquals(['a'], self.get_ls())
 
1152
 
 
1153
    def test_dot_bzr_hidden_with_url(self):
 
1154
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1155
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1156
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
1157
        self.build_tree(['a'])
 
1158
        self.assertEquals(['a'], self.get_ls())
 
1159
 
 
1160
 
 
1161
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
 
1162
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
 
1163
 
 
1164
    def _open(self, transport):
 
1165
        return _TestBzrDir(transport, self)
 
1166
 
 
1167
 
 
1168
class _TestBzrDir(bzrdir.BzrDirMeta1):
 
1169
    """Test BzrDir implementation for TestBzrDirSprout.
 
1170
    
 
1171
    When created a _TestBzrDir already has repository and a branch.  The branch
 
1172
    is a test double as well.
 
1173
    """
 
1174
 
 
1175
    def __init__(self, *args, **kwargs):
 
1176
        super(_TestBzrDir, self).__init__(*args, **kwargs)
 
1177
        self.test_branch = _TestBranch()
 
1178
        self.test_branch.repository = self.create_repository()
 
1179
 
 
1180
    def open_branch(self, unsupported=False):
 
1181
        return self.test_branch
 
1182
 
 
1183
    def cloning_metadir(self, require_stacking=False):
 
1184
        return _TestBzrDirFormat()
 
1185
 
 
1186
 
 
1187
class _TestBranch(bzrlib.branch.Branch):
 
1188
    """Test Branch implementation for TestBzrDirSprout."""
 
1189
 
 
1190
    def __init__(self, *args, **kwargs):
 
1191
        super(_TestBranch, self).__init__(*args, **kwargs)
 
1192
        self.calls = []
 
1193
        self._parent = None
 
1194
 
 
1195
    def sprout(self, *args, **kwargs):
 
1196
        self.calls.append('sprout')
 
1197
        return _TestBranch()
 
1198
 
 
1199
    def copy_content_into(self, destination, revision_id=None):
 
1200
        self.calls.append('copy_content_into')
 
1201
 
 
1202
    def get_parent(self):
 
1203
        return self._parent
 
1204
 
 
1205
    def set_parent(self, parent):
 
1206
        self._parent = parent
 
1207
 
 
1208
 
 
1209
class TestBzrDirSprout(TestCaseWithMemoryTransport):
 
1210
 
 
1211
    def test_sprout_uses_branch_sprout(self):
 
1212
        """BzrDir.sprout calls Branch.sprout.
 
1213
 
 
1214
        Usually, BzrDir.sprout should delegate to the branch's sprout method
 
1215
        for part of the work.  This allows the source branch to control the
 
1216
        choice of format for the new branch.
 
1217
        
 
1218
        There are exceptions, but this tests avoids them:
 
1219
          - if there's no branch in the source bzrdir,
 
1220
          - or if the stacking has been requested and the format needs to be
 
1221
            overridden to satisfy that.
 
1222
        """
 
1223
        # Make an instrumented bzrdir.
 
1224
        t = self.get_transport('source')
 
1225
        t.ensure_base()
 
1226
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
 
1227
        # The instrumented bzrdir has a test_branch attribute that logs calls
 
1228
        # made to the branch contained in that bzrdir.  Initially the test
 
1229
        # branch exists but no calls have been made to it.
 
1230
        self.assertEqual([], source_bzrdir.test_branch.calls)
 
1231
 
 
1232
        # Sprout the bzrdir
 
1233
        target_url = self.get_url('target')
 
1234
        result = source_bzrdir.sprout(target_url, recurse='no')
 
1235
 
 
1236
        # The bzrdir called the branch's sprout method.
 
1237
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
 
1238
 
 
1239
    def test_sprout_parent(self):
 
1240
        grandparent_tree = self.make_branch('grandparent')
 
1241
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
 
1242
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
 
1243
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')