1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
29
from bzrlib.errors import (NotBranchError,
32
UnsupportedFormatError,
34
from bzrlib.repository import RepositoryFormat
35
from bzrlib.smart import server
36
from bzrlib.tests import (
38
TestCaseWithTransport,
41
from bzrlib.transport import get_transport
42
from bzrlib.transport.memory import MemoryServer
43
from bzrlib.util import bencode
49
revision as _mod_revision,
54
from bzrlib.repofmt import knitrepo, weaverepo
57
class TestDefaultFormat(TestCase):
59
def test_get_set_default_format(self):
60
old_default = bzrdir.format_registry.get('default')
61
private_default = old_default().repository_format.__class__
62
old_format = repository.RepositoryFormat.get_default_format()
63
self.assertTrue(isinstance(old_format, private_default))
64
def make_sample_bzrdir():
65
my_bzrdir = bzrdir.BzrDirMetaFormat1()
66
my_bzrdir.repository_format = SampleRepositoryFormat()
68
bzrdir.format_registry.remove('default')
69
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
70
bzrdir.format_registry.set_default('sample')
71
# creating a repository should now create an instrumented dir.
73
# the default branch format is used by the meta dir format
74
# which is not the default bzrdir format at this point
75
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
76
result = dir.create_repository()
77
self.assertEqual(result, 'A bzr repository dir')
79
bzrdir.format_registry.remove('default')
80
bzrdir.format_registry.remove('sample')
81
bzrdir.format_registry.register('default', old_default, '')
82
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
86
class SampleRepositoryFormat(repository.RepositoryFormat):
89
this format is initializable, unsupported to aid in testing the
90
open and open(unsupported=True) routines.
93
def get_format_string(self):
94
"""See RepositoryFormat.get_format_string()."""
95
return "Sample .bzr repository format."
97
def initialize(self, a_bzrdir, shared=False):
98
"""Initialize a repository in a BzrDir"""
99
t = a_bzrdir.get_repository_transport(self)
100
t.put_bytes('format', self.get_format_string())
101
return 'A bzr repository dir'
103
def is_supported(self):
106
def open(self, a_bzrdir, _found=False):
107
return "opened repository."
110
class TestRepositoryFormat(TestCaseWithTransport):
111
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
113
def test_find_format(self):
114
# is the right format object found for a repository?
115
# create a branch with a few known format objects.
116
# this is not quite the same as
117
self.build_tree(["foo/", "bar/"])
118
def check_format(format, url):
119
dir = format._matchingbzrdir.initialize(url)
120
format.initialize(dir)
121
t = get_transport(url)
122
found_format = repository.RepositoryFormat.find_format(dir)
123
self.failUnless(isinstance(found_format, format.__class__))
124
check_format(weaverepo.RepositoryFormat7(), "bar")
126
def test_find_format_no_repository(self):
127
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
128
self.assertRaises(errors.NoRepositoryPresent,
129
repository.RepositoryFormat.find_format,
132
def test_find_format_unknown_format(self):
133
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
134
SampleRepositoryFormat().initialize(dir)
135
self.assertRaises(UnknownFormatError,
136
repository.RepositoryFormat.find_format,
139
def test_register_unregister_format(self):
140
format = SampleRepositoryFormat()
142
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
144
format.initialize(dir)
145
# register a format for it.
146
repository.RepositoryFormat.register_format(format)
147
# which repository.Open will refuse (not supported)
148
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
149
# but open(unsupported) will work
150
self.assertEqual(format.open(dir), "opened repository.")
151
# unregister the format
152
repository.RepositoryFormat.unregister_format(format)
155
class TestFormat6(TestCaseWithTransport):
157
def test_no_ancestry_weave(self):
158
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
159
repo = weaverepo.RepositoryFormat6().initialize(control)
160
# We no longer need to create the ancestry.weave file
161
# since it is *never* used.
162
self.assertRaises(NoSuchFile,
163
control.transport.get,
166
def test_exposed_versioned_files_are_marked_dirty(self):
167
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
168
repo = weaverepo.RepositoryFormat6().initialize(control)
170
inv = repo.get_inventory_weave()
172
self.assertRaises(errors.OutSideTransaction,
173
inv.add_lines, 'foo', [], [])
176
class TestFormat7(TestCaseWithTransport):
178
def test_disk_layout(self):
179
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
180
repo = weaverepo.RepositoryFormat7().initialize(control)
181
# in case of side effects of locking.
185
# format 'Bazaar-NG Repository format 7'
187
# inventory.weave == empty_weave
188
# empty revision-store directory
189
# empty weaves directory
190
t = control.get_repository_transport(None)
191
self.assertEqualDiff('Bazaar-NG Repository format 7',
192
t.get('format').read())
193
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
194
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
195
self.assertEqualDiff('# bzr weave file v5\n'
198
t.get('inventory.weave').read())
200
def test_shared_disk_layout(self):
201
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
202
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
204
# format 'Bazaar-NG Repository format 7'
205
# inventory.weave == empty_weave
206
# empty revision-store directory
207
# empty weaves directory
208
# a 'shared-storage' marker file.
209
# lock is not present when unlocked
210
t = control.get_repository_transport(None)
211
self.assertEqualDiff('Bazaar-NG Repository format 7',
212
t.get('format').read())
213
self.assertEqualDiff('', t.get('shared-storage').read())
214
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
215
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
216
self.assertEqualDiff('# bzr weave file v5\n'
219
t.get('inventory.weave').read())
220
self.assertFalse(t.has('branch-lock'))
222
def test_creates_lockdir(self):
223
"""Make sure it appears to be controlled by a LockDir existence"""
224
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
225
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
226
t = control.get_repository_transport(None)
227
# TODO: Should check there is a 'lock' toplevel directory,
228
# regardless of contents
229
self.assertFalse(t.has('lock/held/info'))
232
self.assertTrue(t.has('lock/held/info'))
234
# unlock so we don't get a warning about failing to do so
237
def test_uses_lockdir(self):
238
"""repo format 7 actually locks on lockdir"""
239
base_url = self.get_url()
240
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
241
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
242
t = control.get_repository_transport(None)
246
# make sure the same lock is created by opening it
247
repo = repository.Repository.open(base_url)
249
self.assertTrue(t.has('lock/held/info'))
251
self.assertFalse(t.has('lock/held/info'))
253
def test_shared_no_tree_disk_layout(self):
254
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
255
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
256
repo.set_make_working_trees(False)
258
# format 'Bazaar-NG Repository format 7'
260
# inventory.weave == empty_weave
261
# empty revision-store directory
262
# empty weaves directory
263
# a 'shared-storage' marker file.
264
t = control.get_repository_transport(None)
265
self.assertEqualDiff('Bazaar-NG Repository format 7',
266
t.get('format').read())
267
## self.assertEqualDiff('', t.get('lock').read())
268
self.assertEqualDiff('', t.get('shared-storage').read())
269
self.assertEqualDiff('', t.get('no-working-trees').read())
270
repo.set_make_working_trees(True)
271
self.assertFalse(t.has('no-working-trees'))
272
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
273
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
274
self.assertEqualDiff('# bzr weave file v5\n'
277
t.get('inventory.weave').read())
279
def test_exposed_versioned_files_are_marked_dirty(self):
280
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
281
repo = weaverepo.RepositoryFormat7().initialize(control)
283
inv = repo.get_inventory_weave()
285
self.assertRaises(errors.OutSideTransaction,
286
inv.add_lines, 'foo', [], [])
289
class TestFormatKnit1(TestCaseWithTransport):
291
def test_disk_layout(self):
292
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
293
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
294
# in case of side effects of locking.
298
# format 'Bazaar-NG Knit Repository Format 1'
299
# lock: is a directory
300
# inventory.weave == empty_weave
301
# empty revision-store directory
302
# empty weaves directory
303
t = control.get_repository_transport(None)
304
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
305
t.get('format').read())
306
# XXX: no locks left when unlocked at the moment
307
# self.assertEqualDiff('', t.get('lock').read())
308
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
311
def assertHasKnit(self, t, knit_name):
312
"""Assert that knit_name exists on t."""
313
self.assertEqualDiff('# bzr knit index 8\n',
314
t.get(knit_name + '.kndx').read())
316
self.assertTrue(t.has(knit_name + '.knit'))
318
def check_knits(self, t):
319
"""check knit content for a repository."""
320
self.assertHasKnit(t, 'inventory')
321
self.assertHasKnit(t, 'revisions')
322
self.assertHasKnit(t, 'signatures')
324
def test_shared_disk_layout(self):
325
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
326
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
328
# format 'Bazaar-NG Knit Repository Format 1'
329
# lock: is a directory
330
# inventory.weave == empty_weave
331
# empty revision-store directory
332
# empty weaves directory
333
# a 'shared-storage' marker file.
334
t = control.get_repository_transport(None)
335
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
336
t.get('format').read())
337
# XXX: no locks left when unlocked at the moment
338
# self.assertEqualDiff('', t.get('lock').read())
339
self.assertEqualDiff('', t.get('shared-storage').read())
340
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
343
def test_shared_no_tree_disk_layout(self):
344
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
345
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
346
repo.set_make_working_trees(False)
348
# format 'Bazaar-NG Knit Repository Format 1'
350
# inventory.weave == empty_weave
351
# empty revision-store directory
352
# empty weaves directory
353
# a 'shared-storage' marker file.
354
t = control.get_repository_transport(None)
355
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
356
t.get('format').read())
357
# XXX: no locks left when unlocked at the moment
358
# self.assertEqualDiff('', t.get('lock').read())
359
self.assertEqualDiff('', t.get('shared-storage').read())
360
self.assertEqualDiff('', t.get('no-working-trees').read())
361
repo.set_make_working_trees(True)
362
self.assertFalse(t.has('no-working-trees'))
363
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
366
def test_exposed_versioned_files_are_marked_dirty(self):
367
format = bzrdir.BzrDirMetaFormat1()
368
format.repository_format = knitrepo.RepositoryFormatKnit1()
369
repo = self.make_repository('.', format=format)
371
inv = repo.get_inventory_weave()
373
self.assertRaises(errors.OutSideTransaction,
374
inv.add_lines, 'foo', [], [])
377
class KnitRepositoryStreamTests(test_knit.KnitTests):
378
"""Tests for knitrepo._get_stream_as_bytes."""
380
def test_get_stream_as_bytes(self):
382
k1 = self.make_test_knit()
383
k1.add_lines('text-a', [], test_knit.split_lines(test_knit.TEXT_1))
385
# Serialise it, check the output.
386
bytes = knitrepo._get_stream_as_bytes(k1, ['text-a'])
387
data = bencode.bdecode(bytes)
388
format, record = data
389
self.assertEqual('knit-plain', format)
390
self.assertEqual(['text-a', ['fulltext'], []], record[:3])
391
self.assertRecordContentEqual(k1, 'text-a', record[3])
393
def test_get_stream_as_bytes_all(self):
394
"""Get a serialised data stream for all the records in a knit.
396
Much like test_get_stream_all, except for get_stream_as_bytes.
398
k1 = self.make_test_knit()
399
# Insert the same data as BasicKnitTests.test_knit_join, as they seem
400
# to cover a range of cases (no parents, one parent, multiple parents).
402
('text-a', [], test_knit.TEXT_1),
403
('text-b', ['text-a'], test_knit.TEXT_1),
404
('text-c', [], test_knit.TEXT_1),
405
('text-d', ['text-c'], test_knit.TEXT_1),
406
('text-m', ['text-b', 'text-d'], test_knit.TEXT_1),
408
expected_data_list = [
409
# version, options, parents
410
('text-a', ['fulltext'], []),
411
('text-b', ['line-delta'], ['text-a']),
412
('text-c', ['fulltext'], []),
413
('text-d', ['line-delta'], ['text-c']),
414
('text-m', ['line-delta'], ['text-b', 'text-d']),
416
for version_id, parents, lines in test_data:
417
k1.add_lines(version_id, parents, test_knit.split_lines(lines))
419
bytes = knitrepo._get_stream_as_bytes(
420
k1, ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
422
data = bencode.bdecode(bytes)
424
self.assertEqual('knit-plain', format)
426
for expected, actual in zip(expected_data_list, data):
427
expected_version = expected[0]
428
expected_options = expected[1]
429
expected_parents = expected[2]
430
version, options, parents, bytes = actual
431
self.assertEqual(expected_version, version)
432
self.assertEqual(expected_options, options)
433
self.assertEqual(expected_parents, parents)
434
self.assertRecordContentEqual(k1, version, bytes)
437
class DummyRepository(object):
438
"""A dummy repository for testing."""
442
def supports_rich_root(self):
446
class InterDummy(repository.InterRepository):
447
"""An inter-repository optimised code path for DummyRepository.
449
This is for use during testing where we use DummyRepository as repositories
450
so that none of the default regsitered inter-repository classes will
455
def is_compatible(repo_source, repo_target):
456
"""InterDummy is compatible with DummyRepository."""
457
return (isinstance(repo_source, DummyRepository) and
458
isinstance(repo_target, DummyRepository))
461
class TestInterRepository(TestCaseWithTransport):
463
def test_get_default_inter_repository(self):
464
# test that the InterRepository.get(repo_a, repo_b) probes
465
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
466
# true and returns a default inter_repo otherwise.
467
# This also tests that the default registered optimised interrepository
468
# classes do not barf inappropriately when a surprising repository type
470
dummy_a = DummyRepository()
471
dummy_b = DummyRepository()
472
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
474
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
475
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
477
The effective default is now InterSameDataRepository because there is
478
no actual sane default in the presence of incompatible data models.
480
inter_repo = repository.InterRepository.get(repo_a, repo_b)
481
self.assertEqual(repository.InterSameDataRepository,
482
inter_repo.__class__)
483
self.assertEqual(repo_a, inter_repo.source)
484
self.assertEqual(repo_b, inter_repo.target)
486
def test_register_inter_repository_class(self):
487
# test that a optimised code path provider - a
488
# InterRepository subclass can be registered and unregistered
489
# and that it is correctly selected when given a repository
490
# pair that it returns true on for the is_compatible static method
492
dummy_a = DummyRepository()
493
dummy_b = DummyRepository()
494
repo = self.make_repository('.')
495
# hack dummies to look like repo somewhat.
496
dummy_a._serializer = repo._serializer
497
dummy_b._serializer = repo._serializer
498
repository.InterRepository.register_optimiser(InterDummy)
500
# we should get the default for something InterDummy returns False
502
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
503
self.assertGetsDefaultInterRepository(dummy_a, repo)
504
# and we should get an InterDummy for a pair it 'likes'
505
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
506
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
507
self.assertEqual(InterDummy, inter_repo.__class__)
508
self.assertEqual(dummy_a, inter_repo.source)
509
self.assertEqual(dummy_b, inter_repo.target)
511
repository.InterRepository.unregister_optimiser(InterDummy)
512
# now we should get the default InterRepository object again.
513
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
516
class TestInterWeaveRepo(TestCaseWithTransport):
518
def test_is_compatible_and_registered(self):
519
# InterWeaveRepo is compatible when either side
520
# is a format 5/6/7 branch
521
from bzrlib.repofmt import knitrepo, weaverepo
522
formats = [weaverepo.RepositoryFormat5(),
523
weaverepo.RepositoryFormat6(),
524
weaverepo.RepositoryFormat7()]
525
incompatible_formats = [weaverepo.RepositoryFormat4(),
526
knitrepo.RepositoryFormatKnit1(),
528
repo_a = self.make_repository('a')
529
repo_b = self.make_repository('b')
530
is_compatible = repository.InterWeaveRepo.is_compatible
531
for source in incompatible_formats:
532
# force incompatible left then right
533
repo_a._format = source
534
repo_b._format = formats[0]
535
self.assertFalse(is_compatible(repo_a, repo_b))
536
self.assertFalse(is_compatible(repo_b, repo_a))
537
for source in formats:
538
repo_a._format = source
539
for target in formats:
540
repo_b._format = target
541
self.assertTrue(is_compatible(repo_a, repo_b))
542
self.assertEqual(repository.InterWeaveRepo,
543
repository.InterRepository.get(repo_a,
547
class TestInterRemoteToOther(TestCaseWithTransport):
549
def make_remote_repository(self, path, backing_format=None):
550
"""Make a RemoteRepository object backed by a real repository that will
551
be created at the given path."""
552
self.make_repository(path, format=backing_format)
553
smart_server = server.SmartTCPServer_for_testing()
555
remote_transport = get_transport(smart_server.get_url()).clone(path)
556
self.addCleanup(smart_server.tearDown)
557
remote_bzrdir = bzrdir.BzrDir.open_from_transport(remote_transport)
558
remote_repo = remote_bzrdir.open_repository()
561
def test_is_compatible_same_format(self):
562
"""InterRemoteToOther is compatible with a remote repository and a
563
second repository that have the same format."""
564
local_repo = self.make_repository('local')
565
remote_repo = self.make_remote_repository('remote')
566
is_compatible = repository.InterRemoteToOther.is_compatible
568
is_compatible(remote_repo, local_repo),
569
"InterRemoteToOther(%r, %r) is false" % (remote_repo, local_repo))
571
def test_is_incompatible_different_format(self):
572
local_repo = self.make_repository('local', 'dirstate')
573
remote_repo = self.make_remote_repository('a', 'dirstate-with-subtree')
574
is_compatible = repository.InterRemoteToOther.is_compatible
576
is_compatible(remote_repo, local_repo),
577
"InterRemoteToOther(%r, %r) is true" % (local_repo, remote_repo))
579
def test_is_incompatible_different_format_both_remote(self):
580
remote_repo_a = self.make_remote_repository(
581
'a', 'dirstate-with-subtree')
582
remote_repo_b = self.make_remote_repository('b', 'dirstate')
583
is_compatible = repository.InterRemoteToOther.is_compatible
585
is_compatible(remote_repo_a, remote_repo_b),
586
"InterRemoteToOther(%r, %r) is true"
587
% (remote_repo_a, remote_repo_b))
590
class TestRepositoryConverter(TestCaseWithTransport):
592
def test_convert_empty(self):
593
t = get_transport(self.get_url('.'))
594
t.mkdir('repository')
595
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
596
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
597
target_format = knitrepo.RepositoryFormatKnit1()
598
converter = repository.CopyConverter(target_format)
599
pb = bzrlib.ui.ui_factory.nested_progress_bar()
601
converter.convert(repo, pb)
604
repo = repo_dir.open_repository()
605
self.assertTrue(isinstance(target_format, repo._format.__class__))
608
class TestMisc(TestCase):
610
def test_unescape_xml(self):
611
"""We get some kind of error when malformed entities are passed"""
612
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
615
class TestRepositoryFormatKnit3(TestCaseWithTransport):
617
def test_convert(self):
618
"""Ensure the upgrade adds weaves for roots"""
619
format = bzrdir.BzrDirMetaFormat1()
620
format.repository_format = knitrepo.RepositoryFormatKnit1()
621
tree = self.make_branch_and_tree('.', format)
622
tree.commit("Dull commit", rev_id="dull")
623
revision_tree = tree.branch.repository.revision_tree('dull')
624
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
625
revision_tree.inventory.root.file_id)
626
format = bzrdir.BzrDirMetaFormat1()
627
format.repository_format = knitrepo.RepositoryFormatKnit3()
628
upgrade.Convert('.', format)
629
tree = workingtree.WorkingTree.open('.')
630
revision_tree = tree.branch.repository.revision_tree('dull')
631
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
632
tree.commit("Another dull commit", rev_id='dull2')
633
revision_tree = tree.branch.repository.revision_tree('dull2')
634
self.assertEqual('dull', revision_tree.inventory.root.revision)
636
def test_exposed_versioned_files_are_marked_dirty(self):
637
format = bzrdir.BzrDirMetaFormat1()
638
format.repository_format = knitrepo.RepositoryFormatKnit3()
639
repo = self.make_repository('.', format=format)
641
inv = repo.get_inventory_weave()
643
self.assertRaises(errors.OutSideTransaction,
644
inv.add_lines, 'foo', [], [])
647
class TestWithBrokenRepo(TestCaseWithTransport):
649
def make_broken_repository(self):
650
# XXX: This function is borrowed from Aaron's "Reconcile can fix bad
651
# parent references" branch which is due to land in bzr.dev soon. Once
652
# it does, this duplication should be removed.
653
repo = self.make_repository('broken-repo')
657
cleanups.append(repo.unlock)
658
repo.start_write_group()
659
cleanups.append(repo.commit_write_group)
660
# make rev1a: A well-formed revision, containing 'file1'
661
inv = inventory.Inventory(revision_id='rev1a')
662
inv.root.revision = 'rev1a'
663
self.add_file(repo, inv, 'file1', 'rev1a', [])
664
repo.add_inventory('rev1a', inv, [])
665
revision = _mod_revision.Revision('rev1a',
666
committer='jrandom@example.com', timestamp=0,
667
inventory_sha1='', timezone=0, message='foo', parent_ids=[])
668
repo.add_revision('rev1a',revision, inv)
670
# make rev1b, which has no Revision, but has an Inventory, and
672
inv = inventory.Inventory(revision_id='rev1b')
673
inv.root.revision = 'rev1b'
674
self.add_file(repo, inv, 'file1', 'rev1b', [])
675
repo.add_inventory('rev1b', inv, [])
677
# make rev2, with file1 and file2
679
# file1 has 'rev1b' as an ancestor, even though this is not
680
# mentioned by 'rev1a', making it an unreferenced ancestor
681
inv = inventory.Inventory()
682
self.add_file(repo, inv, 'file1', 'rev2', ['rev1a', 'rev1b'])
683
self.add_file(repo, inv, 'file2', 'rev2', [])
684
self.add_revision(repo, 'rev2', inv, ['rev1a'])
686
# make ghost revision rev1c
687
inv = inventory.Inventory()
688
self.add_file(repo, inv, 'file2', 'rev1c', [])
690
# make rev3 with file2
691
# file2 refers to 'rev1c', which is a ghost in this repository, so
692
# file2 cannot have rev1c as its ancestor.
693
inv = inventory.Inventory()
694
self.add_file(repo, inv, 'file2', 'rev3', ['rev1c'])
695
self.add_revision(repo, 'rev3', inv, ['rev1c'])
698
for cleanup in reversed(cleanups):
701
def add_revision(self, repo, revision_id, inv, parent_ids):
702
inv.revision_id = revision_id
703
inv.root.revision = revision_id
704
repo.add_inventory(revision_id, inv, parent_ids)
705
revision = _mod_revision.Revision(revision_id,
706
committer='jrandom@example.com', timestamp=0, inventory_sha1='',
707
timezone=0, message='foo', parent_ids=parent_ids)
708
repo.add_revision(revision_id,revision, inv)
710
def add_file(self, repo, inv, filename, revision, parents):
711
file_id = filename + '-id'
712
entry = inventory.InventoryFile(file_id, filename, 'TREE_ROOT')
713
entry.revision = revision
716
vf = repo.weave_store.get_weave_or_empty(file_id,
717
repo.get_transaction())
718
vf.add_lines(revision, parents, ['line\n'])
720
def test_insert_from_broken_repo(self):
721
"""Inserting a data stream from a broken repository won't silently
722
corrupt the target repository.
724
broken_repo = self.make_broken_repository()
725
empty_repo = self.make_repository('empty-repo')
726
stream = broken_repo.get_data_stream(['rev1a', 'rev2', 'rev3'])
728
errors.KnitCorrupt, empty_repo.insert_data_stream, stream)