52
52
def test_attribute__fetch_order(self):
53
53
"""Test the _fetch_order attribute."""
54
self.assertFormatAttribute('_fetch_order', ('topological', 'unordered'))
54
self.assertFormatAttribute(
55
'_fetch_order', ('topological', 'unordered'))
56
57
def test_attribute__fetch_uses_deltas(self):
57
58
"""Test the _fetch_uses_deltas attribute."""
77
78
tree = self.make_branch_and_tree('tree')
78
79
repo = tree.branch.repository
79
80
self.assertIsInstance(repo.revisions,
80
versionedfile.VersionedFiles)
81
versionedfile.VersionedFiles)
82
83
def test_attribute_revision_store_basics(self):
83
84
"""Test the basic behaviour of the revisions attribute."""
88
89
self.assertEqual(set(), set(repo.revisions.keys()))
89
90
revid = (tree.commit("foo"),)
90
91
self.assertEqual({revid}, set(repo.revisions.keys()))
91
self.assertEqual({revid:()},
92
repo.revisions.get_parent_map([revid]))
92
self.assertEqual({revid: ()},
93
repo.revisions.get_parent_map([revid]))
95
96
tree2 = self.make_branch_and_tree('tree2')
102
103
self.addCleanup(repo.unlock)
103
104
self.assertEqual({revid, left_id, right_id, merge_id},
104
set(repo.revisions.keys()))
105
self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
106
merge_id:(right_id, left_id)},
107
repo.revisions.get_parent_map(repo.revisions.keys()))
105
set(repo.revisions.keys()))
106
self.assertEqual({revid: (), left_id: (revid,), right_id: (revid,),
107
merge_id: (right_id, left_id)},
108
repo.revisions.get_parent_map(repo.revisions.keys()))
109
110
def test_attribute_signature_store(self):
110
111
"""Test the existence of the signatures attribute."""
111
112
tree = self.make_branch_and_tree('tree')
112
113
repo = tree.branch.repository
113
114
self.assertIsInstance(repo.signatures,
114
versionedfile.VersionedFiles)
115
versionedfile.VersionedFiles)
116
117
def test_exposed_versioned_files_are_marked_dirty(self):
117
118
repo = self.make_repository('.')
121
122
inventories = repo.inventories
123
124
self.assertRaises(errors.ObjectNotLocked,
125
self.assertRaises(errors.ObjectNotLocked,
127
self.assertRaises(errors.ObjectNotLocked,
129
self.assertRaises(errors.ObjectNotLocked,
130
signatures.add_lines, ('foo',), [], [])
131
self.assertRaises(errors.ObjectNotLocked,
132
revisions.add_lines, ('foo',), [], [])
133
self.assertRaises(errors.ObjectNotLocked,
134
inventories.add_lines, ('foo',), [], [])
126
self.assertRaises(errors.ObjectNotLocked,
128
self.assertRaises(errors.ObjectNotLocked,
130
self.assertRaises(errors.ObjectNotLocked,
131
signatures.add_lines, ('foo',), [], [])
132
self.assertRaises(errors.ObjectNotLocked,
133
revisions.add_lines, ('foo',), [], [])
134
self.assertRaises(errors.ObjectNotLocked,
135
inventories.add_lines, ('foo',), [], [])
136
137
def test__get_sink(self):
137
138
repo = self.make_repository('repo')
163
164
root_id = inv.root.file_id
164
165
repo.texts.add_lines((b'fixed-root', b'A'), [], [])
165
166
repo.add_revision(b'A', _mod_revision.Revision(
166
b'A', committer='B', timestamp=0,
167
timezone=0, message='C'), inv=inv)
167
b'A', committer='B', timestamp=0,
168
timezone=0, message='C'), inv=inv)
168
169
repo.commit_write_group()
197
198
tree = self.make_branch_and_tree('tree')
198
199
repo = tree.branch.repository
199
200
self.assertIsInstance(repo.texts,
200
versionedfile.VersionedFiles)
201
versionedfile.VersionedFiles)
202
203
def test_iter_inventories_is_ordered(self):
203
204
# just a smoke test
270
271
with tree.lock_write():
271
272
self.assertEqual(set(), set(repo.texts.keys()))
272
273
tree.add(['foo'], [file_id], ['file'])
273
tree.put_file_bytes_non_atomic('foo', b'content\n', file_id=file_id)
274
tree.put_file_bytes_non_atomic(
275
'foo', b'content\n', file_id=file_id)
275
277
rev_key = (tree.commit("foo"),)
276
278
except errors.IllegalPath:
280
282
if repo._format.rich_root_data:
281
283
root_commit = (tree.get_root_id(),) + rev_key
282
284
keys = {root_commit}
283
parents = {root_commit:()}
285
parents = {root_commit: ()}
288
290
parents[file_key + rev_key] = ()
289
291
self.assertEqual(keys, set(repo.texts.keys()))
290
292
self.assertEqual(parents,
291
repo.texts.get_parent_map(repo.texts.keys()))
293
repo.texts.get_parent_map(repo.texts.keys()))
292
294
tree2 = self.make_branch_and_tree('tree2')
293
295
tree2.pull(tree.branch)
294
296
tree2.put_file_bytes_non_atomic('foo', b'right\n', file_id=b'Foo:Bar')
379
381
def test_all_revision_ids(self):
380
382
# all_revision_ids -> all revisions
381
383
self.assertEqual({b'rev1', b'rev2', b'rev3', b'rev4'},
382
set(self.controldir.open_repository().all_revision_ids()))
384
set(self.controldir.open_repository().all_revision_ids()))
384
386
def test_reserved_id(self):
385
387
repo = self.make_repository('repository')
387
389
repo.start_write_group()
389
391
self.assertRaises(errors.ReservedId, repo.add_inventory,
390
b'reserved:', None, None)
392
b'reserved:', None, None)
391
393
self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
392
"foo", [], b'reserved:', None)
394
"foo", [], b'reserved:', None)
393
395
self.assertRaises(errors.ReservedId, repo.add_revision,
396
398
repo.abort_write_group()
408
410
repo = self.make_repository('inventory_with_unnecessary_ghost')
409
411
repo.lock_write()
410
412
repo.start_write_group()
411
inv = inventory.Inventory(revision_id = b'ghost')
413
inv = inventory.Inventory(revision_id=b'ghost')
412
414
inv.root.revision = b'ghost'
413
415
if repo.supports_rich_root():
414
416
root_id = inv.root.file_id
424
426
raise tests.TestNotApplicable(
425
427
"Cannot test with ghosts for this format.")
427
inv = inventory.Inventory(revision_id = b'the_ghost')
429
inv = inventory.Inventory(revision_id=b'the_ghost')
428
430
inv.root.revision = b'the_ghost'
429
431
if repo.supports_rich_root():
430
432
root_id = inv.root.file_id
439
441
inv_weave = repo.inventories
440
442
possible_parents = (None, ((b'ghost',),))
441
443
self.assertSubset(inv_weave.get_parent_map([(b'ghost',)])[(b'ghost',)],
443
445
repo.commit_write_group()
457
459
if not reported_wrong:
459
self.assertRaises(errors.CorruptRepository, repo.get_revision, b'ghost')
461
self.assertRaises(errors.CorruptRepository,
462
repo.get_revision, b'ghost')
461
464
def test_corrupt_revision_get_revision_reconcile(self):
462
465
repo_url = self.get_url('inventory_with_unnecessary_ghost')