1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
31
from ..bzr.inventory import (
39
mutable_inventory_from_tree,
43
TestCaseWithTransport,
45
from .scenarios import load_tests_apply_scenarios
48
load_tests = load_tests_apply_scenarios
51
def delta_application_scenarios():
53
('Inventory', {'apply_delta':apply_inventory_Inventory}),
55
# Working tree basis delta application
56
# Repository add_inv_by_delta.
57
# Reduce form of the per_repository test logic - that logic needs to be
58
# be able to get /just/ repositories whereas these tests are fine with
59
# just creating trees.
61
for _, format in repository.format_registry.iteritems():
62
if format.supports_full_versioned_files:
63
scenarios.append((str(format.__name__), {
64
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
66
for format in workingtree.format_registry._get_all():
67
repo_fmt = format._matchingbzrdir.repository_format
68
if not repo_fmt.supports_full_versioned_files:
71
(str(format.__class__.__name__) + ".update_basis_by_delta", {
72
'apply_delta':apply_inventory_WT_basis,
75
(str(format.__class__.__name__) + ".apply_inventory_delta", {
76
'apply_delta':apply_inventory_WT,
81
def create_texts_for_inv(repo, inv):
82
for path, ie in inv.iter_entries():
84
lines = ['a' * ie.text_size]
87
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
90
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
91
"""Apply delta to basis and return the result.
93
:param basis: An inventory to be used as the basis.
94
:param delta: The inventory delta to apply:
95
:return: An inventory resulting from the application.
97
basis.apply_delta(delta)
101
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
102
"""Apply delta to basis and return the result.
104
This sets the tree state to be basis, and then calls apply_inventory_delta.
106
:param basis: An inventory to be used as the basis.
107
:param delta: The inventory delta to apply:
108
:return: An inventory resulting from the application.
110
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
111
control.create_repository()
112
control.create_branch()
113
tree = self.format.initialize(control)
116
tree._write_inventory(basis)
119
# Fresh object, reads disk again.
120
tree = tree.bzrdir.open_workingtree()
123
tree.apply_inventory_delta(delta)
126
# reload tree - ensure we get what was written.
127
tree = tree.bzrdir.open_workingtree()
129
self.addCleanup(tree.unlock)
130
if not invalid_delta:
132
return tree.root_inventory
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
136
repo.start_write_group()
138
rev = revision.Revision('basis', timestamp=0, timezone=None,
139
message="", committer="foo@example.com")
140
basis.revision_id = 'basis'
141
create_texts_for_inv(repo, basis)
142
repo.add_revision('basis', rev, basis)
144
# We don't want to apply the delta to the basis, because we expect
145
# the delta is invalid.
147
result_inv.revision_id = 'result'
148
target_entries = None
150
result_inv = basis.create_by_apply_delta(delta, 'result')
151
create_texts_for_inv(repo, result_inv)
152
target_entries = list(result_inv.iter_entries_by_dir())
153
rev = revision.Revision('result', timestamp=0, timezone=None,
154
message="", committer="foo@example.com")
155
repo.add_revision('result', rev, result_inv)
156
repo.commit_write_group()
158
repo.abort_write_group()
160
return target_entries
163
def _get_basis_entries(tree):
164
basis_tree = tree.basis_tree()
165
basis_tree.lock_read()
166
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
168
return basis_tree_entries
171
def _populate_different_tree(tree, basis, delta):
172
"""Put all entries into tree, but at a unique location."""
175
tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
176
for path, ie in basis.iter_entries_by_dir():
177
if ie.file_id in added_ids:
179
# We want a unique path for each of these, we use the file-id
180
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
181
added_ids.add(ie.file_id)
182
for old_path, new_path, file_id, ie in delta:
183
if file_id in added_ids:
185
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
189
"""Apply delta to basis and return the result.
191
This sets the parent and then calls update_basis_by_delta.
192
It also puts the basis in the repository under both 'basis' and 'result' to
193
allow safety checks made by the WT to succeed, and finally ensures that all
194
items in the delta with a new path are present in the WT before calling
195
update_basis_by_delta.
197
:param basis: An inventory to be used as the basis.
198
:param delta: The inventory delta to apply:
199
:return: An inventory resulting from the application.
201
control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
202
control.create_repository()
203
control.create_branch()
204
tree = test.format.initialize(control)
207
target_entries = _create_repo_revisions(tree.branch.repository, basis,
208
delta, invalid_delta)
209
# Set the basis state as the trees current state
210
tree._write_inventory(basis)
211
# This reads basis from the repo and puts it into the tree's local
212
# cache, if it has one.
213
tree.set_parent_ids(['basis'])
216
# Fresh lock, reads disk again.
219
tree.update_basis_by_delta('result', delta)
220
if not invalid_delta:
224
# reload tree - ensure we get what was written.
225
tree = tree.bzrdir.open_workingtree()
226
basis_tree = tree.basis_tree()
227
basis_tree.lock_read()
228
test.addCleanup(basis_tree.unlock)
229
basis_inv = basis_tree.root_inventory
231
basis_entries = list(basis_inv.iter_entries_by_dir())
232
test.assertEqual(target_entries, basis_entries)
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
238
"""Apply delta to basis and return the result.
240
This inserts basis as a whole inventory and then uses
241
add_inventory_by_delta to add delta.
243
:param basis: An inventory to be used as the basis.
244
:param delta: The inventory delta to apply:
245
:return: An inventory resulting from the application.
247
format = self.format()
248
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
249
repo = format.initialize(control)
252
repo.start_write_group()
254
rev = revision.Revision('basis', timestamp=0, timezone=None,
255
message="", committer="foo@example.com")
256
basis.revision_id = 'basis'
257
create_texts_for_inv(repo, basis)
258
repo.add_revision('basis', rev, basis)
259
repo.commit_write_group()
261
repo.abort_write_group()
267
repo.start_write_group()
269
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
272
repo.abort_write_group()
275
repo.commit_write_group()
278
# Fresh lock, reads disk again.
279
repo = repo.bzrdir.open_repository()
281
self.addCleanup(repo.unlock)
282
return repo.get_inventory('result')
285
class TestInventoryUpdates(TestCase):
287
def test_creation_from_root_id(self):
288
# iff a root id is passed to the constructor, a root directory is made
289
inv = inventory.Inventory(root_id='tree-root')
290
self.assertNotEqual(None, inv.root)
291
self.assertEqual('tree-root', inv.root.file_id)
293
def test_add_path_of_root(self):
294
# if no root id is given at creation time, there is no root directory
295
inv = inventory.Inventory(root_id=None)
296
self.assertIs(None, inv.root)
297
# add a root entry by adding its path
298
ie = inv.add_path("", "directory", "my-root")
299
ie.revision = 'test-rev'
300
self.assertEqual("my-root", ie.file_id)
301
self.assertIs(ie, inv.root)
303
def test_add_path(self):
304
inv = inventory.Inventory(root_id='tree_root')
305
ie = inv.add_path('hello', 'file', 'hello-id')
306
self.assertEqual('hello-id', ie.file_id)
307
self.assertEqual('file', ie.kind)
310
"""Make sure copy() works and creates a deep copy."""
311
inv = inventory.Inventory(root_id='some-tree-root')
312
ie = inv.add_path('hello', 'file', 'hello-id')
314
inv.root.file_id = 'some-new-root'
316
self.assertEqual('some-tree-root', inv2.root.file_id)
317
self.assertEqual('hello', inv2['hello-id'].name)
319
def test_copy_empty(self):
320
"""Make sure an empty inventory can be copied."""
321
inv = inventory.Inventory(root_id=None)
323
self.assertIs(None, inv2.root)
325
def test_copy_copies_root_revision(self):
326
"""Make sure the revision of the root gets copied."""
327
inv = inventory.Inventory(root_id='someroot')
328
inv.root.revision = 'therev'
330
self.assertEqual('someroot', inv2.root.file_id)
331
self.assertEqual('therev', inv2.root.revision)
333
def test_create_tree_reference(self):
334
inv = inventory.Inventory('tree-root-123')
335
inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
336
revision='rev', reference_revision='rev2'))
338
def test_error_encoding(self):
339
inv = inventory.Inventory('tree-root')
340
inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
341
e = self.assertRaises(errors.InconsistentDelta, inv.add,
342
InventoryFile('b-id', u'\u1234', 'tree-root'))
343
self.assertContainsRe(str(e), r'\\u1234')
345
def test_add_recursive(self):
346
parent = InventoryDirectory('src-id', 'src', 'tree-root')
347
child = InventoryFile('hello-id', 'hello.c', 'src-id')
348
parent.children[child.file_id] = child
349
inv = inventory.Inventory('tree-root')
351
self.assertEqual('src/hello.c', inv.id2path('hello-id'))
355
class TestDeltaApplication(TestCaseWithTransport):
357
scenarios = delta_application_scenarios()
359
def get_empty_inventory(self, reference_inv=None):
360
"""Get an empty inventory.
362
Note that tests should not depend on the revision of the root for
363
setting up test conditions, as it has to be flexible to accomodate non
364
rich root repositories.
366
:param reference_inv: If not None, get the revision for the root from
367
this inventory. This is useful for dealing with older repositories
368
that routinely discarded the root entry data. If None, the root's
369
revision is set to 'basis'.
371
inv = inventory.Inventory()
372
if reference_inv is not None:
373
inv.root.revision = reference_inv.root.revision
375
inv.root.revision = 'basis'
378
def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
379
ie_file = inventory.InventoryFile(file_id, name, parent_id)
380
ie_file.revision = 'result'
381
ie_file.text_size = 0
382
ie_file.text_sha1 = ''
385
def test_empty_delta(self):
386
inv = self.get_empty_inventory()
388
inv = self.apply_delta(self, inv, delta)
389
inv2 = self.get_empty_inventory(inv)
390
self.assertEqual([], inv2._make_delta(inv))
392
def test_None_file_id(self):
393
inv = self.get_empty_inventory()
394
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
395
dir1.revision = 'result'
396
delta = [(None, u'dir1', None, dir1)]
397
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
400
def test_unicode_file_id(self):
401
inv = self.get_empty_inventory()
402
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
403
dir1.revision = 'result'
404
delta = [(None, u'dir1', dir1.file_id, dir1)]
405
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
408
def test_repeated_file_id(self):
409
inv = self.get_empty_inventory()
410
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
411
file1.revision = 'result'
416
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
417
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
420
def test_repeated_new_path(self):
421
inv = self.get_empty_inventory()
422
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
423
file1.revision = 'result'
427
file2.file_id = 'id2'
428
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
429
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
432
def test_repeated_old_path(self):
433
inv = self.get_empty_inventory()
434
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
435
file1.revision = 'result'
438
# We can't *create* a source inventory with the same path, but
439
# a badly generated partial delta might claim the same source twice.
440
# This would be buggy in two ways: the path is repeated in the delta,
441
# And the path for one of the file ids doesn't match the source
442
# location. Alternatively, we could have a repeated fileid, but that
443
# is separately checked for.
444
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
445
file2.revision = 'result'
450
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
451
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
454
def test_mismatched_id_entry_id(self):
455
inv = self.get_empty_inventory()
456
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
457
file1.revision = 'result'
460
delta = [(None, u'path', 'id', file1)]
461
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
464
def test_mismatched_new_path_entry_None(self):
465
inv = self.get_empty_inventory()
466
delta = [(None, u'path', 'id', None)]
467
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
470
def test_mismatched_new_path_None_entry(self):
471
inv = self.get_empty_inventory()
472
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
473
file1.revision = 'result'
476
delta = [(u"path", None, 'id1', file1)]
477
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
480
def test_parent_is_not_directory(self):
481
inv = self.get_empty_inventory()
482
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
483
file1.revision = 'result'
486
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
487
file2.revision = 'result'
491
delta = [(None, u'path/path2', 'id2', file2)]
492
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
495
def test_parent_is_missing(self):
496
inv = self.get_empty_inventory()
497
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
498
file2.revision = 'result'
501
delta = [(None, u'path/path2', 'id2', file2)]
502
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
505
def test_new_parent_path_has_wrong_id(self):
506
inv = self.get_empty_inventory()
507
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
508
parent1.revision = 'result'
509
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
510
parent2.revision = 'result'
511
file1 = inventory.InventoryFile('id', 'path', 'p-2')
512
file1.revision = 'result'
517
# This delta claims that file1 is at dir/path, but actually its at
518
# dir2/path if you follow the inventory parent structure.
519
delta = [(None, u'dir/path', 'id', file1)]
520
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
523
def test_old_parent_path_is_wrong(self):
524
inv = self.get_empty_inventory()
525
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
526
parent1.revision = 'result'
527
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
528
parent2.revision = 'result'
529
file1 = inventory.InventoryFile('id', 'path', 'p-2')
530
file1.revision = 'result'
536
# This delta claims that file1 was at dir/path, but actually it was at
537
# dir2/path if you follow the inventory parent structure.
538
delta = [(u'dir/path', None, 'id', None)]
539
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
542
def test_old_parent_path_is_for_other_id(self):
543
inv = self.get_empty_inventory()
544
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
545
parent1.revision = 'result'
546
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
547
parent2.revision = 'result'
548
file1 = inventory.InventoryFile('id', 'path', 'p-2')
549
file1.revision = 'result'
552
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
553
file2.revision = 'result'
560
# This delta claims that file1 was at dir/path, but actually it was at
561
# dir2/path if you follow the inventory parent structure. At dir/path
562
# is another entry we should not delete.
563
delta = [(u'dir/path', None, 'id', None)]
564
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
567
def test_add_existing_id_new_path(self):
568
inv = self.get_empty_inventory()
569
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
570
parent1.revision = 'result'
571
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
572
parent2.revision = 'result'
574
delta = [(None, u'dir2', 'p-1', parent2)]
575
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
578
def test_add_new_id_existing_path(self):
579
inv = self.get_empty_inventory()
580
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
581
parent1.revision = 'result'
582
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
583
parent2.revision = 'result'
585
delta = [(None, u'dir1', 'p-2', parent2)]
586
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
589
def test_remove_dir_leaving_dangling_child(self):
590
inv = self.get_empty_inventory()
591
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
592
dir1.revision = 'result'
593
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
594
dir2.revision = 'result'
595
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
596
dir3.revision = 'result'
600
delta = [(u'dir1', None, 'p-1', None),
601
(u'dir1/child2', None, 'p-3', None)]
602
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
605
def test_add_file(self):
606
inv = self.get_empty_inventory()
607
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
608
file1.revision = 'result'
611
delta = [(None, u'path', 'file-id', file1)]
612
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
613
self.assertEqual('file-id', res_inv['file-id'].file_id)
615
def test_remove_file(self):
616
inv = self.get_empty_inventory()
617
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
618
file1.revision = 'result'
622
delta = [(u'path', None, 'file-id', None)]
623
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
624
self.assertEqual(None, res_inv.path2id('path'))
625
self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
627
def test_rename_file(self):
628
inv = self.get_empty_inventory()
629
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
631
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
632
delta = [(u'path', 'path2', 'file-id', file2)]
633
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
634
self.assertEqual(None, res_inv.path2id('path'))
635
self.assertEqual('file-id', res_inv.path2id('path2'))
637
def test_replaced_at_new_path(self):
638
inv = self.get_empty_inventory()
639
file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
641
file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
642
delta = [(u'name', None, 'id1', None),
643
(None, u'name', 'id2', file2)]
644
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
645
self.assertEqual('id2', res_inv.path2id('name'))
647
def test_rename_dir(self):
648
inv = self.get_empty_inventory()
649
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
650
dir1.revision = 'basis'
651
file1 = self.make_file_ie(parent_id='dir-id')
654
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
655
dir2.revision = 'result'
656
delta = [('dir1', 'dir2', 'dir-id', dir2)]
657
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
658
# The file should be accessible under the new path
659
self.assertEqual('file-id', res_inv.path2id('dir2/name'))
661
def test_renamed_dir_with_renamed_child(self):
662
inv = self.get_empty_inventory()
663
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
664
dir1.revision = 'basis'
665
file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
666
file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
670
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
671
dir2.revision = 'result'
672
file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
673
delta = [('dir1', 'dir2', 'dir-id', dir2),
674
('dir1/name2', 'name2', 'file-id-2', file2b)]
675
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
676
# The file should be accessible under the new path
677
self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
678
self.assertEqual(None, res_inv.path2id('dir2/name2'))
679
self.assertEqual('file-id-2', res_inv.path2id('name2'))
681
def test_is_root(self):
682
"""Ensure our root-checking code is accurate."""
683
inv = inventory.Inventory('TREE_ROOT')
684
self.assertTrue(inv.is_root('TREE_ROOT'))
685
self.assertFalse(inv.is_root('booga'))
686
inv.root.file_id = 'booga'
687
self.assertFalse(inv.is_root('TREE_ROOT'))
688
self.assertTrue(inv.is_root('booga'))
689
# works properly even if no root is set
691
self.assertFalse(inv.is_root('TREE_ROOT'))
692
self.assertFalse(inv.is_root('booga'))
694
def test_entries_for_empty_inventory(self):
695
"""Test that entries() will not fail for an empty inventory"""
696
inv = Inventory(root_id=None)
697
self.assertEqual([], inv.entries())
700
class TestInventoryEntry(TestCase):
702
def test_file_kind_character(self):
703
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
704
self.assertEqual(file.kind_character(), '')
706
def test_dir_kind_character(self):
707
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
708
self.assertEqual(dir.kind_character(), '/')
710
def test_link_kind_character(self):
711
dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
712
self.assertEqual(dir.kind_character(), '')
714
def test_dir_detect_changes(self):
715
left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
716
right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
717
self.assertEqual((False, False), left.detect_changes(right))
718
self.assertEqual((False, False), right.detect_changes(left))
720
def test_file_detect_changes(self):
721
left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
723
right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
724
right.text_sha1 = 123
725
self.assertEqual((False, False), left.detect_changes(right))
726
self.assertEqual((False, False), right.detect_changes(left))
727
left.executable = True
728
self.assertEqual((False, True), left.detect_changes(right))
729
self.assertEqual((False, True), right.detect_changes(left))
730
right.text_sha1 = 321
731
self.assertEqual((True, True), left.detect_changes(right))
732
self.assertEqual((True, True), right.detect_changes(left))
734
def test_symlink_detect_changes(self):
735
left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
736
left.symlink_target='foo'
737
right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
738
right.symlink_target='foo'
739
self.assertEqual((False, False), left.detect_changes(right))
740
self.assertEqual((False, False), right.detect_changes(left))
741
left.symlink_target = 'different'
742
self.assertEqual((True, False), left.detect_changes(right))
743
self.assertEqual((True, False), right.detect_changes(left))
745
def test_file_has_text(self):
746
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
747
self.assertTrue(file.has_text())
749
def test_directory_has_text(self):
750
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
751
self.assertFalse(dir.has_text())
753
def test_link_has_text(self):
754
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
755
self.assertFalse(link.has_text())
757
def test_make_entry(self):
758
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
759
inventory.InventoryFile)
760
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
761
inventory.InventoryLink)
762
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
763
inventory.InventoryDirectory)
765
def test_make_entry_non_normalized(self):
766
orig_normalized_filename = osutils.normalized_filename
769
osutils.normalized_filename = osutils._accessible_normalized_filename
770
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
771
self.assertEqual(u'\xe5', entry.name)
772
self.assertIsInstance(entry, inventory.InventoryFile)
774
osutils.normalized_filename = osutils._inaccessible_normalized_filename
775
self.assertRaises(errors.InvalidNormalization,
776
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
778
osutils.normalized_filename = orig_normalized_filename
781
class TestDescribeChanges(TestCase):
783
def test_describe_change(self):
784
# we need to test the following change combinations:
790
# renamed/reparented and modified
791
# change kind (perhaps can't be done yet?)
792
# also, merged in combination with all of these?
793
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
794
old_a.text_sha1 = '123132'
796
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
797
new_a.text_sha1 = '123132'
800
self.assertChangeDescription('unchanged', old_a, new_a)
803
new_a.text_sha1 = 'abcabc'
804
self.assertChangeDescription('modified', old_a, new_a)
806
self.assertChangeDescription('added', None, new_a)
807
self.assertChangeDescription('removed', old_a, None)
808
# perhaps a bit questionable but seems like the most reasonable thing...
809
self.assertChangeDescription('unchanged', None, None)
811
# in this case it's both renamed and modified; show a rename and
813
new_a.name = 'newfilename'
814
self.assertChangeDescription('modified and renamed', old_a, new_a)
816
# reparenting is 'renaming'
817
new_a.name = old_a.name
818
new_a.parent_id = 'somedir-id'
819
self.assertChangeDescription('modified and renamed', old_a, new_a)
821
# reset the content values so its not modified
822
new_a.text_size = old_a.text_size
823
new_a.text_sha1 = old_a.text_sha1
824
new_a.name = old_a.name
826
new_a.name = 'newfilename'
827
self.assertChangeDescription('renamed', old_a, new_a)
829
# reparenting is 'renaming'
830
new_a.name = old_a.name
831
new_a.parent_id = 'somedir-id'
832
self.assertChangeDescription('renamed', old_a, new_a)
834
def assertChangeDescription(self, expected_change, old_ie, new_ie):
835
change = InventoryEntry.describe_change(old_ie, new_ie)
836
self.assertEqual(expected_change, change)
839
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
841
def get_chk_bytes(self):
842
factory = groupcompress.make_pack_factory(True, True, 1)
843
trans = self.get_transport('')
844
return factory(trans)
846
def read_bytes(self, chk_bytes, key):
847
stream = chk_bytes.get_record_stream([key], 'unordered', True)
848
return stream.next().get_bytes_as("fulltext")
850
def test_deserialise_gives_CHKInventory(self):
852
inv.revision_id = "revid"
853
inv.root.revision = "rootrev"
854
chk_bytes = self.get_chk_bytes()
855
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
856
bytes = ''.join(chk_inv.to_lines())
857
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
858
self.assertEqual("revid", new_inv.revision_id)
859
self.assertEqual("directory", new_inv.root.kind)
860
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
861
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
862
self.assertEqual(inv.root.name, new_inv.root.name)
863
self.assertEqual("rootrev", new_inv.root.revision)
864
self.assertEqual('plain', new_inv._search_key_name)
866
def test_deserialise_wrong_revid(self):
868
inv.revision_id = "revid"
869
inv.root.revision = "rootrev"
870
chk_bytes = self.get_chk_bytes()
871
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
872
bytes = ''.join(chk_inv.to_lines())
873
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
876
def test_captures_rev_root_byid(self):
878
inv.revision_id = "foo"
879
inv.root.revision = "bar"
880
chk_bytes = self.get_chk_bytes()
881
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
882
lines = chk_inv.to_lines()
885
'revision_id: foo\n',
886
'root_id: TREE_ROOT\n',
887
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
888
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
890
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
891
self.assertEqual('plain', chk_inv._search_key_name)
893
def test_captures_parent_id_basename_index(self):
895
inv.revision_id = "foo"
896
inv.root.revision = "bar"
897
chk_bytes = self.get_chk_bytes()
898
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
899
lines = chk_inv.to_lines()
902
'revision_id: foo\n',
903
'root_id: TREE_ROOT\n',
904
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
905
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
907
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
908
self.assertEqual('plain', chk_inv._search_key_name)
910
def test_captures_search_key_name(self):
912
inv.revision_id = "foo"
913
inv.root.revision = "bar"
914
chk_bytes = self.get_chk_bytes()
915
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
916
search_key_name='hash-16-way')
917
lines = chk_inv.to_lines()
920
'search_key_name: hash-16-way\n',
921
'root_id: TREE_ROOT\n',
922
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
923
'revision_id: foo\n',
924
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
926
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
927
self.assertEqual('hash-16-way', chk_inv._search_key_name)
929
def test_directory_children_on_demand(self):
931
inv.revision_id = "revid"
932
inv.root.revision = "rootrev"
933
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
934
inv["fileid"].revision = "filerev"
935
inv["fileid"].executable = True
936
inv["fileid"].text_sha1 = "ffff"
937
inv["fileid"].text_size = 1
938
chk_bytes = self.get_chk_bytes()
939
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
940
bytes = ''.join(chk_inv.to_lines())
941
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
942
root_entry = new_inv[inv.root.file_id]
943
self.assertEqual(None, root_entry._children)
944
self.assertEqual({'file'}, set(root_entry.children))
945
file_direct = new_inv["fileid"]
946
file_found = root_entry.children['file']
947
self.assertEqual(file_direct.kind, file_found.kind)
948
self.assertEqual(file_direct.file_id, file_found.file_id)
949
self.assertEqual(file_direct.parent_id, file_found.parent_id)
950
self.assertEqual(file_direct.name, file_found.name)
951
self.assertEqual(file_direct.revision, file_found.revision)
952
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
953
self.assertEqual(file_direct.text_size, file_found.text_size)
954
self.assertEqual(file_direct.executable, file_found.executable)
956
def test_from_inventory_maximum_size(self):
957
# from_inventory supports the maximum_size parameter.
959
inv.revision_id = "revid"
960
inv.root.revision = "rootrev"
961
chk_bytes = self.get_chk_bytes()
962
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
963
chk_inv.id_to_entry._ensure_root()
964
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
965
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
966
p_id_basename = chk_inv.parent_id_basename_to_file_id
967
p_id_basename._ensure_root()
968
self.assertEqual(120, p_id_basename._root_node.maximum_size)
969
self.assertEqual(2, p_id_basename._root_node._key_width)
971
def test___iter__(self):
973
inv.revision_id = "revid"
974
inv.root.revision = "rootrev"
975
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
976
inv["fileid"].revision = "filerev"
977
inv["fileid"].executable = True
978
inv["fileid"].text_sha1 = "ffff"
979
inv["fileid"].text_size = 1
980
chk_bytes = self.get_chk_bytes()
981
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
982
bytes = ''.join(chk_inv.to_lines())
983
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
984
fileids = sorted(new_inv.__iter__())
985
self.assertEqual([inv.root.file_id, "fileid"], fileids)
987
def test__len__(self):
989
inv.revision_id = "revid"
990
inv.root.revision = "rootrev"
991
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
992
inv["fileid"].revision = "filerev"
993
inv["fileid"].executable = True
994
inv["fileid"].text_sha1 = "ffff"
995
inv["fileid"].text_size = 1
996
chk_bytes = self.get_chk_bytes()
997
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
998
self.assertEqual(2, len(chk_inv))
1000
def test___getitem__(self):
1002
inv.revision_id = "revid"
1003
inv.root.revision = "rootrev"
1004
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1005
inv["fileid"].revision = "filerev"
1006
inv["fileid"].executable = True
1007
inv["fileid"].text_sha1 = "ffff"
1008
inv["fileid"].text_size = 1
1009
chk_bytes = self.get_chk_bytes()
1010
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1011
bytes = ''.join(chk_inv.to_lines())
1012
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1013
root_entry = new_inv[inv.root.file_id]
1014
file_entry = new_inv["fileid"]
1015
self.assertEqual("directory", root_entry.kind)
1016
self.assertEqual(inv.root.file_id, root_entry.file_id)
1017
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
1018
self.assertEqual(inv.root.name, root_entry.name)
1019
self.assertEqual("rootrev", root_entry.revision)
1020
self.assertEqual("file", file_entry.kind)
1021
self.assertEqual("fileid", file_entry.file_id)
1022
self.assertEqual(inv.root.file_id, file_entry.parent_id)
1023
self.assertEqual("file", file_entry.name)
1024
self.assertEqual("filerev", file_entry.revision)
1025
self.assertEqual("ffff", file_entry.text_sha1)
1026
self.assertEqual(1, file_entry.text_size)
1027
self.assertEqual(True, file_entry.executable)
1028
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
1030
def test_has_id_true(self):
1032
inv.revision_id = "revid"
1033
inv.root.revision = "rootrev"
1034
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1035
inv["fileid"].revision = "filerev"
1036
inv["fileid"].executable = True
1037
inv["fileid"].text_sha1 = "ffff"
1038
inv["fileid"].text_size = 1
1039
chk_bytes = self.get_chk_bytes()
1040
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1041
self.assertTrue(chk_inv.has_id('fileid'))
1042
self.assertTrue(chk_inv.has_id(inv.root.file_id))
1044
def test_has_id_not(self):
1046
inv.revision_id = "revid"
1047
inv.root.revision = "rootrev"
1048
chk_bytes = self.get_chk_bytes()
1049
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1050
self.assertFalse(chk_inv.has_id('fileid'))
1052
def test_id2path(self):
1054
inv.revision_id = "revid"
1055
inv.root.revision = "rootrev"
1056
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1057
fileentry = InventoryFile("fileid", "file", "dirid")
1060
inv["fileid"].revision = "filerev"
1061
inv["fileid"].executable = True
1062
inv["fileid"].text_sha1 = "ffff"
1063
inv["fileid"].text_size = 1
1064
inv["dirid"].revision = "filerev"
1065
chk_bytes = self.get_chk_bytes()
1066
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1067
bytes = ''.join(chk_inv.to_lines())
1068
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1069
self.assertEqual('', new_inv.id2path(inv.root.file_id))
1070
self.assertEqual('dir', new_inv.id2path('dirid'))
1071
self.assertEqual('dir/file', new_inv.id2path('fileid'))
1073
def test_path2id(self):
1075
inv.revision_id = "revid"
1076
inv.root.revision = "rootrev"
1077
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1078
fileentry = InventoryFile("fileid", "file", "dirid")
1081
inv["fileid"].revision = "filerev"
1082
inv["fileid"].executable = True
1083
inv["fileid"].text_sha1 = "ffff"
1084
inv["fileid"].text_size = 1
1085
inv["dirid"].revision = "filerev"
1086
chk_bytes = self.get_chk_bytes()
1087
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1088
bytes = ''.join(chk_inv.to_lines())
1089
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1090
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1091
self.assertEqual('dirid', new_inv.path2id('dir'))
1092
self.assertEqual('fileid', new_inv.path2id('dir/file'))
1094
def test_create_by_apply_delta_sets_root(self):
1096
inv.revision_id = "revid"
1097
chk_bytes = self.get_chk_bytes()
1098
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1099
inv.add_path("", "directory", "myrootid", None)
1100
inv.revision_id = "expectedid"
1101
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1102
delta = [("", None, base_inv.root.file_id, None),
1103
(None, "", "myrootid", inv.root)]
1104
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1105
self.assertEqual(reference_inv.root, new_inv.root)
1107
def test_create_by_apply_delta_empty_add_child(self):
1109
inv.revision_id = "revid"
1110
inv.root.revision = "rootrev"
1111
chk_bytes = self.get_chk_bytes()
1112
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1113
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1114
a_entry.revision = "filerev"
1115
a_entry.executable = True
1116
a_entry.text_sha1 = "ffff"
1117
a_entry.text_size = 1
1119
inv.revision_id = "expectedid"
1120
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1121
delta = [(None, "A", "A-id", a_entry)]
1122
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1123
# new_inv should be the same as reference_inv.
1124
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1125
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1126
reference_inv.id_to_entry._ensure_root()
1127
new_inv.id_to_entry._ensure_root()
1128
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1129
new_inv.id_to_entry._root_node._key)
1131
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1133
inv.revision_id = "revid"
1134
inv.root.revision = "rootrev"
1135
chk_bytes = self.get_chk_bytes()
1136
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1137
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1138
a_entry.revision = "filerev"
1139
a_entry.executable = True
1140
a_entry.text_sha1 = "ffff"
1141
a_entry.text_size = 1
1143
inv.revision_id = "expectedid"
1144
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1145
delta = [(None, "A", "A-id", a_entry)]
1146
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1147
reference_inv.id_to_entry._ensure_root()
1148
reference_inv.parent_id_basename_to_file_id._ensure_root()
1149
new_inv.id_to_entry._ensure_root()
1150
new_inv.parent_id_basename_to_file_id._ensure_root()
1151
# new_inv should be the same as reference_inv.
1152
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1153
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1154
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1155
new_inv.id_to_entry._root_node._key)
1156
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1157
new_inv.parent_id_basename_to_file_id._root_node._key)
1159
def test_iter_changes(self):
1160
# Low level bootstrapping smoke test; comprehensive generic tests via
1161
# InterTree are coming.
1163
inv.revision_id = "revid"
1164
inv.root.revision = "rootrev"
1165
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1166
inv["fileid"].revision = "filerev"
1167
inv["fileid"].executable = True
1168
inv["fileid"].text_sha1 = "ffff"
1169
inv["fileid"].text_size = 1
1171
inv2.revision_id = "revid2"
1172
inv2.root.revision = "rootrev"
1173
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1174
inv2["fileid"].revision = "filerev2"
1175
inv2["fileid"].executable = False
1176
inv2["fileid"].text_sha1 = "bbbb"
1177
inv2["fileid"].text_size = 2
1178
# get fresh objects.
1179
chk_bytes = self.get_chk_bytes()
1180
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1181
bytes = ''.join(chk_inv.to_lines())
1182
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1183
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1184
bytes = ''.join(chk_inv2.to_lines())
1185
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1186
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1187
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1189
list(inv_1.iter_changes(inv_2)))
1191
def test_parent_id_basename_to_file_id_index_enabled(self):
1193
inv.revision_id = "revid"
1194
inv.root.revision = "rootrev"
1195
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1196
inv["fileid"].revision = "filerev"
1197
inv["fileid"].executable = True
1198
inv["fileid"].text_sha1 = "ffff"
1199
inv["fileid"].text_size = 1
1200
# get fresh objects.
1201
chk_bytes = self.get_chk_bytes()
1202
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1203
bytes = ''.join(tmp_inv.to_lines())
1204
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1205
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1207
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1208
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1210
def test_file_entry_to_bytes(self):
1211
inv = CHKInventory(None)
1212
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1213
ie.executable = True
1214
ie.revision = 'file-rev-id'
1215
ie.text_sha1 = 'abcdefgh'
1217
bytes = inv._entry_to_bytes(ie)
1218
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1219
'file-rev-id\nabcdefgh\n100\nY', bytes)
1220
ie2 = inv._bytes_to_entry(bytes)
1221
self.assertEqual(ie, ie2)
1222
self.assertIsInstance(ie2.name, unicode)
1223
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1224
inv._bytes_to_utf8name_key(bytes))
1226
def test_file2_entry_to_bytes(self):
1227
inv = CHKInventory(None)
1229
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1230
ie.executable = False
1231
ie.revision = 'file-rev-id'
1232
ie.text_sha1 = '123456'
1234
bytes = inv._entry_to_bytes(ie)
1235
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1236
'file-rev-id\n123456\n25\nN', bytes)
1237
ie2 = inv._bytes_to_entry(bytes)
1238
self.assertEqual(ie, ie2)
1239
self.assertIsInstance(ie2.name, unicode)
1240
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1241
inv._bytes_to_utf8name_key(bytes))
1243
def test_dir_entry_to_bytes(self):
1244
inv = CHKInventory(None)
1245
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1246
ie.revision = 'dir-rev-id'
1247
bytes = inv._entry_to_bytes(ie)
1248
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1249
ie2 = inv._bytes_to_entry(bytes)
1250
self.assertEqual(ie, ie2)
1251
self.assertIsInstance(ie2.name, unicode)
1252
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1253
inv._bytes_to_utf8name_key(bytes))
1255
def test_dir2_entry_to_bytes(self):
1256
inv = CHKInventory(None)
1257
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1259
ie.revision = 'dir-rev-id'
1260
bytes = inv._entry_to_bytes(ie)
1261
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1262
'dir-rev-id', bytes)
1263
ie2 = inv._bytes_to_entry(bytes)
1264
self.assertEqual(ie, ie2)
1265
self.assertIsInstance(ie2.name, unicode)
1266
self.assertIs(ie2.parent_id, None)
1267
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1268
inv._bytes_to_utf8name_key(bytes))
1270
def test_symlink_entry_to_bytes(self):
1271
inv = CHKInventory(None)
1272
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1273
ie.revision = 'link-rev-id'
1274
ie.symlink_target = u'target/path'
1275
bytes = inv._entry_to_bytes(ie)
1276
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1277
'link-rev-id\ntarget/path', bytes)
1278
ie2 = inv._bytes_to_entry(bytes)
1279
self.assertEqual(ie, ie2)
1280
self.assertIsInstance(ie2.name, unicode)
1281
self.assertIsInstance(ie2.symlink_target, unicode)
1282
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1283
inv._bytes_to_utf8name_key(bytes))
1285
def test_symlink2_entry_to_bytes(self):
1286
inv = CHKInventory(None)
1287
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1288
ie.revision = 'link-rev-id'
1289
ie.symlink_target = u'target/\u03a9path'
1290
bytes = inv._entry_to_bytes(ie)
1291
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1292
'link-rev-id\ntarget/\xce\xa9path', bytes)
1293
ie2 = inv._bytes_to_entry(bytes)
1294
self.assertEqual(ie, ie2)
1295
self.assertIsInstance(ie2.name, unicode)
1296
self.assertIsInstance(ie2.symlink_target, unicode)
1297
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1298
inv._bytes_to_utf8name_key(bytes))
1300
def test_tree_reference_entry_to_bytes(self):
1301
inv = CHKInventory(None)
1302
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1304
ie.revision = 'tree-rev-id'
1305
ie.reference_revision = 'ref-rev-id'
1306
bytes = inv._entry_to_bytes(ie)
1307
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1308
'tree-rev-id\nref-rev-id', bytes)
1309
ie2 = inv._bytes_to_entry(bytes)
1310
self.assertEqual(ie, ie2)
1311
self.assertIsInstance(ie2.name, unicode)
1312
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1313
inv._bytes_to_utf8name_key(bytes))
1315
def make_basic_utf8_inventory(self):
1317
inv.revision_id = "revid"
1318
inv.root.revision = "rootrev"
1319
root_id = inv.root.file_id
1320
inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1321
inv["fileid"].revision = "filerev"
1322
inv["fileid"].text_sha1 = "ffff"
1323
inv["fileid"].text_size = 0
1324
inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1325
inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1326
inv["childid"].revision = "filerev"
1327
inv["childid"].text_sha1 = "ffff"
1328
inv["childid"].text_size = 0
1329
chk_bytes = self.get_chk_bytes()
1330
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1331
bytes = ''.join(chk_inv.to_lines())
1332
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1334
def test__preload_handles_utf8(self):
1335
new_inv = self.make_basic_utf8_inventory()
1336
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1337
self.assertFalse(new_inv._fully_cached)
1338
new_inv._preload_cache()
1340
sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1341
sorted(new_inv._fileid_to_entry_cache.keys()))
1342
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1343
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1344
sorted(ie_root._children.keys()))
1345
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1346
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1348
def test__preload_populates_cache(self):
1350
inv.revision_id = "revid"
1351
inv.root.revision = "rootrev"
1352
root_id = inv.root.file_id
1353
inv.add(InventoryFile("fileid", "file", root_id))
1354
inv["fileid"].revision = "filerev"
1355
inv["fileid"].executable = True
1356
inv["fileid"].text_sha1 = "ffff"
1357
inv["fileid"].text_size = 1
1358
inv.add(InventoryDirectory("dirid", "dir", root_id))
1359
inv.add(InventoryFile("childid", "child", "dirid"))
1360
inv["childid"].revision = "filerev"
1361
inv["childid"].executable = False
1362
inv["childid"].text_sha1 = "dddd"
1363
inv["childid"].text_size = 1
1364
chk_bytes = self.get_chk_bytes()
1365
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1366
bytes = ''.join(chk_inv.to_lines())
1367
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1368
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1369
self.assertFalse(new_inv._fully_cached)
1370
new_inv._preload_cache()
1372
sorted([root_id, "fileid", "dirid", "childid"]),
1373
sorted(new_inv._fileid_to_entry_cache.keys()))
1374
self.assertTrue(new_inv._fully_cached)
1375
ie_root = new_inv._fileid_to_entry_cache[root_id]
1376
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1377
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1378
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1380
def test__preload_handles_partially_evaluated_inventory(self):
1381
new_inv = self.make_basic_utf8_inventory()
1382
ie = new_inv[new_inv.root_id]
1383
self.assertIs(None, ie._children)
1384
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1385
sorted(ie.children.keys()))
1386
# Accessing .children loads _children
1387
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1388
sorted(ie._children.keys()))
1389
new_inv._preload_cache()
1391
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1392
sorted(ie._children.keys()))
1393
ie_dir = new_inv["dirid"]
1394
self.assertEqual([u'ch\xefld'],
1395
sorted(ie_dir._children.keys()))
1397
def test_filter_change_in_renamed_subfolder(self):
1398
inv = Inventory('tree-root')
1399
src_ie = inv.add_path('src', 'directory', 'src-id')
1400
inv.add_path('src/sub/', 'directory', 'sub-id')
1401
a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
1402
a_ie.text_sha1 = osutils.sha_string('content\n')
1403
a_ie.text_size = len('content\n')
1404
chk_bytes = self.get_chk_bytes()
1405
inv = CHKInventory.from_inventory(chk_bytes, inv)
1406
inv = inv.create_by_apply_delta([
1407
("src/sub/a", "src/sub/a", "a-id", a_ie),
1408
("src", "src2", "src-id", src_ie),
1410
new_inv = inv.filter(['a-id', 'src-id'])
1414
('src/sub', 'sub-id'),
1415
('src/sub/a', 'a-id'),
1416
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1418
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1420
def get_chk_bytes(self):
1421
factory = groupcompress.make_pack_factory(True, True, 1)
1422
trans = self.get_transport('')
1423
return factory(trans)
1425
def make_dir(self, inv, name, parent_id):
1426
inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1428
def make_file(self, inv, name, parent_id, content='content\n'):
1429
ie = inv.make_entry('file', name, parent_id, name + '-id')
1430
ie.text_sha1 = osutils.sha_string(content)
1431
ie.text_size = len(content)
1434
def make_simple_inventory(self):
1435
inv = Inventory('TREE_ROOT')
1436
inv.revision_id = "revid"
1437
inv.root.revision = "rootrev"
1440
# sub-file1 sub-file1-id
1441
# sub-file2 sub-file2-id
1442
# sub-dir1/ sub-dir1-id
1443
# subsub-file1 subsub-file1-id
1445
# sub2-file1 sub2-file1-id
1447
self.make_dir(inv, 'dir1', 'TREE_ROOT')
1448
self.make_dir(inv, 'dir2', 'TREE_ROOT')
1449
self.make_dir(inv, 'sub-dir1', 'dir1-id')
1450
self.make_file(inv, 'top', 'TREE_ROOT')
1451
self.make_file(inv, 'sub-file1', 'dir1-id')
1452
self.make_file(inv, 'sub-file2', 'dir1-id')
1453
self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1454
self.make_file(inv, 'sub2-file1', 'dir2-id')
1455
chk_bytes = self.get_chk_bytes()
1456
# use a small maximum_size to force internal paging structures
1457
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1459
search_key_name='hash-255-way')
1460
bytes = ''.join(chk_inv.to_lines())
1461
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1463
def assert_Getitems(self, expected_fileids, inv, file_ids):
1464
self.assertEqual(sorted(expected_fileids),
1465
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1467
def assertExpand(self, all_ids, inv, file_ids):
1469
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1470
self.assertEqual(set(all_ids), val_all_ids)
1471
entries = inv._getitems(val_all_ids)
1472
expected_children = {}
1473
for entry in entries:
1474
s = expected_children.setdefault(entry.parent_id, [])
1475
s.append(entry.file_id)
1476
val_children = dict((k, sorted(v)) for k, v
1477
in val_children.items())
1478
expected_children = dict((k, sorted(v)) for k, v
1479
in expected_children.items())
1480
self.assertEqual(expected_children, val_children)
1482
def test_make_simple_inventory(self):
1483
inv = self.make_simple_inventory()
1485
for path, entry in inv.iter_entries_by_dir():
1486
layout.append((path, entry.file_id))
1489
('dir1', 'dir1-id'),
1490
('dir2', 'dir2-id'),
1492
('dir1/sub-dir1', 'sub-dir1-id'),
1493
('dir1/sub-file1', 'sub-file1-id'),
1494
('dir1/sub-file2', 'sub-file2-id'),
1495
('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1496
('dir2/sub2-file1', 'sub2-file1-id'),
1499
def test__getitems(self):
1500
inv = self.make_simple_inventory()
1502
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1503
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1504
self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1506
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1508
self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1509
['dir1-id', 'sub-file2-id'])
1510
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1511
self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1513
def test_single_file(self):
1514
inv = self.make_simple_inventory()
1515
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1517
def test_get_all_parents(self):
1518
inv = self.make_simple_inventory()
1519
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1521
], inv, ['subsub-file1-id'])
1523
def test_get_children(self):
1524
inv = self.make_simple_inventory()
1525
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1526
'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1527
], inv, ['dir1-id'])
1529
def test_from_root(self):
1530
inv = self.make_simple_inventory()
1531
self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1532
'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1533
'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1535
def test_top_level_file(self):
1536
inv = self.make_simple_inventory()
1537
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1539
def test_subsub_file(self):
1540
inv = self.make_simple_inventory()
1541
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1542
'subsub-file1-id'], inv, ['subsub-file1-id'])
1544
def test_sub_and_root(self):
1545
inv = self.make_simple_inventory()
1546
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1547
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
1550
class TestMutableInventoryFromTree(TestCaseWithTransport):
1552
def test_empty(self):
1553
repository = self.make_repository('.')
1554
tree = repository.revision_tree(revision.NULL_REVISION)
1555
inv = mutable_inventory_from_tree(tree)
1556
self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1557
self.assertEqual(0, len(inv))
1559
def test_some_files(self):
1560
wt = self.make_branch_and_tree('.')
1561
self.build_tree(['a'])
1562
wt.add(['a'], ['thefileid'])
1563
revid = wt.commit("commit")
1564
tree = wt.branch.repository.revision_tree(revid)
1565
inv = mutable_inventory_from_tree(tree)
1566
self.assertEqual(revid, inv.revision_id)
1567
self.assertEqual(2, len(inv))
1568
self.assertEqual("a", inv['thefileid'].name)
1569
# The inventory should be mutable and independent of
1571
self.assertFalse(tree.root_inventory['thefileid'].executable)
1572
inv['thefileid'].executable = True
1573
self.assertFalse(tree.root_inventory['thefileid'].executable)