1
# Copyright (C) 2006-2013, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the BzrDir facility and any format specific tests.
19
For interface contract tests, see tests/per_bzr_dir.
35
revision as _mod_revision,
37
transport as _mod_transport,
49
import breezy.bzr.branch
50
from ..bzr.fullhistory import BzrBranchFormat5
51
from ..errors import (
53
NoColocatedBranchSupport,
55
UnsupportedFormatError,
59
TestCaseWithMemoryTransport,
60
TestCaseWithTransport,
67
from ..transport import (
71
from ..transport.http import HttpTransport
72
from ..transport.nosmart import NoSmartTransportDecorator
73
from ..transport.readonly import ReadonlyTransportDecorator
74
from ..bzr import knitrepo, knitpack_repo
77
class TestDefaultFormat(TestCase):
79
def test_get_set_default_format(self):
80
old_format = bzrdir.BzrDirFormat.get_default_format()
81
# default is BzrDirMetaFormat1
82
self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
83
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
84
# creating a bzr dir should now create an instrumented dir.
86
result = bzrdir.BzrDir.create('memory:///')
87
self.assertIsInstance(result, SampleBzrDir)
89
controldir.ControlDirFormat._set_default_format(old_format)
90
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
93
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
94
"""A deprecated bzr dir format."""
97
class TestFormatRegistry(TestCase):
99
def make_format_registry(self):
100
my_format_registry = controldir.ControlDirFormatRegistry()
101
my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
102
'Some format. Slower and unawesome and deprecated.',
104
my_format_registry.register_lazy('lazy', 'breezy.tests.test_bzrdir',
105
'DeprecatedBzrDirFormat', 'Format registered lazily',
107
bzr.register_metadir(my_format_registry, 'knit',
108
'breezy.bzr.knitrepo.RepositoryFormatKnit1',
109
'Format using knits',
111
my_format_registry.set_default('knit')
112
bzr.register_metadir(my_format_registry,
114
'breezy.bzr.knitrepo.RepositoryFormatKnit3',
115
'Experimental successor to knit. Use at your own risk.',
116
branch_format='breezy.bzr.branch.BzrBranchFormat6',
118
bzr.register_metadir(my_format_registry,
120
'breezy.bzr.knitrepo.RepositoryFormatKnit3',
121
'Experimental successor to knit. Use at your own risk.',
122
branch_format='breezy.bzr.branch.BzrBranchFormat6', hidden=True)
123
my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
124
'Old format. Slower and does not support things. ', hidden=True)
125
my_format_registry.register_lazy('hiddenlazy', 'breezy.tests.test_bzrdir',
126
'DeprecatedBzrDirFormat', 'Format registered lazily',
127
deprecated=True, hidden=True)
128
return my_format_registry
130
def test_format_registry(self):
131
my_format_registry = self.make_format_registry()
132
my_bzrdir = my_format_registry.make_controldir('lazy')
133
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
134
my_bzrdir = my_format_registry.make_controldir('deprecated')
135
self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
136
my_bzrdir = my_format_registry.make_controldir('default')
137
self.assertIsInstance(my_bzrdir.repository_format,
138
knitrepo.RepositoryFormatKnit1)
139
my_bzrdir = my_format_registry.make_controldir('knit')
140
self.assertIsInstance(my_bzrdir.repository_format,
141
knitrepo.RepositoryFormatKnit1)
142
my_bzrdir = my_format_registry.make_controldir('branch6')
143
self.assertIsInstance(my_bzrdir.get_branch_format(),
144
breezy.bzr.branch.BzrBranchFormat6)
146
def test_get_help(self):
147
my_format_registry = self.make_format_registry()
148
self.assertEqual('Format registered lazily',
149
my_format_registry.get_help('lazy'))
150
self.assertEqual('Format using knits',
151
my_format_registry.get_help('knit'))
152
self.assertEqual('Format using knits',
153
my_format_registry.get_help('default'))
154
self.assertEqual('Some format. Slower and unawesome and deprecated.',
155
my_format_registry.get_help('deprecated'))
157
def test_help_topic(self):
158
topics = help_topics.HelpTopicRegistry()
159
registry = self.make_format_registry()
160
topics.register('current-formats', registry.help_topic,
162
topics.register('other-formats', registry.help_topic,
164
new = topics.get_detail('current-formats')
165
rest = topics.get_detail('other-formats')
166
experimental, deprecated = rest.split('Deprecated formats')
167
self.assertContainsRe(new, 'formats-help')
168
self.assertContainsRe(new,
169
':knit:\n \\(native\\) \\(default\\) Format using knits\n')
170
self.assertContainsRe(experimental,
171
':branch6:\n \\(native\\) Experimental successor to knit')
172
self.assertContainsRe(deprecated,
173
':lazy:\n \\(native\\) Format registered lazily\n')
174
self.assertNotContainsRe(new, 'hidden')
176
def test_set_default_repository(self):
177
default_factory = controldir.format_registry.get('default')
178
old_default = [k for k, v in controldir.format_registry.iteritems()
179
if v == default_factory and k != 'default'][0]
180
controldir.format_registry.set_default_repository('dirstate-with-subtree')
182
self.assertIs(controldir.format_registry.get('dirstate-with-subtree'),
183
controldir.format_registry.get('default'))
185
repository.format_registry.get_default().__class__,
186
knitrepo.RepositoryFormatKnit3)
188
controldir.format_registry.set_default_repository(old_default)
190
def test_aliases(self):
191
a_registry = controldir.ControlDirFormatRegistry()
192
a_registry.register('deprecated', DeprecatedBzrDirFormat,
193
'Old format. Slower and does not support stuff',
195
a_registry.register_alias('deprecatedalias', 'deprecated')
196
self.assertEqual({'deprecatedalias': 'deprecated'},
197
a_registry.aliases())
200
class SampleBranch(breezy.branch.Branch):
201
"""A dummy branch for guess what, dummy use."""
203
def __init__(self, dir):
204
self.controldir = dir
207
class SampleRepository(breezy.repository.Repository):
210
def __init__(self, dir):
211
self.controldir = dir
214
class SampleBzrDir(bzrdir.BzrDir):
215
"""A sample BzrDir implementation to allow testing static methods."""
217
def create_repository(self, shared=False):
218
"""See ControlDir.create_repository."""
219
return "A repository"
221
def open_repository(self):
222
"""See ControlDir.open_repository."""
223
return SampleRepository(self)
225
def create_branch(self, name=None):
226
"""See ControlDir.create_branch."""
228
raise NoColocatedBranchSupport(self)
229
return SampleBranch(self)
231
def create_workingtree(self):
232
"""See ControlDir.create_workingtree."""
236
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
239
this format is initializable, unsupported to aid in testing the
240
open and open_downlevel routines.
243
def get_format_string(self):
244
"""See BzrDirFormat.get_format_string()."""
245
return b"Sample .bzr dir format."
247
def initialize_on_transport(self, t):
248
"""Create a bzr dir."""
250
t.put_bytes('.bzr/branch-format', self.get_format_string())
251
return SampleBzrDir(t, self)
253
def is_supported(self):
256
def open(self, transport, _found=None):
257
return "opened branch."
260
def from_string(cls, format_string):
264
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
267
def get_format_string():
268
return b"Test format 1"
271
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
274
def get_format_string():
275
return b"Test format 2"
278
class TestBzrDirFormat(TestCaseWithTransport):
279
"""Tests for the BzrDirFormat facility."""
281
def test_find_format(self):
282
# is the right format object found for a branch?
283
# create a branch with a few known format objects.
284
bzr.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
286
self.addCleanup(bzr.BzrProber.formats.remove,
287
BzrDirFormatTest1.get_format_string())
288
bzr.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
290
self.addCleanup(bzr.BzrProber.formats.remove,
291
BzrDirFormatTest2.get_format_string())
292
t = self.get_transport()
293
self.build_tree(["foo/", "bar/"], transport=t)
294
def check_format(format, url):
295
format.initialize(url)
296
t = _mod_transport.get_transport_from_path(url)
297
found_format = bzrdir.BzrDirFormat.find_format(t)
298
self.assertIsInstance(found_format, format.__class__)
299
check_format(BzrDirFormatTest1(), "foo")
300
check_format(BzrDirFormatTest2(), "bar")
302
def test_find_format_nothing_there(self):
303
self.assertRaises(NotBranchError,
304
bzrdir.BzrDirFormat.find_format,
305
_mod_transport.get_transport_from_path('.'))
307
def test_find_format_unknown_format(self):
308
t = self.get_transport()
310
t.put_bytes('.bzr/branch-format', b'')
311
self.assertRaises(UnknownFormatError,
312
bzrdir.BzrDirFormat.find_format,
313
_mod_transport.get_transport_from_path('.'))
315
def test_register_unregister_format(self):
316
format = SampleBzrDirFormat()
319
format.initialize(url)
320
# register a format for it.
321
bzr.BzrProber.formats.register(format.get_format_string(), format)
322
# which bzrdir.Open will refuse (not supported)
323
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
324
# which bzrdir.open_containing will refuse (not supported)
325
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
326
# but open_downlevel will work
327
t = _mod_transport.get_transport_from_url(url)
328
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
329
# unregister the format
330
bzr.BzrProber.formats.remove(format.get_format_string())
331
# now open_downlevel should fail too.
332
self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
334
def test_create_branch_and_repo_uses_default(self):
335
format = SampleBzrDirFormat()
336
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
338
self.assertTrue(isinstance(branch, SampleBranch))
340
def test_create_branch_and_repo_under_shared(self):
341
# creating a branch and repo in a shared repo uses the
343
format = controldir.format_registry.make_controldir('knit')
344
self.make_repository('.', shared=True, format=format)
345
branch = bzrdir.BzrDir.create_branch_and_repo(
346
self.get_url('child'), format=format)
347
self.assertRaises(errors.NoRepositoryPresent,
348
branch.controldir.open_repository)
350
def test_create_branch_and_repo_under_shared_force_new(self):
351
# creating a branch and repo in a shared repo can be forced to
353
format = controldir.format_registry.make_controldir('knit')
354
self.make_repository('.', shared=True, format=format)
355
branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
358
branch.controldir.open_repository()
360
def test_create_standalone_working_tree(self):
361
format = SampleBzrDirFormat()
362
# note this is deliberately readonly, as this failure should
363
# occur before any writes.
364
self.assertRaises(errors.NotLocalUrl,
365
bzrdir.BzrDir.create_standalone_workingtree,
366
self.get_readonly_url(), format=format)
367
tree = bzrdir.BzrDir.create_standalone_workingtree('.',
369
self.assertEqual('A tree', tree)
371
def test_create_standalone_working_tree_under_shared_repo(self):
372
# create standalone working tree always makes a repo.
373
format = controldir.format_registry.make_controldir('knit')
374
self.make_repository('.', shared=True, format=format)
375
# note this is deliberately readonly, as this failure should
376
# occur before any writes.
377
self.assertRaises(errors.NotLocalUrl,
378
bzrdir.BzrDir.create_standalone_workingtree,
379
self.get_readonly_url('child'), format=format)
380
tree = bzrdir.BzrDir.create_standalone_workingtree('child',
382
tree.controldir.open_repository()
384
def test_create_branch_convenience(self):
385
# outside a repo the default convenience output is a repo+branch_tree
386
format = controldir.format_registry.make_controldir('knit')
387
branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
388
branch.controldir.open_workingtree()
389
branch.controldir.open_repository()
391
def test_create_branch_convenience_possible_transports(self):
392
"""Check that the optional 'possible_transports' is recognized"""
393
format = controldir.format_registry.make_controldir('knit')
394
t = self.get_transport()
395
branch = bzrdir.BzrDir.create_branch_convenience(
396
'.', format=format, possible_transports=[t])
397
branch.controldir.open_workingtree()
398
branch.controldir.open_repository()
400
def test_create_branch_convenience_root(self):
401
"""Creating a branch at the root of a fs should work."""
402
self.vfs_transport_factory = memory.MemoryServer
403
# outside a repo the default convenience output is a repo+branch_tree
404
format = controldir.format_registry.make_controldir('knit')
405
branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
407
self.assertRaises(errors.NoWorkingTree,
408
branch.controldir.open_workingtree)
409
branch.controldir.open_repository()
411
def test_create_branch_convenience_under_shared_repo(self):
412
# inside a repo the default convenience output is a branch+ follow the
414
format = controldir.format_registry.make_controldir('knit')
415
self.make_repository('.', shared=True, format=format)
416
branch = bzrdir.BzrDir.create_branch_convenience('child',
418
branch.controldir.open_workingtree()
419
self.assertRaises(errors.NoRepositoryPresent,
420
branch.controldir.open_repository)
422
def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
423
# inside a repo the default convenience output is a branch+ follow the
424
# repo tree policy but we can override that
425
format = controldir.format_registry.make_controldir('knit')
426
self.make_repository('.', shared=True, format=format)
427
branch = bzrdir.BzrDir.create_branch_convenience('child',
428
force_new_tree=False, format=format)
429
self.assertRaises(errors.NoWorkingTree,
430
branch.controldir.open_workingtree)
431
self.assertRaises(errors.NoRepositoryPresent,
432
branch.controldir.open_repository)
434
def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
435
# inside a repo the default convenience output is a branch+ follow the
437
format = controldir.format_registry.make_controldir('knit')
438
repo = self.make_repository('.', shared=True, format=format)
439
repo.set_make_working_trees(False)
440
branch = bzrdir.BzrDir.create_branch_convenience('child',
442
self.assertRaises(errors.NoWorkingTree,
443
branch.controldir.open_workingtree)
444
self.assertRaises(errors.NoRepositoryPresent,
445
branch.controldir.open_repository)
447
def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
448
# inside a repo the default convenience output is a branch+ follow the
449
# repo tree policy but we can override that
450
format = controldir.format_registry.make_controldir('knit')
451
repo = self.make_repository('.', shared=True, format=format)
452
repo.set_make_working_trees(False)
453
branch = bzrdir.BzrDir.create_branch_convenience('child',
454
force_new_tree=True, format=format)
455
branch.controldir.open_workingtree()
456
self.assertRaises(errors.NoRepositoryPresent,
457
branch.controldir.open_repository)
459
def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
460
# inside a repo the default convenience output is overridable to give
462
format = controldir.format_registry.make_controldir('knit')
463
self.make_repository('.', shared=True, format=format)
464
branch = bzrdir.BzrDir.create_branch_convenience('child',
465
force_new_repo=True, format=format)
466
branch.controldir.open_repository()
467
branch.controldir.open_workingtree()
470
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
472
def test_acquire_repository_standalone(self):
473
"""The default acquisition policy should create a standalone branch."""
474
my_bzrdir = self.make_controldir('.')
475
repo_policy = my_bzrdir.determine_repository_policy()
476
repo, is_new = repo_policy.acquire_repository()
477
self.assertEqual(repo.controldir.root_transport.base,
478
my_bzrdir.root_transport.base)
479
self.assertFalse(repo.is_shared())
481
def test_determine_stacking_policy(self):
482
parent_bzrdir = self.make_controldir('.')
483
child_bzrdir = self.make_controldir('child')
484
parent_bzrdir.get_config().set_default_stack_on('http://example.org')
485
repo_policy = child_bzrdir.determine_repository_policy()
486
self.assertEqual('http://example.org', repo_policy._stack_on)
488
def test_determine_stacking_policy_relative(self):
489
parent_bzrdir = self.make_controldir('.')
490
child_bzrdir = self.make_controldir('child')
491
parent_bzrdir.get_config().set_default_stack_on('child2')
492
repo_policy = child_bzrdir.determine_repository_policy()
493
self.assertEqual('child2', repo_policy._stack_on)
494
self.assertEqual(parent_bzrdir.root_transport.base,
495
repo_policy._stack_on_pwd)
497
def prepare_default_stacking(self, child_format='1.6'):
498
parent_bzrdir = self.make_controldir('.')
499
child_branch = self.make_branch('child', format=child_format)
500
parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
501
new_child_transport = parent_bzrdir.transport.clone('child2')
502
return child_branch, new_child_transport
504
def test_clone_on_transport_obeys_stacking_policy(self):
505
child_branch, new_child_transport = self.prepare_default_stacking()
506
new_child = child_branch.controldir.clone_on_transport(new_child_transport)
507
self.assertEqual(child_branch.base,
508
new_child.open_branch().get_stacked_on_url())
510
def test_default_stacking_with_stackable_branch_unstackable_repo(self):
511
# Make stackable source branch with an unstackable repo format.
512
source_bzrdir = self.make_controldir('source')
513
knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
514
source_branch = breezy.bzr.branch.BzrBranchFormat7().initialize(
516
# Make a directory with a default stacking policy
517
parent_bzrdir = self.make_controldir('parent')
518
stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
519
parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
520
# Clone source into directory
521
target = source_bzrdir.clone(self.get_url('parent/target'))
523
def test_format_initialize_on_transport_ex_stacked_on(self):
524
# trunk is a stackable format. Note that its in the same server area
525
# which is what launchpad does, but not sufficient to exercise the
527
trunk = self.make_branch('trunk', format='1.9')
528
t = self.get_transport('stacked')
529
old_fmt = controldir.format_registry.make_controldir('pack-0.92')
530
repo_name = old_fmt.repository_format.network_name()
531
# Should end up with a 1.9 format (stackable)
532
repo, control, require_stacking, repo_policy = \
533
old_fmt.initialize_on_transport_ex(t,
534
repo_format_name=repo_name, stacked_on='../trunk',
537
# Repositories are open write-locked
538
self.assertTrue(repo.is_write_locked())
539
self.addCleanup(repo.unlock)
541
repo = control.open_repository()
542
self.assertIsInstance(control, bzrdir.BzrDir)
543
opened = bzrdir.BzrDir.open(t.base)
544
if not isinstance(old_fmt, remote.RemoteBzrDirFormat):
545
self.assertEqual(control._format.network_name(),
546
old_fmt.network_name())
547
self.assertEqual(control._format.network_name(),
548
opened._format.network_name())
549
self.assertEqual(control.__class__, opened.__class__)
550
self.assertLength(1, repo._fallback_repositories)
552
def test_sprout_obeys_stacking_policy(self):
553
child_branch, new_child_transport = self.prepare_default_stacking()
554
new_child = child_branch.controldir.sprout(new_child_transport.base)
555
self.assertEqual(child_branch.base,
556
new_child.open_branch().get_stacked_on_url())
558
def test_clone_ignores_policy_for_unsupported_formats(self):
559
child_branch, new_child_transport = self.prepare_default_stacking(
560
child_format='pack-0.92')
561
new_child = child_branch.controldir.clone_on_transport(new_child_transport)
562
self.assertRaises(branch.UnstackableBranchFormat,
563
new_child.open_branch().get_stacked_on_url)
565
def test_sprout_ignores_policy_for_unsupported_formats(self):
566
child_branch, new_child_transport = self.prepare_default_stacking(
567
child_format='pack-0.92')
568
new_child = child_branch.controldir.sprout(new_child_transport.base)
569
self.assertRaises(branch.UnstackableBranchFormat,
570
new_child.open_branch().get_stacked_on_url)
572
def test_sprout_upgrades_format_if_stacked_specified(self):
573
child_branch, new_child_transport = self.prepare_default_stacking(
574
child_format='pack-0.92')
575
new_child = child_branch.controldir.sprout(new_child_transport.base,
577
self.assertEqual(child_branch.controldir.root_transport.base,
578
new_child.open_branch().get_stacked_on_url())
579
repo = new_child.open_repository()
580
self.assertTrue(repo._format.supports_external_lookups)
581
self.assertFalse(repo.supports_rich_root())
583
def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
584
child_branch, new_child_transport = self.prepare_default_stacking(
585
child_format='pack-0.92')
586
new_child = child_branch.controldir.clone_on_transport(new_child_transport,
587
stacked_on=child_branch.controldir.root_transport.base)
588
self.assertEqual(child_branch.controldir.root_transport.base,
589
new_child.open_branch().get_stacked_on_url())
590
repo = new_child.open_repository()
591
self.assertTrue(repo._format.supports_external_lookups)
592
self.assertFalse(repo.supports_rich_root())
594
def test_sprout_upgrades_to_rich_root_format_if_needed(self):
595
child_branch, new_child_transport = self.prepare_default_stacking(
596
child_format='rich-root-pack')
597
new_child = child_branch.controldir.sprout(new_child_transport.base,
599
repo = new_child.open_repository()
600
self.assertTrue(repo._format.supports_external_lookups)
601
self.assertTrue(repo.supports_rich_root())
603
def test_add_fallback_repo_handles_absolute_urls(self):
604
stack_on = self.make_branch('stack_on', format='1.6')
605
repo = self.make_repository('repo', format='1.6')
606
policy = bzrdir.UseExistingRepository(repo, stack_on.base)
607
policy._add_fallback(repo)
609
def test_add_fallback_repo_handles_relative_urls(self):
610
stack_on = self.make_branch('stack_on', format='1.6')
611
repo = self.make_repository('repo', format='1.6')
612
policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
613
policy._add_fallback(repo)
615
def test_configure_relative_branch_stacking_url(self):
616
stack_on = self.make_branch('stack_on', format='1.6')
617
stacked = self.make_branch('stack_on/stacked', format='1.6')
618
policy = bzrdir.UseExistingRepository(stacked.repository,
620
policy.configure_branch(stacked)
621
self.assertEqual('..', stacked.get_stacked_on_url())
623
def test_relative_branch_stacking_to_absolute(self):
624
stack_on = self.make_branch('stack_on', format='1.6')
625
stacked = self.make_branch('stack_on/stacked', format='1.6')
626
policy = bzrdir.UseExistingRepository(stacked.repository,
627
'.', self.get_readonly_url('stack_on'))
628
policy.configure_branch(stacked)
629
self.assertEqual(self.get_readonly_url('stack_on'),
630
stacked.get_stacked_on_url())
633
class ChrootedTests(TestCaseWithTransport):
634
"""A support class that provides readonly urls outside the local namespace.
636
This is done by checking if self.transport_server is a MemoryServer. if it
637
is then we are chrooted already, if it is not then an HttpServer is used
642
super(ChrootedTests, self).setUp()
643
if not self.vfs_transport_factory == memory.MemoryServer:
644
self.transport_readonly_server = http_server.HttpServer
646
def local_branch_path(self, branch):
647
return os.path.realpath(urlutils.local_path_from_url(branch.base))
649
def test_open_containing(self):
650
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
651
self.get_readonly_url(''))
652
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
653
self.get_readonly_url('g/p/q'))
654
control = bzrdir.BzrDir.create(self.get_url())
655
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
656
self.assertEqual('', relpath)
657
branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
658
self.assertEqual('g/p/q', relpath)
660
def test_open_containing_tree_branch_or_repository_empty(self):
661
self.assertRaises(errors.NotBranchError,
662
bzrdir.BzrDir.open_containing_tree_branch_or_repository,
663
self.get_readonly_url(''))
665
def test_open_containing_tree_branch_or_repository_all(self):
666
self.make_branch_and_tree('topdir')
667
tree, branch, repo, relpath = \
668
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
670
self.assertEqual(os.path.realpath('topdir'),
671
os.path.realpath(tree.basedir))
672
self.assertEqual(os.path.realpath('topdir'),
673
self.local_branch_path(branch))
675
osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
676
repo.controldir.transport.local_abspath('repository'))
677
self.assertEqual(relpath, 'foo')
679
def test_open_containing_tree_branch_or_repository_no_tree(self):
680
self.make_branch('branch')
681
tree, branch, repo, relpath = \
682
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
684
self.assertEqual(tree, None)
685
self.assertEqual(os.path.realpath('branch'),
686
self.local_branch_path(branch))
688
osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
689
repo.controldir.transport.local_abspath('repository'))
690
self.assertEqual(relpath, 'foo')
692
def test_open_containing_tree_branch_or_repository_repo(self):
693
self.make_repository('repo')
694
tree, branch, repo, relpath = \
695
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
697
self.assertEqual(tree, None)
698
self.assertEqual(branch, None)
700
osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
701
repo.controldir.transport.local_abspath('repository'))
702
self.assertEqual(relpath, '')
704
def test_open_containing_tree_branch_or_repository_shared_repo(self):
705
self.make_repository('shared', shared=True)
706
bzrdir.BzrDir.create_branch_convenience('shared/branch',
707
force_new_tree=False)
708
tree, branch, repo, relpath = \
709
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
711
self.assertEqual(tree, None)
712
self.assertEqual(os.path.realpath('shared/branch'),
713
self.local_branch_path(branch))
715
osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
716
repo.controldir.transport.local_abspath('repository'))
717
self.assertEqual(relpath, '')
719
def test_open_containing_tree_branch_or_repository_branch_subdir(self):
720
self.make_branch_and_tree('foo')
721
self.build_tree(['foo/bar/'])
722
tree, branch, repo, relpath = \
723
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
725
self.assertEqual(os.path.realpath('foo'),
726
os.path.realpath(tree.basedir))
727
self.assertEqual(os.path.realpath('foo'),
728
self.local_branch_path(branch))
730
osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
731
repo.controldir.transport.local_abspath('repository'))
732
self.assertEqual(relpath, 'bar')
734
def test_open_containing_tree_branch_or_repository_repo_subdir(self):
735
self.make_repository('bar')
736
self.build_tree(['bar/baz/'])
737
tree, branch, repo, relpath = \
738
bzrdir.BzrDir.open_containing_tree_branch_or_repository(
740
self.assertEqual(tree, None)
741
self.assertEqual(branch, None)
743
osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
744
repo.controldir.transport.local_abspath('repository'))
745
self.assertEqual(relpath, 'baz')
747
def test_open_containing_from_transport(self):
748
self.assertRaises(NotBranchError,
749
bzrdir.BzrDir.open_containing_from_transport,
750
_mod_transport.get_transport_from_url(self.get_readonly_url('')))
751
self.assertRaises(NotBranchError,
752
bzrdir.BzrDir.open_containing_from_transport,
753
_mod_transport.get_transport_from_url(
754
self.get_readonly_url('g/p/q')))
755
control = bzrdir.BzrDir.create(self.get_url())
756
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
757
_mod_transport.get_transport_from_url(
758
self.get_readonly_url('')))
759
self.assertEqual('', relpath)
760
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
761
_mod_transport.get_transport_from_url(
762
self.get_readonly_url('g/p/q')))
763
self.assertEqual('g/p/q', relpath)
765
def test_open_containing_tree_or_branch(self):
766
self.make_branch_and_tree('topdir')
767
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
769
self.assertEqual(os.path.realpath('topdir'),
770
os.path.realpath(tree.basedir))
771
self.assertEqual(os.path.realpath('topdir'),
772
self.local_branch_path(branch))
773
self.assertIs(tree.controldir, branch.controldir)
774
self.assertEqual('foo', relpath)
775
# opening from non-local should not return the tree
776
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
777
self.get_readonly_url('topdir/foo'))
778
self.assertEqual(None, tree)
779
self.assertEqual('foo', relpath)
781
self.make_branch('topdir/foo')
782
tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
784
self.assertIs(tree, None)
785
self.assertEqual(os.path.realpath('topdir/foo'),
786
self.local_branch_path(branch))
787
self.assertEqual('', relpath)
789
def test_open_tree_or_branch(self):
790
self.make_branch_and_tree('topdir')
791
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
792
self.assertEqual(os.path.realpath('topdir'),
793
os.path.realpath(tree.basedir))
794
self.assertEqual(os.path.realpath('topdir'),
795
self.local_branch_path(branch))
796
self.assertIs(tree.controldir, branch.controldir)
797
# opening from non-local should not return the tree
798
tree, branch = bzrdir.BzrDir.open_tree_or_branch(
799
self.get_readonly_url('topdir'))
800
self.assertEqual(None, tree)
802
self.make_branch('topdir/foo')
803
tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
804
self.assertIs(tree, None)
805
self.assertEqual(os.path.realpath('topdir/foo'),
806
self.local_branch_path(branch))
808
def test_open_from_transport(self):
809
# transport pointing at bzrdir should give a bzrdir with root transport
810
# set to the given transport
811
control = bzrdir.BzrDir.create(self.get_url())
812
t = self.get_transport()
813
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
814
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
815
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
817
def test_open_from_transport_no_bzrdir(self):
818
t = self.get_transport()
819
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
821
def test_open_from_transport_bzrdir_in_parent(self):
822
control = bzrdir.BzrDir.create(self.get_url())
823
t = self.get_transport()
825
t = t.clone('subdir')
826
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
828
def test_sprout_recursive(self):
829
tree = self.make_branch_and_tree('tree1',
830
format='development-subtree')
831
sub_tree = self.make_branch_and_tree('tree1/subtree',
832
format='development-subtree')
833
sub_tree.set_root_id(b'subtree-root')
834
tree.add_reference(sub_tree)
835
self.build_tree(['tree1/subtree/file'])
837
tree.commit('Initial commit')
838
tree2 = tree.controldir.sprout('tree2').open_workingtree()
840
self.addCleanup(tree2.unlock)
841
self.assertPathExists('tree2/subtree/file')
844
tree2.kind('subtree', 'subtree-root'))
846
def test_cloning_metadir(self):
847
"""Ensure that cloning metadir is suitable"""
848
bzrdir = self.make_controldir('bzrdir')
849
bzrdir.cloning_metadir()
850
branch = self.make_branch('branch', format='knit')
851
format = branch.controldir.cloning_metadir()
852
self.assertIsInstance(format.workingtree_format,
853
workingtree_4.WorkingTreeFormat6)
855
def test_sprout_recursive_treeless(self):
856
tree = self.make_branch_and_tree('tree1',
857
format='development-subtree')
858
sub_tree = self.make_branch_and_tree('tree1/subtree',
859
format='development-subtree')
860
tree.add_reference(sub_tree)
861
self.build_tree(['tree1/subtree/file'])
863
tree.commit('Initial commit')
864
# The following line force the orhaning to reveal bug #634470
865
tree.branch.get_config_stack().set(
866
'transform.orphan_policy', 'move')
867
tree.controldir.destroy_workingtree()
868
# FIXME: subtree/.bzr is left here which allows the test to pass (or
869
# fail :-( ) -- vila 20100909
870
repo = self.make_repository('repo', shared=True,
871
format='development-subtree')
872
repo.set_make_working_trees(False)
873
# FIXME: we just deleted the workingtree and now we want to use it ????
874
# At a minimum, we should use tree.branch below (but this fails too
875
# currently) or stop calling this test 'treeless'. Specifically, I've
876
# turn the line below into an assertRaises when 'subtree/.bzr' is
877
# orphaned and sprout tries to access the branch there (which is left
878
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
879
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
880
# #634470. -- vila 20100909
881
self.assertRaises(errors.NotBranchError,
882
tree.controldir.sprout, 'repo/tree2')
883
# self.assertPathExists('repo/tree2/subtree')
884
# self.assertPathDoesNotExist('repo/tree2/subtree/file')
886
def make_foo_bar_baz(self):
887
foo = bzrdir.BzrDir.create_branch_convenience('foo').controldir
888
bar = self.make_branch('foo/bar').controldir
889
baz = self.make_branch('baz').controldir
892
def test_find_controldirs(self):
893
foo, bar, baz = self.make_foo_bar_baz()
894
t = self.get_transport()
895
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_controldirs(t))
897
def make_fake_permission_denied_transport(self, transport, paths):
898
"""Create a transport that raises PermissionDenied for some paths."""
901
raise errors.PermissionDenied(path)
903
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
904
path_filter_server.start_server()
905
self.addCleanup(path_filter_server.stop_server)
906
path_filter_transport = pathfilter.PathFilteringTransport(
907
path_filter_server, '.')
908
return (path_filter_server, path_filter_transport)
910
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
911
"""Check that each branch url ends with the given suffix."""
912
for actual_bzrdir in actual_bzrdirs:
913
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
915
def test_find_controldirs_permission_denied(self):
916
foo, bar, baz = self.make_foo_bar_baz()
917
t = self.get_transport()
918
path_filter_server, path_filter_transport = \
919
self.make_fake_permission_denied_transport(t, ['foo'])
921
self.assertBranchUrlsEndWith('/baz/',
922
bzrdir.BzrDir.find_controldirs(path_filter_transport))
924
smart_transport = self.make_smart_server('.',
925
backing_server=path_filter_server)
926
self.assertBranchUrlsEndWith('/baz/',
927
bzrdir.BzrDir.find_controldirs(smart_transport))
929
def test_find_controldirs_list_current(self):
930
def list_current(transport):
931
return [s for s in transport.list_dir('') if s != 'baz']
933
foo, bar, baz = self.make_foo_bar_baz()
934
t = self.get_transport()
935
self.assertEqualBzrdirs(
937
bzrdir.BzrDir.find_controldirs(t, list_current=list_current))
939
def test_find_controldirs_evaluate(self):
940
def evaluate(bzrdir):
942
repo = bzrdir.open_repository()
943
except errors.NoRepositoryPresent:
944
return True, bzrdir.root_transport.base
946
return False, bzrdir.root_transport.base
948
foo, bar, baz = self.make_foo_bar_baz()
949
t = self.get_transport()
950
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
951
list(bzrdir.BzrDir.find_controldirs(t, evaluate=evaluate)))
953
def assertEqualBzrdirs(self, first, second):
955
second = list(second)
956
self.assertEqual(len(first), len(second))
957
for x, y in zip(first, second):
958
self.assertEqual(x.root_transport.base, y.root_transport.base)
960
def test_find_branches(self):
961
root = self.make_repository('', shared=True)
962
foo, bar, baz = self.make_foo_bar_baz()
963
qux = self.make_controldir('foo/qux')
964
t = self.get_transport()
965
branches = bzrdir.BzrDir.find_branches(t)
966
self.assertEqual(baz.root_transport.base, branches[0].base)
967
self.assertEqual(foo.root_transport.base, branches[1].base)
968
self.assertEqual(bar.root_transport.base, branches[2].base)
970
# ensure this works without a top-level repo
971
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
972
self.assertEqual(foo.root_transport.base, branches[0].base)
973
self.assertEqual(bar.root_transport.base, branches[1].base)
976
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
978
def test_find_controldirs_missing_repo(self):
979
t = self.get_transport()
980
arepo = self.make_repository('arepo', shared=True)
981
abranch_url = arepo.user_url + '/abranch'
982
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
983
t.delete_tree('arepo/.bzr')
984
self.assertRaises(errors.NoRepositoryPresent,
985
branch.Branch.open, abranch_url)
986
self.make_branch('baz')
987
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
988
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
991
class TestMeta1DirFormat(TestCaseWithTransport):
992
"""Tests specific to the meta1 dir format."""
994
def test_right_base_dirs(self):
995
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
997
branch_base = t.clone('branch').base
998
self.assertEqual(branch_base, dir.get_branch_transport(None).base)
999
self.assertEqual(branch_base,
1000
dir.get_branch_transport(BzrBranchFormat5()).base)
1001
repository_base = t.clone('repository').base
1002
self.assertEqual(repository_base, dir.get_repository_transport(None).base)
1003
repository_format = repository.format_registry.get_default()
1004
self.assertEqual(repository_base,
1005
dir.get_repository_transport(repository_format).base)
1006
checkout_base = t.clone('checkout').base
1007
self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
1008
self.assertEqual(checkout_base,
1009
dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
1011
def test_meta1dir_uses_lockdir(self):
1012
"""Meta1 format uses a LockDir to guard the whole directory, not a file."""
1013
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
1015
self.assertIsDirectory('branch-lock', t)
1017
def test_comparison(self):
1018
"""Equality and inequality behave properly.
1020
Metadirs should compare equal iff they have the same repo, branch and
1023
mydir = controldir.format_registry.make_controldir('knit')
1024
self.assertEqual(mydir, mydir)
1025
self.assertFalse(mydir != mydir)
1026
otherdir = controldir.format_registry.make_controldir('knit')
1027
self.assertEqual(otherdir, mydir)
1028
self.assertFalse(otherdir != mydir)
1029
otherdir2 = controldir.format_registry.make_controldir('development-subtree')
1030
self.assertNotEqual(otherdir2, mydir)
1031
self.assertFalse(otherdir2 == mydir)
1033
def test_with_features(self):
1034
tree = self.make_branch_and_tree('tree', format='2a')
1035
tree.controldir.update_feature_flags({b"bar": b"required"})
1036
self.assertRaises(bzrdir.MissingFeature, bzrdir.BzrDir.open, 'tree')
1037
bzrdir.BzrDirMetaFormat1.register_feature(b'bar')
1038
self.addCleanup(bzrdir.BzrDirMetaFormat1.unregister_feature, b'bar')
1039
dir = bzrdir.BzrDir.open('tree')
1040
self.assertEqual(b"required", dir._format.features.get(b"bar"))
1041
tree.controldir.update_feature_flags({
1043
b"nonexistant": None})
1044
dir = bzrdir.BzrDir.open('tree')
1045
self.assertEqual({}, dir._format.features)
1047
def test_needs_conversion_different_working_tree(self):
1048
# meta1dirs need an conversion if any element is not the default.
1049
new_format = controldir.format_registry.make_controldir('dirstate')
1050
tree = self.make_branch_and_tree('tree', format='knit')
1051
self.assertTrue(tree.controldir.needs_format_conversion(
1054
def test_initialize_on_format_uses_smart_transport(self):
1055
self.setup_smart_server_with_call_log()
1056
new_format = controldir.format_registry.make_controldir('dirstate')
1057
transport = self.get_transport('target')
1058
transport.ensure_base()
1059
self.reset_smart_call_log()
1060
instance = new_format.initialize_on_transport(transport)
1061
self.assertIsInstance(instance, remote.RemoteBzrDir)
1062
rpc_count = len(self.hpss_calls)
1063
# This figure represent the amount of work to perform this use case. It
1064
# is entirely ok to reduce this number if a test fails due to rpc_count
1065
# being too low. If rpc_count increases, more network roundtrips have
1066
# become necessary for this use case. Please do not adjust this number
1067
# upwards without agreement from bzr's network support maintainers.
1068
self.assertEqual(2, rpc_count)
1071
class NonLocalTests(TestCaseWithTransport):
1072
"""Tests for bzrdir static behaviour on non local paths."""
1075
super(NonLocalTests, self).setUp()
1076
self.vfs_transport_factory = memory.MemoryServer
1078
def test_create_branch_convenience(self):
1079
# outside a repo the default convenience output is a repo+branch_tree
1080
format = controldir.format_registry.make_controldir('knit')
1081
branch = bzrdir.BzrDir.create_branch_convenience(
1082
self.get_url('foo'), format=format)
1083
self.assertRaises(errors.NoWorkingTree,
1084
branch.controldir.open_workingtree)
1085
branch.controldir.open_repository()
1087
def test_create_branch_convenience_force_tree_not_local_fails(self):
1088
# outside a repo the default convenience output is a repo+branch_tree
1089
format = controldir.format_registry.make_controldir('knit')
1090
self.assertRaises(errors.NotLocalUrl,
1091
bzrdir.BzrDir.create_branch_convenience,
1092
self.get_url('foo'),
1093
force_new_tree=True,
1095
t = self.get_transport()
1096
self.assertFalse(t.has('foo'))
1098
def test_clone(self):
1099
# clone into a nonlocal path works
1100
format = controldir.format_registry.make_controldir('knit')
1101
branch = bzrdir.BzrDir.create_branch_convenience('local',
1103
branch.controldir.open_workingtree()
1104
result = branch.controldir.clone(self.get_url('remote'))
1105
self.assertRaises(errors.NoWorkingTree,
1106
result.open_workingtree)
1107
result.open_branch()
1108
result.open_repository()
1110
def test_checkout_metadir(self):
1111
# checkout_metadir has reasonable working tree format even when no
1112
# working tree is present
1113
self.make_branch('branch-knit2', format='dirstate-with-subtree')
1114
my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1115
checkout_format = my_bzrdir.checkout_metadir()
1116
self.assertIsInstance(checkout_format.workingtree_format,
1117
workingtree_4.WorkingTreeFormat4)
1120
class TestHTTPRedirectionsBase(object):
1121
"""Test redirection between two http servers.
1123
This MUST be used by daughter classes that also inherit from
1124
TestCaseWithTwoWebservers.
1126
We can't inherit directly from TestCaseWithTwoWebservers or the
1127
test framework will try to create an instance which cannot
1128
run, its implementation being incomplete.
1131
def create_transport_readonly_server(self):
1132
# We don't set the http protocol version, relying on the default
1133
return http_utils.HTTPServerRedirecting()
1135
def create_transport_secondary_server(self):
1136
# We don't set the http protocol version, relying on the default
1137
return http_utils.HTTPServerRedirecting()
1140
super(TestHTTPRedirectionsBase, self).setUp()
1141
# The redirections will point to the new server
1142
self.new_server = self.get_readonly_server()
1143
# The requests to the old server will be redirected
1144
self.old_server = self.get_secondary_server()
1145
# Configure the redirections
1146
self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1148
def test_loop(self):
1149
# Both servers redirect to each other creating a loop
1150
self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1151
# Starting from either server should loop
1152
old_url = self._qualified_url(self.old_server.host,
1153
self.old_server.port)
1154
oldt = self._transport(old_url)
1155
self.assertRaises(errors.NotBranchError,
1156
bzrdir.BzrDir.open_from_transport, oldt)
1157
new_url = self._qualified_url(self.new_server.host,
1158
self.new_server.port)
1159
newt = self._transport(new_url)
1160
self.assertRaises(errors.NotBranchError,
1161
bzrdir.BzrDir.open_from_transport, newt)
1163
def test_qualifier_preserved(self):
1164
wt = self.make_branch_and_tree('branch')
1165
old_url = self._qualified_url(self.old_server.host,
1166
self.old_server.port)
1167
start = self._transport(old_url).clone('branch')
1168
bdir = bzrdir.BzrDir.open_from_transport(start)
1169
# Redirection should preserve the qualifier, hence the transport class
1171
self.assertIsInstance(bdir.root_transport, type(start))
1174
class TestHTTPRedirections(TestHTTPRedirectionsBase,
1175
http_utils.TestCaseWithTwoWebservers):
1176
"""Tests redirections for urllib implementation"""
1178
_transport = HttpTransport
1180
def _qualified_url(self, host, port):
1181
result = 'http://%s:%s' % (host, port)
1182
self.permit_url(result)
1187
class TestHTTPRedirections_nosmart(TestHTTPRedirectionsBase,
1188
http_utils.TestCaseWithTwoWebservers):
1189
"""Tests redirections for the nosmart decorator"""
1191
_transport = NoSmartTransportDecorator
1193
def _qualified_url(self, host, port):
1194
result = 'nosmart+http://%s:%s' % (host, port)
1195
self.permit_url(result)
1199
class TestHTTPRedirections_readonly(TestHTTPRedirectionsBase,
1200
http_utils.TestCaseWithTwoWebservers):
1201
"""Tests redirections for readonly decoratror"""
1203
_transport = ReadonlyTransportDecorator
1205
def _qualified_url(self, host, port):
1206
result = 'readonly+http://%s:%s' % (host, port)
1207
self.permit_url(result)
1211
class TestDotBzrHidden(TestCaseWithTransport):
1214
if sys.platform == 'win32':
1215
ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1218
f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1219
stderr=subprocess.PIPE)
1220
out, err = f.communicate()
1221
self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1223
return out.splitlines()
1225
def test_dot_bzr_hidden(self):
1226
if sys.platform == 'win32' and not win32utils.has_win32file:
1227
raise TestSkipped('unable to make file hidden without pywin32 library')
1228
b = bzrdir.BzrDir.create('.')
1229
self.build_tree(['a'])
1230
self.assertEqual(['a'], self.get_ls())
1232
def test_dot_bzr_hidden_with_url(self):
1233
if sys.platform == 'win32' and not win32utils.has_win32file:
1234
raise TestSkipped('unable to make file hidden without pywin32 library')
1235
b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1236
self.build_tree(['a'])
1237
self.assertEqual(['a'], self.get_ls())
1240
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1241
"""Test BzrDirFormat implementation for TestBzrDirSprout."""
1243
def _open(self, transport):
1244
return _TestBzrDir(transport, self)
1247
class _TestBzrDir(bzrdir.BzrDirMeta1):
1248
"""Test BzrDir implementation for TestBzrDirSprout.
1250
When created a _TestBzrDir already has repository and a branch. The branch
1251
is a test double as well.
1254
def __init__(self, *args, **kwargs):
1255
super(_TestBzrDir, self).__init__(*args, **kwargs)
1256
self.test_branch = _TestBranch(self.transport)
1257
self.test_branch.repository = self.create_repository()
1259
def open_branch(self, unsupported=False, possible_transports=None):
1260
return self.test_branch
1262
def cloning_metadir(self, require_stacking=False):
1263
return _TestBzrDirFormat()
1266
class _TestBranchFormat(breezy.branch.BranchFormat):
1267
"""Test Branch format for TestBzrDirSprout."""
1270
class _TestBranch(breezy.branch.Branch):
1271
"""Test Branch implementation for TestBzrDirSprout."""
1273
def __init__(self, transport, *args, **kwargs):
1274
self._format = _TestBranchFormat()
1275
self._transport = transport
1276
self.base = transport.base
1277
super(_TestBranch, self).__init__(*args, **kwargs)
1281
def sprout(self, *args, **kwargs):
1282
self.calls.append('sprout')
1283
return _TestBranch(self._transport)
1285
def copy_content_into(self, destination, revision_id=None):
1286
self.calls.append('copy_content_into')
1288
def last_revision(self):
1289
return _mod_revision.NULL_REVISION
1291
def get_parent(self):
1294
def _get_config(self):
1295
return config.TransportConfig(self._transport, 'branch.conf')
1297
def _get_config_store(self):
1298
return config.BranchStore(self)
1300
def set_parent(self, parent):
1301
self._parent = parent
1303
def lock_read(self):
1304
return lock.LogicalLockResult(self.unlock)
1310
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1312
def test_sprout_uses_branch_sprout(self):
1313
"""BzrDir.sprout calls Branch.sprout.
1315
Usually, BzrDir.sprout should delegate to the branch's sprout method
1316
for part of the work. This allows the source branch to control the
1317
choice of format for the new branch.
1319
There are exceptions, but this tests avoids them:
1320
- if there's no branch in the source bzrdir,
1321
- or if the stacking has been requested and the format needs to be
1322
overridden to satisfy that.
1324
# Make an instrumented bzrdir.
1325
t = self.get_transport('source')
1327
source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1328
# The instrumented bzrdir has a test_branch attribute that logs calls
1329
# made to the branch contained in that bzrdir. Initially the test
1330
# branch exists but no calls have been made to it.
1331
self.assertEqual([], source_bzrdir.test_branch.calls)
1334
target_url = self.get_url('target')
1335
result = source_bzrdir.sprout(target_url, recurse='no')
1337
# The bzrdir called the branch's sprout method.
1338
self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1340
def test_sprout_parent(self):
1341
grandparent_tree = self.make_branch('grandparent')
1342
parent = grandparent_tree.controldir.sprout('parent').open_branch()
1343
branch_tree = parent.controldir.sprout('branch').open_branch()
1344
self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1347
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1349
def test_pre_open_called(self):
1351
bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1352
transport = self.get_transport('foo')
1353
url = transport.base
1354
self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1355
self.assertEqual([transport.base], [t.base for t in calls])
1357
def test_pre_open_actual_exceptions_raised(self):
1359
def fail_once(transport):
1362
raise errors.BzrError("fail")
1363
bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1364
transport = self.get_transport('foo')
1365
url = transport.base
1366
err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1367
self.assertEqual('fail', err._preformatted_string)
1369
def test_post_repo_init(self):
1370
from ..controldir import RepoInitHookParams
1372
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1374
self.make_repository('foo')
1375
self.assertLength(1, calls)
1377
self.assertIsInstance(params, RepoInitHookParams)
1378
self.assertTrue(hasattr(params, 'controldir'))
1379
self.assertTrue(hasattr(params, 'repository'))
1381
def test_post_repo_init_hook_repr(self):
1383
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1384
lambda params: param_reprs.append(repr(params)), None)
1385
self.make_repository('foo')
1386
self.assertLength(1, param_reprs)
1387
param_repr = param_reprs[0]
1388
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1391
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1392
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1393
# moved to per_bzrdir or per_transport for better coverage ?
1397
super(TestGenerateBackupName, self).setUp()
1398
self._transport = self.get_transport()
1399
bzrdir.BzrDir.create(self.get_url(),
1400
possible_transports=[self._transport])
1401
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1404
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1406
def test_exiting(self):
1407
self._transport.put_bytes("a.~1~", "some content")
1408
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))
1411
class TestMeta1DirColoFormat(TestCaseWithTransport):
1412
"""Tests specific to the meta1 dir with colocated branches format."""
1414
def test_supports_colo(self):
1415
format = bzrdir.BzrDirMetaFormat1Colo()
1416
self.assertTrue(format.colocated_branches)
1418
def test_upgrade_from_2a(self):
1419
tree = self.make_branch_and_tree('.', format='2a')
1420
format = bzrdir.BzrDirMetaFormat1Colo()
1421
self.assertTrue(tree.controldir.needs_format_conversion(format))
1422
converter = tree.controldir._format.get_converter(format)
1423
result = converter.convert(tree.controldir, None)
1424
self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1Colo)
1425
self.assertFalse(result.needs_format_conversion(format))
1427
def test_downgrade_to_2a(self):
1428
tree = self.make_branch_and_tree('.', format='development-colo')
1429
format = bzrdir.BzrDirMetaFormat1()
1430
self.assertTrue(tree.controldir.needs_format_conversion(format))
1431
converter = tree.controldir._format.get_converter(format)
1432
result = converter.convert(tree.controldir, None)
1433
self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1)
1434
self.assertFalse(result.needs_format_conversion(format))
1436
def test_downgrade_to_2a_too_many_branches(self):
1437
tree = self.make_branch_and_tree('.', format='development-colo')
1438
tree.controldir.create_branch(name="another-colocated-branch")
1439
converter = tree.controldir._format.get_converter(
1440
bzrdir.BzrDirMetaFormat1())
1441
result = converter.convert(tree.controldir, bzrdir.BzrDirMetaFormat1())
1442
self.assertIsInstance(result._format, bzrdir.BzrDirMetaFormat1)
1444
def test_nested(self):
1445
tree = self.make_branch_and_tree('.', format='development-colo')
1446
tree.controldir.create_branch(name='foo')
1447
tree.controldir.create_branch(name='fool/bla')
1449
errors.ParentBranchExists, tree.controldir.create_branch,
1452
def test_parent(self):
1453
tree = self.make_branch_and_tree('.', format='development-colo')
1454
tree.controldir.create_branch(name='fool/bla')
1455
tree.controldir.create_branch(name='foo/bar')
1457
errors.AlreadyBranchError, tree.controldir.create_branch,
1461
class SampleBzrFormat(bzrdir.BzrFormat):
1464
def get_format_string(cls):
1465
return b"First line\n"
1468
class TestBzrFormat(TestCase):
1469
"""Tests for BzrFormat."""
1471
def test_as_string(self):
1472
format = SampleBzrFormat()
1473
format.features = {b"foo": b"required"}
1474
self.assertEqual(format.as_string(),
1477
format.features["another"] = "optional"
1478
self.assertEqual(format.as_string(),
1481
b"optional another\n")
1483
def test_network_name(self):
1484
# The network string should include the feature info
1485
format = SampleBzrFormat()
1486
format.features = {b"foo": b"required"}
1488
b"First line\nrequired foo\n",
1489
format.network_name())
1491
def test_from_string_no_features(self):
1493
format = SampleBzrFormat.from_string(
1495
self.assertEqual({}, format.features)
1497
def test_from_string_with_feature(self):
1499
format = SampleBzrFormat.from_string(
1500
b"First line\nrequired foo\n")
1501
self.assertEqual(b"required", format.features.get(b"foo"))
1503
def test_from_string_format_string_mismatch(self):
1504
# The first line has to match the format string
1505
self.assertRaises(AssertionError, SampleBzrFormat.from_string,
1506
b"Second line\nrequired foo\n")
1508
def test_from_string_missing_space(self):
1509
# At least one space is required in the feature lines
1510
self.assertRaises(errors.ParseFormatError, SampleBzrFormat.from_string,
1511
b"First line\nfoo\n")
1513
def test_from_string_with_spaces(self):
1514
# Feature with spaces (in case we add stuff like this in the future)
1515
format = SampleBzrFormat.from_string(
1516
b"First line\nrequired foo with spaces\n")
1517
self.assertEqual(b"required", format.features.get(b"foo with spaces"))
1520
format1 = SampleBzrFormat()
1521
format1.features = {b"nested-trees": b"optional"}
1522
format2 = SampleBzrFormat()
1523
format2.features = {b"nested-trees": b"optional"}
1524
self.assertEqual(format1, format1)
1525
self.assertEqual(format1, format2)
1526
format3 = SampleBzrFormat()
1527
self.assertNotEqual(format1, format3)
1529
def test_check_support_status_optional(self):
1530
# Optional, so silently ignore
1531
format = SampleBzrFormat()
1532
format.features = {b"nested-trees": b"optional"}
1533
format.check_support_status(True)
1534
self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
1535
SampleBzrFormat.register_feature(b"nested-trees")
1536
format.check_support_status(True)
1538
def test_check_support_status_required(self):
1539
# Optional, so trigger an exception
1540
format = SampleBzrFormat()
1541
format.features = {b"nested-trees": b"required"}
1542
self.assertRaises(bzrdir.MissingFeature, format.check_support_status,
1544
self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
1545
SampleBzrFormat.register_feature(b"nested-trees")
1546
format.check_support_status(True)
1548
def test_check_support_status_unknown(self):
1549
# treat unknown necessity as required
1550
format = SampleBzrFormat()
1551
format.features = {b"nested-trees": b"unknown"}
1552
self.assertRaises(bzrdir.MissingFeature, format.check_support_status,
1554
self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
1555
SampleBzrFormat.register_feature(b"nested-trees")
1556
format.check_support_status(True)
1558
def test_feature_already_registered(self):
1559
# a feature can only be registered once
1560
self.addCleanup(SampleBzrFormat.unregister_feature, b"nested-trees")
1561
SampleBzrFormat.register_feature(b"nested-trees")
1562
self.assertRaises(bzrdir.FeatureAlreadyRegistered,
1563
SampleBzrFormat.register_feature, b"nested-trees")
1565
def test_feature_with_space(self):
1566
# spaces are not allowed in feature names
1567
self.assertRaises(ValueError, SampleBzrFormat.register_feature,