1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the Repository facility that are not interface tests.
19
For interface tests see tests/repository_implementations/*.py.
21
For concrete class tests see this file, and for storage formats tests
25
from stat import S_ISDIR
26
from StringIO import StringIO
29
from bzrlib.errors import (NotBranchError,
32
UnsupportedFormatError,
34
from bzrlib.repository import RepositoryFormat
35
from bzrlib.smart import server
36
from bzrlib.tests import (
38
TestCaseWithTransport,
41
from bzrlib.transport import get_transport
42
from bzrlib.transport.memory import MemoryServer
43
from bzrlib.util import bencode
49
revision as _mod_revision,
54
from bzrlib.repofmt import knitrepo, weaverepo
57
class TestDefaultFormat(TestCase):
59
def test_get_set_default_format(self):
60
old_default = bzrdir.format_registry.get('default')
61
private_default = old_default().repository_format.__class__
62
old_format = repository.RepositoryFormat.get_default_format()
63
self.assertTrue(isinstance(old_format, private_default))
64
def make_sample_bzrdir():
65
my_bzrdir = bzrdir.BzrDirMetaFormat1()
66
my_bzrdir.repository_format = SampleRepositoryFormat()
68
bzrdir.format_registry.remove('default')
69
bzrdir.format_registry.register('sample', make_sample_bzrdir, '')
70
bzrdir.format_registry.set_default('sample')
71
# creating a repository should now create an instrumented dir.
73
# the default branch format is used by the meta dir format
74
# which is not the default bzrdir format at this point
75
dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
76
result = dir.create_repository()
77
self.assertEqual(result, 'A bzr repository dir')
79
bzrdir.format_registry.remove('default')
80
bzrdir.format_registry.remove('sample')
81
bzrdir.format_registry.register('default', old_default, '')
82
self.assertIsInstance(repository.RepositoryFormat.get_default_format(),
86
class SampleRepositoryFormat(repository.RepositoryFormat):
89
this format is initializable, unsupported to aid in testing the
90
open and open(unsupported=True) routines.
93
def get_format_string(self):
94
"""See RepositoryFormat.get_format_string()."""
95
return "Sample .bzr repository format."
97
def initialize(self, a_bzrdir, shared=False):
98
"""Initialize a repository in a BzrDir"""
99
t = a_bzrdir.get_repository_transport(self)
100
t.put_bytes('format', self.get_format_string())
101
return 'A bzr repository dir'
103
def is_supported(self):
106
def open(self, a_bzrdir, _found=False):
107
return "opened repository."
110
class TestRepositoryFormat(TestCaseWithTransport):
111
"""Tests for the Repository format detection used by the bzr meta dir facility.BzrBranchFormat facility."""
113
def test_find_format(self):
114
# is the right format object found for a repository?
115
# create a branch with a few known format objects.
116
# this is not quite the same as
117
self.build_tree(["foo/", "bar/"])
118
def check_format(format, url):
119
dir = format._matchingbzrdir.initialize(url)
120
format.initialize(dir)
121
t = get_transport(url)
122
found_format = repository.RepositoryFormat.find_format(dir)
123
self.failUnless(isinstance(found_format, format.__class__))
124
check_format(weaverepo.RepositoryFormat7(), "bar")
126
def test_find_format_no_repository(self):
127
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
128
self.assertRaises(errors.NoRepositoryPresent,
129
repository.RepositoryFormat.find_format,
132
def test_find_format_unknown_format(self):
133
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
134
SampleRepositoryFormat().initialize(dir)
135
self.assertRaises(UnknownFormatError,
136
repository.RepositoryFormat.find_format,
139
def test_register_unregister_format(self):
140
format = SampleRepositoryFormat()
142
dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
144
format.initialize(dir)
145
# register a format for it.
146
repository.RepositoryFormat.register_format(format)
147
# which repository.Open will refuse (not supported)
148
self.assertRaises(UnsupportedFormatError, repository.Repository.open, self.get_url())
149
# but open(unsupported) will work
150
self.assertEqual(format.open(dir), "opened repository.")
151
# unregister the format
152
repository.RepositoryFormat.unregister_format(format)
155
class TestFormat6(TestCaseWithTransport):
157
def test_no_ancestry_weave(self):
158
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
159
repo = weaverepo.RepositoryFormat6().initialize(control)
160
# We no longer need to create the ancestry.weave file
161
# since it is *never* used.
162
self.assertRaises(NoSuchFile,
163
control.transport.get,
166
def test_exposed_versioned_files_are_marked_dirty(self):
167
control = bzrdir.BzrDirFormat6().initialize(self.get_url())
168
repo = weaverepo.RepositoryFormat6().initialize(control)
170
inv = repo.get_inventory_weave()
172
self.assertRaises(errors.OutSideTransaction,
173
inv.add_lines, 'foo', [], [])
176
class TestFormat7(TestCaseWithTransport):
178
def test_disk_layout(self):
179
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
180
repo = weaverepo.RepositoryFormat7().initialize(control)
181
# in case of side effects of locking.
185
# format 'Bazaar-NG Repository format 7'
187
# inventory.weave == empty_weave
188
# empty revision-store directory
189
# empty weaves directory
190
t = control.get_repository_transport(None)
191
self.assertEqualDiff('Bazaar-NG Repository format 7',
192
t.get('format').read())
193
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
194
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
195
self.assertEqualDiff('# bzr weave file v5\n'
198
t.get('inventory.weave').read())
200
def test_shared_disk_layout(self):
201
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
202
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
204
# format 'Bazaar-NG Repository format 7'
205
# inventory.weave == empty_weave
206
# empty revision-store directory
207
# empty weaves directory
208
# a 'shared-storage' marker file.
209
# lock is not present when unlocked
210
t = control.get_repository_transport(None)
211
self.assertEqualDiff('Bazaar-NG Repository format 7',
212
t.get('format').read())
213
self.assertEqualDiff('', t.get('shared-storage').read())
214
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
215
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
216
self.assertEqualDiff('# bzr weave file v5\n'
219
t.get('inventory.weave').read())
220
self.assertFalse(t.has('branch-lock'))
222
def test_creates_lockdir(self):
223
"""Make sure it appears to be controlled by a LockDir existence"""
224
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
225
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
226
t = control.get_repository_transport(None)
227
# TODO: Should check there is a 'lock' toplevel directory,
228
# regardless of contents
229
self.assertFalse(t.has('lock/held/info'))
232
self.assertTrue(t.has('lock/held/info'))
234
# unlock so we don't get a warning about failing to do so
237
def test_uses_lockdir(self):
238
"""repo format 7 actually locks on lockdir"""
239
base_url = self.get_url()
240
control = bzrdir.BzrDirMetaFormat1().initialize(base_url)
241
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
242
t = control.get_repository_transport(None)
246
# make sure the same lock is created by opening it
247
repo = repository.Repository.open(base_url)
249
self.assertTrue(t.has('lock/held/info'))
251
self.assertFalse(t.has('lock/held/info'))
253
def test_shared_no_tree_disk_layout(self):
254
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
255
repo = weaverepo.RepositoryFormat7().initialize(control, shared=True)
256
repo.set_make_working_trees(False)
258
# format 'Bazaar-NG Repository format 7'
260
# inventory.weave == empty_weave
261
# empty revision-store directory
262
# empty weaves directory
263
# a 'shared-storage' marker file.
264
t = control.get_repository_transport(None)
265
self.assertEqualDiff('Bazaar-NG Repository format 7',
266
t.get('format').read())
267
## self.assertEqualDiff('', t.get('lock').read())
268
self.assertEqualDiff('', t.get('shared-storage').read())
269
self.assertEqualDiff('', t.get('no-working-trees').read())
270
repo.set_make_working_trees(True)
271
self.assertFalse(t.has('no-working-trees'))
272
self.assertTrue(S_ISDIR(t.stat('revision-store').st_mode))
273
self.assertTrue(S_ISDIR(t.stat('weaves').st_mode))
274
self.assertEqualDiff('# bzr weave file v5\n'
277
t.get('inventory.weave').read())
279
def test_exposed_versioned_files_are_marked_dirty(self):
280
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
281
repo = weaverepo.RepositoryFormat7().initialize(control)
283
inv = repo.get_inventory_weave()
285
self.assertRaises(errors.OutSideTransaction,
286
inv.add_lines, 'foo', [], [])
289
class TestFormatKnit1(TestCaseWithTransport):
291
def test_disk_layout(self):
292
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
293
repo = knitrepo.RepositoryFormatKnit1().initialize(control)
294
# in case of side effects of locking.
298
# format 'Bazaar-NG Knit Repository Format 1'
299
# lock: is a directory
300
# inventory.weave == empty_weave
301
# empty revision-store directory
302
# empty weaves directory
303
t = control.get_repository_transport(None)
304
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
305
t.get('format').read())
306
# XXX: no locks left when unlocked at the moment
307
# self.assertEqualDiff('', t.get('lock').read())
308
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
311
def assertHasKnit(self, t, knit_name):
312
"""Assert that knit_name exists on t."""
313
self.assertEqualDiff('# bzr knit index 8\n',
314
t.get(knit_name + '.kndx').read())
316
self.assertTrue(t.has(knit_name + '.knit'))
318
def check_knits(self, t):
319
"""check knit content for a repository."""
320
self.assertHasKnit(t, 'inventory')
321
self.assertHasKnit(t, 'revisions')
322
self.assertHasKnit(t, 'signatures')
324
def test_shared_disk_layout(self):
325
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
326
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
328
# format 'Bazaar-NG Knit Repository Format 1'
329
# lock: is a directory
330
# inventory.weave == empty_weave
331
# empty revision-store directory
332
# empty weaves directory
333
# a 'shared-storage' marker file.
334
t = control.get_repository_transport(None)
335
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
336
t.get('format').read())
337
# XXX: no locks left when unlocked at the moment
338
# self.assertEqualDiff('', t.get('lock').read())
339
self.assertEqualDiff('', t.get('shared-storage').read())
340
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
343
def test_shared_no_tree_disk_layout(self):
344
control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
345
repo = knitrepo.RepositoryFormatKnit1().initialize(control, shared=True)
346
repo.set_make_working_trees(False)
348
# format 'Bazaar-NG Knit Repository Format 1'
350
# inventory.weave == empty_weave
351
# empty revision-store directory
352
# empty weaves directory
353
# a 'shared-storage' marker file.
354
t = control.get_repository_transport(None)
355
self.assertEqualDiff('Bazaar-NG Knit Repository Format 1',
356
t.get('format').read())
357
# XXX: no locks left when unlocked at the moment
358
# self.assertEqualDiff('', t.get('lock').read())
359
self.assertEqualDiff('', t.get('shared-storage').read())
360
self.assertEqualDiff('', t.get('no-working-trees').read())
361
repo.set_make_working_trees(True)
362
self.assertFalse(t.has('no-working-trees'))
363
self.assertTrue(S_ISDIR(t.stat('knits').st_mode))
366
def test_exposed_versioned_files_are_marked_dirty(self):
367
format = bzrdir.BzrDirMetaFormat1()
368
format.repository_format = knitrepo.RepositoryFormatKnit1()
369
repo = self.make_repository('.', format=format)
371
inv = repo.get_inventory_weave()
373
self.assertRaises(errors.OutSideTransaction,
374
inv.add_lines, 'foo', [], [])
376
def test_deserialise_sets_root_revision(self):
377
"""We must have a inventory.root.revision
379
Old versions of the XML5 serializer did not set the revision_id for
380
the whole inventory. So we grab the one from the expected text. Which
381
is valid when the api is not being abused.
383
repo = self.make_repository('.',
384
format=bzrdir.format_registry.get('knit')())
385
inv_xml = '<inventory format="5">\n</inventory>\n'
386
inv = repo.deserialise_inventory('test-rev-id', inv_xml)
387
self.assertEqual('test-rev-id', inv.root.revision)
389
def test_deserialise_uses_global_revision_id(self):
390
"""If it is set, then we re-use the global revision id"""
391
repo = self.make_repository('.',
392
format=bzrdir.format_registry.get('knit')())
393
inv_xml = ('<inventory format="5" revision_id="other-rev-id">\n'
395
# Arguably, the deserialise_inventory should detect a mismatch, and
396
# raise an error, rather than silently using one revision_id over the
398
inv = repo.deserialise_inventory('test-rev-id', inv_xml)
399
self.assertEqual('other-rev-id', inv.root.revision)
402
class KnitRepositoryStreamTests(test_knit.KnitTests):
403
"""Tests for knitrepo._get_stream_as_bytes."""
405
def test_get_stream_as_bytes(self):
407
k1 = self.make_test_knit()
408
k1.add_lines('text-a', [], test_knit.split_lines(test_knit.TEXT_1))
410
# Serialise it, check the output.
411
bytes = knitrepo._get_stream_as_bytes(k1, ['text-a'])
412
data = bencode.bdecode(bytes)
413
format, record = data
414
self.assertEqual('knit-plain', format)
415
self.assertEqual(['text-a', ['fulltext'], []], record[:3])
416
self.assertRecordContentEqual(k1, 'text-a', record[3])
418
def test_get_stream_as_bytes_all(self):
419
"""Get a serialised data stream for all the records in a knit.
421
Much like test_get_stream_all, except for get_stream_as_bytes.
423
k1 = self.make_test_knit()
424
# Insert the same data as BasicKnitTests.test_knit_join, as they seem
425
# to cover a range of cases (no parents, one parent, multiple parents).
427
('text-a', [], test_knit.TEXT_1),
428
('text-b', ['text-a'], test_knit.TEXT_1),
429
('text-c', [], test_knit.TEXT_1),
430
('text-d', ['text-c'], test_knit.TEXT_1),
431
('text-m', ['text-b', 'text-d'], test_knit.TEXT_1),
433
expected_data_list = [
434
# version, options, parents
435
('text-a', ['fulltext'], []),
436
('text-b', ['line-delta'], ['text-a']),
437
('text-c', ['fulltext'], []),
438
('text-d', ['line-delta'], ['text-c']),
439
('text-m', ['line-delta'], ['text-b', 'text-d']),
441
for version_id, parents, lines in test_data:
442
k1.add_lines(version_id, parents, test_knit.split_lines(lines))
444
bytes = knitrepo._get_stream_as_bytes(
445
k1, ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
447
data = bencode.bdecode(bytes)
449
self.assertEqual('knit-plain', format)
451
for expected, actual in zip(expected_data_list, data):
452
expected_version = expected[0]
453
expected_options = expected[1]
454
expected_parents = expected[2]
455
version, options, parents, bytes = actual
456
self.assertEqual(expected_version, version)
457
self.assertEqual(expected_options, options)
458
self.assertEqual(expected_parents, parents)
459
self.assertRecordContentEqual(k1, version, bytes)
462
class DummyRepository(object):
463
"""A dummy repository for testing."""
467
def supports_rich_root(self):
471
class InterDummy(repository.InterRepository):
472
"""An inter-repository optimised code path for DummyRepository.
474
This is for use during testing where we use DummyRepository as repositories
475
so that none of the default regsitered inter-repository classes will
480
def is_compatible(repo_source, repo_target):
481
"""InterDummy is compatible with DummyRepository."""
482
return (isinstance(repo_source, DummyRepository) and
483
isinstance(repo_target, DummyRepository))
486
class TestInterRepository(TestCaseWithTransport):
488
def test_get_default_inter_repository(self):
489
# test that the InterRepository.get(repo_a, repo_b) probes
490
# for a inter_repo class where is_compatible(repo_a, repo_b) returns
491
# true and returns a default inter_repo otherwise.
492
# This also tests that the default registered optimised interrepository
493
# classes do not barf inappropriately when a surprising repository type
495
dummy_a = DummyRepository()
496
dummy_b = DummyRepository()
497
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
499
def assertGetsDefaultInterRepository(self, repo_a, repo_b):
500
"""Asserts that InterRepository.get(repo_a, repo_b) -> the default.
502
The effective default is now InterSameDataRepository because there is
503
no actual sane default in the presence of incompatible data models.
505
inter_repo = repository.InterRepository.get(repo_a, repo_b)
506
self.assertEqual(repository.InterSameDataRepository,
507
inter_repo.__class__)
508
self.assertEqual(repo_a, inter_repo.source)
509
self.assertEqual(repo_b, inter_repo.target)
511
def test_register_inter_repository_class(self):
512
# test that a optimised code path provider - a
513
# InterRepository subclass can be registered and unregistered
514
# and that it is correctly selected when given a repository
515
# pair that it returns true on for the is_compatible static method
517
dummy_a = DummyRepository()
518
dummy_b = DummyRepository()
519
repo = self.make_repository('.')
520
# hack dummies to look like repo somewhat.
521
dummy_a._serializer = repo._serializer
522
dummy_b._serializer = repo._serializer
523
repository.InterRepository.register_optimiser(InterDummy)
525
# we should get the default for something InterDummy returns False
527
self.assertFalse(InterDummy.is_compatible(dummy_a, repo))
528
self.assertGetsDefaultInterRepository(dummy_a, repo)
529
# and we should get an InterDummy for a pair it 'likes'
530
self.assertTrue(InterDummy.is_compatible(dummy_a, dummy_b))
531
inter_repo = repository.InterRepository.get(dummy_a, dummy_b)
532
self.assertEqual(InterDummy, inter_repo.__class__)
533
self.assertEqual(dummy_a, inter_repo.source)
534
self.assertEqual(dummy_b, inter_repo.target)
536
repository.InterRepository.unregister_optimiser(InterDummy)
537
# now we should get the default InterRepository object again.
538
self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
541
class TestInterWeaveRepo(TestCaseWithTransport):
543
def test_is_compatible_and_registered(self):
544
# InterWeaveRepo is compatible when either side
545
# is a format 5/6/7 branch
546
from bzrlib.repofmt import knitrepo, weaverepo
547
formats = [weaverepo.RepositoryFormat5(),
548
weaverepo.RepositoryFormat6(),
549
weaverepo.RepositoryFormat7()]
550
incompatible_formats = [weaverepo.RepositoryFormat4(),
551
knitrepo.RepositoryFormatKnit1(),
553
repo_a = self.make_repository('a')
554
repo_b = self.make_repository('b')
555
is_compatible = repository.InterWeaveRepo.is_compatible
556
for source in incompatible_formats:
557
# force incompatible left then right
558
repo_a._format = source
559
repo_b._format = formats[0]
560
self.assertFalse(is_compatible(repo_a, repo_b))
561
self.assertFalse(is_compatible(repo_b, repo_a))
562
for source in formats:
563
repo_a._format = source
564
for target in formats:
565
repo_b._format = target
566
self.assertTrue(is_compatible(repo_a, repo_b))
567
self.assertEqual(repository.InterWeaveRepo,
568
repository.InterRepository.get(repo_a,
572
class TestInterRemoteToOther(TestCaseWithTransport):
574
def make_remote_repository(self, path, backing_format=None):
575
"""Make a RemoteRepository object backed by a real repository that will
576
be created at the given path."""
577
self.make_repository(path, format=backing_format)
578
smart_server = server.SmartTCPServer_for_testing()
580
remote_transport = get_transport(smart_server.get_url()).clone(path)
581
self.addCleanup(smart_server.tearDown)
582
remote_bzrdir = bzrdir.BzrDir.open_from_transport(remote_transport)
583
remote_repo = remote_bzrdir.open_repository()
586
def test_is_compatible_same_format(self):
587
"""InterRemoteToOther is compatible with a remote repository and a
588
second repository that have the same format."""
589
local_repo = self.make_repository('local')
590
remote_repo = self.make_remote_repository('remote')
591
is_compatible = repository.InterRemoteToOther.is_compatible
593
is_compatible(remote_repo, local_repo),
594
"InterRemoteToOther(%r, %r) is false" % (remote_repo, local_repo))
596
def test_is_incompatible_different_format(self):
597
local_repo = self.make_repository('local', 'dirstate')
598
remote_repo = self.make_remote_repository('a', 'dirstate-with-subtree')
599
is_compatible = repository.InterRemoteToOther.is_compatible
601
is_compatible(remote_repo, local_repo),
602
"InterRemoteToOther(%r, %r) is true" % (local_repo, remote_repo))
604
def test_is_incompatible_different_format_both_remote(self):
605
remote_repo_a = self.make_remote_repository(
606
'a', 'dirstate-with-subtree')
607
remote_repo_b = self.make_remote_repository('b', 'dirstate')
608
is_compatible = repository.InterRemoteToOther.is_compatible
610
is_compatible(remote_repo_a, remote_repo_b),
611
"InterRemoteToOther(%r, %r) is true"
612
% (remote_repo_a, remote_repo_b))
615
class TestRepositoryConverter(TestCaseWithTransport):
617
def test_convert_empty(self):
618
t = get_transport(self.get_url('.'))
619
t.mkdir('repository')
620
repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
621
repo = weaverepo.RepositoryFormat7().initialize(repo_dir)
622
target_format = knitrepo.RepositoryFormatKnit1()
623
converter = repository.CopyConverter(target_format)
624
pb = bzrlib.ui.ui_factory.nested_progress_bar()
626
converter.convert(repo, pb)
629
repo = repo_dir.open_repository()
630
self.assertTrue(isinstance(target_format, repo._format.__class__))
633
class TestMisc(TestCase):
635
def test_unescape_xml(self):
636
"""We get some kind of error when malformed entities are passed"""
637
self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
640
class TestRepositoryFormatKnit3(TestCaseWithTransport):
642
def test_convert(self):
643
"""Ensure the upgrade adds weaves for roots"""
644
format = bzrdir.BzrDirMetaFormat1()
645
format.repository_format = knitrepo.RepositoryFormatKnit1()
646
tree = self.make_branch_and_tree('.', format)
647
tree.commit("Dull commit", rev_id="dull")
648
revision_tree = tree.branch.repository.revision_tree('dull')
649
self.assertRaises(errors.NoSuchFile, revision_tree.get_file_lines,
650
revision_tree.inventory.root.file_id)
651
format = bzrdir.BzrDirMetaFormat1()
652
format.repository_format = knitrepo.RepositoryFormatKnit3()
653
upgrade.Convert('.', format)
654
tree = workingtree.WorkingTree.open('.')
655
revision_tree = tree.branch.repository.revision_tree('dull')
656
revision_tree.get_file_lines(revision_tree.inventory.root.file_id)
657
tree.commit("Another dull commit", rev_id='dull2')
658
revision_tree = tree.branch.repository.revision_tree('dull2')
659
self.assertEqual('dull', revision_tree.inventory.root.revision)
661
def test_exposed_versioned_files_are_marked_dirty(self):
662
format = bzrdir.BzrDirMetaFormat1()
663
format.repository_format = knitrepo.RepositoryFormatKnit3()
664
repo = self.make_repository('.', format=format)
666
inv = repo.get_inventory_weave()
668
self.assertRaises(errors.OutSideTransaction,
669
inv.add_lines, 'foo', [], [])
672
class TestWithBrokenRepo(TestCaseWithTransport):
674
def make_broken_repository(self):
675
# XXX: This function is borrowed from Aaron's "Reconcile can fix bad
676
# parent references" branch which is due to land in bzr.dev soon. Once
677
# it does, this duplication should be removed.
678
repo = self.make_repository('broken-repo')
682
cleanups.append(repo.unlock)
683
repo.start_write_group()
684
cleanups.append(repo.commit_write_group)
685
# make rev1a: A well-formed revision, containing 'file1'
686
inv = inventory.Inventory(revision_id='rev1a')
687
inv.root.revision = 'rev1a'
688
self.add_file(repo, inv, 'file1', 'rev1a', [])
689
repo.add_inventory('rev1a', inv, [])
690
revision = _mod_revision.Revision('rev1a',
691
committer='jrandom@example.com', timestamp=0,
692
inventory_sha1='', timezone=0, message='foo', parent_ids=[])
693
repo.add_revision('rev1a',revision, inv)
695
# make rev1b, which has no Revision, but has an Inventory, and
697
inv = inventory.Inventory(revision_id='rev1b')
698
inv.root.revision = 'rev1b'
699
self.add_file(repo, inv, 'file1', 'rev1b', [])
700
repo.add_inventory('rev1b', inv, [])
702
# make rev2, with file1 and file2
704
# file1 has 'rev1b' as an ancestor, even though this is not
705
# mentioned by 'rev1a', making it an unreferenced ancestor
706
inv = inventory.Inventory()
707
self.add_file(repo, inv, 'file1', 'rev2', ['rev1a', 'rev1b'])
708
self.add_file(repo, inv, 'file2', 'rev2', [])
709
self.add_revision(repo, 'rev2', inv, ['rev1a'])
711
# make ghost revision rev1c
712
inv = inventory.Inventory()
713
self.add_file(repo, inv, 'file2', 'rev1c', [])
715
# make rev3 with file2
716
# file2 refers to 'rev1c', which is a ghost in this repository, so
717
# file2 cannot have rev1c as its ancestor.
718
inv = inventory.Inventory()
719
self.add_file(repo, inv, 'file2', 'rev3', ['rev1c'])
720
self.add_revision(repo, 'rev3', inv, ['rev1c'])
723
for cleanup in reversed(cleanups):
726
def add_revision(self, repo, revision_id, inv, parent_ids):
727
inv.revision_id = revision_id
728
inv.root.revision = revision_id
729
repo.add_inventory(revision_id, inv, parent_ids)
730
revision = _mod_revision.Revision(revision_id,
731
committer='jrandom@example.com', timestamp=0, inventory_sha1='',
732
timezone=0, message='foo', parent_ids=parent_ids)
733
repo.add_revision(revision_id,revision, inv)
735
def add_file(self, repo, inv, filename, revision, parents):
736
file_id = filename + '-id'
737
entry = inventory.InventoryFile(file_id, filename, 'TREE_ROOT')
738
entry.revision = revision
741
vf = repo.weave_store.get_weave_or_empty(file_id,
742
repo.get_transaction())
743
vf.add_lines(revision, parents, ['line\n'])
745
def test_insert_from_broken_repo(self):
746
"""Inserting a data stream from a broken repository won't silently
747
corrupt the target repository.
749
broken_repo = self.make_broken_repository()
750
empty_repo = self.make_repository('empty-repo')
751
stream = broken_repo.get_data_stream(['rev1a', 'rev2', 'rev3'])
753
errors.KnitCorrupt, empty_repo.insert_data_stream, stream)