1
# Copyright (C) 2006-2013, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
35
revision as _mod_revision,
37
transport as _mod_transport,
49
import breezy.bzr.branch
50
from ..bzr.fullhistory import BzrBranchFormat5
51
from ..errors import (
53
NoColocatedBranchSupport,
55
UnsupportedFormatError,
59
TestCaseWithMemoryTransport,
60
TestCaseWithTransport,
67
from ..transport import (
71
from ..transport.http._urllib import HttpTransport_urllib
72
from ..transport.nosmart import NoSmartTransportDecorator
73
from ..transport.readonly import ReadonlyTransportDecorator
74
from ..bzr import knitrepo, knitpack_repo
77
class TestDefaultFormat(TestCase):
79
def test_get_set_default_format(self):
80
old_format = bzrdir.BzrDirFormat.get_default_format()
81
# default is BzrDirMetaFormat1
82
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
83
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
84
# creating a bzr dir should now create an instrumented dir.
86
result = bzrdir.BzrDir.create('memory:///')
87
self.assertIsInstance(result, SampleBzrDir)
89
controldir.ControlDirFormat._set_default_format(old_format)
90
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
93
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
94
"""A deprecated bzr dir format."""
97
class TestFormatRegistry(TestCase):
99
def make_format_registry(self):
100
my_format_registry = controldir.ControlDirFormatRegistry()
101
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
102
'Some format. Slower and unawesome and deprecated.',
104
my_format_registry.register_lazy('lazy', 'breezy.tests.test_bzrdir',
105
'DeprecatedBzrDirFormat', 'Format registered lazily',
107
bzr.register_metadir(my_format_registry, 'knit',
108
'breezy.bzr.knitrepo.RepositoryFormatKnit1',
109
'Format using knits',
111
my_format_registry.set_default('knit')
112
bzr.register_metadir(my_format_registry,
114
'breezy.bzr.knitrepo.RepositoryFormatKnit3',
115
'Experimental successor to knit. Use at your own risk.',
116
branch_format='breezy.bzr.branch.BzrBranchFormat6',
118
bzr.register_metadir(my_format_registry,
120
'breezy.bzr.knitrepo.RepositoryFormatKnit3',
121
'Experimental successor to knit. Use at your own risk.',
122
branch_format='breezy.bzr.branch.BzrBranchFormat6', hidden=True)
123
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
124
'Old format. Slower and does not support things. ', hidden=True)
125
my_format_registry.register_lazy('hiddenlazy', 'breezy.tests.test_bzrdir',
126
'DeprecatedBzrDirFormat', 'Format registered lazily',
127
deprecated=True, hidden=True)
128
return my_format_registry
130
def test_format_registry(self):
131
my_format_registry = self.make_format_registry()
132
my_bzrdir = my_format_registry.make_controldir('lazy')
133
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
134
my_bzrdir = my_format_registry.make_controldir('deprecated')
135
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
136
my_bzrdir = my_format_registry.make_controldir('default')
137
self.assertIsInstance(my_bzrdir.repository_format,
138
knitrepo.RepositoryFormatKnit1)
139
my_bzrdir = my_format_registry.make_controldir('knit')
140
self.assertIsInstance(my_bzrdir.repository_format,
141
knitrepo.RepositoryFormatKnit1)
142
my_bzrdir = my_format_registry.make_controldir('branch6')
143
self.assertIsInstance(my_bzrdir.get_branch_format(),
144
breezy.bzr.branch.BzrBranchFormat6)
146
def test_get_help(self):
147
my_format_registry = self.make_format_registry()
148
self.assertEqual('Format registered lazily',
149
my_format_registry.get_help('lazy'))
150
self.assertEqual('Format using knits',
151
my_format_registry.get_help('knit'))
152
self.assertEqual('Format using knits',
153
my_format_registry.get_help('default'))
154
self.assertEqual('Some format. Slower and unawesome and deprecated.',
155
my_format_registry.get_help('deprecated'))
157
def test_help_topic(self):
158
topics = help_topics.HelpTopicRegistry()
159
registry = self.make_format_registry()
160
topics.register('current-formats', registry.help_topic,
162
topics.register('other-formats', registry.help_topic,
164
new = topics.get_detail('current-formats')
165
rest = topics.get_detail('other-formats')
166
experimental, deprecated = rest.split('Deprecated formats')
167
self.assertContainsRe(new, 'formats-help')
168
self.assertContainsRe(new,
169
':knit:\n \\(native\\) \\(default\\) Format using knits\n')
170
self.assertContainsRe(experimental,
171
':branch6:\n \\(native\\) Experimental successor to knit')
172
self.assertContainsRe(deprecated,
173
':lazy:\n \\(native\\) Format registered lazily\n')
174
self.assertNotContainsRe(new, 'hidden')
176
def test_set_default_repository(self):
177
default_factory = controldir.format_registry.get('default')
178
old_default = [k for k, v in controldir.format_registry.iteritems()
179
if v == default_factory and k != 'default'][0]
180
controldir.format_registry.set_default_repository('dirstate-with-subtree')
182
self.assertIs(controldir.format_registry.get('dirstate-with-subtree'),
183
controldir.format_registry.get('default'))
185
repository.format_registry.get_default().__class__,
186
knitrepo.RepositoryFormatKnit3)
188
controldir.format_registry.set_default_repository(old_default)
190
def test_aliases(self):
191
a_registry = controldir.ControlDirFormatRegistry()
192
a_registry.register('deprecated', DeprecatedBzrDirFormat,
193
'Old format. Slower and does not support stuff',
195
a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
196
'Old format. Slower and does not support stuff',
197
deprecated=True, alias=True)
198
self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
201
class SampleBranch(breezy.branch.Branch):
202
"""A dummy branch for guess what, dummy use."""
204
def __init__(self, dir):
205
self.controldir = dir
208
class SampleRepository(breezy.repository.Repository):
211
def __init__(self, dir):
212
self.controldir = dir
215
class SampleBzrDir(bzrdir.BzrDir):
216
"""A sample BzrDir implementation to allow testing static methods."""
218
def create_repository(self, shared=False):
219
"""See ControlDir.create_repository."""
220
return "A repository"
222
def open_repository(self):
223
"""See ControlDir.open_repository."""
224
return SampleRepository(self)
226
def create_branch(self, name=None):
227
"""See ControlDir.create_branch."""
229
raise NoColocatedBranchSupport(self)
230
return SampleBranch(self)
232
def create_workingtree(self):
233
"""See ControlDir.create_workingtree."""
237
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
240
this format is initializable, unsupported to aid in testing the
241
open and open_downlevel routines.
244
def get_format_string(self):
245
"""See BzrDirFormat.get_format_string()."""
246
return "Sample .bzr dir format."
248
def initialize_on_transport(self, t):
249
"""Create a bzr dir."""
251
t.put_bytes('.bzr/branch-format', self.get_format_string())
252
return SampleBzrDir(t, self)
254
def is_supported(self):
257
def open(self, transport, _found=None):
258
return "opened branch."
261
def from_string(cls, format_string):
265
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
268
def get_format_string():
269
return "Test format 1"
272
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
275
def get_format_string():
276
return "Test format 2"
279
class TestBzrDirFormat(TestCaseWithTransport):
280
"""Tests for the BzrDirFormat facility."""
282
def test_find_format(self):
283
# is the right format object found for a branch?
284
# create a branch with a few known format objects.
285
bzr.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
287
self.addCleanup(bzr.BzrProber.formats.remove,
288
BzrDirFormatTest1.get_format_string())
289
bzr.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
291
self.addCleanup(bzr.BzrProber.formats.remove,
292
BzrDirFormatTest2.get_format_string())
293
t = self.get_transport()
294
self.build_tree(["foo/", "bar/"], transport=t)
295
def check_format(format, url):
296
format.initialize(url)
297
t = _mod_transport.get_transport_from_path(url)
298
found_format = bzrdir.BzrDirFormat.find_format(t)
299
self.assertIsInstance(found_format, format.__class__)
300
check_format(BzrDirFormatTest1(), "foo")
301
check_format(BzrDirFormatTest2(), "bar")
303
def test_find_format_nothing_there(self):
304
self.assertRaises(NotBranchError,
305
bzrdir.BzrDirFormat.find_format,
306
_mod_transport.get_transport_from_path('.'))
308
def test_find_format_unknown_format(self):
309
t = self.get_transport()
311
t.put_bytes('.bzr/branch-format', '')
312
self.assertRaises(UnknownFormatError,
313
bzrdir.BzrDirFormat.find_format,
314
_mod_transport.get_transport_from_path('.'))
316
def test_register_unregister_format(self):
317
format = SampleBzrDirFormat()
320
format.initialize(url)
321
# register a format for it.
322
bzr.BzrProber.formats.register(format.get_format_string(), format)
323
# which bzrdir.Open will refuse (not supported)
324
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
325
# which bzrdir.open_containing will refuse (not supported)
326
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
327
# but open_downlevel will work
328
t = _mod_transport.get_transport_from_url(url)
329
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
330
# unregister the format
331
bzr.BzrProber.formats.remove(format.get_format_string())
332
# now open_downlevel should fail too.
333
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
335
def test_create_branch_and_repo_uses_default(self):
336
format = SampleBzrDirFormat()
337
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
339
self.assertTrue(isinstance(branch, SampleBranch))
341
def test_create_branch_and_repo_under_shared(self):
342
# creating a branch and repo in a shared repo uses the
344
format = controldir.format_registry.make_controldir('knit')
345
self.make_repository('.', shared=True, format=format)
346
branch = bzrdir.BzrDir.create_branch_and_repo(
347
self.get_url('child'), format=format)
348
self.assertRaises(errors.NoRepositoryPresent,
349
branch.controldir.open_repository)
351
def test_create_branch_and_repo_under_shared_force_new(self):
352
# creating a branch and repo in a shared repo can be forced to
354
format = controldir.format_registry.make_controldir('knit')
355
self.make_repository('.', shared=True, format=format)
356
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
359
branch.controldir.open_repository()
361
def test_create_standalone_working_tree(self):
362
format = SampleBzrDirFormat()
363
# note this is deliberately readonly, as this failure should
364
# occur before any writes.
365
self.assertRaises(errors.NotLocalUrl,
366
bzrdir.BzrDir.create_standalone_workingtree,
367
self.get_readonly_url(), format=format)
368
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
370
self.assertEqual('A tree', tree)
372
def test_create_standalone_working_tree_under_shared_repo(self):
373
# create standalone working tree always makes a repo.
374
format = controldir.format_registry.make_controldir('knit')
375
self.make_repository('.', shared=True, format=format)
376
# note this is deliberately readonly, as this failure should
377
# occur before any writes.
378
self.assertRaises(errors.NotLocalUrl,
379
bzrdir.BzrDir.create_standalone_workingtree,
380
self.get_readonly_url('child'), format=format)
381
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
383
tree.controldir.open_repository()
385
def test_create_branch_convenience(self):
386
# outside a repo the default convenience output is a repo+branch_tree
387
format = controldir.format_registry.make_controldir('knit')
388
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
389
branch.controldir.open_workingtree()
390
branch.controldir.open_repository()
392
def test_create_branch_convenience_possible_transports(self):
393
"""Check that the optional 'possible_transports' is recognized"""
394
format = controldir.format_registry.make_controldir('knit')
395
t = self.get_transport()
396
branch = bzrdir.BzrDir.create_branch_convenience(
397
'.', format=format, possible_transports=[t])
398
branch.controldir.open_workingtree()
399
branch.controldir.open_repository()
401
def test_create_branch_convenience_root(self):
402
"""Creating a branch at the root of a fs should work."""
403
self.vfs_transport_factory = memory.MemoryServer
404
# outside a repo the default convenience output is a repo+branch_tree
405
format = controldir.format_registry.make_controldir('knit')
406
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
408
self.assertRaises(errors.NoWorkingTree,
409
branch.controldir.open_workingtree)
410
branch.controldir.open_repository()
412
def test_create_branch_convenience_under_shared_repo(self):
413
# inside a repo the default convenience output is a branch+ follow the
415
format = controldir.format_registry.make_controldir('knit')
416
self.make_repository('.', shared=True, format=format)
417
branch = bzrdir.BzrDir.create_branch_convenience('child',
419
branch.controldir.open_workingtree()
420
self.assertRaises(errors.NoRepositoryPresent,
421
branch.controldir.open_repository)
423
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
424
# inside a repo the default convenience output is a branch+ follow the
425
# repo tree policy but we can override that
426
format = controldir.format_registry.make_controldir('knit')
427
self.make_repository('.', shared=True, format=format)
428
branch = bzrdir.BzrDir.create_branch_convenience('child',
429
force_new_tree=False, format=format)
430
self.assertRaises(errors.NoWorkingTree,
431
branch.controldir.open_workingtree)
432
self.assertRaises(errors.NoRepositoryPresent,
433
branch.controldir.open_repository)
435
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
436
# inside a repo the default convenience output is a branch+ follow the
438
format = controldir.format_registry.make_controldir('knit')
439
repo = self.make_repository('.', shared=True, format=format)
440
repo.set_make_working_trees(False)
441
branch = bzrdir.BzrDir.create_branch_convenience('child',
443
self.assertRaises(errors.NoWorkingTree,
444
branch.controldir.open_workingtree)
445
self.assertRaises(errors.NoRepositoryPresent,
446
branch.controldir.open_repository)
448
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
449
# inside a repo the default convenience output is a branch+ follow the
450
# repo tree policy but we can override that
451
format = controldir.format_registry.make_controldir('knit')
452
repo = self.make_repository('.', shared=True, format=format)
453
repo.set_make_working_trees(False)
454
branch = bzrdir.BzrDir.create_branch_convenience('child',
455
force_new_tree=True, format=format)
456
branch.controldir.open_workingtree()
457
self.assertRaises(errors.NoRepositoryPresent,
458
branch.controldir.open_repository)
460
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
461
# inside a repo the default convenience output is overridable to give
463
format = controldir.format_registry.make_controldir('knit')
464
self.make_repository('.', shared=True, format=format)
465
branch = bzrdir.BzrDir.create_branch_convenience('child',
466
force_new_repo=True, format=format)
467
branch.controldir.open_repository()
468
branch.controldir.open_workingtree()
471
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
473
def test_acquire_repository_standalone(self):
474
"""The default acquisition policy should create a standalone branch."""
475
my_bzrdir = self.make_controldir('.')
476
repo_policy = my_bzrdir.determine_repository_policy()
477
repo, is_new = repo_policy.acquire_repository()
478
self.assertEqual(repo.controldir.root_transport.base,
479
my_bzrdir.root_transport.base)
480
self.assertFalse(repo.is_shared())
482
def test_determine_stacking_policy(self):
483
parent_bzrdir = self.make_controldir('.')
484
child_bzrdir = self.make_controldir('child')
485
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
486
repo_policy = child_bzrdir.determine_repository_policy()
487
self.assertEqual('http://example.org', repo_policy._stack_on)
489
def test_determine_stacking_policy_relative(self):
490
parent_bzrdir = self.make_controldir('.')
491
child_bzrdir = self.make_controldir('child')
492
parent_bzrdir.get_config().set_default_stack_on('child2')
493
repo_policy = child_bzrdir.determine_repository_policy()
494
self.assertEqual('child2', repo_policy._stack_on)
495
self.assertEqual(parent_bzrdir.root_transport.base,
496
repo_policy._stack_on_pwd)
498
def prepare_default_stacking(self, child_format='1.6'):
499
parent_bzrdir = self.make_controldir('.')
500
child_branch = self.make_branch('child', format=child_format)
501
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
502
new_child_transport = parent_bzrdir.transport.clone('child2')
503
return child_branch, new_child_transport
505
def test_clone_on_transport_obeys_stacking_policy(self):
506
child_branch, new_child_transport = self.prepare_default_stacking()
507
new_child = child_branch.controldir.clone_on_transport(new_child_transport)
508
self.assertEqual(child_branch.base,
509
new_child.open_branch().get_stacked_on_url())
511
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
512
# Make stackable source branch with an unstackable repo format.
513
source_bzrdir = self.make_controldir('source')
514
knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
515
source_branch = breezy.bzr.branch.BzrBranchFormat7().initialize(
517
# Make a directory with a default stacking policy
518
parent_bzrdir = self.make_controldir('parent')
519
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
520
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
521
# Clone source into directory
522
target = source_bzrdir.clone(self.get_url('parent/target'))
524
def test_format_initialize_on_transport_ex_stacked_on(self):
525
# trunk is a stackable format. Note that its in the same server area
526
# which is what launchpad does, but not sufficient to exercise the
528
trunk = self.make_branch('trunk', format='1.9')
529
t = self.get_transport('stacked')
530
old_fmt = controldir.format_registry.make_controldir('pack-0.92')
531
repo_name = old_fmt.repository_format.network_name()
532
# Should end up with a 1.9 format (stackable)
533
repo, control, require_stacking, repo_policy = \
534
old_fmt.initialize_on_transport_ex(t,
535
repo_format_name=repo_name, stacked_on='../trunk',
538
# Repositories are open write-locked
539
self.assertTrue(repo.is_write_locked())
540
self.addCleanup(repo.unlock)
542
repo = control.open_repository()
543
self.assertIsInstance(control, bzrdir.BzrDir)
544
opened = bzrdir.BzrDir.open(t.base)
545
if not isinstance(old_fmt, remote.RemoteBzrDirFormat):
546
self.assertEqual(control._format.network_name(),
547
old_fmt.network_name())
548
self.assertEqual(control._format.network_name(),
549
opened._format.network_name())
550
self.assertEqual(control.__class__, opened.__class__)
551
self.assertLength(1, repo._fallback_repositories)
553
def test_sprout_obeys_stacking_policy(self):
554
child_branch, new_child_transport = self.prepare_default_stacking()
555
new_child = child_branch.controldir.sprout(new_child_transport.base)
556
self.assertEqual(child_branch.base,
557
new_child.open_branch().get_stacked_on_url())
559
def test_clone_ignores_policy_for_unsupported_formats(self):
560
child_branch, new_child_transport = self.prepare_default_stacking(
561
child_format='pack-0.92')
562
new_child = child_branch.controldir.clone_on_transport(new_child_transport)
563
self.assertRaises(branch.UnstackableBranchFormat,
564
new_child.open_branch().get_stacked_on_url)
566
def test_sprout_ignores_policy_for_unsupported_formats(self):
567
child_branch, new_child_transport = self.prepare_default_stacking(
568
child_format='pack-0.92')
569
new_child = child_branch.controldir.sprout(new_child_transport.base)
570
self.assertRaises(branch.UnstackableBranchFormat,
571
new_child.open_branch().get_stacked_on_url)
573
def test_sprout_upgrades_format_if_stacked_specified(self):
574
child_branch, new_child_transport = self.prepare_default_stacking(
575
child_format='pack-0.92')
576
new_child = child_branch.controldir.sprout(new_child_transport.base,
578
self.assertEqual(child_branch.controldir.root_transport.base,
579
new_child.open_branch().get_stacked_on_url())
580
repo = new_child.open_repository()
581
self.assertTrue(repo._format.supports_external_lookups)
582
self.assertFalse(repo.supports_rich_root())
584
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
585
child_branch, new_child_transport = self.prepare_default_stacking(
586
child_format='pack-0.92')
587
new_child = child_branch.controldir.clone_on_transport(new_child_transport,
588
stacked_on=child_branch.controldir.root_transport.base)
589
self.assertEqual(child_branch.controldir.root_transport.base,
590
new_child.open_branch().get_stacked_on_url())
591
repo = new_child.open_repository()
592
self.assertTrue(repo._format.supports_external_lookups)
593
self.assertFalse(repo.supports_rich_root())
595
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
596
child_branch, new_child_transport = self.prepare_default_stacking(
597
child_format='rich-root-pack')
598
new_child = child_branch.controldir.sprout(new_child_transport.base,
600
repo = new_child.open_repository()
601
self.assertTrue(repo._format.supports_external_lookups)
602
self.assertTrue(repo.supports_rich_root())
604
def test_add_fallback_repo_handles_absolute_urls(self):
605
stack_on = self.make_branch('stack_on', format='1.6')
606
repo = self.make_repository('repo', format='1.6')
607
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
608
policy._add_fallback(repo)
610
def test_add_fallback_repo_handles_relative_urls(self):
611
stack_on = self.make_branch('stack_on', format='1.6')
612
repo = self.make_repository('repo', format='1.6')
613
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
614
policy._add_fallback(repo)
616
def test_configure_relative_branch_stacking_url(self):
617
stack_on = self.make_branch('stack_on', format='1.6')
618
stacked = self.make_branch('stack_on/stacked', format='1.6')
619
policy = bzrdir.UseExistingRepository(stacked.repository,
621
policy.configure_branch(stacked)
622
self.assertEqual('..', stacked.get_stacked_on_url())
624
def test_relative_branch_stacking_to_absolute(self):
625
stack_on = self.make_branch('stack_on', format='1.6')
626
stacked = self.make_branch('stack_on/stacked', format='1.6')
627
policy = bzrdir.UseExistingRepository(stacked.repository,
628
'.', self.get_readonly_url('stack_on'))
629
policy.configure_branch(stacked)
630
self.assertEqual(self.get_readonly_url('stack_on'),
631
stacked.get_stacked_on_url())
634
class ChrootedTests(TestCaseWithTransport):
635
"""A support class that provides readonly urls outside the local namespace.
637
This is done by checking if self.transport_server is a MemoryServer. if it
638
is then we are chrooted already, if it is not then an HttpServer is used
643
super(ChrootedTests, self).setUp()
644
if not self.vfs_transport_factory == memory.MemoryServer:
645
self.transport_readonly_server = http_server.HttpServer
647
def local_branch_path(self, branch):
648
return os.path.realpath(urlutils.local_path_from_url(branch.base))
650
def test_open_containing(self):
651
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
652
self.get_readonly_url(''))
653
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
654
self.get_readonly_url('g/p/q'))
655
control = bzrdir.BzrDir.create(self.get_url())
656
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
657
self.assertEqual('', relpath)
658
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
659
self.assertEqual('g/p/q', relpath)
661
def test_open_containing_tree_branch_or_repository_empty(self):
662
self.assertRaises(errors.NotBranchError,
663
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
664
self.get_readonly_url(''))
666
def test_open_containing_tree_branch_or_repository_all(self):
667
self.make_branch_and_tree('topdir')
668
tree, branch, repo, relpath = \
669
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
671
self.assertEqual(os.path.realpath('topdir'),
672
os.path.realpath(tree.basedir))
673
self.assertEqual(os.path.realpath('topdir'),
674
self.local_branch_path(branch))
676
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
677
repo.controldir.transport.local_abspath('repository'))
678
self.assertEqual(relpath, 'foo')
680
def test_open_containing_tree_branch_or_repository_no_tree(self):
681
self.make_branch('branch')
682
tree, branch, repo, relpath = \
683
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
685
self.assertEqual(tree, None)
686
self.assertEqual(os.path.realpath('branch'),
687
self.local_branch_path(branch))
689
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
690
repo.controldir.transport.local_abspath('repository'))
691
self.assertEqual(relpath, 'foo')
693
def test_open_containing_tree_branch_or_repository_repo(self):
694
self.make_repository('repo')
695
tree, branch, repo, relpath = \
696
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
698
self.assertEqual(tree, None)
699
self.assertEqual(branch, None)
701
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
702
repo.controldir.transport.local_abspath('repository'))
703
self.assertEqual(relpath, '')
705
def test_open_containing_tree_branch_or_repository_shared_repo(self):
706
self.make_repository('shared', shared=True)
707
bzrdir.BzrDir.create_branch_convenience('shared/branch',
708
force_new_tree=False)
709
tree, branch, repo, relpath = \
710
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
712
self.assertEqual(tree, None)
713
self.assertEqual(os.path.realpath('shared/branch'),
714
self.local_branch_path(branch))
716
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
717
repo.controldir.transport.local_abspath('repository'))
718
self.assertEqual(relpath, '')
720
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
721
self.make_branch_and_tree('foo')
722
self.build_tree(['foo/bar/'])
723
tree, branch, repo, relpath = \
724
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
726
self.assertEqual(os.path.realpath('foo'),
727
os.path.realpath(tree.basedir))
728
self.assertEqual(os.path.realpath('foo'),
729
self.local_branch_path(branch))
731
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
732
repo.controldir.transport.local_abspath('repository'))
733
self.assertEqual(relpath, 'bar')
735
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
736
self.make_repository('bar')
737
self.build_tree(['bar/baz/'])
738
tree, branch, repo, relpath = \
739
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
741
self.assertEqual(tree, None)
742
self.assertEqual(branch, None)
744
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
745
repo.controldir.transport.local_abspath('repository'))
746
self.assertEqual(relpath, 'baz')
748
def test_open_containing_from_transport(self):
749
self.assertRaises(NotBranchError,
750
bzrdir.BzrDir.open_containing_from_transport,
751
_mod_transport.get_transport_from_url(self.get_readonly_url('')))
752
self.assertRaises(NotBranchError,
753
bzrdir.BzrDir.open_containing_from_transport,
754
_mod_transport.get_transport_from_url(
755
self.get_readonly_url('g/p/q')))
756
control = bzrdir.BzrDir.create(self.get_url())
757
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
758
_mod_transport.get_transport_from_url(
759
self.get_readonly_url('')))
760
self.assertEqual('', relpath)
761
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
762
_mod_transport.get_transport_from_url(
763
self.get_readonly_url('g/p/q')))
764
self.assertEqual('g/p/q', relpath)
766
def test_open_containing_tree_or_branch(self):
767
self.make_branch_and_tree('topdir')
768
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
770
self.assertEqual(os.path.realpath('topdir'),
771
os.path.realpath(tree.basedir))
772
self.assertEqual(os.path.realpath('topdir'),
773
self.local_branch_path(branch))
774
self.assertIs(tree.controldir, branch.controldir)
775
self.assertEqual('foo', relpath)
776
# opening from non-local should not return the tree
777
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
778
self.get_readonly_url('topdir/foo'))
779
self.assertEqual(None, tree)
780
self.assertEqual('foo', relpath)
782
self.make_branch('topdir/foo')
783
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
785
self.assertIs(tree, None)
786
self.assertEqual(os.path.realpath('topdir/foo'),
787
self.local_branch_path(branch))
788
self.assertEqual('', relpath)
790
def test_open_tree_or_branch(self):
791
self.make_branch_and_tree('topdir')
792
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
793
self.assertEqual(os.path.realpath('topdir'),
794
os.path.realpath(tree.basedir))
795
self.assertEqual(os.path.realpath('topdir'),
796
self.local_branch_path(branch))
797
self.assertIs(tree.controldir, branch.controldir)
798
# opening from non-local should not return the tree
799
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
800
self.get_readonly_url('topdir'))
801
self.assertEqual(None, tree)
803
self.make_branch('topdir/foo')
804
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
805
self.assertIs(tree, None)
806
self.assertEqual(os.path.realpath('topdir/foo'),
807
self.local_branch_path(branch))
809
def test_open_from_transport(self):
810
# transport pointing at bzrdir should give a bzrdir with root transport
811
# set to the given transport
812
control = bzrdir.BzrDir.create(self.get_url())
813
t = self.get_transport()
814
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
815
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
816
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
818
def test_open_from_transport_no_bzrdir(self):
819
t = self.get_transport()
820
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
822
def test_open_from_transport_bzrdir_in_parent(self):
823
control = bzrdir.BzrDir.create(self.get_url())
824
t = self.get_transport()
826
t = t.clone('subdir')
827
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
829
def test_sprout_recursive(self):
830
tree = self.make_branch_and_tree('tree1',
831
format='development-subtree')
832
sub_tree = self.make_branch_and_tree('tree1/subtree',
833
format='development-subtree')
834
sub_tree.set_root_id('subtree-root')
835
tree.add_reference(sub_tree)
836
self.build_tree(['tree1/subtree/file'])
838
tree.commit('Initial commit')
839
tree2 = tree.controldir.sprout('tree2').open_workingtree()
841
self.addCleanup(tree2.unlock)
842
self.assertPathExists('tree2/subtree/file')
845
tree2.kind('subtree', 'subtree-root'))
847
def test_cloning_metadir(self):
848
"""Ensure that cloning metadir is suitable"""
849
bzrdir = self.make_controldir('bzrdir')
850
bzrdir.cloning_metadir()
851
branch = self.make_branch('branch', format='knit')
852
format = branch.controldir.cloning_metadir()
853
self.assertIsInstance(format.workingtree_format,
854
workingtree_4.WorkingTreeFormat6)
856
def test_sprout_recursive_treeless(self):
857
tree = self.make_branch_and_tree('tree1',
858
format='development-subtree')
859
sub_tree = self.make_branch_and_tree('tree1/subtree',
860
format='development-subtree')
861
tree.add_reference(sub_tree)
862
self.build_tree(['tree1/subtree/file'])
864
tree.commit('Initial commit')
865
# The following line force the orhaning to reveal bug #634470
866
tree.branch.get_config_stack().set(
867
'bzr.transform.orphan_policy', 'move')
868
tree.controldir.destroy_workingtree()
869
# FIXME: subtree/.bzr is left here which allows the test to pass (or
870
# fail :-( ) -- vila 20100909
871
repo = self.make_repository('repo', shared=True,
872
format='development-subtree')
873
repo.set_make_working_trees(False)
874
# FIXME: we just deleted the workingtree and now we want to use it ????
875
# At a minimum, we should use tree.branch below (but this fails too
876
# currently) or stop calling this test 'treeless'. Specifically, I've
877
# turn the line below into an assertRaises when 'subtree/.bzr' is
878
# orphaned and sprout tries to access the branch there (which is left
879
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
880
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
881
# #634470. -- vila 20100909
882
self.assertRaises(errors.NotBranchError,
883
tree.controldir.sprout, 'repo/tree2')
884
# self.assertPathExists('repo/tree2/subtree')
885
# self.assertPathDoesNotExist('repo/tree2/subtree/file')
887
def make_foo_bar_baz(self):
888
foo = bzrdir.BzrDir.create_branch_convenience('foo').controldir
889
bar = self.make_branch('foo/bar').controldir
890
baz = self.make_branch('baz').controldir
893
def test_find_controldirs(self):
894
foo, bar, baz = self.make_foo_bar_baz()
895
t = self.get_transport()
896
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_controldirs(t))
898
def make_fake_permission_denied_transport(self, transport, paths):
899
"""Create a transport that raises PermissionDenied for some paths."""
902
raise errors.PermissionDenied(path)
904
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
905
path_filter_server.start_server()
906
self.addCleanup(path_filter_server.stop_server)
907
path_filter_transport = pathfilter.PathFilteringTransport(
908
path_filter_server, '.')
909
return (path_filter_server, path_filter_transport)
911
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
912
"""Check that each branch url ends with the given suffix."""
913
for actual_bzrdir in actual_bzrdirs:
914
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
916
def test_find_controldirs_permission_denied(self):
917
foo, bar, baz = self.make_foo_bar_baz()
918
t = self.get_transport()
919
path_filter_server, path_filter_transport = \
920
self.make_fake_permission_denied_transport(t, ['foo'])
922
self.assertBranchUrlsEndWith('/baz/',
923
bzrdir.BzrDir.find_controldirs(path_filter_transport))
925
smart_transport = self.make_smart_server('.',
926
backing_server=path_filter_server)
927
self.assertBranchUrlsEndWith('/baz/',
928
bzrdir.BzrDir.find_controldirs(smart_transport))
930
def test_find_controldirs_list_current(self):
931
def list_current(transport):
932
return [s for s in transport.list_dir('') if s != 'baz']
934
foo, bar, baz = self.make_foo_bar_baz()
935
t = self.get_transport()
936
self.assertEqualBzrdirs(
938
bzrdir.BzrDir.find_controldirs(t, list_current=list_current))
940
def test_find_controldirs_evaluate(self):
941
def evaluate(bzrdir):
943
repo = bzrdir.open_repository()
944
except errors.NoRepositoryPresent:
945
return True, bzrdir.root_transport.base
947
return False, bzrdir.root_transport.base
949
foo, bar, baz = self.make_foo_bar_baz()
950
t = self.get_transport()
951
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
952
list(bzrdir.BzrDir.find_controldirs(t, evaluate=evaluate)))
954
def assertEqualBzrdirs(self, first, second):
956
second = list(second)
957
self.assertEqual(len(first), len(second))
958
for x, y in zip(first, second):
959
self.assertEqual(x.root_transport.base, y.root_transport.base)
961
def test_find_branches(self):
962
root = self.make_repository('', shared=True)
963
foo, bar, baz = self.make_foo_bar_baz()
964
qux = self.make_controldir('foo/qux')
965
t = self.get_transport()
966
branches = bzrdir.BzrDir.find_branches(t)
967
self.assertEqual(baz.root_transport.base, branches[0].base)
968
self.assertEqual(foo.root_transport.base, branches[1].base)
969
self.assertEqual(bar.root_transport.base, branches[2].base)
971
# ensure this works without a top-level repo
972
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
973
self.assertEqual(foo.root_transport.base, branches[0].base)
974
self.assertEqual(bar.root_transport.base, branches[1].base)
977
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
979
def test_find_controldirs_missing_repo(self):
980
t = self.get_transport()
981
arepo = self.make_repository('arepo', shared=True)
982
abranch_url = arepo.user_url + '/abranch'
983
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
984
t.delete_tree('arepo/.bzr')
985
self.assertRaises(errors.NoRepositoryPresent,
986
branch.Branch.open, abranch_url)
987
self.make_branch('baz')
988
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
989
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
992
class TestMeta1DirFormat(TestCaseWithTransport):
993
"""Tests specific to the meta1 dir format."""
995
def test_right_base_dirs(self):
996
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
998
branch_base = t.clone('branch').base
999
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
1000
self.assertEqual(branch_base,
1001
dir.get_branch_transport(BzrBranchFormat5()).base)
1002
repository_base = t.clone('repository').base
1003
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
1004
repository_format = repository.format_registry.get_default()
1005
self.assertEqual(repository_base,
1006
dir.get_repository_transport(repository_format).base)
1007
checkout_base = t.clone('checkout').base
1008
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
1009
self.assertEqual(checkout_base,
1010
dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
1012
def test_meta1dir_uses_lockdir(self):
1013
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
1014
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
1016
self.assertIsDirectory('branch-lock', t)
1018
def test_comparison(self):
1019
"""Equality and inequality behave properly.
1021
Metadirs should compare equal iff they have the same repo, branch and
1024
mydir = controldir.format_registry.make_controldir('knit')
1025
self.assertEqual(mydir, mydir)
1026
self.assertFalse(mydir != mydir)
1027
otherdir = controldir.format_registry.make_controldir('knit')
1028
self.assertEqual(otherdir, mydir)
1029
self.assertFalse(otherdir != mydir)
1030
otherdir2 = controldir.format_registry.make_controldir('development-subtree')
1031
self.assertNotEqual(otherdir2, mydir)
1032
self.assertFalse(otherdir2 == mydir)
1034
def test_with_features(self):
1035
tree = self.make_branch_and_tree('tree', format='2a')
1036
tree.controldir.update_feature_flags({"bar": "required"})
1037
self.assertRaises(bzrdir.MissingFeature, bzrdir.BzrDir.open, 'tree')
1038
bzrdir.BzrDirMetaFormat1.register_feature('bar')
1039
self.addCleanup(bzrdir.BzrDirMetaFormat1.unregister_feature, 'bar')
1040
dir = bzrdir.BzrDir.open('tree')
1041
self.assertEqual("required", dir._format.features.get("bar"))
1042
tree.controldir.update_feature_flags({"bar": None, "nonexistant": None})
1043
dir = bzrdir.BzrDir.open('tree')
1044
self.assertEqual({}, dir._format.features)
1046
def test_needs_conversion_different_working_tree(self):
1047
# meta1dirs need an conversion if any element is not the default.
1048
new_format = controldir.format_registry.make_controldir('dirstate')
1049
tree = self.make_branch_and_tree('tree', format='knit')
1050
self.assertTrue(tree.controldir.needs_format_conversion(
1053
def test_initialize_on_format_uses_smart_transport(self):
1054
self.setup_smart_server_with_call_log()
1055
new_format = controldir.format_registry.make_controldir('dirstate')
1056
transport = self.get_transport('target')
1057
transport.ensure_base()
1058
self.reset_smart_call_log()
1059
instance = new_format.initialize_on_transport(transport)
1060
self.assertIsInstance(instance, remote.RemoteBzrDir)
1061
rpc_count = len(self.hpss_calls)
1062
# This figure represent the amount of work to perform this use case. It
1063
# is entirely ok to reduce this number if a test fails due to rpc_count
1064
# being too low. If rpc_count increases, more network roundtrips have
1065
# become necessary for this use case. Please do not adjust this number
1066
# upwards without agreement from bzr's network support maintainers.
1067
self.assertEqual(2, rpc_count)
1070
class NonLocalTests(TestCaseWithTransport):
1071
"""Tests for bzrdir static behaviour on non local paths."""
1074
super(NonLocalTests, self).setUp()
1075
self.vfs_transport_factory = memory.MemoryServer
1077
def test_create_branch_convenience(self):
1078
# outside a repo the default convenience output is a repo+branch_tree
1079
format = controldir.format_registry.make_controldir('knit')
1080
branch = bzrdir.BzrDir.create_branch_convenience(
1081
self.get_url('foo'), format=format)
1082
self.assertRaises(errors.NoWorkingTree,
1083
branch.controldir.open_workingtree)
1084
branch.controldir.open_repository()
1086
def test_create_branch_convenience_force_tree_not_local_fails(self):
1087
# outside a repo the default convenience output is a repo+branch_tree
1088
format = controldir.format_registry.make_controldir('knit')
1089
self.assertRaises(errors.NotLocalUrl,
1090
bzrdir.BzrDir.create_branch_convenience,
1091
self.get_url('foo'),
1092
force_new_tree=True,
1094
t = self.get_transport()
1095
self.assertFalse(t.has('foo'))
1097
def test_clone(self):
1098
# clone into a nonlocal path works
1099
format = controldir.format_registry.make_controldir('knit')
1100
branch = bzrdir.BzrDir.create_branch_convenience('local',
1102
branch.controldir.open_workingtree()
1103
result = branch.controldir.clone(self.get_url('remote'))
1104
self.assertRaises(errors.NoWorkingTree,
1105
result.open_workingtree)
1106
result.open_branch()
1107
result.open_repository()
1109
def test_checkout_metadir(self):
1110
# checkout_metadir has reasonable working tree format even when no
1111
# working tree is present
1112
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1113
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1114
checkout_format = my_bzrdir.checkout_metadir()
1115
self.assertIsInstance(checkout_format.workingtree_format,
1116
workingtree_4.WorkingTreeFormat4)
1119
class TestHTTPRedirections(object):
1120
"""Test redirection between two http servers.
1122
This MUST be used by daughter classes that also inherit from
1123
TestCaseWithTwoWebservers.
1125
We can't inherit directly from TestCaseWithTwoWebservers or the
1126
test framework will try to create an instance which cannot
1127
run, its implementation being incomplete.
1130
def create_transport_readonly_server(self):
1131
# We don't set the http protocol version, relying on the default
1132
return http_utils.HTTPServerRedirecting()
1134
def create_transport_secondary_server(self):
1135
# We don't set the http protocol version, relying on the default
1136
return http_utils.HTTPServerRedirecting()
1139
super(TestHTTPRedirections, self).setUp()
1140
# The redirections will point to the new server
1141
self.new_server = self.get_readonly_server()
1142
# The requests to the old server will be redirected
1143
self.old_server = self.get_secondary_server()
1144
# Configure the redirections
1145
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1147
def test_loop(self):
1148
# Both servers redirect to each other creating a loop
1149
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1150
# Starting from either server should loop
1151
old_url = self._qualified_url(self.old_server.host,
1152
self.old_server.port)
1153
oldt = self._transport(old_url)
1154
self.assertRaises(errors.NotBranchError,
1155
bzrdir.BzrDir.open_from_transport, oldt)
1156
new_url = self._qualified_url(self.new_server.host,
1157
self.new_server.port)
1158
newt = self._transport(new_url)
1159
self.assertRaises(errors.NotBranchError,
1160
bzrdir.BzrDir.open_from_transport, newt)
1162
def test_qualifier_preserved(self):
1163
wt = self.make_branch_and_tree('branch')
1164
old_url = self._qualified_url(self.old_server.host,
1165
self.old_server.port)
1166
start = self._transport(old_url).clone('branch')
1167
bdir = bzrdir.BzrDir.open_from_transport(start)
1168
# Redirection should preserve the qualifier, hence the transport class
1170
self.assertIsInstance(bdir.root_transport, type(start))
1173
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1174
http_utils.TestCaseWithTwoWebservers):
1175
"""Tests redirections for urllib implementation"""
1177
_transport = HttpTransport_urllib
1179
def _qualified_url(self, host, port):
1180
result = 'http+urllib://%s:%s' % (host, port)
1181
self.permit_url(result)
1186
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1187
http_utils.TestCaseWithTwoWebservers):
1188
"""Tests redirections for the nosmart decorator"""
1190
_transport = NoSmartTransportDecorator
1192
def _qualified_url(self, host, port):
1193
result = 'nosmart+http://%s:%s' % (host, port)
1194
self.permit_url(result)
1198
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1199
http_utils.TestCaseWithTwoWebservers):
1200
"""Tests redirections for readonly decoratror"""
1202
_transport = ReadonlyTransportDecorator
1204
def _qualified_url(self, host, port):
1205
result = 'readonly+http://%s:%s' % (host, port)
1206
self.permit_url(result)
1210
class TestDotBzrHidden(TestCaseWithTransport):
1213
if sys.platform == 'win32':
1214
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1217
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1218
stderr=subprocess.PIPE)
1219
out, err = f.communicate()
1220
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1222
return out.splitlines()
1224
def test_dot_bzr_hidden(self):
1225
if sys.platform == 'win32' and not win32utils.has_win32file:
1226
raise TestSkipped('unable to make file hidden without pywin32 library')
1227
b = bzrdir.BzrDir.create('.')
1228
self.build_tree(['a'])
1229
self.assertEqual(['a'], self.get_ls())
1231
def test_dot_bzr_hidden_with_url(self):
1232
if sys.platform == 'win32' and not win32utils.has_win32file:
1233
raise TestSkipped('unable to make file hidden without pywin32 library')
1234
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1235
self.build_tree(['a'])
1236
self.assertEqual(['a'], self.get_ls())
1239
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1240
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1242
def _open(self, transport):
1243
return _TestBzrDir(transport, self)
1246
class _TestBzrDir(bzrdir.BzrDirMeta1):
1247
"""Test BzrDir implementation for TestBzrDirSprout.
1249
When created a _TestBzrDir already has repository and a branch. The branch
1250
is a test double as well.
1253
def __init__(self, *args, **kwargs):
1254
super(_TestBzrDir, self).__init__(*args, **kwargs)
1255
self.test_branch = _TestBranch(self.transport)
1256
self.test_branch.repository = self.create_repository()
1258
def open_branch(self, unsupported=False, possible_transports=None):
1259
return self.test_branch
1261
def cloning_metadir(self, require_stacking=False):
1262
return _TestBzrDirFormat()
1265
class _TestBranchFormat(breezy.branch.BranchFormat):
1266
"""Test Branch format for TestBzrDirSprout."""
1269
class _TestBranch(breezy.branch.Branch):
1270
"""Test Branch implementation for TestBzrDirSprout."""
1272
def __init__(self, transport, *args, **kwargs):
1273
self._format = _TestBranchFormat()
1274
self._transport = transport
1275
self.base = transport.base
1276
super(_TestBranch, self).__init__(*args, **kwargs)
1280
def sprout(self, *args, **kwargs):
1281
self.calls.append('sprout')
1282
return _TestBranch(self._transport)
1284
def copy_content_into(self, destination, revision_id=None):
1285
self.calls.append('copy_content_into')
1287
def last_revision(self):
1288
return _mod_revision.NULL_REVISION
1290
def get_parent(self):
1293
def _get_config(self):
1294
return config.TransportConfig(self._transport, 'branch.conf')
1296
def _get_config_store(self):
1297
return config.BranchStore(self)
1299
def set_parent(self, parent):
1300
self._parent = parent
1302
def lock_read(self):
1303
return lock.LogicalLockResult(self.unlock)
1309
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1311
def test_sprout_uses_branch_sprout(self):
1312
"""BzrDir.sprout calls Branch.sprout.
1314
Usually, BzrDir.sprout should delegate to the branch's sprout method
1315
for part of the work. This allows the source branch to control the
1316
choice of format for the new branch.
1318
There are exceptions, but this tests avoids them:
1319
- if there's no branch in the source bzrdir,
1320
- or if the stacking has been requested and the format needs to be
1321
overridden to satisfy that.
1323
# Make an instrumented bzrdir.
1324
t = self.get_transport('source')
1326
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1327
# The instrumented bzrdir has a test_branch attribute that logs calls
1328
# made to the branch contained in that bzrdir. Initially the test
1329
# branch exists but no calls have been made to it.
1330
self.assertEqual([], source_bzrdir.test_branch.calls)
1333
target_url = self.get_url('target')
1334
result = source_bzrdir.sprout(target_url, recurse='no')
1336
# The bzrdir called the branch's sprout method.
1337
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1339
def test_sprout_parent(self):
1340
grandparent_tree = self.make_branch('grandparent')
1341
parent = grandparent_tree.controldir.sprout('parent').open_branch()
1342
branch_tree = parent.controldir.sprout('branch').open_branch()
1343
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1346
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1348
def test_pre_open_called(self):
1350
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1351
transport = self.get_transport('foo')
1352
url = transport.base
1353
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1354
self.assertEqual([transport.base], [t.base for t in calls])
1356
def test_pre_open_actual_exceptions_raised(self):
1358
def fail_once(transport):
1361
raise errors.BzrError("fail")
1362
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1363
transport = self.get_transport('foo')
1364
url = transport.base
1365
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1366
self.assertEqual('fail', err._preformatted_string)
1368
def test_post_repo_init(self):
1369
from ..controldir import RepoInitHookParams
1371
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1373
self.make_repository('foo')
1374
self.assertLength(1, calls)
1376
self.assertIsInstance(params, RepoInitHookParams)
1377
self.assertTrue(hasattr(params, 'controldir'))
1378
self.assertTrue(hasattr(params, 'repository'))
1380
def test_post_repo_init_hook_repr(self):
1382
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1383
lambda params: param_reprs.append(repr(params)), None)
1384
self.make_repository('foo')
1385
self.assertLength(1, param_reprs)
1386
param_repr = param_reprs[0]
1387
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1390
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1391
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1392
# moved to per_bzrdir or per_transport for better coverage ?
1396
super(TestGenerateBackupName, self).setUp()
1397
self._transport = self.get_transport()
1398
bzrdir.BzrDir.create(self.get_url(),
1399
possible_transports=[self._transport])
1400
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1403
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1405
def test_exiting(self):
1406
self._transport.put_bytes("a.~1~", "some content")
1407
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))
1410
class TestMeta1DirColoFormat(TestCaseWithTransport):
1411
"""Tests specific to the meta1 dir with colocated branches format."""
1413
def test_supports_colo(self):
1414
format = bzrdir.BzrDirMetaFormat1Colo()
1415
self.assertTrue(format.colocated_branches)
1417
def test_upgrade_from_2a(self):
1418
tree = self.make_branch_and_tree('.', format='2a')
1419
format = bzrdir.BzrDirMetaFormat1Colo()
1420
self.assertTrue(tree.controldir.needs_format_conversion(format))
1421
converter = tree.controldir._format.get_converter(format)
1422
result = converter.convert(tree.controldir, None)
1423
self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1Colo)
1424
self.assertFalse(result.needs_format_conversion(format))
1426
def test_downgrade_to_2a(self):
1427
tree = self.make_branch_and_tree('.', format='development-colo')
1428
format = bzrdir.BzrDirMetaFormat1()
1429
self.assertTrue(tree.controldir.needs_format_conversion(format))
1430
converter = tree.controldir._format.get_converter(format)
1431
result = converter.convert(tree.controldir, None)
1432
self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1)
1433
self.assertFalse(result.needs_format_conversion(format))
1435
def test_downgrade_to_2a_too_many_branches(self):
1436
tree = self.make_branch_and_tree('.', format='development-colo')
1437
tree.controldir.create_branch(name="another-colocated-branch")
1438
converter = tree.controldir._format.get_converter(
1439
bzrdir.BzrDirMetaFormat1())
1440
result = converter.convert(tree.controldir, bzrdir.BzrDirMetaFormat1())
1441
self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1)
1443
def test_nested(self):
1444
tree = self.make_branch_and_tree('.', format='development-colo')
1445
tree.controldir.create_branch(name='foo')
1446
tree.controldir.create_branch(name='fool/bla')
1448
errors.ParentBranchExists, tree.controldir.create_branch,
1451
def test_parent(self):
1452
tree = self.make_branch_and_tree('.', format='development-colo')
1453
tree.controldir.create_branch(name='fool/bla')
1454
tree.controldir.create_branch(name='foo/bar')
1456
errors.AlreadyBranchError, tree.controldir.create_branch,
1460
class SampleBzrFormat(bzrdir.BzrFormat):
1463
def get_format_string(cls):
1464
return "First line\n"
1467
class TestBzrFormat(TestCase):
1468
"""Tests for BzrFormat."""
1470
def test_as_string(self):
1471
format = SampleBzrFormat()
1472
format.features = {"foo": "required"}
1473
self.assertEqual(format.as_string(),
1476
format.features["another"] = "optional"
1477
self.assertEqual(format.as_string(),
1480
"optional another\n")
1482
def test_network_name(self):
1483
# The network string should include the feature info
1484
format = SampleBzrFormat()
1485
format.features = {"foo": "required"}
1487
"First line\nrequired foo\n",
1488
format.network_name())
1490
def test_from_string_no_features(self):
1492
format = SampleBzrFormat.from_string(
1494
self.assertEqual({}, format.features)
1496
def test_from_string_with_feature(self):
1498
format = SampleBzrFormat.from_string(
1499
"First line\nrequired foo\n")
1500
self.assertEqual("required", format.features.get("foo"))
1502
def test_from_string_format_string_mismatch(self):
1503
# The first line has to match the format string
1504
self.assertRaises(AssertionError, SampleBzrFormat.from_string,
1505
"Second line\nrequired foo\n")
1507
def test_from_string_missing_space(self):
1508
# At least one space is required in the feature lines
1509
self.assertRaises(errors.ParseFormatError, SampleBzrFormat.from_string,
1510
"First line\nfoo\n")
1512
def test_from_string_with_spaces(self):
1513
# Feature with spaces (in case we add stuff like this in the future)
1514
format = SampleBzrFormat.from_string(
1515
"First line\nrequired foo with spaces\n")
1516
self.assertEqual("required", format.features.get("foo with spaces"))
1519
format1 = SampleBzrFormat()
1520
format1.features = {"nested-trees": "optional"}
1521
format2 = SampleBzrFormat()
1522
format2.features = {"nested-trees": "optional"}
1523
self.assertEqual(format1, format1)
1524
self.assertEqual(format1, format2)
1525
format3 = SampleBzrFormat()
1526
self.assertNotEqual(format1, format3)
1528
def test_check_support_status_optional(self):
1529
# Optional, so silently ignore
1530
format = SampleBzrFormat()
1531
format.features = {"nested-trees": "optional"}
1532
format.check_support_status(True)
1533
self.addCleanup(SampleBzrFormat.unregister_feature, "nested-trees")
1534
SampleBzrFormat.register_feature("nested-trees")
1535
format.check_support_status(True)
1537
def test_check_support_status_required(self):
1538
# Optional, so trigger an exception
1539
format = SampleBzrFormat()
1540
format.features = {"nested-trees": "required"}
1541
self.assertRaises(bzrdir.MissingFeature, format.check_support_status,
1543
self.addCleanup(SampleBzrFormat.unregister_feature, "nested-trees")
1544
SampleBzrFormat.register_feature("nested-trees")
1545
format.check_support_status(True)
1547
def test_check_support_status_unknown(self):
1548
# treat unknown necessity as required
1549
format = SampleBzrFormat()
1550
format.features = {"nested-trees": "unknown"}
1551
self.assertRaises(bzrdir.MissingFeature, format.check_support_status,
1553
self.addCleanup(SampleBzrFormat.unregister_feature, "nested-trees")
1554
SampleBzrFormat.register_feature("nested-trees")
1555
format.check_support_status(True)
1557
def test_feature_already_registered(self):
1558
# a feature can only be registered once
1559
self.addCleanup(SampleBzrFormat.unregister_feature, "nested-trees")
1560
SampleBzrFormat.register_feature("nested-trees")
1561
self.assertRaises(bzrdir.FeatureAlreadyRegistered,
1562
SampleBzrFormat.register_feature, "nested-trees")
1564
def test_feature_with_space(self):
1565
# spaces are not allowed in feature names
1566
self.assertRaises(ValueError, SampleBzrFormat.register_feature,