1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
31
from ..bzr.inventory import (
39
mutable_inventory_from_tree,
43
TestCaseWithTransport,
45
from .scenarios import load_tests_apply_scenarios
48
load_tests = load_tests_apply_scenarios
51
def delta_application_scenarios():
53
('Inventory', {'apply_delta':apply_inventory_Inventory}),
55
# Working tree basis delta application
56
# Repository add_inv_by_delta.
57
# Reduce form of the per_repository test logic - that logic needs to be
58
# be able to get /just/ repositories whereas these tests are fine with
59
# just creating trees.
61
for _, format in repository.format_registry.iteritems():
62
if format.supports_full_versioned_files:
63
scenarios.append((str(format.__name__), {
64
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
66
for format in workingtree.format_registry._get_all():
67
repo_fmt = format._matchingbzrdir.repository_format
68
if not repo_fmt.supports_full_versioned_files:
71
(str(format.__class__.__name__) + ".update_basis_by_delta", {
72
'apply_delta':apply_inventory_WT_basis,
75
(str(format.__class__.__name__) + ".apply_inventory_delta", {
76
'apply_delta':apply_inventory_WT,
81
def create_texts_for_inv(repo, inv):
82
for path, ie in inv.iter_entries():
84
lines = ['a' * ie.text_size]
87
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
90
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
91
"""Apply delta to basis and return the result.
93
:param basis: An inventory to be used as the basis.
94
:param delta: The inventory delta to apply:
95
:return: An inventory resulting from the application.
97
basis.apply_delta(delta)
101
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
102
"""Apply delta to basis and return the result.
104
This sets the tree state to be basis, and then calls apply_inventory_delta.
106
:param basis: An inventory to be used as the basis.
107
:param delta: The inventory delta to apply:
108
:return: An inventory resulting from the application.
110
control = self.make_controldir('tree', format=self.format._matchingbzrdir)
111
control.create_repository()
112
control.create_branch()
113
tree = self.format.initialize(control)
116
tree._write_inventory(basis)
119
# Fresh object, reads disk again.
120
tree = tree.controldir.open_workingtree()
123
tree.apply_inventory_delta(delta)
126
# reload tree - ensure we get what was written.
127
tree = tree.controldir.open_workingtree()
129
self.addCleanup(tree.unlock)
130
if not invalid_delta:
132
return tree.root_inventory
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
136
repo.start_write_group()
138
rev = revision.Revision('basis', timestamp=0, timezone=None,
139
message="", committer="foo@example.com")
140
basis.revision_id = 'basis'
141
create_texts_for_inv(repo, basis)
142
repo.add_revision('basis', rev, basis)
144
# We don't want to apply the delta to the basis, because we expect
145
# the delta is invalid.
147
result_inv.revision_id = 'result'
148
target_entries = None
150
result_inv = basis.create_by_apply_delta(delta, 'result')
151
create_texts_for_inv(repo, result_inv)
152
target_entries = list(result_inv.iter_entries_by_dir())
153
rev = revision.Revision('result', timestamp=0, timezone=None,
154
message="", committer="foo@example.com")
155
repo.add_revision('result', rev, result_inv)
156
repo.commit_write_group()
158
repo.abort_write_group()
160
return target_entries
163
def _get_basis_entries(tree):
164
basis_tree = tree.basis_tree()
165
basis_tree.lock_read()
166
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
168
return basis_tree_entries
171
def _populate_different_tree(tree, basis, delta):
172
"""Put all entries into tree, but at a unique location."""
175
tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
176
for path, ie in basis.iter_entries_by_dir():
177
if ie.file_id in added_ids:
179
# We want a unique path for each of these, we use the file-id
180
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
181
added_ids.add(ie.file_id)
182
for old_path, new_path, file_id, ie in delta:
183
if file_id in added_ids:
185
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
189
"""Apply delta to basis and return the result.
191
This sets the parent and then calls update_basis_by_delta.
192
It also puts the basis in the repository under both 'basis' and 'result' to
193
allow safety checks made by the WT to succeed, and finally ensures that all
194
items in the delta with a new path are present in the WT before calling
195
update_basis_by_delta.
197
:param basis: An inventory to be used as the basis.
198
:param delta: The inventory delta to apply:
199
:return: An inventory resulting from the application.
201
control = test.make_controldir('tree', format=test.format._matchingbzrdir)
202
control.create_repository()
203
control.create_branch()
204
tree = test.format.initialize(control)
207
target_entries = _create_repo_revisions(tree.branch.repository, basis,
208
delta, invalid_delta)
209
# Set the basis state as the trees current state
210
tree._write_inventory(basis)
211
# This reads basis from the repo and puts it into the tree's local
212
# cache, if it has one.
213
tree.set_parent_ids(['basis'])
216
# Fresh lock, reads disk again.
219
tree.update_basis_by_delta('result', delta)
220
if not invalid_delta:
224
# reload tree - ensure we get what was written.
225
tree = tree.controldir.open_workingtree()
226
basis_tree = tree.basis_tree()
227
basis_tree.lock_read()
228
test.addCleanup(basis_tree.unlock)
229
basis_inv = basis_tree.root_inventory
231
basis_entries = list(basis_inv.iter_entries_by_dir())
232
test.assertEqual(target_entries, basis_entries)
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
238
"""Apply delta to basis and return the result.
240
This inserts basis as a whole inventory and then uses
241
add_inventory_by_delta to add delta.
243
:param basis: An inventory to be used as the basis.
244
:param delta: The inventory delta to apply:
245
:return: An inventory resulting from the application.
247
format = self.format()
248
control = self.make_controldir('tree', format=format._matchingbzrdir)
249
repo = format.initialize(control)
252
repo.start_write_group()
254
rev = revision.Revision('basis', timestamp=0, timezone=None,
255
message="", committer="foo@example.com")
256
basis.revision_id = 'basis'
257
create_texts_for_inv(repo, basis)
258
repo.add_revision('basis', rev, basis)
259
repo.commit_write_group()
261
repo.abort_write_group()
267
repo.start_write_group()
269
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
272
repo.abort_write_group()
275
repo.commit_write_group()
278
# Fresh lock, reads disk again.
279
repo = repo.controldir.open_repository()
281
self.addCleanup(repo.unlock)
282
return repo.get_inventory('result')
285
class TestInventoryUpdates(TestCase):
287
def test_creation_from_root_id(self):
288
# iff a root id is passed to the constructor, a root directory is made
289
inv = inventory.Inventory(root_id=b'tree-root')
290
self.assertNotEqual(None, inv.root)
291
self.assertEqual(b'tree-root', inv.root.file_id)
293
def test_add_path_of_root(self):
294
# if no root id is given at creation time, there is no root directory
295
inv = inventory.Inventory(root_id=None)
296
self.assertIs(None, inv.root)
297
# add a root entry by adding its path
298
ie = inv.add_path(u"", "directory", b"my-root")
299
ie.revision = b'test-rev'
300
self.assertEqual(b"my-root", ie.file_id)
301
self.assertIs(ie, inv.root)
303
def test_add_path(self):
304
inv = inventory.Inventory(root_id=b'tree_root')
305
ie = inv.add_path(u'hello', 'file', b'hello-id')
306
self.assertEqual(b'hello-id', ie.file_id)
307
self.assertEqual('file', ie.kind)
310
"""Make sure copy() works and creates a deep copy."""
311
inv = inventory.Inventory(root_id=b'some-tree-root')
312
ie = inv.add_path(u'hello', 'file', b'hello-id')
314
inv.root.file_id = b'some-new-root'
316
self.assertEqual(b'some-tree-root', inv2.root.file_id)
317
self.assertEqual(u'hello', inv2[b'hello-id'].name)
319
def test_copy_empty(self):
320
"""Make sure an empty inventory can be copied."""
321
inv = inventory.Inventory(root_id=None)
323
self.assertIs(None, inv2.root)
325
def test_copy_copies_root_revision(self):
326
"""Make sure the revision of the root gets copied."""
327
inv = inventory.Inventory(root_id=b'someroot')
328
inv.root.revision = b'therev'
330
self.assertEqual(b'someroot', inv2.root.file_id)
331
self.assertEqual(b'therev', inv2.root.revision)
333
def test_create_tree_reference(self):
334
inv = inventory.Inventory(b'tree-root-123')
335
inv.add(TreeReference(
336
b'nested-id', 'nested', parent_id=b'tree-root-123',
337
revision=b'rev', reference_revision=b'rev2'))
339
def test_error_encoding(self):
340
inv = inventory.Inventory('tree-root')
341
inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
342
e = self.assertRaises(errors.InconsistentDelta, inv.add,
343
InventoryFile('b-id', u'\u1234', 'tree-root'))
344
self.assertContainsRe(str(e), r'\\u1234')
346
def test_add_recursive(self):
347
parent = InventoryDirectory('src-id', 'src', 'tree-root')
348
child = InventoryFile('hello-id', 'hello.c', 'src-id')
349
parent.children[child.file_id] = child
350
inv = inventory.Inventory('tree-root')
352
self.assertEqual('src/hello.c', inv.id2path('hello-id'))
356
class TestDeltaApplication(TestCaseWithTransport):
358
scenarios = delta_application_scenarios()
360
def get_empty_inventory(self, reference_inv=None):
361
"""Get an empty inventory.
363
Note that tests should not depend on the revision of the root for
364
setting up test conditions, as it has to be flexible to accomodate non
365
rich root repositories.
367
:param reference_inv: If not None, get the revision for the root from
368
this inventory. This is useful for dealing with older repositories
369
that routinely discarded the root entry data. If None, the root's
370
revision is set to 'basis'.
372
inv = inventory.Inventory()
373
if reference_inv is not None:
374
inv.root.revision = reference_inv.root.revision
376
inv.root.revision = 'basis'
379
def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
380
ie_file = inventory.InventoryFile(file_id, name, parent_id)
381
ie_file.revision = 'result'
382
ie_file.text_size = 0
383
ie_file.text_sha1 = ''
386
def test_empty_delta(self):
387
inv = self.get_empty_inventory()
389
inv = self.apply_delta(self, inv, delta)
390
inv2 = self.get_empty_inventory(inv)
391
self.assertEqual([], inv2._make_delta(inv))
393
def test_None_file_id(self):
394
inv = self.get_empty_inventory()
395
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
396
dir1.revision = 'result'
397
delta = [(None, u'dir1', None, dir1)]
398
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
401
def test_unicode_file_id(self):
402
inv = self.get_empty_inventory()
403
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
404
dir1.revision = 'result'
405
delta = [(None, u'dir1', dir1.file_id, dir1)]
406
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
409
def test_repeated_file_id(self):
410
inv = self.get_empty_inventory()
411
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
412
file1.revision = 'result'
417
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
418
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
421
def test_repeated_new_path(self):
422
inv = self.get_empty_inventory()
423
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
424
file1.revision = 'result'
428
file2.file_id = 'id2'
429
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
430
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
433
def test_repeated_old_path(self):
434
inv = self.get_empty_inventory()
435
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
436
file1.revision = 'result'
439
# We can't *create* a source inventory with the same path, but
440
# a badly generated partial delta might claim the same source twice.
441
# This would be buggy in two ways: the path is repeated in the delta,
442
# And the path for one of the file ids doesn't match the source
443
# location. Alternatively, we could have a repeated fileid, but that
444
# is separately checked for.
445
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
446
file2.revision = 'result'
451
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
452
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
455
def test_mismatched_id_entry_id(self):
456
inv = self.get_empty_inventory()
457
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
458
file1.revision = 'result'
461
delta = [(None, u'path', 'id', file1)]
462
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
465
def test_mismatched_new_path_entry_None(self):
466
inv = self.get_empty_inventory()
467
delta = [(None, u'path', 'id', None)]
468
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
471
def test_mismatched_new_path_None_entry(self):
472
inv = self.get_empty_inventory()
473
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
474
file1.revision = 'result'
477
delta = [(u"path", None, 'id1', file1)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_parent_is_not_directory(self):
482
inv = self.get_empty_inventory()
483
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
484
file1.revision = 'result'
487
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
488
file2.revision = 'result'
492
delta = [(None, u'path/path2', 'id2', file2)]
493
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
496
def test_parent_is_missing(self):
497
inv = self.get_empty_inventory()
498
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
499
file2.revision = 'result'
502
delta = [(None, u'path/path2', 'id2', file2)]
503
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
506
def test_new_parent_path_has_wrong_id(self):
507
inv = self.get_empty_inventory()
508
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
509
parent1.revision = 'result'
510
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
511
parent2.revision = 'result'
512
file1 = inventory.InventoryFile('id', 'path', 'p-2')
513
file1.revision = 'result'
518
# This delta claims that file1 is at dir/path, but actually its at
519
# dir2/path if you follow the inventory parent structure.
520
delta = [(None, u'dir/path', 'id', file1)]
521
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
524
def test_old_parent_path_is_wrong(self):
525
inv = self.get_empty_inventory()
526
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
527
parent1.revision = 'result'
528
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
529
parent2.revision = 'result'
530
file1 = inventory.InventoryFile('id', 'path', 'p-2')
531
file1.revision = 'result'
537
# This delta claims that file1 was at dir/path, but actually it was at
538
# dir2/path if you follow the inventory parent structure.
539
delta = [(u'dir/path', None, 'id', None)]
540
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
543
def test_old_parent_path_is_for_other_id(self):
544
inv = self.get_empty_inventory()
545
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
546
parent1.revision = 'result'
547
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
548
parent2.revision = 'result'
549
file1 = inventory.InventoryFile('id', 'path', 'p-2')
550
file1.revision = 'result'
553
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
554
file2.revision = 'result'
561
# This delta claims that file1 was at dir/path, but actually it was at
562
# dir2/path if you follow the inventory parent structure. At dir/path
563
# is another entry we should not delete.
564
delta = [(u'dir/path', None, 'id', None)]
565
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
568
def test_add_existing_id_new_path(self):
569
inv = self.get_empty_inventory()
570
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
571
parent1.revision = 'result'
572
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
573
parent2.revision = 'result'
575
delta = [(None, u'dir2', 'p-1', parent2)]
576
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
579
def test_add_new_id_existing_path(self):
580
inv = self.get_empty_inventory()
581
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
582
parent1.revision = 'result'
583
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
584
parent2.revision = 'result'
586
delta = [(None, u'dir1', 'p-2', parent2)]
587
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
590
def test_remove_dir_leaving_dangling_child(self):
591
inv = self.get_empty_inventory()
592
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
593
dir1.revision = 'result'
594
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
595
dir2.revision = 'result'
596
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
597
dir3.revision = 'result'
601
delta = [(u'dir1', None, 'p-1', None),
602
(u'dir1/child2', None, 'p-3', None)]
603
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
606
def test_add_file(self):
607
inv = self.get_empty_inventory()
608
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
609
file1.revision = 'result'
612
delta = [(None, u'path', 'file-id', file1)]
613
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
614
self.assertEqual('file-id', res_inv['file-id'].file_id)
616
def test_remove_file(self):
617
inv = self.get_empty_inventory()
618
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
619
file1.revision = 'result'
623
delta = [(u'path', None, 'file-id', None)]
624
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
625
self.assertEqual(None, res_inv.path2id('path'))
626
self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
628
def test_rename_file(self):
629
inv = self.get_empty_inventory()
630
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
632
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
633
delta = [(u'path', 'path2', 'file-id', file2)]
634
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
635
self.assertEqual(None, res_inv.path2id('path'))
636
self.assertEqual('file-id', res_inv.path2id('path2'))
638
def test_replaced_at_new_path(self):
639
inv = self.get_empty_inventory()
640
file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
642
file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
643
delta = [(u'name', None, 'id1', None),
644
(None, u'name', 'id2', file2)]
645
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
646
self.assertEqual('id2', res_inv.path2id('name'))
648
def test_rename_dir(self):
649
inv = self.get_empty_inventory()
650
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
651
dir1.revision = 'basis'
652
file1 = self.make_file_ie(parent_id='dir-id')
655
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
656
dir2.revision = 'result'
657
delta = [('dir1', 'dir2', 'dir-id', dir2)]
658
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
659
# The file should be accessible under the new path
660
self.assertEqual('file-id', res_inv.path2id('dir2/name'))
662
def test_renamed_dir_with_renamed_child(self):
663
inv = self.get_empty_inventory()
664
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
665
dir1.revision = 'basis'
666
file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
667
file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
671
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
672
dir2.revision = 'result'
673
file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
674
delta = [('dir1', 'dir2', 'dir-id', dir2),
675
('dir1/name2', 'name2', 'file-id-2', file2b)]
676
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
677
# The file should be accessible under the new path
678
self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
679
self.assertEqual(None, res_inv.path2id('dir2/name2'))
680
self.assertEqual('file-id-2', res_inv.path2id('name2'))
682
def test_is_root(self):
683
"""Ensure our root-checking code is accurate."""
684
inv = inventory.Inventory('TREE_ROOT')
685
self.assertTrue(inv.is_root('TREE_ROOT'))
686
self.assertFalse(inv.is_root('booga'))
687
inv.root.file_id = 'booga'
688
self.assertFalse(inv.is_root('TREE_ROOT'))
689
self.assertTrue(inv.is_root('booga'))
690
# works properly even if no root is set
692
self.assertFalse(inv.is_root('TREE_ROOT'))
693
self.assertFalse(inv.is_root('booga'))
695
def test_entries_for_empty_inventory(self):
696
"""Test that entries() will not fail for an empty inventory"""
697
inv = Inventory(root_id=None)
698
self.assertEqual([], inv.entries())
701
class TestInventoryEntry(TestCase):
703
def test_file_kind_character(self):
704
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
705
self.assertEqual(file.kind_character(), '')
707
def test_dir_kind_character(self):
708
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
709
self.assertEqual(dir.kind_character(), '/')
711
def test_link_kind_character(self):
712
dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
713
self.assertEqual(dir.kind_character(), '')
715
def test_dir_detect_changes(self):
716
left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
717
right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
718
self.assertEqual((False, False), left.detect_changes(right))
719
self.assertEqual((False, False), right.detect_changes(left))
721
def test_file_detect_changes(self):
722
left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
724
right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
725
right.text_sha1 = 123
726
self.assertEqual((False, False), left.detect_changes(right))
727
self.assertEqual((False, False), right.detect_changes(left))
728
left.executable = True
729
self.assertEqual((False, True), left.detect_changes(right))
730
self.assertEqual((False, True), right.detect_changes(left))
731
right.text_sha1 = 321
732
self.assertEqual((True, True), left.detect_changes(right))
733
self.assertEqual((True, True), right.detect_changes(left))
735
def test_symlink_detect_changes(self):
736
left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
737
left.symlink_target='foo'
738
right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
739
right.symlink_target='foo'
740
self.assertEqual((False, False), left.detect_changes(right))
741
self.assertEqual((False, False), right.detect_changes(left))
742
left.symlink_target = 'different'
743
self.assertEqual((True, False), left.detect_changes(right))
744
self.assertEqual((True, False), right.detect_changes(left))
746
def test_file_has_text(self):
747
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
748
self.assertTrue(file.has_text())
750
def test_directory_has_text(self):
751
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
752
self.assertFalse(dir.has_text())
754
def test_link_has_text(self):
755
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
756
self.assertFalse(link.has_text())
758
def test_make_entry(self):
759
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
760
inventory.InventoryFile)
761
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
762
inventory.InventoryLink)
763
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
764
inventory.InventoryDirectory)
766
def test_make_entry_non_normalized(self):
767
orig_normalized_filename = osutils.normalized_filename
770
osutils.normalized_filename = osutils._accessible_normalized_filename
771
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
772
self.assertEqual(u'\xe5', entry.name)
773
self.assertIsInstance(entry, inventory.InventoryFile)
775
osutils.normalized_filename = osutils._inaccessible_normalized_filename
776
self.assertRaises(errors.InvalidNormalization,
777
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
779
osutils.normalized_filename = orig_normalized_filename
782
class TestDescribeChanges(TestCase):
784
def test_describe_change(self):
785
# we need to test the following change combinations:
791
# renamed/reparented and modified
792
# change kind (perhaps can't be done yet?)
793
# also, merged in combination with all of these?
794
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
795
old_a.text_sha1 = '123132'
797
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
798
new_a.text_sha1 = '123132'
801
self.assertChangeDescription('unchanged', old_a, new_a)
804
new_a.text_sha1 = 'abcabc'
805
self.assertChangeDescription('modified', old_a, new_a)
807
self.assertChangeDescription('added', None, new_a)
808
self.assertChangeDescription('removed', old_a, None)
809
# perhaps a bit questionable but seems like the most reasonable thing...
810
self.assertChangeDescription('unchanged', None, None)
812
# in this case it's both renamed and modified; show a rename and
814
new_a.name = 'newfilename'
815
self.assertChangeDescription('modified and renamed', old_a, new_a)
817
# reparenting is 'renaming'
818
new_a.name = old_a.name
819
new_a.parent_id = 'somedir-id'
820
self.assertChangeDescription('modified and renamed', old_a, new_a)
822
# reset the content values so its not modified
823
new_a.text_size = old_a.text_size
824
new_a.text_sha1 = old_a.text_sha1
825
new_a.name = old_a.name
827
new_a.name = 'newfilename'
828
self.assertChangeDescription('renamed', old_a, new_a)
830
# reparenting is 'renaming'
831
new_a.name = old_a.name
832
new_a.parent_id = 'somedir-id'
833
self.assertChangeDescription('renamed', old_a, new_a)
835
def assertChangeDescription(self, expected_change, old_ie, new_ie):
836
change = InventoryEntry.describe_change(old_ie, new_ie)
837
self.assertEqual(expected_change, change)
840
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
842
def get_chk_bytes(self):
843
factory = groupcompress.make_pack_factory(True, True, 1)
844
trans = self.get_transport('')
845
return factory(trans)
847
def read_bytes(self, chk_bytes, key):
848
stream = chk_bytes.get_record_stream([key], 'unordered', True)
849
return stream.next().get_bytes_as("fulltext")
851
def test_deserialise_gives_CHKInventory(self):
853
inv.revision_id = "revid"
854
inv.root.revision = "rootrev"
855
chk_bytes = self.get_chk_bytes()
856
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
857
bytes = ''.join(chk_inv.to_lines())
858
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
859
self.assertEqual("revid", new_inv.revision_id)
860
self.assertEqual("directory", new_inv.root.kind)
861
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
862
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
863
self.assertEqual(inv.root.name, new_inv.root.name)
864
self.assertEqual("rootrev", new_inv.root.revision)
865
self.assertEqual('plain', new_inv._search_key_name)
867
def test_deserialise_wrong_revid(self):
869
inv.revision_id = "revid"
870
inv.root.revision = "rootrev"
871
chk_bytes = self.get_chk_bytes()
872
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
873
bytes = ''.join(chk_inv.to_lines())
874
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
877
def test_captures_rev_root_byid(self):
879
inv.revision_id = "foo"
880
inv.root.revision = "bar"
881
chk_bytes = self.get_chk_bytes()
882
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
883
lines = chk_inv.to_lines()
886
'revision_id: foo\n',
887
'root_id: TREE_ROOT\n',
888
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
889
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
891
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
892
self.assertEqual('plain', chk_inv._search_key_name)
894
def test_captures_parent_id_basename_index(self):
896
inv.revision_id = "foo"
897
inv.root.revision = "bar"
898
chk_bytes = self.get_chk_bytes()
899
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
900
lines = chk_inv.to_lines()
903
'revision_id: foo\n',
904
'root_id: TREE_ROOT\n',
905
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
906
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
908
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
909
self.assertEqual('plain', chk_inv._search_key_name)
911
def test_captures_search_key_name(self):
913
inv.revision_id = "foo"
914
inv.root.revision = "bar"
915
chk_bytes = self.get_chk_bytes()
916
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
917
search_key_name='hash-16-way')
918
lines = chk_inv.to_lines()
921
'search_key_name: hash-16-way\n',
922
'root_id: TREE_ROOT\n',
923
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
924
'revision_id: foo\n',
925
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
927
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
928
self.assertEqual('hash-16-way', chk_inv._search_key_name)
930
def test_directory_children_on_demand(self):
932
inv.revision_id = "revid"
933
inv.root.revision = "rootrev"
934
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
935
inv["fileid"].revision = "filerev"
936
inv["fileid"].executable = True
937
inv["fileid"].text_sha1 = "ffff"
938
inv["fileid"].text_size = 1
939
chk_bytes = self.get_chk_bytes()
940
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
941
bytes = ''.join(chk_inv.to_lines())
942
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
943
root_entry = new_inv[inv.root.file_id]
944
self.assertEqual(None, root_entry._children)
945
self.assertEqual({'file'}, set(root_entry.children))
946
file_direct = new_inv["fileid"]
947
file_found = root_entry.children['file']
948
self.assertEqual(file_direct.kind, file_found.kind)
949
self.assertEqual(file_direct.file_id, file_found.file_id)
950
self.assertEqual(file_direct.parent_id, file_found.parent_id)
951
self.assertEqual(file_direct.name, file_found.name)
952
self.assertEqual(file_direct.revision, file_found.revision)
953
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
954
self.assertEqual(file_direct.text_size, file_found.text_size)
955
self.assertEqual(file_direct.executable, file_found.executable)
957
def test_from_inventory_maximum_size(self):
958
# from_inventory supports the maximum_size parameter.
960
inv.revision_id = "revid"
961
inv.root.revision = "rootrev"
962
chk_bytes = self.get_chk_bytes()
963
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
964
chk_inv.id_to_entry._ensure_root()
965
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
966
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
967
p_id_basename = chk_inv.parent_id_basename_to_file_id
968
p_id_basename._ensure_root()
969
self.assertEqual(120, p_id_basename._root_node.maximum_size)
970
self.assertEqual(2, p_id_basename._root_node._key_width)
972
def test___iter__(self):
974
inv.revision_id = "revid"
975
inv.root.revision = "rootrev"
976
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
977
inv["fileid"].revision = "filerev"
978
inv["fileid"].executable = True
979
inv["fileid"].text_sha1 = "ffff"
980
inv["fileid"].text_size = 1
981
chk_bytes = self.get_chk_bytes()
982
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
983
bytes = ''.join(chk_inv.to_lines())
984
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
985
fileids = sorted(new_inv.__iter__())
986
self.assertEqual([inv.root.file_id, "fileid"], fileids)
988
def test__len__(self):
990
inv.revision_id = "revid"
991
inv.root.revision = "rootrev"
992
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
993
inv["fileid"].revision = "filerev"
994
inv["fileid"].executable = True
995
inv["fileid"].text_sha1 = "ffff"
996
inv["fileid"].text_size = 1
997
chk_bytes = self.get_chk_bytes()
998
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
999
self.assertEqual(2, len(chk_inv))
1001
def test___getitem__(self):
1003
inv.revision_id = b"revid"
1004
inv.root.revision = b"rootrev"
1005
inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
1006
inv[b"fileid"].revision = b"filerev"
1007
inv[b"fileid"].executable = True
1008
inv[b"fileid"].text_sha1 = b"ffff"
1009
inv[b"fileid"].text_size = 1
1010
chk_bytes = self.get_chk_bytes()
1011
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1012
data = b''.join(chk_inv.to_lines())
1013
new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
1014
root_entry = new_inv[inv.root.file_id]
1015
file_entry = new_inv[b"fileid"]
1016
self.assertEqual("directory", root_entry.kind)
1017
self.assertEqual(inv.root.file_id, root_entry.file_id)
1018
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
1019
self.assertEqual(inv.root.name, root_entry.name)
1020
self.assertEqual(b"rootrev", root_entry.revision)
1021
self.assertEqual("file", file_entry.kind)
1022
self.assertEqual(b"fileid", file_entry.file_id)
1023
self.assertEqual(inv.root.file_id, file_entry.parent_id)
1024
self.assertEqual(u"file", file_entry.name)
1025
self.assertEqual(b"filerev", file_entry.revision)
1026
self.assertEqual(b"ffff", file_entry.text_sha1)
1027
self.assertEqual(1, file_entry.text_size)
1028
self.assertEqual(True, file_entry.executable)
1029
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
1031
def test_has_id_true(self):
1033
inv.revision_id = "revid"
1034
inv.root.revision = "rootrev"
1035
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1036
inv["fileid"].revision = "filerev"
1037
inv["fileid"].executable = True
1038
inv["fileid"].text_sha1 = "ffff"
1039
inv["fileid"].text_size = 1
1040
chk_bytes = self.get_chk_bytes()
1041
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1042
self.assertTrue(chk_inv.has_id('fileid'))
1043
self.assertTrue(chk_inv.has_id(inv.root.file_id))
1045
def test_has_id_not(self):
1047
inv.revision_id = "revid"
1048
inv.root.revision = "rootrev"
1049
chk_bytes = self.get_chk_bytes()
1050
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1051
self.assertFalse(chk_inv.has_id('fileid'))
1053
def test_id2path(self):
1055
inv.revision_id = "revid"
1056
inv.root.revision = "rootrev"
1057
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1058
fileentry = InventoryFile("fileid", "file", "dirid")
1061
inv["fileid"].revision = "filerev"
1062
inv["fileid"].executable = True
1063
inv["fileid"].text_sha1 = "ffff"
1064
inv["fileid"].text_size = 1
1065
inv["dirid"].revision = "filerev"
1066
chk_bytes = self.get_chk_bytes()
1067
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1068
bytes = ''.join(chk_inv.to_lines())
1069
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1070
self.assertEqual('', new_inv.id2path(inv.root.file_id))
1071
self.assertEqual('dir', new_inv.id2path('dirid'))
1072
self.assertEqual('dir/file', new_inv.id2path('fileid'))
1074
def test_path2id(self):
1076
inv.revision_id = "revid"
1077
inv.root.revision = "rootrev"
1078
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1079
fileentry = InventoryFile("fileid", "file", "dirid")
1082
inv["fileid"].revision = "filerev"
1083
inv["fileid"].executable = True
1084
inv["fileid"].text_sha1 = "ffff"
1085
inv["fileid"].text_size = 1
1086
inv["dirid"].revision = "filerev"
1087
chk_bytes = self.get_chk_bytes()
1088
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1089
bytes = ''.join(chk_inv.to_lines())
1090
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1091
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1092
self.assertEqual('dirid', new_inv.path2id('dir'))
1093
self.assertEqual('fileid', new_inv.path2id('dir/file'))
1095
def test_create_by_apply_delta_sets_root(self):
1097
inv.revision_id = "revid"
1098
chk_bytes = self.get_chk_bytes()
1099
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1100
inv.add_path("", "directory", "myrootid", None)
1101
inv.revision_id = "expectedid"
1102
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1103
delta = [("", None, base_inv.root.file_id, None),
1104
(None, "", "myrootid", inv.root)]
1105
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1106
self.assertEqual(reference_inv.root, new_inv.root)
1108
def test_create_by_apply_delta_empty_add_child(self):
1110
inv.revision_id = "revid"
1111
inv.root.revision = "rootrev"
1112
chk_bytes = self.get_chk_bytes()
1113
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1114
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1115
a_entry.revision = "filerev"
1116
a_entry.executable = True
1117
a_entry.text_sha1 = "ffff"
1118
a_entry.text_size = 1
1120
inv.revision_id = "expectedid"
1121
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1122
delta = [(None, "A", "A-id", a_entry)]
1123
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1124
# new_inv should be the same as reference_inv.
1125
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1126
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1127
reference_inv.id_to_entry._ensure_root()
1128
new_inv.id_to_entry._ensure_root()
1129
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1130
new_inv.id_to_entry._root_node._key)
1132
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1134
inv.revision_id = "revid"
1135
inv.root.revision = "rootrev"
1136
chk_bytes = self.get_chk_bytes()
1137
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1138
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1139
a_entry.revision = "filerev"
1140
a_entry.executable = True
1141
a_entry.text_sha1 = "ffff"
1142
a_entry.text_size = 1
1144
inv.revision_id = "expectedid"
1145
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1146
delta = [(None, "A", "A-id", a_entry)]
1147
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1148
reference_inv.id_to_entry._ensure_root()
1149
reference_inv.parent_id_basename_to_file_id._ensure_root()
1150
new_inv.id_to_entry._ensure_root()
1151
new_inv.parent_id_basename_to_file_id._ensure_root()
1152
# new_inv should be the same as reference_inv.
1153
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1154
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1155
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1156
new_inv.id_to_entry._root_node._key)
1157
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1158
new_inv.parent_id_basename_to_file_id._root_node._key)
1160
def test_iter_changes(self):
1161
# Low level bootstrapping smoke test; comprehensive generic tests via
1162
# InterTree are coming.
1164
inv.revision_id = "revid"
1165
inv.root.revision = "rootrev"
1166
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1167
inv["fileid"].revision = "filerev"
1168
inv["fileid"].executable = True
1169
inv["fileid"].text_sha1 = "ffff"
1170
inv["fileid"].text_size = 1
1172
inv2.revision_id = "revid2"
1173
inv2.root.revision = "rootrev"
1174
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1175
inv2["fileid"].revision = "filerev2"
1176
inv2["fileid"].executable = False
1177
inv2["fileid"].text_sha1 = "bbbb"
1178
inv2["fileid"].text_size = 2
1179
# get fresh objects.
1180
chk_bytes = self.get_chk_bytes()
1181
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1182
bytes = ''.join(chk_inv.to_lines())
1183
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1184
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1185
bytes = ''.join(chk_inv2.to_lines())
1186
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1187
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1188
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1190
list(inv_1.iter_changes(inv_2)))
1192
def test_parent_id_basename_to_file_id_index_enabled(self):
1194
inv.revision_id = "revid"
1195
inv.root.revision = "rootrev"
1196
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1197
inv["fileid"].revision = "filerev"
1198
inv["fileid"].executable = True
1199
inv["fileid"].text_sha1 = "ffff"
1200
inv["fileid"].text_size = 1
1201
# get fresh objects.
1202
chk_bytes = self.get_chk_bytes()
1203
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1204
bytes = ''.join(tmp_inv.to_lines())
1205
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1206
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1208
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1209
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1211
def test_file_entry_to_bytes(self):
1212
inv = CHKInventory(None)
1213
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1214
ie.executable = True
1215
ie.revision = 'file-rev-id'
1216
ie.text_sha1 = 'abcdefgh'
1218
bytes = inv._entry_to_bytes(ie)
1219
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1220
'file-rev-id\nabcdefgh\n100\nY', bytes)
1221
ie2 = inv._bytes_to_entry(bytes)
1222
self.assertEqual(ie, ie2)
1223
self.assertIsInstance(ie2.name, unicode)
1224
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1225
inv._bytes_to_utf8name_key(bytes))
1227
def test_file2_entry_to_bytes(self):
1228
inv = CHKInventory(None)
1230
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1231
ie.executable = False
1232
ie.revision = 'file-rev-id'
1233
ie.text_sha1 = '123456'
1235
bytes = inv._entry_to_bytes(ie)
1236
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1237
'file-rev-id\n123456\n25\nN', bytes)
1238
ie2 = inv._bytes_to_entry(bytes)
1239
self.assertEqual(ie, ie2)
1240
self.assertIsInstance(ie2.name, unicode)
1241
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1242
inv._bytes_to_utf8name_key(bytes))
1244
def test_dir_entry_to_bytes(self):
1245
inv = CHKInventory(None)
1246
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1247
ie.revision = 'dir-rev-id'
1248
bytes = inv._entry_to_bytes(ie)
1249
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1250
ie2 = inv._bytes_to_entry(bytes)
1251
self.assertEqual(ie, ie2)
1252
self.assertIsInstance(ie2.name, unicode)
1253
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1254
inv._bytes_to_utf8name_key(bytes))
1256
def test_dir2_entry_to_bytes(self):
1257
inv = CHKInventory(None)
1258
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1260
ie.revision = 'dir-rev-id'
1261
bytes = inv._entry_to_bytes(ie)
1262
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1263
'dir-rev-id', bytes)
1264
ie2 = inv._bytes_to_entry(bytes)
1265
self.assertEqual(ie, ie2)
1266
self.assertIsInstance(ie2.name, unicode)
1267
self.assertIs(ie2.parent_id, None)
1268
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1269
inv._bytes_to_utf8name_key(bytes))
1271
def test_symlink_entry_to_bytes(self):
1272
inv = CHKInventory(None)
1273
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1274
ie.revision = 'link-rev-id'
1275
ie.symlink_target = u'target/path'
1276
bytes = inv._entry_to_bytes(ie)
1277
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1278
'link-rev-id\ntarget/path', bytes)
1279
ie2 = inv._bytes_to_entry(bytes)
1280
self.assertEqual(ie, ie2)
1281
self.assertIsInstance(ie2.name, unicode)
1282
self.assertIsInstance(ie2.symlink_target, unicode)
1283
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1284
inv._bytes_to_utf8name_key(bytes))
1286
def test_symlink2_entry_to_bytes(self):
1287
inv = CHKInventory(None)
1288
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1289
ie.revision = 'link-rev-id'
1290
ie.symlink_target = u'target/\u03a9path'
1291
bytes = inv._entry_to_bytes(ie)
1292
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1293
'link-rev-id\ntarget/\xce\xa9path', bytes)
1294
ie2 = inv._bytes_to_entry(bytes)
1295
self.assertEqual(ie, ie2)
1296
self.assertIsInstance(ie2.name, unicode)
1297
self.assertIsInstance(ie2.symlink_target, unicode)
1298
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1299
inv._bytes_to_utf8name_key(bytes))
1301
def test_tree_reference_entry_to_bytes(self):
1302
inv = CHKInventory(None)
1303
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1305
ie.revision = 'tree-rev-id'
1306
ie.reference_revision = 'ref-rev-id'
1307
bytes = inv._entry_to_bytes(ie)
1308
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1309
'tree-rev-id\nref-rev-id', bytes)
1310
ie2 = inv._bytes_to_entry(bytes)
1311
self.assertEqual(ie, ie2)
1312
self.assertIsInstance(ie2.name, unicode)
1313
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1314
inv._bytes_to_utf8name_key(bytes))
1316
def make_basic_utf8_inventory(self):
1318
inv.revision_id = "revid"
1319
inv.root.revision = "rootrev"
1320
root_id = inv.root.file_id
1321
inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1322
inv["fileid"].revision = "filerev"
1323
inv["fileid"].text_sha1 = "ffff"
1324
inv["fileid"].text_size = 0
1325
inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1326
inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1327
inv["childid"].revision = "filerev"
1328
inv["childid"].text_sha1 = "ffff"
1329
inv["childid"].text_size = 0
1330
chk_bytes = self.get_chk_bytes()
1331
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1332
bytes = ''.join(chk_inv.to_lines())
1333
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1335
def test__preload_handles_utf8(self):
1336
new_inv = self.make_basic_utf8_inventory()
1337
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1338
self.assertFalse(new_inv._fully_cached)
1339
new_inv._preload_cache()
1341
sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1342
sorted(new_inv._fileid_to_entry_cache.keys()))
1343
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1344
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1345
sorted(ie_root._children.keys()))
1346
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1347
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1349
def test__preload_populates_cache(self):
1351
inv.revision_id = "revid"
1352
inv.root.revision = "rootrev"
1353
root_id = inv.root.file_id
1354
inv.add(InventoryFile("fileid", "file", root_id))
1355
inv["fileid"].revision = "filerev"
1356
inv["fileid"].executable = True
1357
inv["fileid"].text_sha1 = "ffff"
1358
inv["fileid"].text_size = 1
1359
inv.add(InventoryDirectory("dirid", "dir", root_id))
1360
inv.add(InventoryFile("childid", "child", "dirid"))
1361
inv["childid"].revision = "filerev"
1362
inv["childid"].executable = False
1363
inv["childid"].text_sha1 = "dddd"
1364
inv["childid"].text_size = 1
1365
chk_bytes = self.get_chk_bytes()
1366
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1367
bytes = ''.join(chk_inv.to_lines())
1368
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1369
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1370
self.assertFalse(new_inv._fully_cached)
1371
new_inv._preload_cache()
1373
sorted([root_id, "fileid", "dirid", "childid"]),
1374
sorted(new_inv._fileid_to_entry_cache.keys()))
1375
self.assertTrue(new_inv._fully_cached)
1376
ie_root = new_inv._fileid_to_entry_cache[root_id]
1377
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1378
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1379
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1381
def test__preload_handles_partially_evaluated_inventory(self):
1382
new_inv = self.make_basic_utf8_inventory()
1383
ie = new_inv[new_inv.root_id]
1384
self.assertIs(None, ie._children)
1385
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1386
sorted(ie.children.keys()))
1387
# Accessing .children loads _children
1388
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1389
sorted(ie._children.keys()))
1390
new_inv._preload_cache()
1392
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1393
sorted(ie._children.keys()))
1394
ie_dir = new_inv["dirid"]
1395
self.assertEqual([u'ch\xefld'],
1396
sorted(ie_dir._children.keys()))
1398
def test_filter_change_in_renamed_subfolder(self):
1399
inv = Inventory('tree-root')
1400
src_ie = inv.add_path('src', 'directory', 'src-id')
1401
inv.add_path('src/sub/', 'directory', 'sub-id')
1402
a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
1403
a_ie.text_sha1 = osutils.sha_string('content\n')
1404
a_ie.text_size = len('content\n')
1405
chk_bytes = self.get_chk_bytes()
1406
inv = CHKInventory.from_inventory(chk_bytes, inv)
1407
inv = inv.create_by_apply_delta([
1408
("src/sub/a", "src/sub/a", "a-id", a_ie),
1409
("src", "src2", "src-id", src_ie),
1411
new_inv = inv.filter(['a-id', 'src-id'])
1415
('src/sub', 'sub-id'),
1416
('src/sub/a', 'a-id'),
1417
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1419
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1421
def get_chk_bytes(self):
1422
factory = groupcompress.make_pack_factory(True, True, 1)
1423
trans = self.get_transport('')
1424
return factory(trans)
1426
def make_dir(self, inv, name, parent_id):
1427
inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1429
def make_file(self, inv, name, parent_id, content='content\n'):
1430
ie = inv.make_entry('file', name, parent_id, name + '-id')
1431
ie.text_sha1 = osutils.sha_string(content)
1432
ie.text_size = len(content)
1435
def make_simple_inventory(self):
1436
inv = Inventory('TREE_ROOT')
1437
inv.revision_id = "revid"
1438
inv.root.revision = "rootrev"
1441
# sub-file1 sub-file1-id
1442
# sub-file2 sub-file2-id
1443
# sub-dir1/ sub-dir1-id
1444
# subsub-file1 subsub-file1-id
1446
# sub2-file1 sub2-file1-id
1448
self.make_dir(inv, 'dir1', 'TREE_ROOT')
1449
self.make_dir(inv, 'dir2', 'TREE_ROOT')
1450
self.make_dir(inv, 'sub-dir1', 'dir1-id')
1451
self.make_file(inv, 'top', 'TREE_ROOT')
1452
self.make_file(inv, 'sub-file1', 'dir1-id')
1453
self.make_file(inv, 'sub-file2', 'dir1-id')
1454
self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1455
self.make_file(inv, 'sub2-file1', 'dir2-id')
1456
chk_bytes = self.get_chk_bytes()
1457
# use a small maximum_size to force internal paging structures
1458
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1460
search_key_name='hash-255-way')
1461
bytes = ''.join(chk_inv.to_lines())
1462
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1464
def assert_Getitems(self, expected_fileids, inv, file_ids):
1465
self.assertEqual(sorted(expected_fileids),
1466
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1468
def assertExpand(self, all_ids, inv, file_ids):
1470
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1471
self.assertEqual(set(all_ids), val_all_ids)
1472
entries = inv._getitems(val_all_ids)
1473
expected_children = {}
1474
for entry in entries:
1475
s = expected_children.setdefault(entry.parent_id, [])
1476
s.append(entry.file_id)
1477
val_children = dict((k, sorted(v)) for k, v
1478
in val_children.items())
1479
expected_children = dict((k, sorted(v)) for k, v
1480
in expected_children.items())
1481
self.assertEqual(expected_children, val_children)
1483
def test_make_simple_inventory(self):
1484
inv = self.make_simple_inventory()
1486
for path, entry in inv.iter_entries_by_dir():
1487
layout.append((path, entry.file_id))
1490
('dir1', 'dir1-id'),
1491
('dir2', 'dir2-id'),
1493
('dir1/sub-dir1', 'sub-dir1-id'),
1494
('dir1/sub-file1', 'sub-file1-id'),
1495
('dir1/sub-file2', 'sub-file2-id'),
1496
('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1497
('dir2/sub2-file1', 'sub2-file1-id'),
1500
def test__getitems(self):
1501
inv = self.make_simple_inventory()
1503
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1504
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1505
self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1507
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1509
self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1510
['dir1-id', 'sub-file2-id'])
1511
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1512
self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1514
def test_single_file(self):
1515
inv = self.make_simple_inventory()
1516
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1518
def test_get_all_parents(self):
1519
inv = self.make_simple_inventory()
1520
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1522
], inv, ['subsub-file1-id'])
1524
def test_get_children(self):
1525
inv = self.make_simple_inventory()
1526
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1527
'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1528
], inv, ['dir1-id'])
1530
def test_from_root(self):
1531
inv = self.make_simple_inventory()
1532
self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1533
'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1534
'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1536
def test_top_level_file(self):
1537
inv = self.make_simple_inventory()
1538
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1540
def test_subsub_file(self):
1541
inv = self.make_simple_inventory()
1542
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1543
'subsub-file1-id'], inv, ['subsub-file1-id'])
1545
def test_sub_and_root(self):
1546
inv = self.make_simple_inventory()
1547
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1548
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
1551
class TestMutableInventoryFromTree(TestCaseWithTransport):
1553
def test_empty(self):
1554
repository = self.make_repository('.')
1555
tree = repository.revision_tree(revision.NULL_REVISION)
1556
inv = mutable_inventory_from_tree(tree)
1557
self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1558
self.assertEqual(0, len(inv))
1560
def test_some_files(self):
1561
wt = self.make_branch_and_tree('.')
1562
self.build_tree(['a'])
1563
wt.add(['a'], ['thefileid'])
1564
revid = wt.commit("commit")
1565
tree = wt.branch.repository.revision_tree(revid)
1566
inv = mutable_inventory_from_tree(tree)
1567
self.assertEqual(revid, inv.revision_id)
1568
self.assertEqual(2, len(inv))
1569
self.assertEqual("a", inv['thefileid'].name)
1570
# The inventory should be mutable and independent of
1572
self.assertFalse(tree.root_inventory['thefileid'].executable)
1573
inv['thefileid'].executable = True
1574
self.assertFalse(tree.root_inventory['thefileid'].executable)