1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.memory import MemoryServer
48
class TestDefaultFormat(TestCase):
50
def test_get_set_default_format(self):
51
old_default = bzrdir.format_registry.get('default')
52
private_default = old_default().repository_format.__class__
53
old_format = repository.RepositoryFormat.get_default_format()
54
self.assertTrue(isinstance(old_format, private_default))
55
def make_sample_bzrdir():
56
my_bzrdir = bzrdir.BzrDirMetaFormat1()
57
my_bzrdir.repository_format = SampleRepositoryFormat()
59
bzrdir.format_registry.remove('default')
60
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
61
bzrdir.format_registry.set_default('sample')
62
# creating a repository should now create an instrumented dir.
64
# the default branch format is used by the meta dir format
65
# which is not the default bzrdir format at this point
66
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
67
result = dir.create_repository()
68
self.assertEqual(result, 'A bzr repository dir')
70
bzrdir.format_registry.remove('default')
71
bzrdir.format_registry.register('default', old_default, '')
72
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
76
class SampleRepositoryFormat(repository.RepositoryFormat):
79
this format is initializable, unsupported to aid in testing the
80
open and open(unsupported=True) routines.
83
def get_format_string(self):
84
"""See RepositoryFormat.get_format_string()."""
85
return "Sample .bzr repository format."
87
def initialize(self, a_bzrdir, shared=False):
88
"""Initialize a repository in a BzrDir"""
89
t = a_bzrdir.get_repository_transport(self)
90
t.put_bytes('format', self.get_format_string())
91
return 'A bzr repository dir'
93
def is_supported(self):
96
def open(self, a_bzrdir, _found=False):
97
return "opened repository."
100
class TestRepositoryFormat(TestCaseWithTransport):
101
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
103
def test_find_format(self):
104
# is the right format object found for a repository?
105
# create a branch with a few known format objects.
106
# this is not quite the same as
107
self.build_tree(["foo/", "bar/"])
108
def check_format(format, url):
109
dir = format._matchingbzrdir.initialize(url)
110
format.initialize(dir)
111
t = get_transport(url)
112
found_format = repository.RepositoryFormat.find_format(dir)
113
self.failUnless(isinstance(found_format, format.__class__))
114
check_format(repository.RepositoryFormat7(), "bar")
116
def test_find_format_no_repository(self):
117
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
118
self.assertRaises(errors.NoRepositoryPresent,
119
repository.RepositoryFormat.find_format,
122
def test_find_format_unknown_format(self):
123
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
124
SampleRepositoryFormat().initialize(dir)
125
self.assertRaises(UnknownFormatError,
126
repository.RepositoryFormat.find_format,
129
def test_register_unregister_format(self):
130
format = SampleRepositoryFormat()
132
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
134
format.initialize(dir)
135
# register a format for it.
136
repository.RepositoryFormat.register_format(format)
137
# which repository.Open will refuse (not supported)
138
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
139
# but open(unsupported) will work
140
self.assertEqual(format.open(dir), "opened repository.")
141
# unregister the format
142
repository.RepositoryFormat.unregister_format(format)
145
class TestFormat6(TestCaseWithTransport):
147
def test_no_ancestry_weave(self):
148
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
149
repo = repository.RepositoryFormat6().initialize(control)
150
# We no longer need to create the ancestry.weave file
151
# since it is *never* used.
152
self.assertRaises(NoSuchFile,
153
control.transport.get,
157
class TestFormat7(TestCaseWithTransport):
159
def test_disk_layout(self):
160
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
161
repo = repository.RepositoryFormat7().initialize(control)
162
# in case of side effects of locking.
166
# format 'Bazaar-NG Repository format 7'
168
# inventory.weave == empty_weave
169
# empty revision-store directory
170
# empty weaves directory
171
t = control.get_repository_transport(None)
172
self.assertEqualDiff('Bazaar-NG Repository format 7',
173
t.get('format').read())
174
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
175
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
176
self.assertEqualDiff('# bzr weave file v5\n'
179
t.get('inventory.weave').read())
181
def test_shared_disk_layout(self):
182
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
183
repo = repository.RepositoryFormat7().initialize(control, shared=True)
185
# format 'Bazaar-NG Repository format 7'
186
# inventory.weave == empty_weave
187
# empty revision-store directory
188
# empty weaves directory
189
# a 'shared-storage' marker file.
190
# lock is not present when unlocked
191
t = control.get_repository_transport(None)
192
self.assertEqualDiff('Bazaar-NG Repository format 7',
193
t.get('format').read())
194
self.assertEqualDiff('', t.get('shared-storage').read())
195
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
196
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
197
self.assertEqualDiff('# bzr weave file v5\n'
200
t.get('inventory.weave').read())
201
self.assertFalse(t.has('branch-lock'))
203
def test_creates_lockdir(self):
204
"""Make sure it appears to be controlled by a LockDir existence"""
205
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
206
repo = repository.RepositoryFormat7().initialize(control, shared=True)
207
t = control.get_repository_transport(None)
208
# TODO: Should check there is a 'lock' toplevel directory,
209
# regardless of contents
210
self.assertFalse(t.has('lock/held/info'))
213
self.assertTrue(t.has('lock/held/info'))
215
# unlock so we don't get a warning about failing to do so
218
def test_uses_lockdir(self):
219
"""repo format 7 actually locks on lockdir"""
220
base_url = self.get_url()
221
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
222
repo = repository.RepositoryFormat7().initialize(control, shared=True)
223
t = control.get_repository_transport(None)
227
# make sure the same lock is created by opening it
228
repo = repository.Repository.open(base_url)
230
self.assertTrue(t.has('lock/held/info'))
232
self.assertFalse(t.has('lock/held/info'))
234
def test_shared_no_tree_disk_layout(self):
235
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
236
repo = repository.RepositoryFormat7().initialize(control, shared=True)
237
repo.set_make_working_trees(False)
239
# format 'Bazaar-NG Repository format 7'
241
# inventory.weave == empty_weave
242
# empty revision-store directory
243
# empty weaves directory
244
# a 'shared-storage' marker file.
245
t = control.get_repository_transport(None)
246
self.assertEqualDiff('Bazaar-NG Repository format 7',
247
t.get('format').read())
248
## self.assertEqualDiff('', t.get('lock').read())
249
self.assertEqualDiff('', t.get('shared-storage').read())
250
self.assertEqualDiff('', t.get('no-working-trees').read())
251
repo.set_make_working_trees(True)
252
self.assertFalse(t.has('no-working-trees'))
253
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
254
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
255
self.assertEqualDiff('# bzr weave file v5\n'
258
t.get('inventory.weave').read())
261
class TestFormatKnit1(TestCaseWithTransport):
263
def test_disk_layout(self):
264
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
265
repo = repository.RepositoryFormatKnit1().initialize(control)
266
# in case of side effects of locking.
270
# format 'Bazaar-NG Knit Repository Format 1'
271
# lock: is a directory
272
# inventory.weave == empty_weave
273
# empty revision-store directory
274
# empty weaves directory
275
t = control.get_repository_transport(None)
276
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
277
t.get('format').read())
278
# XXX: no locks left when unlocked at the moment
279
# self.assertEqualDiff('', t.get('lock').read())
280
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
283
def assertHasKnit(self, t, knit_name):
284
"""Assert that knit_name exists on t."""
285
self.assertEqualDiff('# bzr knit index 8\n',
286
t.get(knit_name + '.kndx').read())
288
self.assertTrue(t.has(knit_name + '.knit'))
290
def check_knits(self, t):
291
"""check knit content for a repository."""
292
self.assertHasKnit(t, 'inventory')
293
self.assertHasKnit(t, 'revisions')
294
self.assertHasKnit(t, 'signatures')
296
def test_shared_disk_layout(self):
297
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
298
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
300
# format 'Bazaar-NG Knit Repository Format 1'
301
# lock: is a directory
302
# inventory.weave == empty_weave
303
# empty revision-store directory
304
# empty weaves directory
305
# a 'shared-storage' marker file.
306
t = control.get_repository_transport(None)
307
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
308
t.get('format').read())
309
# XXX: no locks left when unlocked at the moment
310
# self.assertEqualDiff('', t.get('lock').read())
311
self.assertEqualDiff('', t.get('shared-storage').read())
312
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
315
def test_shared_no_tree_disk_layout(self):
316
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
317
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
318
repo.set_make_working_trees(False)
320
# format 'Bazaar-NG Knit Repository Format 1'
322
# inventory.weave == empty_weave
323
# empty revision-store directory
324
# empty weaves directory
325
# a 'shared-storage' marker file.
326
t = control.get_repository_transport(None)
327
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
328
t.get('format').read())
329
# XXX: no locks left when unlocked at the moment
330
# self.assertEqualDiff('', t.get('lock').read())
331
self.assertEqualDiff('', t.get('shared-storage').read())
332
self.assertEqualDiff('', t.get('no-working-trees').read())
333
repo.set_make_working_trees(True)
334
self.assertFalse(t.has('no-working-trees'))
335
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
339
class InterString(repository.InterRepository):
340
"""An inter-repository optimised code path for strings.
342
This is for use during testing where we use strings as repositories
343
so that none of the default regsitered inter-repository classes will
348
def is_compatible(repo_source, repo_target):
349
"""InterString is compatible with strings-as-repos."""
350
return isinstance(repo_source, str) and isinstance(repo_target, str)
353
class TestInterRepository(TestCaseWithTransport):
355
def test_get_default_inter_repository(self):
356
# test that the InterRepository.get(repo_a, repo_b) probes
357
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
358
# true and returns a default inter_repo otherwise.
359
# This also tests that the default registered optimised interrepository
360
# classes do not barf inappropriately when a surprising repository type
362
dummy_a = "Repository 1."
363
dummy_b = "Repository 2."
364
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
366
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
367
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
368
inter_repo = repository.InterRepository.get(repo_a, repo_b)
369
self.assertEqual(repository.InterRepository,
370
inter_repo.__class__)
371
self.assertEqual(repo_a, inter_repo.source)
372
self.assertEqual(repo_b, inter_repo.target)
374
def test_register_inter_repository_class(self):
375
# test that a optimised code path provider - a
376
# InterRepository subclass can be registered and unregistered
377
# and that it is correctly selected when given a repository
378
# pair that it returns true on for the is_compatible static method
380
dummy_a = "Repository 1."
381
dummy_b = "Repository 2."
382
repository.InterRepository.register_optimiser(InterString)
384
# we should get the default for something InterString returns False
386
self.assertFalse(InterString.is_compatible(dummy_a, None))
387
self.assertGetsDefaultInterRepository(dummy_a, None)
388
# and we should get an InterString for a pair it 'likes'
389
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
390
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
391
self.assertEqual(InterString, inter_repo.__class__)
392
self.assertEqual(dummy_a, inter_repo.source)
393
self.assertEqual(dummy_b, inter_repo.target)
395
repository.InterRepository.unregister_optimiser(InterString)
396
# now we should get the default InterRepository object again.
397
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
400
class TestInterWeaveRepo(TestCaseWithTransport):
402
def test_is_compatible_and_registered(self):
403
# InterWeaveRepo is compatible when either side
404
# is a format 5/6/7 branch
405
formats = [repository.RepositoryFormat5(),
406
repository.RepositoryFormat6(),
407
repository.RepositoryFormat7()]
408
incompatible_formats = [repository.RepositoryFormat4(),
409
repository.RepositoryFormatKnit1(),
411
repo_a = self.make_repository('a')
412
repo_b = self.make_repository('b')
413
is_compatible = repository.InterWeaveRepo.is_compatible
414
for source in incompatible_formats:
415
# force incompatible left then right
416
repo_a._format = source
417
repo_b._format = formats[0]
418
self.assertFalse(is_compatible(repo_a, repo_b))
419
self.assertFalse(is_compatible(repo_b, repo_a))
420
for source in formats:
421
repo_a._format = source
422
for target in formats:
423
repo_b._format = target
424
self.assertTrue(is_compatible(repo_a, repo_b))
425
self.assertEqual(repository.InterWeaveRepo,
426
repository.InterRepository.get(repo_a,
430
class TestRepositoryConverter(TestCaseWithTransport):
432
def test_convert_empty(self):
433
t = get_transport(self.get_url('.'))
434
t.mkdir('repository')
435
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
436
repo = repository.RepositoryFormat7().initialize(repo_dir)
437
target_format = repository.RepositoryFormatKnit1()
438
converter = repository.CopyConverter(target_format)
439
pb = bzrlib.ui.ui_factory.nested_progress_bar()
441
converter.convert(repo, pb)
444
repo = repo_dir.open_repository()
445
self.assertTrue(isinstance(target_format, repo._format.__class__))
448
class TestMisc(TestCase):
450
def test_unescape_xml(self):
451
"""We get some kind of error when malformed entities are passed"""
452
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
455
class TestRepositoryFormatKnit2(TestCaseWithTransport):
457
def test_convert(self):
458
"""Ensure the upgrade adds weaves for roots"""
459
format = bzrdir.BzrDirMetaFormat1()
460
format.repository_format = repository.RepositoryFormatKnit1()
461
tree = self.make_branch_and_tree('.', format)
462
tree.commit("Dull commit", rev_id="dull")
463
revision_tree = tree.branch.repository.revision_tree('dull')
464
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
465
revision_tree.inventory.root.file_id)
466
format = bzrdir.BzrDirMetaFormat1()
467
format.repository_format = repository.RepositoryFormatKnit2()
468
upgrade.Convert('.', format)
469
tree = workingtree.WorkingTree.open('.')
470
revision_tree = tree.branch.repository.revision_tree('dull')
471
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
472
tree.commit("Another dull commit", rev_id='dull2')
473
revision_tree = tree.branch.repository.revision_tree('dull2')
474
self.assertEqual('dull', revision_tree.inventory.root.revision)