1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
29
from bzrlib.inventory import (
38
from bzrlib.tests import (
40
TestCaseWithTransport,
42
from bzrlib.tests.scenarios import load_tests_apply_scenarios
45
load_tests = load_tests_apply_scenarios
48
def delta_application_scenarios():
50
('Inventory', {'apply_delta':apply_inventory_Inventory}),
52
# Working tree basis delta application
53
# Repository add_inv_by_delta.
54
# Reduce form of the per_repository test logic - that logic needs to be
55
# be able to get /just/ repositories whereas these tests are fine with
56
# just creating trees.
58
for _, format in repository.format_registry.iteritems():
59
if format.supports_full_versioned_files:
60
scenarios.append((str(format.__name__), {
61
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
63
for format in workingtree.format_registry._get_all():
64
repo_fmt = format._matchingbzrdir.repository_format
65
if not repo_fmt.supports_full_versioned_files:
68
(str(format.__class__.__name__) + ".update_basis_by_delta", {
69
'apply_delta':apply_inventory_WT_basis,
72
(str(format.__class__.__name__) + ".apply_inventory_delta", {
73
'apply_delta':apply_inventory_WT,
78
def create_texts_for_inv(repo, inv):
79
for path, ie in inv.iter_entries():
81
lines = ['a' * ie.text_size]
84
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
87
def apply_inventory_Inventory(self, basis, delta):
88
"""Apply delta to basis and return the result.
90
:param basis: An inventory to be used as the basis.
91
:param delta: The inventory delta to apply:
92
:return: An inventory resulting from the application.
94
basis.apply_delta(delta)
98
def apply_inventory_WT(self, basis, delta):
99
"""Apply delta to basis and return the result.
101
This sets the tree state to be basis, and then calls apply_inventory_delta.
103
:param basis: An inventory to be used as the basis.
104
:param delta: The inventory delta to apply:
105
:return: An inventory resulting from the application.
107
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
108
control.create_repository()
109
control.create_branch()
110
tree = self.format.initialize(control)
113
tree._write_inventory(basis)
116
# Fresh object, reads disk again.
117
tree = tree.bzrdir.open_workingtree()
120
tree.apply_inventory_delta(delta)
123
# reload tree - ensure we get what was written.
124
tree = tree.bzrdir.open_workingtree()
126
self.addCleanup(tree.unlock)
127
# One could add 'tree._validate' here but that would cause 'early' failues
128
# as far as higher level code is concerned. Possibly adding an
129
# expect_fail parameter to this function and if that is False then do a
131
return tree.inventory
134
def apply_inventory_WT_basis(self, basis, delta):
135
"""Apply delta to basis and return the result.
137
This sets the parent and then calls update_basis_by_delta.
138
It also puts the basis in the repository under both 'basis' and 'result' to
139
allow safety checks made by the WT to succeed, and finally ensures that all
140
items in the delta with a new path are present in the WT before calling
141
update_basis_by_delta.
143
:param basis: An inventory to be used as the basis.
144
:param delta: The inventory delta to apply:
145
:return: An inventory resulting from the application.
147
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
148
control.create_repository()
149
control.create_branch()
150
tree = self.format.initialize(control)
153
repo = tree.branch.repository
154
repo.start_write_group()
156
rev = revision.Revision('basis', timestamp=0, timezone=None,
157
message="", committer="foo@example.com")
158
basis.revision_id = 'basis'
159
create_texts_for_inv(tree.branch.repository, basis)
160
repo.add_revision('basis', rev, basis)
161
# Add a revision for the result, with the basis content -
162
# update_basis_by_delta doesn't check that the delta results in
163
# result, and we want inconsistent deltas to get called on the
164
# tree, or else the code isn't actually checked.
165
rev = revision.Revision('result', timestamp=0, timezone=None,
166
message="", committer="foo@example.com")
167
basis.revision_id = 'result'
168
repo.add_revision('result', rev, basis)
169
repo.commit_write_group()
171
repo.abort_write_group()
173
# Set the basis state as the trees current state
174
tree._write_inventory(basis)
175
# This reads basis from the repo and puts it into the tree's local
176
# cache, if it has one.
177
tree.set_parent_ids(['basis'])
180
for old, new, id, entry in delta:
181
if None in (new, entry):
183
paths[new] = (entry.file_id, entry.kind)
184
parents.add(osutils.dirname(new))
185
parents = osutils.minimum_path_selection(parents)
187
# Put place holders in the tree to permit adding the other entries.
188
for pos, parent in enumerate(parents):
189
if not tree.path2id(parent):
190
# add a synthetic directory in the tree so we can can put the
191
# tree0 entries in place for dirstate.
192
tree.add([parent], ["id%d" % pos], ["directory"])
194
# Many deltas may cause this mini-apply to fail, but we want to see what
195
# the delta application code says, not the prep that we do to deal with
196
# limitations of dirstate's update_basis code.
197
for path, (file_id, kind) in sorted(paths.items()):
199
tree.add([path], [file_id], [kind])
200
except (KeyboardInterrupt, SystemExit):
206
# Fresh lock, reads disk again.
209
tree.update_basis_by_delta('result', delta)
212
# reload tree - ensure we get what was written.
213
tree = tree.bzrdir.open_workingtree()
214
basis_tree = tree.basis_tree()
215
basis_tree.lock_read()
216
self.addCleanup(basis_tree.unlock)
217
# Note, that if the tree does not have a local cache, the trick above of
218
# setting the result as the basis, will come back to bite us. That said,
219
# all the implementations in bzr do have a local cache.
220
return basis_tree.inventory
223
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
224
"""Apply delta to basis and return the result.
226
This inserts basis as a whole inventory and then uses
227
add_inventory_by_delta to add delta.
229
:param basis: An inventory to be used as the basis.
230
:param delta: The inventory delta to apply:
231
:return: An inventory resulting from the application.
233
format = self.format()
234
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
235
repo = format.initialize(control)
238
repo.start_write_group()
240
rev = revision.Revision('basis', timestamp=0, timezone=None,
241
message="", committer="foo@example.com")
242
basis.revision_id = 'basis'
243
create_texts_for_inv(repo, basis)
244
repo.add_revision('basis', rev, basis)
245
repo.commit_write_group()
247
repo.abort_write_group()
253
repo.start_write_group()
255
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
258
repo.abort_write_group()
261
repo.commit_write_group()
264
# Fresh lock, reads disk again.
265
repo = repo.bzrdir.open_repository()
267
self.addCleanup(repo.unlock)
268
return repo.get_inventory('result')
271
class TestInventoryUpdates(TestCase):
273
def test_creation_from_root_id(self):
274
# iff a root id is passed to the constructor, a root directory is made
275
inv = inventory.Inventory(root_id='tree-root')
276
self.assertNotEqual(None, inv.root)
277
self.assertEqual('tree-root', inv.root.file_id)
279
def test_add_path_of_root(self):
280
# if no root id is given at creation time, there is no root directory
281
inv = inventory.Inventory(root_id=None)
282
self.assertIs(None, inv.root)
283
# add a root entry by adding its path
284
ie = inv.add_path("", "directory", "my-root")
285
ie.revision = 'test-rev'
286
self.assertEqual("my-root", ie.file_id)
287
self.assertIs(ie, inv.root)
289
def test_add_path(self):
290
inv = inventory.Inventory(root_id='tree_root')
291
ie = inv.add_path('hello', 'file', 'hello-id')
292
self.assertEqual('hello-id', ie.file_id)
293
self.assertEqual('file', ie.kind)
296
"""Make sure copy() works and creates a deep copy."""
297
inv = inventory.Inventory(root_id='some-tree-root')
298
ie = inv.add_path('hello', 'file', 'hello-id')
300
inv.root.file_id = 'some-new-root'
302
self.assertEqual('some-tree-root', inv2.root.file_id)
303
self.assertEqual('hello', inv2['hello-id'].name)
305
def test_copy_empty(self):
306
"""Make sure an empty inventory can be copied."""
307
inv = inventory.Inventory(root_id=None)
309
self.assertIs(None, inv2.root)
311
def test_copy_copies_root_revision(self):
312
"""Make sure the revision of the root gets copied."""
313
inv = inventory.Inventory(root_id='someroot')
314
inv.root.revision = 'therev'
316
self.assertEquals('someroot', inv2.root.file_id)
317
self.assertEquals('therev', inv2.root.revision)
319
def test_create_tree_reference(self):
320
inv = inventory.Inventory('tree-root-123')
321
inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
322
revision='rev', reference_revision='rev2'))
324
def test_error_encoding(self):
325
inv = inventory.Inventory('tree-root')
326
inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
327
e = self.assertRaises(errors.InconsistentDelta, inv.add,
328
InventoryFile('b-id', u'\u1234', 'tree-root'))
329
self.assertContainsRe(str(e), r'\\u1234')
331
def test_add_recursive(self):
332
parent = InventoryDirectory('src-id', 'src', 'tree-root')
333
child = InventoryFile('hello-id', 'hello.c', 'src-id')
334
parent.children[child.file_id] = child
335
inv = inventory.Inventory('tree-root')
337
self.assertEqual('src/hello.c', inv.id2path('hello-id'))
341
class TestDeltaApplication(TestCaseWithTransport):
343
scenarios = delta_application_scenarios()
345
def get_empty_inventory(self, reference_inv=None):
346
"""Get an empty inventory.
348
Note that tests should not depend on the revision of the root for
349
setting up test conditions, as it has to be flexible to accomodate non
350
rich root repositories.
352
:param reference_inv: If not None, get the revision for the root from
353
this inventory. This is useful for dealing with older repositories
354
that routinely discarded the root entry data. If None, the root's
355
revision is set to 'basis'.
357
inv = inventory.Inventory()
358
if reference_inv is not None:
359
inv.root.revision = reference_inv.root.revision
361
inv.root.revision = 'basis'
364
def test_empty_delta(self):
365
inv = self.get_empty_inventory()
367
inv = self.apply_delta(self, inv, delta)
368
inv2 = self.get_empty_inventory(inv)
369
self.assertEqual([], inv2._make_delta(inv))
371
def test_None_file_id(self):
372
inv = self.get_empty_inventory()
373
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
374
dir1.revision = 'result'
375
delta = [(None, u'dir1', None, dir1)]
376
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
379
def test_unicode_file_id(self):
380
inv = self.get_empty_inventory()
381
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
382
dir1.revision = 'result'
383
delta = [(None, u'dir1', dir1.file_id, dir1)]
384
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
387
def test_repeated_file_id(self):
388
inv = self.get_empty_inventory()
389
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
390
file1.revision = 'result'
393
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
394
file2.revision = 'result'
397
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
398
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
401
def test_repeated_new_path(self):
402
inv = self.get_empty_inventory()
403
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
404
file1.revision = 'result'
407
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
408
file2.revision = 'result'
411
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
412
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
415
def test_repeated_old_path(self):
416
inv = self.get_empty_inventory()
417
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
418
file1.revision = 'result'
421
# We can't *create* a source inventory with the same path, but
422
# a badly generated partial delta might claim the same source twice.
423
# This would be buggy in two ways: the path is repeated in the delta,
424
# And the path for one of the file ids doesn't match the source
425
# location. Alternatively, we could have a repeated fileid, but that
426
# is separately checked for.
427
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
428
file2.revision = 'result'
433
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
434
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
437
def test_mismatched_id_entry_id(self):
438
inv = self.get_empty_inventory()
439
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
440
file1.revision = 'result'
443
delta = [(None, u'path', 'id', file1)]
444
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
447
def test_mismatched_new_path_entry_None(self):
448
inv = self.get_empty_inventory()
449
delta = [(None, u'path', 'id', None)]
450
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
453
def test_mismatched_new_path_None_entry(self):
454
inv = self.get_empty_inventory()
455
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
456
file1.revision = 'result'
459
delta = [(u"path", None, 'id1', file1)]
460
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
463
def test_parent_is_not_directory(self):
464
inv = self.get_empty_inventory()
465
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
466
file1.revision = 'result'
469
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
470
file2.revision = 'result'
474
delta = [(None, u'path/path2', 'id2', file2)]
475
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
478
def test_parent_is_missing(self):
479
inv = self.get_empty_inventory()
480
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
481
file2.revision = 'result'
484
delta = [(None, u'path/path2', 'id2', file2)]
485
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
488
def test_new_parent_path_has_wrong_id(self):
489
inv = self.get_empty_inventory()
490
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
491
parent1.revision = 'result'
492
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
493
parent2.revision = 'result'
494
file1 = inventory.InventoryFile('id', 'path', 'p-2')
495
file1.revision = 'result'
500
# This delta claims that file1 is at dir/path, but actually its at
501
# dir2/path if you follow the inventory parent structure.
502
delta = [(None, u'dir/path', 'id', file1)]
503
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
506
def test_old_parent_path_is_wrong(self):
507
inv = self.get_empty_inventory()
508
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
509
parent1.revision = 'result'
510
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
511
parent2.revision = 'result'
512
file1 = inventory.InventoryFile('id', 'path', 'p-2')
513
file1.revision = 'result'
519
# This delta claims that file1 was at dir/path, but actually it was at
520
# dir2/path if you follow the inventory parent structure.
521
delta = [(u'dir/path', None, 'id', None)]
522
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
525
def test_old_parent_path_is_for_other_id(self):
526
inv = self.get_empty_inventory()
527
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
528
parent1.revision = 'result'
529
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
530
parent2.revision = 'result'
531
file1 = inventory.InventoryFile('id', 'path', 'p-2')
532
file1.revision = 'result'
535
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
536
file2.revision = 'result'
543
# This delta claims that file1 was at dir/path, but actually it was at
544
# dir2/path if you follow the inventory parent structure. At dir/path
545
# is another entry we should not delete.
546
delta = [(u'dir/path', None, 'id', None)]
547
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
550
def test_add_existing_id_new_path(self):
551
inv = self.get_empty_inventory()
552
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
553
parent1.revision = 'result'
554
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
555
parent2.revision = 'result'
557
delta = [(None, u'dir2', 'p-1', parent2)]
558
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
561
def test_add_new_id_existing_path(self):
562
inv = self.get_empty_inventory()
563
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
564
parent1.revision = 'result'
565
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
566
parent2.revision = 'result'
568
delta = [(None, u'dir1', 'p-2', parent2)]
569
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
572
def test_remove_dir_leaving_dangling_child(self):
573
inv = self.get_empty_inventory()
574
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
575
dir1.revision = 'result'
576
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
577
dir2.revision = 'result'
578
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
579
dir3.revision = 'result'
583
delta = [(u'dir1', None, 'p-1', None),
584
(u'dir1/child2', None, 'p-3', None)]
585
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
589
class TestInventory(TestCase):
591
def test_is_root(self):
592
"""Ensure our root-checking code is accurate."""
593
inv = inventory.Inventory('TREE_ROOT')
594
self.assertTrue(inv.is_root('TREE_ROOT'))
595
self.assertFalse(inv.is_root('booga'))
596
inv.root.file_id = 'booga'
597
self.assertFalse(inv.is_root('TREE_ROOT'))
598
self.assertTrue(inv.is_root('booga'))
599
# works properly even if no root is set
601
self.assertFalse(inv.is_root('TREE_ROOT'))
602
self.assertFalse(inv.is_root('booga'))
604
def test_entries_for_empty_inventory(self):
605
"""Test that entries() will not fail for an empty inventory"""
606
inv = Inventory(root_id=None)
607
self.assertEqual([], inv.entries())
610
class TestInventoryEntry(TestCase):
612
def test_file_kind_character(self):
613
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
614
self.assertEqual(file.kind_character(), '')
616
def test_dir_kind_character(self):
617
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
618
self.assertEqual(dir.kind_character(), '/')
620
def test_link_kind_character(self):
621
dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
622
self.assertEqual(dir.kind_character(), '')
624
def test_dir_detect_changes(self):
625
left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
626
right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
627
self.assertEqual((False, False), left.detect_changes(right))
628
self.assertEqual((False, False), right.detect_changes(left))
630
def test_file_detect_changes(self):
631
left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
633
right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
634
right.text_sha1 = 123
635
self.assertEqual((False, False), left.detect_changes(right))
636
self.assertEqual((False, False), right.detect_changes(left))
637
left.executable = True
638
self.assertEqual((False, True), left.detect_changes(right))
639
self.assertEqual((False, True), right.detect_changes(left))
640
right.text_sha1 = 321
641
self.assertEqual((True, True), left.detect_changes(right))
642
self.assertEqual((True, True), right.detect_changes(left))
644
def test_symlink_detect_changes(self):
645
left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
646
left.symlink_target='foo'
647
right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
648
right.symlink_target='foo'
649
self.assertEqual((False, False), left.detect_changes(right))
650
self.assertEqual((False, False), right.detect_changes(left))
651
left.symlink_target = 'different'
652
self.assertEqual((True, False), left.detect_changes(right))
653
self.assertEqual((True, False), right.detect_changes(left))
655
def test_file_has_text(self):
656
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
657
self.failUnless(file.has_text())
659
def test_directory_has_text(self):
660
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
661
self.failIf(dir.has_text())
663
def test_link_has_text(self):
664
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
665
self.failIf(link.has_text())
667
def test_make_entry(self):
668
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
669
inventory.InventoryFile)
670
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
671
inventory.InventoryLink)
672
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
673
inventory.InventoryDirectory)
675
def test_make_entry_non_normalized(self):
676
orig_normalized_filename = osutils.normalized_filename
679
osutils.normalized_filename = osutils._accessible_normalized_filename
680
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
681
self.assertEqual(u'\xe5', entry.name)
682
self.assertIsInstance(entry, inventory.InventoryFile)
684
osutils.normalized_filename = osutils._inaccessible_normalized_filename
685
self.assertRaises(errors.InvalidNormalization,
686
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
688
osutils.normalized_filename = orig_normalized_filename
691
class TestDescribeChanges(TestCase):
693
def test_describe_change(self):
694
# we need to test the following change combinations:
700
# renamed/reparented and modified
701
# change kind (perhaps can't be done yet?)
702
# also, merged in combination with all of these?
703
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
704
old_a.text_sha1 = '123132'
706
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
707
new_a.text_sha1 = '123132'
710
self.assertChangeDescription('unchanged', old_a, new_a)
713
new_a.text_sha1 = 'abcabc'
714
self.assertChangeDescription('modified', old_a, new_a)
716
self.assertChangeDescription('added', None, new_a)
717
self.assertChangeDescription('removed', old_a, None)
718
# perhaps a bit questionable but seems like the most reasonable thing...
719
self.assertChangeDescription('unchanged', None, None)
721
# in this case it's both renamed and modified; show a rename and
723
new_a.name = 'newfilename'
724
self.assertChangeDescription('modified and renamed', old_a, new_a)
726
# reparenting is 'renaming'
727
new_a.name = old_a.name
728
new_a.parent_id = 'somedir-id'
729
self.assertChangeDescription('modified and renamed', old_a, new_a)
731
# reset the content values so its not modified
732
new_a.text_size = old_a.text_size
733
new_a.text_sha1 = old_a.text_sha1
734
new_a.name = old_a.name
736
new_a.name = 'newfilename'
737
self.assertChangeDescription('renamed', old_a, new_a)
739
# reparenting is 'renaming'
740
new_a.name = old_a.name
741
new_a.parent_id = 'somedir-id'
742
self.assertChangeDescription('renamed', old_a, new_a)
744
def assertChangeDescription(self, expected_change, old_ie, new_ie):
745
change = InventoryEntry.describe_change(old_ie, new_ie)
746
self.assertEqual(expected_change, change)
749
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
751
def get_chk_bytes(self):
752
factory = groupcompress.make_pack_factory(True, True, 1)
753
trans = self.get_transport('')
754
return factory(trans)
756
def read_bytes(self, chk_bytes, key):
757
stream = chk_bytes.get_record_stream([key], 'unordered', True)
758
return stream.next().get_bytes_as("fulltext")
760
def test_deserialise_gives_CHKInventory(self):
762
inv.revision_id = "revid"
763
inv.root.revision = "rootrev"
764
chk_bytes = self.get_chk_bytes()
765
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
766
bytes = ''.join(chk_inv.to_lines())
767
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
768
self.assertEqual("revid", new_inv.revision_id)
769
self.assertEqual("directory", new_inv.root.kind)
770
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
771
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
772
self.assertEqual(inv.root.name, new_inv.root.name)
773
self.assertEqual("rootrev", new_inv.root.revision)
774
self.assertEqual('plain', new_inv._search_key_name)
776
def test_deserialise_wrong_revid(self):
778
inv.revision_id = "revid"
779
inv.root.revision = "rootrev"
780
chk_bytes = self.get_chk_bytes()
781
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
782
bytes = ''.join(chk_inv.to_lines())
783
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
786
def test_captures_rev_root_byid(self):
788
inv.revision_id = "foo"
789
inv.root.revision = "bar"
790
chk_bytes = self.get_chk_bytes()
791
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
792
lines = chk_inv.to_lines()
795
'revision_id: foo\n',
796
'root_id: TREE_ROOT\n',
797
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
798
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
800
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
801
self.assertEqual('plain', chk_inv._search_key_name)
803
def test_captures_parent_id_basename_index(self):
805
inv.revision_id = "foo"
806
inv.root.revision = "bar"
807
chk_bytes = self.get_chk_bytes()
808
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
809
lines = chk_inv.to_lines()
812
'revision_id: foo\n',
813
'root_id: TREE_ROOT\n',
814
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
815
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
817
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
818
self.assertEqual('plain', chk_inv._search_key_name)
820
def test_captures_search_key_name(self):
822
inv.revision_id = "foo"
823
inv.root.revision = "bar"
824
chk_bytes = self.get_chk_bytes()
825
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
826
search_key_name='hash-16-way')
827
lines = chk_inv.to_lines()
830
'search_key_name: hash-16-way\n',
831
'root_id: TREE_ROOT\n',
832
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
833
'revision_id: foo\n',
834
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
836
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
837
self.assertEqual('hash-16-way', chk_inv._search_key_name)
839
def test_directory_children_on_demand(self):
841
inv.revision_id = "revid"
842
inv.root.revision = "rootrev"
843
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
844
inv["fileid"].revision = "filerev"
845
inv["fileid"].executable = True
846
inv["fileid"].text_sha1 = "ffff"
847
inv["fileid"].text_size = 1
848
chk_bytes = self.get_chk_bytes()
849
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
850
bytes = ''.join(chk_inv.to_lines())
851
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
852
root_entry = new_inv[inv.root.file_id]
853
self.assertEqual(None, root_entry._children)
854
self.assertEqual(['file'], root_entry.children.keys())
855
file_direct = new_inv["fileid"]
856
file_found = root_entry.children['file']
857
self.assertEqual(file_direct.kind, file_found.kind)
858
self.assertEqual(file_direct.file_id, file_found.file_id)
859
self.assertEqual(file_direct.parent_id, file_found.parent_id)
860
self.assertEqual(file_direct.name, file_found.name)
861
self.assertEqual(file_direct.revision, file_found.revision)
862
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
863
self.assertEqual(file_direct.text_size, file_found.text_size)
864
self.assertEqual(file_direct.executable, file_found.executable)
866
def test_from_inventory_maximum_size(self):
867
# from_inventory supports the maximum_size parameter.
869
inv.revision_id = "revid"
870
inv.root.revision = "rootrev"
871
chk_bytes = self.get_chk_bytes()
872
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
873
chk_inv.id_to_entry._ensure_root()
874
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
875
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
876
p_id_basename = chk_inv.parent_id_basename_to_file_id
877
p_id_basename._ensure_root()
878
self.assertEqual(120, p_id_basename._root_node.maximum_size)
879
self.assertEqual(2, p_id_basename._root_node._key_width)
881
def test___iter__(self):
883
inv.revision_id = "revid"
884
inv.root.revision = "rootrev"
885
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
886
inv["fileid"].revision = "filerev"
887
inv["fileid"].executable = True
888
inv["fileid"].text_sha1 = "ffff"
889
inv["fileid"].text_size = 1
890
chk_bytes = self.get_chk_bytes()
891
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
892
bytes = ''.join(chk_inv.to_lines())
893
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
894
fileids = list(new_inv.__iter__())
896
self.assertEqual([inv.root.file_id, "fileid"], fileids)
898
def test__len__(self):
900
inv.revision_id = "revid"
901
inv.root.revision = "rootrev"
902
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
903
inv["fileid"].revision = "filerev"
904
inv["fileid"].executable = True
905
inv["fileid"].text_sha1 = "ffff"
906
inv["fileid"].text_size = 1
907
chk_bytes = self.get_chk_bytes()
908
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
909
self.assertEqual(2, len(chk_inv))
911
def test___getitem__(self):
913
inv.revision_id = "revid"
914
inv.root.revision = "rootrev"
915
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
916
inv["fileid"].revision = "filerev"
917
inv["fileid"].executable = True
918
inv["fileid"].text_sha1 = "ffff"
919
inv["fileid"].text_size = 1
920
chk_bytes = self.get_chk_bytes()
921
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
922
bytes = ''.join(chk_inv.to_lines())
923
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
924
root_entry = new_inv[inv.root.file_id]
925
file_entry = new_inv["fileid"]
926
self.assertEqual("directory", root_entry.kind)
927
self.assertEqual(inv.root.file_id, root_entry.file_id)
928
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
929
self.assertEqual(inv.root.name, root_entry.name)
930
self.assertEqual("rootrev", root_entry.revision)
931
self.assertEqual("file", file_entry.kind)
932
self.assertEqual("fileid", file_entry.file_id)
933
self.assertEqual(inv.root.file_id, file_entry.parent_id)
934
self.assertEqual("file", file_entry.name)
935
self.assertEqual("filerev", file_entry.revision)
936
self.assertEqual("ffff", file_entry.text_sha1)
937
self.assertEqual(1, file_entry.text_size)
938
self.assertEqual(True, file_entry.executable)
939
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
941
def test_has_id_true(self):
943
inv.revision_id = "revid"
944
inv.root.revision = "rootrev"
945
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
946
inv["fileid"].revision = "filerev"
947
inv["fileid"].executable = True
948
inv["fileid"].text_sha1 = "ffff"
949
inv["fileid"].text_size = 1
950
chk_bytes = self.get_chk_bytes()
951
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
952
self.assertTrue(chk_inv.has_id('fileid'))
953
self.assertTrue(chk_inv.has_id(inv.root.file_id))
955
def test_has_id_not(self):
957
inv.revision_id = "revid"
958
inv.root.revision = "rootrev"
959
chk_bytes = self.get_chk_bytes()
960
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
961
self.assertFalse(chk_inv.has_id('fileid'))
963
def test_id2path(self):
965
inv.revision_id = "revid"
966
inv.root.revision = "rootrev"
967
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
968
fileentry = InventoryFile("fileid", "file", "dirid")
971
inv["fileid"].revision = "filerev"
972
inv["fileid"].executable = True
973
inv["fileid"].text_sha1 = "ffff"
974
inv["fileid"].text_size = 1
975
inv["dirid"].revision = "filerev"
976
chk_bytes = self.get_chk_bytes()
977
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
978
bytes = ''.join(chk_inv.to_lines())
979
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
980
self.assertEqual('', new_inv.id2path(inv.root.file_id))
981
self.assertEqual('dir', new_inv.id2path('dirid'))
982
self.assertEqual('dir/file', new_inv.id2path('fileid'))
984
def test_path2id(self):
986
inv.revision_id = "revid"
987
inv.root.revision = "rootrev"
988
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
989
fileentry = InventoryFile("fileid", "file", "dirid")
992
inv["fileid"].revision = "filerev"
993
inv["fileid"].executable = True
994
inv["fileid"].text_sha1 = "ffff"
995
inv["fileid"].text_size = 1
996
inv["dirid"].revision = "filerev"
997
chk_bytes = self.get_chk_bytes()
998
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
999
bytes = ''.join(chk_inv.to_lines())
1000
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1001
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1002
self.assertEqual('dirid', new_inv.path2id('dir'))
1003
self.assertEqual('fileid', new_inv.path2id('dir/file'))
1005
def test_create_by_apply_delta_sets_root(self):
1007
inv.revision_id = "revid"
1008
chk_bytes = self.get_chk_bytes()
1009
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1010
inv.add_path("", "directory", "myrootid", None)
1011
inv.revision_id = "expectedid"
1012
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1013
delta = [("", None, base_inv.root.file_id, None),
1014
(None, "", "myrootid", inv.root)]
1015
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1016
self.assertEquals(reference_inv.root, new_inv.root)
1018
def test_create_by_apply_delta_empty_add_child(self):
1020
inv.revision_id = "revid"
1021
inv.root.revision = "rootrev"
1022
chk_bytes = self.get_chk_bytes()
1023
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1024
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1025
a_entry.revision = "filerev"
1026
a_entry.executable = True
1027
a_entry.text_sha1 = "ffff"
1028
a_entry.text_size = 1
1030
inv.revision_id = "expectedid"
1031
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1032
delta = [(None, "A", "A-id", a_entry)]
1033
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1034
# new_inv should be the same as reference_inv.
1035
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1036
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1037
reference_inv.id_to_entry._ensure_root()
1038
new_inv.id_to_entry._ensure_root()
1039
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1040
new_inv.id_to_entry._root_node._key)
1042
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1044
inv.revision_id = "revid"
1045
inv.root.revision = "rootrev"
1046
chk_bytes = self.get_chk_bytes()
1047
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1048
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1049
a_entry.revision = "filerev"
1050
a_entry.executable = True
1051
a_entry.text_sha1 = "ffff"
1052
a_entry.text_size = 1
1054
inv.revision_id = "expectedid"
1055
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1056
delta = [(None, "A", "A-id", a_entry)]
1057
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1058
reference_inv.id_to_entry._ensure_root()
1059
reference_inv.parent_id_basename_to_file_id._ensure_root()
1060
new_inv.id_to_entry._ensure_root()
1061
new_inv.parent_id_basename_to_file_id._ensure_root()
1062
# new_inv should be the same as reference_inv.
1063
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1064
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1065
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1066
new_inv.id_to_entry._root_node._key)
1067
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1068
new_inv.parent_id_basename_to_file_id._root_node._key)
1070
def test_iter_changes(self):
1071
# Low level bootstrapping smoke test; comprehensive generic tests via
1072
# InterTree are coming.
1074
inv.revision_id = "revid"
1075
inv.root.revision = "rootrev"
1076
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1077
inv["fileid"].revision = "filerev"
1078
inv["fileid"].executable = True
1079
inv["fileid"].text_sha1 = "ffff"
1080
inv["fileid"].text_size = 1
1082
inv2.revision_id = "revid2"
1083
inv2.root.revision = "rootrev"
1084
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1085
inv2["fileid"].revision = "filerev2"
1086
inv2["fileid"].executable = False
1087
inv2["fileid"].text_sha1 = "bbbb"
1088
inv2["fileid"].text_size = 2
1089
# get fresh objects.
1090
chk_bytes = self.get_chk_bytes()
1091
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1092
bytes = ''.join(chk_inv.to_lines())
1093
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1094
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1095
bytes = ''.join(chk_inv2.to_lines())
1096
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1097
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1098
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1100
list(inv_1.iter_changes(inv_2)))
1102
def test_parent_id_basename_to_file_id_index_enabled(self):
1104
inv.revision_id = "revid"
1105
inv.root.revision = "rootrev"
1106
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1107
inv["fileid"].revision = "filerev"
1108
inv["fileid"].executable = True
1109
inv["fileid"].text_sha1 = "ffff"
1110
inv["fileid"].text_size = 1
1111
# get fresh objects.
1112
chk_bytes = self.get_chk_bytes()
1113
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1114
bytes = ''.join(tmp_inv.to_lines())
1115
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1116
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1118
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1119
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1121
def test_file_entry_to_bytes(self):
1122
inv = CHKInventory(None)
1123
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1124
ie.executable = True
1125
ie.revision = 'file-rev-id'
1126
ie.text_sha1 = 'abcdefgh'
1128
bytes = inv._entry_to_bytes(ie)
1129
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1130
'file-rev-id\nabcdefgh\n100\nY', bytes)
1131
ie2 = inv._bytes_to_entry(bytes)
1132
self.assertEqual(ie, ie2)
1133
self.assertIsInstance(ie2.name, unicode)
1134
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1135
inv._bytes_to_utf8name_key(bytes))
1137
def test_file2_entry_to_bytes(self):
1138
inv = CHKInventory(None)
1140
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1141
ie.executable = False
1142
ie.revision = 'file-rev-id'
1143
ie.text_sha1 = '123456'
1145
bytes = inv._entry_to_bytes(ie)
1146
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1147
'file-rev-id\n123456\n25\nN', bytes)
1148
ie2 = inv._bytes_to_entry(bytes)
1149
self.assertEqual(ie, ie2)
1150
self.assertIsInstance(ie2.name, unicode)
1151
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1152
inv._bytes_to_utf8name_key(bytes))
1154
def test_dir_entry_to_bytes(self):
1155
inv = CHKInventory(None)
1156
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1157
ie.revision = 'dir-rev-id'
1158
bytes = inv._entry_to_bytes(ie)
1159
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1160
ie2 = inv._bytes_to_entry(bytes)
1161
self.assertEqual(ie, ie2)
1162
self.assertIsInstance(ie2.name, unicode)
1163
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1164
inv._bytes_to_utf8name_key(bytes))
1166
def test_dir2_entry_to_bytes(self):
1167
inv = CHKInventory(None)
1168
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1170
ie.revision = 'dir-rev-id'
1171
bytes = inv._entry_to_bytes(ie)
1172
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1173
'dir-rev-id', bytes)
1174
ie2 = inv._bytes_to_entry(bytes)
1175
self.assertEqual(ie, ie2)
1176
self.assertIsInstance(ie2.name, unicode)
1177
self.assertIs(ie2.parent_id, None)
1178
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1179
inv._bytes_to_utf8name_key(bytes))
1181
def test_symlink_entry_to_bytes(self):
1182
inv = CHKInventory(None)
1183
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1184
ie.revision = 'link-rev-id'
1185
ie.symlink_target = u'target/path'
1186
bytes = inv._entry_to_bytes(ie)
1187
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1188
'link-rev-id\ntarget/path', bytes)
1189
ie2 = inv._bytes_to_entry(bytes)
1190
self.assertEqual(ie, ie2)
1191
self.assertIsInstance(ie2.name, unicode)
1192
self.assertIsInstance(ie2.symlink_target, unicode)
1193
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1194
inv._bytes_to_utf8name_key(bytes))
1196
def test_symlink2_entry_to_bytes(self):
1197
inv = CHKInventory(None)
1198
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1199
ie.revision = 'link-rev-id'
1200
ie.symlink_target = u'target/\u03a9path'
1201
bytes = inv._entry_to_bytes(ie)
1202
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1203
'link-rev-id\ntarget/\xce\xa9path', bytes)
1204
ie2 = inv._bytes_to_entry(bytes)
1205
self.assertEqual(ie, ie2)
1206
self.assertIsInstance(ie2.name, unicode)
1207
self.assertIsInstance(ie2.symlink_target, unicode)
1208
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1209
inv._bytes_to_utf8name_key(bytes))
1211
def test_tree_reference_entry_to_bytes(self):
1212
inv = CHKInventory(None)
1213
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1215
ie.revision = 'tree-rev-id'
1216
ie.reference_revision = 'ref-rev-id'
1217
bytes = inv._entry_to_bytes(ie)
1218
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1219
'tree-rev-id\nref-rev-id', bytes)
1220
ie2 = inv._bytes_to_entry(bytes)
1221
self.assertEqual(ie, ie2)
1222
self.assertIsInstance(ie2.name, unicode)
1223
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1224
inv._bytes_to_utf8name_key(bytes))
1226
def make_basic_utf8_inventory(self):
1228
inv.revision_id = "revid"
1229
inv.root.revision = "rootrev"
1230
root_id = inv.root.file_id
1231
inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1232
inv["fileid"].revision = "filerev"
1233
inv["fileid"].text_sha1 = "ffff"
1234
inv["fileid"].text_size = 0
1235
inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1236
inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1237
inv["childid"].revision = "filerev"
1238
inv["childid"].text_sha1 = "ffff"
1239
inv["childid"].text_size = 0
1240
chk_bytes = self.get_chk_bytes()
1241
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1242
bytes = ''.join(chk_inv.to_lines())
1243
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1245
def test__preload_handles_utf8(self):
1246
new_inv = self.make_basic_utf8_inventory()
1247
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1248
self.assertFalse(new_inv._fully_cached)
1249
new_inv._preload_cache()
1251
sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1252
sorted(new_inv._fileid_to_entry_cache.keys()))
1253
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1254
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1255
sorted(ie_root._children.keys()))
1256
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1257
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1259
def test__preload_populates_cache(self):
1261
inv.revision_id = "revid"
1262
inv.root.revision = "rootrev"
1263
root_id = inv.root.file_id
1264
inv.add(InventoryFile("fileid", "file", root_id))
1265
inv["fileid"].revision = "filerev"
1266
inv["fileid"].executable = True
1267
inv["fileid"].text_sha1 = "ffff"
1268
inv["fileid"].text_size = 1
1269
inv.add(InventoryDirectory("dirid", "dir", root_id))
1270
inv.add(InventoryFile("childid", "child", "dirid"))
1271
inv["childid"].revision = "filerev"
1272
inv["childid"].executable = False
1273
inv["childid"].text_sha1 = "dddd"
1274
inv["childid"].text_size = 1
1275
chk_bytes = self.get_chk_bytes()
1276
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1277
bytes = ''.join(chk_inv.to_lines())
1278
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1279
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1280
self.assertFalse(new_inv._fully_cached)
1281
new_inv._preload_cache()
1283
sorted([root_id, "fileid", "dirid", "childid"]),
1284
sorted(new_inv._fileid_to_entry_cache.keys()))
1285
self.assertTrue(new_inv._fully_cached)
1286
ie_root = new_inv._fileid_to_entry_cache[root_id]
1287
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1288
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1289
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1291
def test__preload_handles_partially_evaluated_inventory(self):
1292
new_inv = self.make_basic_utf8_inventory()
1293
ie = new_inv[new_inv.root_id]
1294
self.assertIs(None, ie._children)
1295
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1296
sorted(ie.children.keys()))
1297
# Accessing .children loads _children
1298
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1299
sorted(ie._children.keys()))
1300
new_inv._preload_cache()
1302
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1303
sorted(ie._children.keys()))
1304
ie_dir = new_inv["dirid"]
1305
self.assertEqual([u'ch\xefld'],
1306
sorted(ie_dir._children.keys()))
1309
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1311
def get_chk_bytes(self):
1312
factory = groupcompress.make_pack_factory(True, True, 1)
1313
trans = self.get_transport('')
1314
return factory(trans)
1316
def make_dir(self, inv, name, parent_id):
1317
inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1319
def make_file(self, inv, name, parent_id, content='content\n'):
1320
ie = inv.make_entry('file', name, parent_id, name + '-id')
1321
ie.text_sha1 = osutils.sha_string(content)
1322
ie.text_size = len(content)
1325
def make_simple_inventory(self):
1326
inv = Inventory('TREE_ROOT')
1327
inv.revision_id = "revid"
1328
inv.root.revision = "rootrev"
1331
# sub-file1 sub-file1-id
1332
# sub-file2 sub-file2-id
1333
# sub-dir1/ sub-dir1-id
1334
# subsub-file1 subsub-file1-id
1336
# sub2-file1 sub2-file1-id
1338
self.make_dir(inv, 'dir1', 'TREE_ROOT')
1339
self.make_dir(inv, 'dir2', 'TREE_ROOT')
1340
self.make_dir(inv, 'sub-dir1', 'dir1-id')
1341
self.make_file(inv, 'top', 'TREE_ROOT')
1342
self.make_file(inv, 'sub-file1', 'dir1-id')
1343
self.make_file(inv, 'sub-file2', 'dir1-id')
1344
self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1345
self.make_file(inv, 'sub2-file1', 'dir2-id')
1346
chk_bytes = self.get_chk_bytes()
1347
# use a small maximum_size to force internal paging structures
1348
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1350
search_key_name='hash-255-way')
1351
bytes = ''.join(chk_inv.to_lines())
1352
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1354
def assert_Getitems(self, expected_fileids, inv, file_ids):
1355
self.assertEqual(sorted(expected_fileids),
1356
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1358
def assertExpand(self, all_ids, inv, file_ids):
1360
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1361
self.assertEqual(set(all_ids), val_all_ids)
1362
entries = inv._getitems(val_all_ids)
1363
expected_children = {}
1364
for entry in entries:
1365
s = expected_children.setdefault(entry.parent_id, [])
1366
s.append(entry.file_id)
1367
val_children = dict((k, sorted(v)) for k, v
1368
in val_children.iteritems())
1369
expected_children = dict((k, sorted(v)) for k, v
1370
in expected_children.iteritems())
1371
self.assertEqual(expected_children, val_children)
1373
def test_make_simple_inventory(self):
1374
inv = self.make_simple_inventory()
1376
for path, entry in inv.iter_entries_by_dir():
1377
layout.append((path, entry.file_id))
1380
('dir1', 'dir1-id'),
1381
('dir2', 'dir2-id'),
1383
('dir1/sub-dir1', 'sub-dir1-id'),
1384
('dir1/sub-file1', 'sub-file1-id'),
1385
('dir1/sub-file2', 'sub-file2-id'),
1386
('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1387
('dir2/sub2-file1', 'sub2-file1-id'),
1390
def test__getitems(self):
1391
inv = self.make_simple_inventory()
1393
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1394
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1395
self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1397
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1399
self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1400
['dir1-id', 'sub-file2-id'])
1401
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1402
self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1404
def test_single_file(self):
1405
inv = self.make_simple_inventory()
1406
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1408
def test_get_all_parents(self):
1409
inv = self.make_simple_inventory()
1410
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1412
], inv, ['subsub-file1-id'])
1414
def test_get_children(self):
1415
inv = self.make_simple_inventory()
1416
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1417
'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1418
], inv, ['dir1-id'])
1420
def test_from_root(self):
1421
inv = self.make_simple_inventory()
1422
self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1423
'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1424
'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1426
def test_top_level_file(self):
1427
inv = self.make_simple_inventory()
1428
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1430
def test_subsub_file(self):
1431
inv = self.make_simple_inventory()
1432
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1433
'subsub-file1-id'], inv, ['subsub-file1-id'])
1435
def test_sub_and_root(self):
1436
inv = self.make_simple_inventory()
1437
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1438
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])