1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
28
from bzrlib import symbol_versioning
30
import bzrlib.bzrdir as bzrdir
31
import bzrlib.errors as errors
32
from bzrlib.errors import (NotBranchError,
35
UnsupportedFormatError,
37
from bzrlib.repository import RepositoryFormat
38
from bzrlib.tests import TestCase, TestCaseWithTransport
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.memory import MemoryServer
46
from bzrlib.repofmt import knitrepo, weaverepo
49
class TestDefaultFormat(TestCase):
51
def test_get_set_default_format(self):
52
old_format = RepositoryFormat.get_default_format()
53
test_format = SampleRepositoryFormat()
54
RepositoryFormat.register_format(test_format)
56
self.applyDeprecated(symbol_versioning.zero_fourteen,
57
RepositoryFormat.set_default_format,
59
# creating a repository should now create an instrumented dir.
61
# the default branch format is used by the meta dir format
62
# which is not the default bzrdir format at this point
63
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
64
result = dir.create_repository()
65
self.assertEqual(result, 'A bzr repository dir')
67
self.applyDeprecated(symbol_versioning.zero_fourteen,
68
RepositoryFormat.set_default_format, old_format)
70
RepositoryFormat.unregister_format(test_format)
71
self.assertEqual(old_format, RepositoryFormat.get_default_format())
74
class SampleRepositoryFormat(repository.RepositoryFormat):
77
this format is initializable, unsupported to aid in testing the
78
open and open(unsupported=True) routines.
81
def get_format_string(self):
82
"""See RepositoryFormat.get_format_string()."""
83
return "Sample .bzr repository format."
85
def initialize(self, a_bzrdir, shared=False):
86
"""Initialize a repository in a BzrDir"""
87
t = a_bzrdir.get_repository_transport(self)
88
t.put_bytes('format', self.get_format_string())
89
return 'A bzr repository dir'
91
def is_supported(self):
94
def open(self, a_bzrdir, _found=False):
95
return "opened repository."
98
class TestRepositoryFormat(TestCaseWithTransport):
99
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
101
def test_find_format(self):
102
# is the right format object found for a repository?
103
# create a branch with a few known format objects.
104
# this is not quite the same as
105
self.build_tree(["foo/", "bar/"])
106
def check_format(format, url):
107
dir = format._matchingbzrdir.initialize(url)
108
format.initialize(dir)
109
t = get_transport(url)
110
found_format = repository.RepositoryFormat.find_format(dir)
111
self.failUnless(isinstance(found_format, format.__class__))
112
check_format(weaverepo.RepositoryFormat7(), "bar")
114
def test_find_format_no_repository(self):
115
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
116
self.assertRaises(errors.NoRepositoryPresent,
117
repository.RepositoryFormat.find_format,
120
def test_find_format_unknown_format(self):
121
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
122
SampleRepositoryFormat().initialize(dir)
123
self.assertRaises(UnknownFormatError,
124
repository.RepositoryFormat.find_format,
127
def test_register_unregister_format(self):
128
format = SampleRepositoryFormat()
130
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
132
format.initialize(dir)
133
# register a format for it.
134
repository.RepositoryFormat.register_format(format)
135
# which repository.Open will refuse (not supported)
136
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
137
# but open(unsupported) will work
138
self.assertEqual(format.open(dir), "opened repository.")
139
# unregister the format
140
repository.RepositoryFormat.unregister_format(format)
143
class TestFormat6(TestCaseWithTransport):
145
def test_no_ancestry_weave(self):
146
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
147
repo = weaverepo.RepositoryFormat6().initialize(control)
148
# We no longer need to create the ancestry.weave file
149
# since it is *never* used.
150
self.assertRaises(NoSuchFile,
151
control.transport.get,
155
class TestFormat7(TestCaseWithTransport):
157
def test_disk_layout(self):
158
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
159
repo = weaverepo.RepositoryFormat7().initialize(control)
160
# in case of side effects of locking.
164
# format 'Bazaar-NG Repository format 7'
166
# inventory.weave == empty_weave
167
# empty revision-store directory
168
# empty weaves directory
169
t = control.get_repository_transport(None)
170
self.assertEqualDiff('Bazaar-NG Repository format 7',
171
t.get('format').read())
172
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
173
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
174
self.assertEqualDiff('# bzr weave file v5\n'
177
t.get('inventory.weave').read())
179
def test_shared_disk_layout(self):
180
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
181
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
183
# format 'Bazaar-NG Repository format 7'
184
# inventory.weave == empty_weave
185
# empty revision-store directory
186
# empty weaves directory
187
# a 'shared-storage' marker file.
188
# lock is not present when unlocked
189
t = control.get_repository_transport(None)
190
self.assertEqualDiff('Bazaar-NG Repository format 7',
191
t.get('format').read())
192
self.assertEqualDiff('', t.get('shared-storage').read())
193
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
194
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
195
self.assertEqualDiff('# bzr weave file v5\n'
198
t.get('inventory.weave').read())
199
self.assertFalse(t.has('branch-lock'))
201
def test_creates_lockdir(self):
202
"""Make sure it appears to be controlled by a LockDir existence"""
203
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
204
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
205
t = control.get_repository_transport(None)
206
# TODO: Should check there is a 'lock' toplevel directory,
207
# regardless of contents
208
self.assertFalse(t.has('lock/held/info'))
211
self.assertTrue(t.has('lock/held/info'))
213
# unlock so we don't get a warning about failing to do so
216
def test_uses_lockdir(self):
217
"""repo format 7 actually locks on lockdir"""
218
base_url = self.get_url()
219
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
220
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
221
t = control.get_repository_transport(None)
225
# make sure the same lock is created by opening it
226
repo = repository.Repository.open(base_url)
228
self.assertTrue(t.has('lock/held/info'))
230
self.assertFalse(t.has('lock/held/info'))
232
def test_shared_no_tree_disk_layout(self):
233
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
234
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
235
repo.set_make_working_trees(False)
237
# format 'Bazaar-NG Repository format 7'
239
# inventory.weave == empty_weave
240
# empty revision-store directory
241
# empty weaves directory
242
# a 'shared-storage' marker file.
243
t = control.get_repository_transport(None)
244
self.assertEqualDiff('Bazaar-NG Repository format 7',
245
t.get('format').read())
246
## self.assertEqualDiff('', t.get('lock').read())
247
self.assertEqualDiff('', t.get('shared-storage').read())
248
self.assertEqualDiff('', t.get('no-working-trees').read())
249
repo.set_make_working_trees(True)
250
self.assertFalse(t.has('no-working-trees'))
251
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
252
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
253
self.assertEqualDiff('# bzr weave file v5\n'
256
t.get('inventory.weave').read())
259
class TestFormatKnit1(TestCaseWithTransport):
261
def test_disk_layout(self):
262
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
263
repo = repository.RepositoryFormatKnit1().initialize(control)
264
# in case of side effects of locking.
268
# format 'Bazaar-NG Knit Repository Format 1'
269
# lock: is a directory
270
# inventory.weave == empty_weave
271
# empty revision-store directory
272
# empty weaves directory
273
t = control.get_repository_transport(None)
274
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
275
t.get('format').read())
276
# XXX: no locks left when unlocked at the moment
277
# self.assertEqualDiff('', t.get('lock').read())
278
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
281
def assertHasKnit(self, t, knit_name):
282
"""Assert that knit_name exists on t."""
283
self.assertEqualDiff('# bzr knit index 8\n',
284
t.get(knit_name + '.kndx').read())
286
self.assertTrue(t.has(knit_name + '.knit'))
288
def check_knits(self, t):
289
"""check knit content for a repository."""
290
self.assertHasKnit(t, 'inventory')
291
self.assertHasKnit(t, 'revisions')
292
self.assertHasKnit(t, 'signatures')
294
def test_shared_disk_layout(self):
295
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
296
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
298
# format 'Bazaar-NG Knit Repository Format 1'
299
# lock: is a directory
300
# inventory.weave == empty_weave
301
# empty revision-store directory
302
# empty weaves directory
303
# a 'shared-storage' marker file.
304
t = control.get_repository_transport(None)
305
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
306
t.get('format').read())
307
# XXX: no locks left when unlocked at the moment
308
# self.assertEqualDiff('', t.get('lock').read())
309
self.assertEqualDiff('', t.get('shared-storage').read())
310
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
313
def test_shared_no_tree_disk_layout(self):
314
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
315
repo = repository.RepositoryFormatKnit1().initialize(control, shared=True)
316
repo.set_make_working_trees(False)
318
# format 'Bazaar-NG Knit Repository Format 1'
320
# inventory.weave == empty_weave
321
# empty revision-store directory
322
# empty weaves directory
323
# a 'shared-storage' marker file.
324
t = control.get_repository_transport(None)
325
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
326
t.get('format').read())
327
# XXX: no locks left when unlocked at the moment
328
# self.assertEqualDiff('', t.get('lock').read())
329
self.assertEqualDiff('', t.get('shared-storage').read())
330
self.assertEqualDiff('', t.get('no-working-trees').read())
331
repo.set_make_working_trees(True)
332
self.assertFalse(t.has('no-working-trees'))
333
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
337
class InterString(repository.InterRepository):
338
"""An inter-repository optimised code path for strings.
340
This is for use during testing where we use strings as repositories
341
so that none of the default regsitered inter-repository classes will
346
def is_compatible(repo_source, repo_target):
347
"""InterString is compatible with strings-as-repos."""
348
return isinstance(repo_source, str) and isinstance(repo_target, str)
351
class TestInterRepository(TestCaseWithTransport):
353
def test_get_default_inter_repository(self):
354
# test that the InterRepository.get(repo_a, repo_b) probes
355
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
356
# true and returns a default inter_repo otherwise.
357
# This also tests that the default registered optimised interrepository
358
# classes do not barf inappropriately when a surprising repository type
360
dummy_a = "Repository 1."
361
dummy_b = "Repository 2."
362
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
364
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
365
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default."""
366
inter_repo = repository.InterRepository.get(repo_a, repo_b)
367
self.assertEqual(repository.InterRepository,
368
inter_repo.__class__)
369
self.assertEqual(repo_a, inter_repo.source)
370
self.assertEqual(repo_b, inter_repo.target)
372
def test_register_inter_repository_class(self):
373
# test that a optimised code path provider - a
374
# InterRepository subclass can be registered and unregistered
375
# and that it is correctly selected when given a repository
376
# pair that it returns true on for the is_compatible static method
378
dummy_a = "Repository 1."
379
dummy_b = "Repository 2."
380
repository.InterRepository.register_optimiser(InterString)
382
# we should get the default for something InterString returns False
384
self.assertFalse(InterString.is_compatible(dummy_a, None))
385
self.assertGetsDefaultInterRepository(dummy_a, None)
386
# and we should get an InterString for a pair it 'likes'
387
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
388
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
389
self.assertEqual(InterString, inter_repo.__class__)
390
self.assertEqual(dummy_a, inter_repo.source)
391
self.assertEqual(dummy_b, inter_repo.target)
393
repository.InterRepository.unregister_optimiser(InterString)
394
# now we should get the default InterRepository object again.
395
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
398
class TestRepositoryConverter(TestCaseWithTransport):
400
def test_convert_empty(self):
401
t = get_transport(self.get_url('.'))
402
t.mkdir('repository')
403
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
404
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
405
target_format = repository.RepositoryFormatKnit1()
406
converter = repository.CopyConverter(target_format)
407
pb = bzrlib.ui.ui_factory.nested_progress_bar()
409
converter.convert(repo, pb)
412
repo = repo_dir.open_repository()
413
self.assertTrue(isinstance(target_format, repo._format.__class__))
416
class TestMisc(TestCase):
418
def test_unescape_xml(self):
419
"""We get some kind of error when malformed entities are passed"""
420
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
423
class TestRepositoryFormatKnit2(TestCaseWithTransport):
425
def test_convert(self):
426
"""Ensure the upgrade adds weaves for roots"""
427
format = bzrdir.BzrDirMetaFormat1()
428
format.repository_format = repository.RepositoryFormatKnit1()
429
tree = self.make_branch_and_tree('.', format)
430
tree.commit("Dull commit", rev_id="dull")
431
revision_tree = tree.branch.repository.revision_tree('dull')
432
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
433
revision_tree.inventory.root.file_id)
434
format = bzrdir.BzrDirMetaFormat1()
435
format.repository_format = knitrepo.RepositoryFormatKnit2()
436
upgrade.Convert('.', format)
437
tree = workingtree.WorkingTree.open('.')
438
revision_tree = tree.branch.repository.revision_tree('dull')
439
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
440
tree.commit("Another dull commit", rev_id='dull2')
441
revision_tree = tree.branch.repository.revision_tree('dull2')
442
self.assertEqual('dull', revision_tree.inventory.root.revision)