1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
28
InventoryDirectory, InventoryEntry, TreeReference)
29
from bzrlib.tests import (
31
TestCaseWithTransport,
34
split_suite_by_condition,
36
from bzrlib.tests.per_workingtree import workingtree_formats
39
def load_tests(standard_tests, module, loader):
40
"""Parameterise some inventory tests."""
41
to_adapt, result = split_suite_by_condition(standard_tests,
42
condition_isinstance(TestDeltaApplication))
44
('Inventory', {'apply_delta':apply_inventory_Inventory}),
46
# Working tree basis delta application
47
# Repository add_inv_by_delta.
48
# Reduce form of the per_repository test logic - that logic needs to be
49
# be able to get /just/ repositories whereas these tests are fine with
50
# just creating trees.
52
for _, format in repository.format_registry.iteritems():
53
scenarios.append((str(format.__name__), {
54
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
56
for format in workingtree_formats():
58
(str(format.__class__.__name__) + ".update_basis_by_delta", {
59
'apply_delta':apply_inventory_WT_basis,
62
(str(format.__class__.__name__) + ".apply_inventory_delta", {
63
'apply_delta':apply_inventory_WT,
65
return multiply_tests(to_adapt, scenarios, result)
68
def create_texts_for_inv(repo, inv):
69
for path, ie in inv.iter_entries():
71
lines = ['a' * ie.text_size]
74
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
76
def apply_inventory_Inventory(self, basis, delta):
77
"""Apply delta to basis and return the result.
79
:param basis: An inventory to be used as the basis.
80
:param delta: The inventory delta to apply:
81
:return: An inventory resulting from the application.
83
basis.apply_delta(delta)
87
def apply_inventory_WT(self, basis, delta):
88
"""Apply delta to basis and return the result.
90
This sets the tree state to be basis, and then calls apply_inventory_delta.
92
:param basis: An inventory to be used as the basis.
93
:param delta: The inventory delta to apply:
94
:return: An inventory resulting from the application.
96
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
97
control.create_repository()
98
control.create_branch()
99
tree = self.format.initialize(control)
102
tree._write_inventory(basis)
105
# Fresh object, reads disk again.
106
tree = tree.bzrdir.open_workingtree()
109
tree.apply_inventory_delta(delta)
112
# reload tree - ensure we get what was written.
113
tree = tree.bzrdir.open_workingtree()
115
self.addCleanup(tree.unlock)
116
# One could add 'tree._validate' here but that would cause 'early' failues
117
# as far as higher level code is concerned. Possibly adding an
118
# expect_fail parameter to this function and if that is False then do a
120
return tree.inventory
123
def apply_inventory_WT_basis(self, basis, delta):
124
"""Apply delta to basis and return the result.
126
This sets the parent and then calls update_basis_by_delta.
127
It also puts the basis in the repository under both 'basis' and 'result' to
128
allow safety checks made by the WT to succeed, and finally ensures that all
129
items in the delta with a new path are present in the WT before calling
130
update_basis_by_delta.
132
:param basis: An inventory to be used as the basis.
133
:param delta: The inventory delta to apply:
134
:return: An inventory resulting from the application.
136
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
137
control.create_repository()
138
control.create_branch()
139
tree = self.format.initialize(control)
142
repo = tree.branch.repository
143
repo.start_write_group()
145
rev = revision.Revision('basis', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = 'basis'
148
create_texts_for_inv(tree.branch.repository, basis)
149
repo.add_revision('basis', rev, basis)
150
# Add a revision for the result, with the basis content -
151
# update_basis_by_delta doesn't check that the delta results in
152
# result, and we want inconsistent deltas to get called on the
153
# tree, or else the code isn't actually checked.
154
rev = revision.Revision('result', timestamp=0, timezone=None,
155
message="", committer="foo@example.com")
156
basis.revision_id = 'result'
157
repo.add_revision('result', rev, basis)
158
repo.commit_write_group()
160
repo.abort_write_group()
162
# Set the basis state as the trees current state
163
tree._write_inventory(basis)
164
# This reads basis from the repo and puts it into the tree's local
165
# cache, if it has one.
166
tree.set_parent_ids(['basis'])
169
for old, new, id, entry in delta:
170
if None in (new, entry):
172
paths[new] = (entry.file_id, entry.kind)
173
parents.add(osutils.dirname(new))
174
parents = osutils.minimum_path_selection(parents)
176
# Put place holders in the tree to permit adding the other entries.
177
for pos, parent in enumerate(parents):
178
if not tree.path2id(parent):
179
# add a synthetic directory in the tree so we can can put the
180
# tree0 entries in place for dirstate.
181
tree.add([parent], ["id%d" % pos], ["directory"])
183
# Many deltas may cause this mini-apply to fail, but we want to see what
184
# the delta application code says, not the prep that we do to deal with
185
# limitations of dirstate's update_basis code.
186
for path, (file_id, kind) in sorted(paths.items()):
188
tree.add([path], [file_id], [kind])
189
except (KeyboardInterrupt, SystemExit):
195
# Fresh lock, reads disk again.
198
tree.update_basis_by_delta('result', delta)
201
# reload tree - ensure we get what was written.
202
tree = tree.bzrdir.open_workingtree()
203
basis_tree = tree.basis_tree()
204
basis_tree.lock_read()
205
self.addCleanup(basis_tree.unlock)
206
# Note, that if the tree does not have a local cache, the trick above of
207
# setting the result as the basis, will come back to bite us. That said,
208
# all the implementations in bzr do have a local cache.
209
return basis_tree.inventory
212
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
213
"""Apply delta to basis and return the result.
215
This inserts basis as a whole inventory and then uses
216
add_inventory_by_delta to add delta.
218
:param basis: An inventory to be used as the basis.
219
:param delta: The inventory delta to apply:
220
:return: An inventory resulting from the application.
222
format = self.format()
223
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
224
repo = format.initialize(control)
227
repo.start_write_group()
229
rev = revision.Revision('basis', timestamp=0, timezone=None,
230
message="", committer="foo@example.com")
231
basis.revision_id = 'basis'
232
create_texts_for_inv(repo, basis)
233
repo.add_revision('basis', rev, basis)
234
repo.commit_write_group()
236
repo.abort_write_group()
242
repo.start_write_group()
244
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
247
repo.abort_write_group()
250
repo.commit_write_group()
253
# Fresh lock, reads disk again.
254
repo = repo.bzrdir.open_repository()
256
self.addCleanup(repo.unlock)
257
return repo.get_inventory('result')
260
class TestDeltaApplication(TestCaseWithTransport):
262
def get_empty_inventory(self, reference_inv=None):
263
"""Get an empty inventory.
265
Note that tests should not depend on the revision of the root for
266
setting up test conditions, as it has to be flexible to accomodate non
267
rich root repositories.
269
:param reference_inv: If not None, get the revision for the root from
270
this inventory. This is useful for dealing with older repositories
271
that routinely discarded the root entry data. If None, the root's
272
revision is set to 'basis'.
274
inv = inventory.Inventory()
275
if reference_inv is not None:
276
inv.root.revision = reference_inv.root.revision
278
inv.root.revision = 'basis'
281
def test_empty_delta(self):
282
inv = self.get_empty_inventory()
284
inv = self.apply_delta(self, inv, delta)
285
inv2 = self.get_empty_inventory(inv)
286
self.assertEqual([], inv2._make_delta(inv))
288
def test_None_file_id(self):
289
inv = self.get_empty_inventory()
290
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
291
dir1.revision = 'result'
292
delta = [(None, u'dir1', None, dir1)]
293
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
296
def test_unicode_file_id(self):
297
inv = self.get_empty_inventory()
298
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
299
dir1.revision = 'result'
300
delta = [(None, u'dir1', dir1.file_id, dir1)]
301
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
304
def test_repeated_file_id(self):
305
inv = self.get_empty_inventory()
306
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
307
file1.revision = 'result'
310
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
311
file2.revision = 'result'
314
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
315
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
318
def test_repeated_new_path(self):
319
inv = self.get_empty_inventory()
320
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
321
file1.revision = 'result'
324
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
325
file2.revision = 'result'
328
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
329
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
332
def test_repeated_old_path(self):
333
inv = self.get_empty_inventory()
334
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
335
file1.revision = 'result'
338
# We can't *create* a source inventory with the same path, but
339
# a badly generated partial delta might claim the same source twice.
340
# This would be buggy in two ways: the path is repeated in the delta,
341
# And the path for one of the file ids doesn't match the source
342
# location. Alternatively, we could have a repeated fileid, but that
343
# is separately checked for.
344
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
345
file2.revision = 'result'
350
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
351
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
354
def test_mismatched_id_entry_id(self):
355
inv = self.get_empty_inventory()
356
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
357
file1.revision = 'result'
360
delta = [(None, u'path', 'id', file1)]
361
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
364
def test_mismatched_new_path_entry_None(self):
365
inv = self.get_empty_inventory()
366
delta = [(None, u'path', 'id', None)]
367
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
370
def test_mismatched_new_path_None_entry(self):
371
inv = self.get_empty_inventory()
372
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
373
file1.revision = 'result'
376
delta = [(u"path", None, 'id1', file1)]
377
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
380
def test_parent_is_not_directory(self):
381
inv = self.get_empty_inventory()
382
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
383
file1.revision = 'result'
386
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
387
file2.revision = 'result'
391
delta = [(None, u'path/path2', 'id2', file2)]
392
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
395
def test_parent_is_missing(self):
396
inv = self.get_empty_inventory()
397
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
398
file2.revision = 'result'
401
delta = [(None, u'path/path2', 'id2', file2)]
402
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
405
def test_new_parent_path_has_wrong_id(self):
406
inv = self.get_empty_inventory()
407
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
408
parent1.revision = 'result'
409
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
410
parent2.revision = 'result'
411
file1 = inventory.InventoryFile('id', 'path', 'p-2')
412
file1.revision = 'result'
417
# This delta claims that file1 is at dir/path, but actually its at
418
# dir2/path if you follow the inventory parent structure.
419
delta = [(None, u'dir/path', 'id', file1)]
420
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
423
def test_old_parent_path_is_wrong(self):
424
inv = self.get_empty_inventory()
425
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
426
parent1.revision = 'result'
427
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
428
parent2.revision = 'result'
429
file1 = inventory.InventoryFile('id', 'path', 'p-2')
430
file1.revision = 'result'
436
# This delta claims that file1 was at dir/path, but actually it was at
437
# dir2/path if you follow the inventory parent structure.
438
delta = [(u'dir/path', None, 'id', None)]
439
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
442
def test_old_parent_path_is_for_other_id(self):
443
inv = self.get_empty_inventory()
444
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
445
parent1.revision = 'result'
446
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
447
parent2.revision = 'result'
448
file1 = inventory.InventoryFile('id', 'path', 'p-2')
449
file1.revision = 'result'
452
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
453
file2.revision = 'result'
460
# This delta claims that file1 was at dir/path, but actually it was at
461
# dir2/path if you follow the inventory parent structure. At dir/path
462
# is another entry we should not delete.
463
delta = [(u'dir/path', None, 'id', None)]
464
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
467
def test_add_existing_id_new_path(self):
468
inv = self.get_empty_inventory()
469
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
470
parent1.revision = 'result'
471
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
472
parent2.revision = 'result'
474
delta = [(None, u'dir2', 'p-1', parent2)]
475
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
478
def test_add_new_id_existing_path(self):
479
inv = self.get_empty_inventory()
480
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
481
parent1.revision = 'result'
482
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
483
parent2.revision = 'result'
485
delta = [(None, u'dir1', 'p-2', parent2)]
486
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
489
def test_remove_dir_leaving_dangling_child(self):
490
inv = self.get_empty_inventory()
491
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
492
dir1.revision = 'result'
493
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
494
dir2.revision = 'result'
495
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
496
dir3.revision = 'result'
500
delta = [(u'dir1', None, 'p-1', None),
501
(u'dir1/child2', None, 'p-3', None)]
502
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
506
class TestInventoryEntry(TestCase):
508
def test_file_kind_character(self):
509
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
510
self.assertEqual(file.kind_character(), '')
512
def test_dir_kind_character(self):
513
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
514
self.assertEqual(dir.kind_character(), '/')
516
def test_link_kind_character(self):
517
dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
518
self.assertEqual(dir.kind_character(), '')
520
def test_dir_detect_changes(self):
521
left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
523
left.executable = True
524
left.symlink_target='foo'
525
right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
526
right.text_sha1 = 321
527
right.symlink_target='bar'
528
self.assertEqual((False, False), left.detect_changes(right))
529
self.assertEqual((False, False), right.detect_changes(left))
531
def test_file_detect_changes(self):
532
left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
534
right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
535
right.text_sha1 = 123
536
self.assertEqual((False, False), left.detect_changes(right))
537
self.assertEqual((False, False), right.detect_changes(left))
538
left.executable = True
539
self.assertEqual((False, True), left.detect_changes(right))
540
self.assertEqual((False, True), right.detect_changes(left))
541
right.text_sha1 = 321
542
self.assertEqual((True, True), left.detect_changes(right))
543
self.assertEqual((True, True), right.detect_changes(left))
545
def test_symlink_detect_changes(self):
546
left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
548
left.executable = True
549
left.symlink_target='foo'
550
right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
551
right.text_sha1 = 321
552
right.symlink_target='foo'
553
self.assertEqual((False, False), left.detect_changes(right))
554
self.assertEqual((False, False), right.detect_changes(left))
555
left.symlink_target = 'different'
556
self.assertEqual((True, False), left.detect_changes(right))
557
self.assertEqual((True, False), right.detect_changes(left))
559
def test_file_has_text(self):
560
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
561
self.failUnless(file.has_text())
563
def test_directory_has_text(self):
564
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
565
self.failIf(dir.has_text())
567
def test_link_has_text(self):
568
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
569
self.failIf(link.has_text())
571
def test_make_entry(self):
572
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
573
inventory.InventoryFile)
574
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
575
inventory.InventoryLink)
576
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
577
inventory.InventoryDirectory)
579
def test_make_entry_non_normalized(self):
580
orig_normalized_filename = osutils.normalized_filename
583
osutils.normalized_filename = osutils._accessible_normalized_filename
584
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
585
self.assertEqual(u'\xe5', entry.name)
586
self.assertIsInstance(entry, inventory.InventoryFile)
588
osutils.normalized_filename = osutils._inaccessible_normalized_filename
589
self.assertRaises(errors.InvalidNormalization,
590
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
592
osutils.normalized_filename = orig_normalized_filename
595
class TestDescribeChanges(TestCase):
597
def test_describe_change(self):
598
# we need to test the following change combinations:
604
# renamed/reparented and modified
605
# change kind (perhaps can't be done yet?)
606
# also, merged in combination with all of these?
607
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
608
old_a.text_sha1 = '123132'
610
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
611
new_a.text_sha1 = '123132'
614
self.assertChangeDescription('unchanged', old_a, new_a)
617
new_a.text_sha1 = 'abcabc'
618
self.assertChangeDescription('modified', old_a, new_a)
620
self.assertChangeDescription('added', None, new_a)
621
self.assertChangeDescription('removed', old_a, None)
622
# perhaps a bit questionable but seems like the most reasonable thing...
623
self.assertChangeDescription('unchanged', None, None)
625
# in this case it's both renamed and modified; show a rename and
627
new_a.name = 'newfilename'
628
self.assertChangeDescription('modified and renamed', old_a, new_a)
630
# reparenting is 'renaming'
631
new_a.name = old_a.name
632
new_a.parent_id = 'somedir-id'
633
self.assertChangeDescription('modified and renamed', old_a, new_a)
635
# reset the content values so its not modified
636
new_a.text_size = old_a.text_size
637
new_a.text_sha1 = old_a.text_sha1
638
new_a.name = old_a.name
640
new_a.name = 'newfilename'
641
self.assertChangeDescription('renamed', old_a, new_a)
643
# reparenting is 'renaming'
644
new_a.name = old_a.name
645
new_a.parent_id = 'somedir-id'
646
self.assertChangeDescription('renamed', old_a, new_a)
648
def assertChangeDescription(self, expected_change, old_ie, new_ie):
649
change = InventoryEntry.describe_change(old_ie, new_ie)
650
self.assertEqual(expected_change, change)
653
class TestCHKInventory(TestCaseWithTransport):
655
def get_chk_bytes(self):
656
# The easiest way to get a CHK store is a development6 repository and
657
# then work with the chk_bytes attribute directly.
658
repo = self.make_repository(".", format="development6-rich-root")
660
self.addCleanup(repo.unlock)
661
repo.start_write_group()
662
self.addCleanup(repo.abort_write_group)
663
return repo.chk_bytes
665
def read_bytes(self, chk_bytes, key):
666
stream = chk_bytes.get_record_stream([key], 'unordered', True)
667
return stream.next().get_bytes_as("fulltext")
669
def test_deserialise_gives_CHKInventory(self):
671
inv.revision_id = "revid"
672
inv.root.revision = "rootrev"
673
chk_bytes = self.get_chk_bytes()
674
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
675
bytes = ''.join(chk_inv.to_lines())
676
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
677
self.assertEqual("revid", new_inv.revision_id)
678
self.assertEqual("directory", new_inv.root.kind)
679
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
680
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
681
self.assertEqual(inv.root.name, new_inv.root.name)
682
self.assertEqual("rootrev", new_inv.root.revision)
683
self.assertEqual('plain', new_inv._search_key_name)
685
def test_deserialise_wrong_revid(self):
687
inv.revision_id = "revid"
688
inv.root.revision = "rootrev"
689
chk_bytes = self.get_chk_bytes()
690
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
691
bytes = ''.join(chk_inv.to_lines())
692
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
695
def test_captures_rev_root_byid(self):
697
inv.revision_id = "foo"
698
inv.root.revision = "bar"
699
chk_bytes = self.get_chk_bytes()
700
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
701
lines = chk_inv.to_lines()
704
'revision_id: foo\n',
705
'root_id: TREE_ROOT\n',
706
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
707
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
709
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
710
self.assertEqual('plain', chk_inv._search_key_name)
712
def test_captures_parent_id_basename_index(self):
714
inv.revision_id = "foo"
715
inv.root.revision = "bar"
716
chk_bytes = self.get_chk_bytes()
717
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
718
lines = chk_inv.to_lines()
721
'revision_id: foo\n',
722
'root_id: TREE_ROOT\n',
723
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
724
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
726
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
727
self.assertEqual('plain', chk_inv._search_key_name)
729
def test_captures_search_key_name(self):
731
inv.revision_id = "foo"
732
inv.root.revision = "bar"
733
chk_bytes = self.get_chk_bytes()
734
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
735
search_key_name='hash-16-way')
736
lines = chk_inv.to_lines()
739
'search_key_name: hash-16-way\n',
740
'root_id: TREE_ROOT\n',
741
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
742
'revision_id: foo\n',
743
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
745
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
746
self.assertEqual('hash-16-way', chk_inv._search_key_name)
748
def test_directory_children_on_demand(self):
750
inv.revision_id = "revid"
751
inv.root.revision = "rootrev"
752
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
753
inv["fileid"].revision = "filerev"
754
inv["fileid"].executable = True
755
inv["fileid"].text_sha1 = "ffff"
756
inv["fileid"].text_size = 1
757
chk_bytes = self.get_chk_bytes()
758
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
759
bytes = ''.join(chk_inv.to_lines())
760
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
761
root_entry = new_inv[inv.root.file_id]
762
self.assertEqual(None, root_entry._children)
763
self.assertEqual(['file'], root_entry.children.keys())
764
file_direct = new_inv["fileid"]
765
file_found = root_entry.children['file']
766
self.assertEqual(file_direct.kind, file_found.kind)
767
self.assertEqual(file_direct.file_id, file_found.file_id)
768
self.assertEqual(file_direct.parent_id, file_found.parent_id)
769
self.assertEqual(file_direct.name, file_found.name)
770
self.assertEqual(file_direct.revision, file_found.revision)
771
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
772
self.assertEqual(file_direct.text_size, file_found.text_size)
773
self.assertEqual(file_direct.executable, file_found.executable)
775
def test_from_inventory_maximum_size(self):
776
# from_inventory supports the maximum_size parameter.
778
inv.revision_id = "revid"
779
inv.root.revision = "rootrev"
780
chk_bytes = self.get_chk_bytes()
781
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
782
chk_inv.id_to_entry._ensure_root()
783
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
784
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
785
p_id_basename = chk_inv.parent_id_basename_to_file_id
786
p_id_basename._ensure_root()
787
self.assertEqual(120, p_id_basename._root_node.maximum_size)
788
self.assertEqual(2, p_id_basename._root_node._key_width)
790
def test___iter__(self):
792
inv.revision_id = "revid"
793
inv.root.revision = "rootrev"
794
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
795
inv["fileid"].revision = "filerev"
796
inv["fileid"].executable = True
797
inv["fileid"].text_sha1 = "ffff"
798
inv["fileid"].text_size = 1
799
chk_bytes = self.get_chk_bytes()
800
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
801
bytes = ''.join(chk_inv.to_lines())
802
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
803
fileids = list(new_inv.__iter__())
805
self.assertEqual([inv.root.file_id, "fileid"], fileids)
807
def test__len__(self):
809
inv.revision_id = "revid"
810
inv.root.revision = "rootrev"
811
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
812
inv["fileid"].revision = "filerev"
813
inv["fileid"].executable = True
814
inv["fileid"].text_sha1 = "ffff"
815
inv["fileid"].text_size = 1
816
chk_bytes = self.get_chk_bytes()
817
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
818
self.assertEqual(2, len(chk_inv))
820
def test___getitem__(self):
822
inv.revision_id = "revid"
823
inv.root.revision = "rootrev"
824
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
825
inv["fileid"].revision = "filerev"
826
inv["fileid"].executable = True
827
inv["fileid"].text_sha1 = "ffff"
828
inv["fileid"].text_size = 1
829
chk_bytes = self.get_chk_bytes()
830
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
831
bytes = ''.join(chk_inv.to_lines())
832
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
833
root_entry = new_inv[inv.root.file_id]
834
file_entry = new_inv["fileid"]
835
self.assertEqual("directory", root_entry.kind)
836
self.assertEqual(inv.root.file_id, root_entry.file_id)
837
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
838
self.assertEqual(inv.root.name, root_entry.name)
839
self.assertEqual("rootrev", root_entry.revision)
840
self.assertEqual("file", file_entry.kind)
841
self.assertEqual("fileid", file_entry.file_id)
842
self.assertEqual(inv.root.file_id, file_entry.parent_id)
843
self.assertEqual("file", file_entry.name)
844
self.assertEqual("filerev", file_entry.revision)
845
self.assertEqual("ffff", file_entry.text_sha1)
846
self.assertEqual(1, file_entry.text_size)
847
self.assertEqual(True, file_entry.executable)
848
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
850
def test_has_id_true(self):
852
inv.revision_id = "revid"
853
inv.root.revision = "rootrev"
854
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
855
inv["fileid"].revision = "filerev"
856
inv["fileid"].executable = True
857
inv["fileid"].text_sha1 = "ffff"
858
inv["fileid"].text_size = 1
859
chk_bytes = self.get_chk_bytes()
860
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
861
self.assertTrue(chk_inv.has_id('fileid'))
862
self.assertTrue(chk_inv.has_id(inv.root.file_id))
864
def test_has_id_not(self):
866
inv.revision_id = "revid"
867
inv.root.revision = "rootrev"
868
chk_bytes = self.get_chk_bytes()
869
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
870
self.assertFalse(chk_inv.has_id('fileid'))
872
def test_id2path(self):
874
inv.revision_id = "revid"
875
inv.root.revision = "rootrev"
876
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
877
fileentry = InventoryFile("fileid", "file", "dirid")
880
inv["fileid"].revision = "filerev"
881
inv["fileid"].executable = True
882
inv["fileid"].text_sha1 = "ffff"
883
inv["fileid"].text_size = 1
884
inv["dirid"].revision = "filerev"
885
chk_bytes = self.get_chk_bytes()
886
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
887
bytes = ''.join(chk_inv.to_lines())
888
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
889
self.assertEqual('', new_inv.id2path(inv.root.file_id))
890
self.assertEqual('dir', new_inv.id2path('dirid'))
891
self.assertEqual('dir/file', new_inv.id2path('fileid'))
893
def test_path2id(self):
895
inv.revision_id = "revid"
896
inv.root.revision = "rootrev"
897
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
898
fileentry = InventoryFile("fileid", "file", "dirid")
901
inv["fileid"].revision = "filerev"
902
inv["fileid"].executable = True
903
inv["fileid"].text_sha1 = "ffff"
904
inv["fileid"].text_size = 1
905
inv["dirid"].revision = "filerev"
906
chk_bytes = self.get_chk_bytes()
907
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
908
bytes = ''.join(chk_inv.to_lines())
909
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
910
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
911
self.assertEqual('dirid', new_inv.path2id('dir'))
912
self.assertEqual('fileid', new_inv.path2id('dir/file'))
914
def test_create_by_apply_delta_sets_root(self):
916
inv.revision_id = "revid"
917
chk_bytes = self.get_chk_bytes()
918
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
919
inv.add_path("", "directory", "myrootid", None)
920
inv.revision_id = "expectedid"
921
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
922
delta = [("", None, base_inv.root.file_id, None),
923
(None, "", "myrootid", inv.root)]
924
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
925
self.assertEquals(reference_inv.root, new_inv.root)
927
def test_create_by_apply_delta_empty_add_child(self):
929
inv.revision_id = "revid"
930
inv.root.revision = "rootrev"
931
chk_bytes = self.get_chk_bytes()
932
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
933
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
934
a_entry.revision = "filerev"
935
a_entry.executable = True
936
a_entry.text_sha1 = "ffff"
937
a_entry.text_size = 1
939
inv.revision_id = "expectedid"
940
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
941
delta = [(None, "A", "A-id", a_entry)]
942
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
943
# new_inv should be the same as reference_inv.
944
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
945
self.assertEqual(reference_inv.root_id, new_inv.root_id)
946
reference_inv.id_to_entry._ensure_root()
947
new_inv.id_to_entry._ensure_root()
948
self.assertEqual(reference_inv.id_to_entry._root_node._key,
949
new_inv.id_to_entry._root_node._key)
951
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
953
inv.revision_id = "revid"
954
inv.root.revision = "rootrev"
955
chk_bytes = self.get_chk_bytes()
956
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
957
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
958
a_entry.revision = "filerev"
959
a_entry.executable = True
960
a_entry.text_sha1 = "ffff"
961
a_entry.text_size = 1
963
inv.revision_id = "expectedid"
964
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
965
delta = [(None, "A", "A-id", a_entry)]
966
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
967
reference_inv.id_to_entry._ensure_root()
968
reference_inv.parent_id_basename_to_file_id._ensure_root()
969
new_inv.id_to_entry._ensure_root()
970
new_inv.parent_id_basename_to_file_id._ensure_root()
971
# new_inv should be the same as reference_inv.
972
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
973
self.assertEqual(reference_inv.root_id, new_inv.root_id)
974
self.assertEqual(reference_inv.id_to_entry._root_node._key,
975
new_inv.id_to_entry._root_node._key)
976
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
977
new_inv.parent_id_basename_to_file_id._root_node._key)
979
def test_iter_changes(self):
980
# Low level bootstrapping smoke test; comprehensive generic tests via
981
# InterTree are coming.
983
inv.revision_id = "revid"
984
inv.root.revision = "rootrev"
985
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
986
inv["fileid"].revision = "filerev"
987
inv["fileid"].executable = True
988
inv["fileid"].text_sha1 = "ffff"
989
inv["fileid"].text_size = 1
991
inv2.revision_id = "revid2"
992
inv2.root.revision = "rootrev"
993
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
994
inv2["fileid"].revision = "filerev2"
995
inv2["fileid"].executable = False
996
inv2["fileid"].text_sha1 = "bbbb"
997
inv2["fileid"].text_size = 2
999
chk_bytes = self.get_chk_bytes()
1000
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1001
bytes = ''.join(chk_inv.to_lines())
1002
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1003
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1004
bytes = ''.join(chk_inv2.to_lines())
1005
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1006
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1007
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1009
list(inv_1.iter_changes(inv_2)))
1011
def test_parent_id_basename_to_file_id_index_enabled(self):
1013
inv.revision_id = "revid"
1014
inv.root.revision = "rootrev"
1015
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1016
inv["fileid"].revision = "filerev"
1017
inv["fileid"].executable = True
1018
inv["fileid"].text_sha1 = "ffff"
1019
inv["fileid"].text_size = 1
1020
# get fresh objects.
1021
chk_bytes = self.get_chk_bytes()
1022
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1023
bytes = ''.join(tmp_inv.to_lines())
1024
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1025
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1027
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1028
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1030
def test_file_entry_to_bytes(self):
1031
inv = CHKInventory(None)
1032
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1033
ie.executable = True
1034
ie.revision = 'file-rev-id'
1035
ie.text_sha1 = 'abcdefgh'
1037
bytes = inv._entry_to_bytes(ie)
1038
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1039
'file-rev-id\nabcdefgh\n100\nY', bytes)
1040
ie2 = inv._bytes_to_entry(bytes)
1041
self.assertEqual(ie, ie2)
1042
self.assertIsInstance(ie2.name, unicode)
1043
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1044
inv._bytes_to_utf8name_key(bytes))
1046
def test_file2_entry_to_bytes(self):
1047
inv = CHKInventory(None)
1049
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1050
ie.executable = False
1051
ie.revision = 'file-rev-id'
1052
ie.text_sha1 = '123456'
1054
bytes = inv._entry_to_bytes(ie)
1055
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1056
'file-rev-id\n123456\n25\nN', bytes)
1057
ie2 = inv._bytes_to_entry(bytes)
1058
self.assertEqual(ie, ie2)
1059
self.assertIsInstance(ie2.name, unicode)
1060
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1061
inv._bytes_to_utf8name_key(bytes))
1063
def test_dir_entry_to_bytes(self):
1064
inv = CHKInventory(None)
1065
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1066
ie.revision = 'dir-rev-id'
1067
bytes = inv._entry_to_bytes(ie)
1068
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1069
ie2 = inv._bytes_to_entry(bytes)
1070
self.assertEqual(ie, ie2)
1071
self.assertIsInstance(ie2.name, unicode)
1072
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1073
inv._bytes_to_utf8name_key(bytes))
1075
def test_dir2_entry_to_bytes(self):
1076
inv = CHKInventory(None)
1077
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1079
ie.revision = 'dir-rev-id'
1080
bytes = inv._entry_to_bytes(ie)
1081
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1082
'dir-rev-id', bytes)
1083
ie2 = inv._bytes_to_entry(bytes)
1084
self.assertEqual(ie, ie2)
1085
self.assertIsInstance(ie2.name, unicode)
1086
self.assertIs(ie2.parent_id, None)
1087
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1088
inv._bytes_to_utf8name_key(bytes))
1090
def test_symlink_entry_to_bytes(self):
1091
inv = CHKInventory(None)
1092
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1093
ie.revision = 'link-rev-id'
1094
ie.symlink_target = u'target/path'
1095
bytes = inv._entry_to_bytes(ie)
1096
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1097
'link-rev-id\ntarget/path', bytes)
1098
ie2 = inv._bytes_to_entry(bytes)
1099
self.assertEqual(ie, ie2)
1100
self.assertIsInstance(ie2.name, unicode)
1101
self.assertIsInstance(ie2.symlink_target, unicode)
1102
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1103
inv._bytes_to_utf8name_key(bytes))
1105
def test_symlink2_entry_to_bytes(self):
1106
inv = CHKInventory(None)
1107
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1108
ie.revision = 'link-rev-id'
1109
ie.symlink_target = u'target/\u03a9path'
1110
bytes = inv._entry_to_bytes(ie)
1111
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1112
'link-rev-id\ntarget/\xce\xa9path', bytes)
1113
ie2 = inv._bytes_to_entry(bytes)
1114
self.assertEqual(ie, ie2)
1115
self.assertIsInstance(ie2.name, unicode)
1116
self.assertIsInstance(ie2.symlink_target, unicode)
1117
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1118
inv._bytes_to_utf8name_key(bytes))
1120
def test_tree_reference_entry_to_bytes(self):
1121
inv = CHKInventory(None)
1122
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1124
ie.revision = 'tree-rev-id'
1125
ie.reference_revision = 'ref-rev-id'
1126
bytes = inv._entry_to_bytes(ie)
1127
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1128
'tree-rev-id\nref-rev-id', bytes)
1129
ie2 = inv._bytes_to_entry(bytes)
1130
self.assertEqual(ie, ie2)
1131
self.assertIsInstance(ie2.name, unicode)
1132
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1133
inv._bytes_to_utf8name_key(bytes))