67
73
old_format = bzrdir.BzrDirFormat.get_default_format()
68
74
# default is BzrDirFormat6
69
75
self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
70
bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
76
controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
71
77
# creating a bzr dir should now create an instrumented dir.
73
79
result = bzrdir.BzrDir.create('memory:///')
74
80
self.failUnless(isinstance(result, SampleBzrDir))
76
bzrdir.BzrDirFormat._set_default_format(old_format)
82
controldir.ControlDirFormat._set_default_format(old_format)
77
83
self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
80
86
class TestFormatRegistry(TestCase):
82
88
def make_format_registry(self):
83
my_format_registry = bzrdir.BzrDirFormatRegistry()
89
my_format_registry = controldir.ControlDirFormatRegistry()
84
90
my_format_registry.register('weave', bzrdir.BzrDirFormat6,
85
91
'Pre-0.8 format. Slower and does not support checkouts or shared'
86
92
' repositories', deprecated=True)
87
93
my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
88
94
'BzrDirFormat6', 'Format registered lazily', deprecated=True)
89
my_format_registry.register_metadir('knit',
95
bzrdir.register_metadir(my_format_registry, 'knit',
90
96
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
91
97
'Format using knits',
93
99
my_format_registry.set_default('knit')
94
my_format_registry.register_metadir(
100
bzrdir.register_metadir(my_format_registry,
96
102
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
97
103
'Experimental successor to knit. Use at your own risk.',
98
104
branch_format='bzrlib.branch.BzrBranchFormat6',
99
105
experimental=True)
100
my_format_registry.register_metadir(
106
bzrdir.register_metadir(my_format_registry,
102
108
'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
103
109
'Experimental successor to knit. Use at your own risk.',
172
178
bzrdir.format_registry.set_default_repository(old_default)
174
180
def test_aliases(self):
175
a_registry = bzrdir.BzrDirFormatRegistry()
181
a_registry = controldir.ControlDirFormatRegistry()
176
182
a_registry.register('weave', bzrdir.BzrDirFormat6,
177
183
'Pre-0.8 format. Slower and does not support checkouts or shared'
178
184
' repositories', deprecated=True)
249
255
# is the right format object found for a branch?
250
256
# create a branch with a few known format objects.
251
257
# this is not quite the same as
252
t = get_transport(self.get_url())
258
t = self.get_transport()
253
259
self.build_tree(["foo/", "bar/"], transport=t)
254
260
def check_format(format, url):
255
261
format.initialize(url)
256
t = get_transport(url)
262
t = _mod_transport.get_transport(url)
257
263
found_format = bzrdir.BzrDirFormat.find_format(t)
258
264
self.failUnless(isinstance(found_format, format.__class__))
259
265
check_format(bzrdir.BzrDirFormat5(), "foo")
262
268
def test_find_format_nothing_there(self):
263
269
self.assertRaises(NotBranchError,
264
270
bzrdir.BzrDirFormat.find_format,
271
_mod_transport.get_transport('.'))
267
273
def test_find_format_unknown_format(self):
268
t = get_transport(self.get_url())
274
t = self.get_transport()
270
276
t.put_bytes('.bzr/branch-format', '')
271
277
self.assertRaises(UnknownFormatError,
272
278
bzrdir.BzrDirFormat.find_format,
279
_mod_transport.get_transport('.'))
275
281
def test_register_unregister_format(self):
276
282
format = SampleBzrDirFormat()
284
290
# which bzrdir.open_containing will refuse (not supported)
285
291
self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
286
292
# but open_downlevel will work
287
t = get_transport(url)
293
t = _mod_transport.get_transport(url)
288
294
self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
289
295
# unregister the format
290
296
bzrdir.BzrDirFormat.unregister_format(format)
676
682
self.assertEqual(relpath, 'baz')
678
684
def test_open_containing_from_transport(self):
679
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
680
get_transport(self.get_readonly_url('')))
681
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
682
get_transport(self.get_readonly_url('g/p/q')))
685
self.assertRaises(NotBranchError,
686
bzrdir.BzrDir.open_containing_from_transport,
687
_mod_transport.get_transport(self.get_readonly_url('')))
688
self.assertRaises(NotBranchError,
689
bzrdir.BzrDir.open_containing_from_transport,
690
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
683
691
control = bzrdir.BzrDir.create(self.get_url())
684
692
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
685
get_transport(self.get_readonly_url('')))
693
_mod_transport.get_transport(self.get_readonly_url('')))
686
694
self.assertEqual('', relpath)
687
695
branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
688
get_transport(self.get_readonly_url('g/p/q')))
696
_mod_transport.get_transport(self.get_readonly_url('g/p/q')))
689
697
self.assertEqual('g/p/q', relpath)
691
699
def test_open_containing_tree_or_branch(self):
735
743
# transport pointing at bzrdir should give a bzrdir with root transport
736
744
# set to the given transport
737
745
control = bzrdir.BzrDir.create(self.get_url())
738
transport = get_transport(self.get_url())
739
opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
740
self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
746
t = self.get_transport()
747
opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
748
self.assertEqual(t.base, opened_bzrdir.root_transport.base)
741
749
self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
743
751
def test_open_from_transport_no_bzrdir(self):
744
transport = get_transport(self.get_url())
745
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
752
t = self.get_transport()
753
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
748
755
def test_open_from_transport_bzrdir_in_parent(self):
749
756
control = bzrdir.BzrDir.create(self.get_url())
750
transport = get_transport(self.get_url())
751
transport.mkdir('subdir')
752
transport = transport.clone('subdir')
753
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
757
t = self.get_transport()
759
t = t.clone('subdir')
760
self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
756
762
def test_sprout_recursive(self):
757
763
tree = self.make_branch_and_tree('tree1',
787
793
self.build_tree(['tree1/subtree/file'])
788
794
sub_tree.add('file')
789
795
tree.commit('Initial commit')
796
# The following line force the orhaning to reveal bug #634470
797
tree.branch.get_config().set_user_option(
798
'bzr.transform.orphan_policy', 'move')
790
799
tree.bzrdir.destroy_workingtree()
800
# FIXME: subtree/.bzr is left here which allows the test to pass (or
801
# fail :-( ) -- vila 20100909
791
802
repo = self.make_repository('repo', shared=True,
792
803
format='dirstate-with-subtree')
793
804
repo.set_make_working_trees(False)
794
tree.bzrdir.sprout('repo/tree2')
795
self.failUnlessExists('repo/tree2/subtree')
796
self.failIfExists('repo/tree2/subtree/file')
805
# FIXME: we just deleted the workingtree and now we want to use it ????
806
# At a minimum, we should use tree.branch below (but this fails too
807
# currently) or stop calling this test 'treeless'. Specifically, I've
808
# turn the line below into an assertRaises when 'subtree/.bzr' is
809
# orphaned and sprout tries to access the branch there (which is left
810
# by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
811
# [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
812
# #634470. -- vila 20100909
813
self.assertRaises(errors.NotBranchError,
814
tree.bzrdir.sprout, 'repo/tree2')
815
# self.failUnlessExists('repo/tree2/subtree')
816
# self.failIfExists('repo/tree2/subtree/file')
798
818
def make_foo_bar_baz(self):
799
819
foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
804
824
def test_find_bzrdirs(self):
805
825
foo, bar, baz = self.make_foo_bar_baz()
806
transport = get_transport(self.get_url())
807
self.assertEqualBzrdirs([baz, foo, bar],
808
bzrdir.BzrDir.find_bzrdirs(transport))
826
t = self.get_transport()
827
self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
829
def make_fake_permission_denied_transport(self, transport, paths):
830
"""Create a transport that raises PermissionDenied for some paths."""
833
raise errors.PermissionDenied(path)
835
path_filter_server = pathfilter.PathFilteringServer(transport, filter)
836
path_filter_server.start_server()
837
self.addCleanup(path_filter_server.stop_server)
838
path_filter_transport = pathfilter.PathFilteringTransport(
839
path_filter_server, '.')
840
return (path_filter_server, path_filter_transport)
842
def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
843
"""Check that each branch url ends with the given suffix."""
844
for actual_bzrdir in actual_bzrdirs:
845
self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
847
def test_find_bzrdirs_permission_denied(self):
848
foo, bar, baz = self.make_foo_bar_baz()
849
t = self.get_transport()
850
path_filter_server, path_filter_transport = \
851
self.make_fake_permission_denied_transport(t, ['foo'])
853
self.assertBranchUrlsEndWith('/baz/',
854
bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
856
smart_transport = self.make_smart_server('.',
857
backing_server=path_filter_server)
858
self.assertBranchUrlsEndWith('/baz/',
859
bzrdir.BzrDir.find_bzrdirs(smart_transport))
810
861
def test_find_bzrdirs_list_current(self):
811
862
def list_current(transport):
812
863
return [s for s in transport.list_dir('') if s != 'baz']
814
865
foo, bar, baz = self.make_foo_bar_baz()
815
transport = get_transport(self.get_url())
816
self.assertEqualBzrdirs([foo, bar],
817
bzrdir.BzrDir.find_bzrdirs(transport,
818
list_current=list_current))
866
t = self.get_transport()
867
self.assertEqualBzrdirs(
869
bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
821
871
def test_find_bzrdirs_evaluate(self):
822
872
def evaluate(bzrdir):
828
878
return False, bzrdir.root_transport.base
830
880
foo, bar, baz = self.make_foo_bar_baz()
831
transport = get_transport(self.get_url())
881
t = self.get_transport()
832
882
self.assertEqual([baz.root_transport.base, foo.root_transport.base],
833
list(bzrdir.BzrDir.find_bzrdirs(transport,
883
list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
836
885
def assertEqualBzrdirs(self, first, second):
837
886
first = list(first)
844
893
root = self.make_repository('', shared=True)
845
894
foo, bar, baz = self.make_foo_bar_baz()
846
895
qux = self.make_bzrdir('foo/qux')
847
transport = get_transport(self.get_url())
848
branches = bzrdir.BzrDir.find_branches(transport)
896
t = self.get_transport()
897
branches = bzrdir.BzrDir.find_branches(t)
849
898
self.assertEqual(baz.root_transport.base, branches[0].base)
850
899
self.assertEqual(foo.root_transport.base, branches[1].base)
851
900
self.assertEqual(bar.root_transport.base, branches[2].base)
853
902
# ensure this works without a top-level repo
854
branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
903
branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
855
904
self.assertEqual(foo.root_transport.base, branches[0].base)
856
905
self.assertEqual(bar.root_transport.base, branches[1].base)
908
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
910
def test_find_bzrdirs_missing_repo(self):
911
t = self.get_transport()
912
arepo = self.make_repository('arepo', shared=True)
913
abranch_url = arepo.user_url + '/abranch'
914
abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
915
t.delete_tree('arepo/.bzr')
916
self.assertRaises(errors.NoRepositoryPresent,
917
branch.Branch.open, abranch_url)
918
self.make_branch('baz')
919
for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
920
self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
859
923
class TestMeta1DirFormat(TestCaseWithTransport):
860
924
"""Tests specific to the meta1 dir format."""
1030
1096
dir = format.initialize(self.get_url())
1031
1097
self.assertIsInstance(dir, NotBzrDir)
1032
1098
# now probe for it.
1033
bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1099
controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1035
found = bzrlib.bzrdir.BzrDirFormat.find_format(
1036
get_transport(self.get_url()))
1101
found = bzrlib.bzrdir.BzrDirFormat.find_format(self.get_transport())
1037
1102
self.assertIsInstance(found, NotBzrDirFormat)
1039
bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1104
controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1041
1106
def test_included_in_known_formats(self):
1042
bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1107
not_format = NotBzrDirFormat()
1108
bzrlib.controldir.ControlDirFormat.register_format(not_format)
1044
1110
formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1045
1111
for format in formats:
1113
1179
def create_transport_readonly_server(self):
1180
# We don't set the http protocol version, relying on the default
1114
1181
return http_utils.HTTPServerRedirecting()
1116
1183
def create_transport_secondary_server(self):
1184
# We don't set the http protocol version, relying on the default
1117
1185
return http_utils.HTTPServerRedirecting()
1119
1187
def setUp(self):
1274
1342
def copy_content_into(self, destination, revision_id=None):
1275
1343
self.calls.append('copy_content_into')
1345
def last_revision(self):
1346
return _mod_revision.NULL_REVISION
1277
1348
def get_parent(self):
1278
1349
return self._parent
1280
1351
def set_parent(self, parent):
1281
1352
self._parent = parent
1354
def lock_read(self):
1355
return lock.LogicalLockResult(self.unlock)
1284
1361
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1351
1428
self.assertIsInstance(params, RepoInitHookParams)
1352
1429
self.assertTrue(hasattr(params, 'bzrdir'))
1353
1430
self.assertTrue(hasattr(params, 'repository'))
1432
def test_post_repo_init_hook_repr(self):
1434
bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1435
lambda params: param_reprs.append(repr(params)), None)
1436
self.make_repository('foo')
1437
self.assertLength(1, param_reprs)
1438
param_repr = param_reprs[0]
1439
self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1442
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1443
# FIXME: This may need to be unified with test_osutils.TestBackupNames or
1444
# moved to per_bzrdir or per_transport for better coverage ?
1448
super(TestGenerateBackupName, self).setUp()
1449
self._transport = self.get_transport()
1450
bzrdir.BzrDir.create(self.get_url(),
1451
possible_transports=[self._transport])
1452
self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1454
def test_deprecated_generate_backup_name(self):
1455
res = self.applyDeprecated(
1456
symbol_versioning.deprecated_in((2, 3, 0)),
1457
self._bzrdir.generate_backup_name, 'whatever')
1460
self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1462
def test_exiting(self):
1463
self._transport.put_bytes("a.~1~", "some content")
1464
self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))