1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
26
from ..sixish import text_type
32
from ..bzr.inventory import (
40
mutable_inventory_from_tree,
44
TestCaseWithTransport,
46
from .scenarios import load_tests_apply_scenarios
49
load_tests = load_tests_apply_scenarios
52
def delta_application_scenarios():
54
('Inventory', {'apply_delta':apply_inventory_Inventory}),
56
# Working tree basis delta application
57
# Repository add_inv_by_delta.
58
# Reduce form of the per_repository test logic - that logic needs to be
59
# be able to get /just/ repositories whereas these tests are fine with
60
# just creating trees.
62
for _, format in repository.format_registry.iteritems():
63
if format.supports_full_versioned_files:
64
scenarios.append((str(format.__name__), {
65
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
67
for getter in workingtree.format_registry._get_all_lazy():
73
pass # Format with unmet dependency
74
repo_fmt = format._matchingcontroldir.repository_format
75
if not repo_fmt.supports_full_versioned_files:
78
(str(format.__class__.__name__) + ".update_basis_by_delta", {
79
'apply_delta':apply_inventory_WT_basis,
82
(str(format.__class__.__name__) + ".apply_inventory_delta", {
83
'apply_delta':apply_inventory_WT,
88
def create_texts_for_inv(repo, inv):
89
for path, ie in inv.iter_entries():
91
lines = [b'a' * ie.text_size]
94
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
97
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
98
"""Apply delta to basis and return the result.
100
:param basis: An inventory to be used as the basis.
101
:param delta: The inventory delta to apply:
102
:return: An inventory resulting from the application.
104
basis.apply_delta(delta)
108
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
109
"""Apply delta to basis and return the result.
111
This sets the tree state to be basis, and then calls apply_inventory_delta.
113
:param basis: An inventory to be used as the basis.
114
:param delta: The inventory delta to apply:
115
:return: An inventory resulting from the application.
117
control = self.make_controldir('tree', format=self.format._matchingcontroldir)
118
control.create_repository()
119
control.create_branch()
120
tree = self.format.initialize(control)
123
tree._write_inventory(basis)
126
# Fresh object, reads disk again.
127
tree = tree.controldir.open_workingtree()
130
tree.apply_inventory_delta(delta)
133
# reload tree - ensure we get what was written.
134
tree = tree.controldir.open_workingtree()
136
self.addCleanup(tree.unlock)
137
if not invalid_delta:
139
return tree.root_inventory
142
def _create_repo_revisions(repo, basis, delta, invalid_delta):
143
repo.start_write_group()
145
rev = revision.Revision(b'basis', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = b'basis'
148
create_texts_for_inv(repo, basis)
149
repo.add_revision(b'basis', rev, basis)
151
# We don't want to apply the delta to the basis, because we expect
152
# the delta is invalid.
154
result_inv.revision_id = b'result'
155
target_entries = None
157
result_inv = basis.create_by_apply_delta(delta, b'result')
158
create_texts_for_inv(repo, result_inv)
159
target_entries = list(result_inv.iter_entries_by_dir())
160
rev = revision.Revision(b'result', timestamp=0, timezone=None,
161
message="", committer="foo@example.com")
162
repo.add_revision(b'result', rev, result_inv)
163
repo.commit_write_group()
165
repo.abort_write_group()
167
return target_entries
170
def _get_basis_entries(tree):
171
basis_tree = tree.basis_tree()
172
basis_tree.lock_read()
173
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
175
return basis_tree_entries
178
def _populate_different_tree(tree, basis, delta):
179
"""Put all entries into tree, but at a unique location."""
182
tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
183
for path, ie in basis.iter_entries_by_dir():
184
if ie.file_id in added_ids:
186
# We want a unique path for each of these, we use the file-id
187
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
188
added_ids.add(ie.file_id)
189
for old_path, new_path, file_id, ie in delta:
190
if file_id in added_ids:
192
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
195
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
196
"""Apply delta to basis and return the result.
198
This sets the parent and then calls update_basis_by_delta.
199
It also puts the basis in the repository under both 'basis' and 'result' to
200
allow safety checks made by the WT to succeed, and finally ensures that all
201
items in the delta with a new path are present in the WT before calling
202
update_basis_by_delta.
204
:param basis: An inventory to be used as the basis.
205
:param delta: The inventory delta to apply:
206
:return: An inventory resulting from the application.
208
control = test.make_controldir('tree', format=test.format._matchingcontroldir)
209
control.create_repository()
210
control.create_branch()
211
tree = test.format.initialize(control)
214
target_entries = _create_repo_revisions(tree.branch.repository, basis,
215
delta, invalid_delta)
216
# Set the basis state as the trees current state
217
tree._write_inventory(basis)
218
# This reads basis from the repo and puts it into the tree's local
219
# cache, if it has one.
220
tree.set_parent_ids([b'basis'])
223
# Fresh lock, reads disk again.
224
with tree.lock_write():
225
tree.update_basis_by_delta(b'result', delta)
226
if not invalid_delta:
228
# reload tree - ensure we get what was written.
229
tree = tree.controldir.open_workingtree()
230
basis_tree = tree.basis_tree()
231
basis_tree.lock_read()
232
test.addCleanup(basis_tree.unlock)
233
basis_inv = basis_tree.root_inventory
235
basis_entries = list(basis_inv.iter_entries_by_dir())
236
test.assertEqual(target_entries, basis_entries)
240
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
242
"""Apply delta to basis and return the result.
244
This inserts basis as a whole inventory and then uses
245
add_inventory_by_delta to add delta.
247
:param basis: An inventory to be used as the basis.
248
:param delta: The inventory delta to apply:
249
:return: An inventory resulting from the application.
251
format = self.format()
252
control = self.make_controldir('tree', format=format._matchingcontroldir)
253
repo = format.initialize(control)
254
with repo.lock_write():
255
repo.start_write_group()
257
rev = revision.Revision(b'basis', timestamp=0, timezone=None,
258
message="", committer="foo@example.com")
259
basis.revision_id = b'basis'
260
create_texts_for_inv(repo, basis)
261
repo.add_revision(b'basis', rev, basis)
262
repo.commit_write_group()
264
repo.abort_write_group()
266
with repo.lock_write():
267
repo.start_write_group()
269
inv_sha1 = repo.add_inventory_by_delta(b'basis', delta,
270
b'result', [b'basis'])
272
repo.abort_write_group()
275
repo.commit_write_group()
276
# Fresh lock, reads disk again.
277
repo = repo.controldir.open_repository()
279
self.addCleanup(repo.unlock)
280
return repo.get_inventory(b'result')
283
class TestInventoryUpdates(TestCase):
285
def test_creation_from_root_id(self):
286
# iff a root id is passed to the constructor, a root directory is made
287
inv = inventory.Inventory(root_id=b'tree-root')
288
self.assertNotEqual(None, inv.root)
289
self.assertEqual(b'tree-root', inv.root.file_id)
291
def test_add_path_of_root(self):
292
# if no root id is given at creation time, there is no root directory
293
inv = inventory.Inventory(root_id=None)
294
self.assertIs(None, inv.root)
295
# add a root entry by adding its path
296
ie = inv.add_path(u"", "directory", b"my-root")
297
ie.revision = b'test-rev'
298
self.assertEqual(b"my-root", ie.file_id)
299
self.assertIs(ie, inv.root)
301
def test_add_path(self):
302
inv = inventory.Inventory(root_id=b'tree_root')
303
ie = inv.add_path(u'hello', 'file', b'hello-id')
304
self.assertEqual(b'hello-id', ie.file_id)
305
self.assertEqual('file', ie.kind)
308
"""Make sure copy() works and creates a deep copy."""
309
inv = inventory.Inventory(root_id=b'some-tree-root')
310
ie = inv.add_path(u'hello', 'file', b'hello-id')
312
inv.root.file_id = b'some-new-root'
314
self.assertEqual(b'some-tree-root', inv2.root.file_id)
315
self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
317
def test_copy_empty(self):
318
"""Make sure an empty inventory can be copied."""
319
inv = inventory.Inventory(root_id=None)
321
self.assertIs(None, inv2.root)
323
def test_copy_copies_root_revision(self):
324
"""Make sure the revision of the root gets copied."""
325
inv = inventory.Inventory(root_id=b'someroot')
326
inv.root.revision = b'therev'
328
self.assertEqual(b'someroot', inv2.root.file_id)
329
self.assertEqual(b'therev', inv2.root.revision)
331
def test_create_tree_reference(self):
332
inv = inventory.Inventory(b'tree-root-123')
333
inv.add(TreeReference(
334
b'nested-id', 'nested', parent_id=b'tree-root-123',
335
revision=b'rev', reference_revision=b'rev2'))
337
def test_error_encoding(self):
338
inv = inventory.Inventory(b'tree-root')
339
inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
340
e = self.assertRaises(errors.InconsistentDelta, inv.add,
341
InventoryFile(b'b-id', u'\u1234', b'tree-root'))
342
self.assertContainsRe(str(e), '\\u1234')
344
def test_add_recursive(self):
345
parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
346
child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
347
parent.children[child.file_id] = child
348
inv = inventory.Inventory(b'tree-root')
350
self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
354
class TestDeltaApplication(TestCaseWithTransport):
356
scenarios = delta_application_scenarios()
358
def get_empty_inventory(self, reference_inv=None):
359
"""Get an empty inventory.
361
Note that tests should not depend on the revision of the root for
362
setting up test conditions, as it has to be flexible to accomodate non
363
rich root repositories.
365
:param reference_inv: If not None, get the revision for the root from
366
this inventory. This is useful for dealing with older repositories
367
that routinely discarded the root entry data. If None, the root's
368
revision is set to 'basis'.
370
inv = inventory.Inventory()
371
if reference_inv is not None:
372
inv.root.revision = reference_inv.root.revision
374
inv.root.revision = b'basis'
377
def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
378
ie_file = inventory.InventoryFile(file_id, name, parent_id)
379
ie_file.revision = b'result'
380
ie_file.text_size = 0
381
ie_file.text_sha1 = b''
384
def test_empty_delta(self):
385
inv = self.get_empty_inventory()
387
inv = self.apply_delta(self, inv, delta)
388
inv2 = self.get_empty_inventory(inv)
389
self.assertEqual([], inv2._make_delta(inv))
391
def test_None_file_id(self):
392
inv = self.get_empty_inventory()
393
dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
395
dir1.revision = b'result'
396
delta = [(None, u'dir1', None, dir1)]
397
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
400
def test_unicode_file_id(self):
401
inv = self.get_empty_inventory()
402
dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
403
dir1.file_id = u'dirid'
404
dir1.revision = b'result'
405
delta = [(None, u'dir1', dir1.file_id, dir1)]
406
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
409
def test_repeated_file_id(self):
410
inv = self.get_empty_inventory()
411
file1 = inventory.InventoryFile(b'id', 'path1', inv.root.file_id)
412
file1.revision = b'result'
414
file1.text_sha1 = b""
417
delta = [(None, u'path1', b'id', file1), (None, u'path2', b'id', file2)]
418
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
421
def test_repeated_new_path(self):
422
inv = self.get_empty_inventory()
423
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
424
file1.revision = b'result'
426
file1.text_sha1 = b""
428
file2.file_id = b'id2'
429
delta = [(None, u'path', b'id1', file1), (None, u'path', b'id2', file2)]
430
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
433
def test_repeated_old_path(self):
434
inv = self.get_empty_inventory()
435
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
436
file1.revision = b'result'
438
file1.text_sha1 = b""
439
# We can't *create* a source inventory with the same path, but
440
# a badly generated partial delta might claim the same source twice.
441
# This would be buggy in two ways: the path is repeated in the delta,
442
# And the path for one of the file ids doesn't match the source
443
# location. Alternatively, we could have a repeated fileid, but that
444
# is separately checked for.
445
file2 = inventory.InventoryFile(b'id2', 'path2', inv.root.file_id)
446
file2.revision = b'result'
448
file2.text_sha1 = b""
451
delta = [(u'path', None, b'id1', None), (u'path', None, b'id2', None)]
452
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
455
def test_mismatched_id_entry_id(self):
456
inv = self.get_empty_inventory()
457
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
458
file1.revision = b'result'
460
file1.text_sha1 = b""
461
delta = [(None, u'path', b'id', file1)]
462
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
465
def test_mismatched_new_path_entry_None(self):
466
inv = self.get_empty_inventory()
467
delta = [(None, u'path', b'id', None)]
468
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
471
def test_mismatched_new_path_None_entry(self):
472
inv = self.get_empty_inventory()
473
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
474
file1.revision = b'result'
476
file1.text_sha1 = b""
477
delta = [(u"path", None, b'id1', file1)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_parent_is_not_directory(self):
482
inv = self.get_empty_inventory()
483
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
484
file1.revision = b'result'
486
file1.text_sha1 = b""
487
file2 = inventory.InventoryFile(b'id2', 'path2', b'id1')
488
file2.revision = b'result'
490
file2.text_sha1 = b""
492
delta = [(None, u'path/path2', b'id2', file2)]
493
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
496
def test_parent_is_missing(self):
497
inv = self.get_empty_inventory()
498
file2 = inventory.InventoryFile(b'id2', 'path2', b'missingparent')
499
file2.revision = b'result'
501
file2.text_sha1 = b""
502
delta = [(None, u'path/path2', b'id2', file2)]
503
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
506
def test_new_parent_path_has_wrong_id(self):
507
inv = self.get_empty_inventory()
508
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
509
parent1.revision = b'result'
510
parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
511
parent2.revision = b'result'
512
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
513
file1.revision = b'result'
515
file1.text_sha1 = b""
518
# This delta claims that file1 is at dir/path, but actually its at
519
# dir2/path if you follow the inventory parent structure.
520
delta = [(None, u'dir/path', b'id', file1)]
521
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
524
def test_old_parent_path_is_wrong(self):
525
inv = self.get_empty_inventory()
526
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
527
parent1.revision = b'result'
528
parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
529
parent2.revision = b'result'
530
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
531
file1.revision = b'result'
533
file1.text_sha1 = b""
537
# This delta claims that file1 was at dir/path, but actually it was at
538
# dir2/path if you follow the inventory parent structure.
539
delta = [(u'dir/path', None, b'id', None)]
540
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
543
def test_old_parent_path_is_for_other_id(self):
544
inv = self.get_empty_inventory()
545
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
546
parent1.revision = b'result'
547
parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
548
parent2.revision = b'result'
549
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
550
file1.revision = b'result'
552
file1.text_sha1 = b""
553
file2 = inventory.InventoryFile(b'id2', 'path', b'p-1')
554
file2.revision = b'result'
556
file2.text_sha1 = b""
561
# This delta claims that file1 was at dir/path, but actually it was at
562
# dir2/path if you follow the inventory parent structure. At dir/path
563
# is another entry we should not delete.
564
delta = [(u'dir/path', None, b'id', None)]
565
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
568
def test_add_existing_id_new_path(self):
569
inv = self.get_empty_inventory()
570
parent1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
571
parent1.revision = b'result'
572
parent2 = inventory.InventoryDirectory(b'p-1', 'dir2', inv.root.file_id)
573
parent2.revision = b'result'
575
delta = [(None, u'dir2', b'p-1', parent2)]
576
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
579
def test_add_new_id_existing_path(self):
580
inv = self.get_empty_inventory()
581
parent1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
582
parent1.revision = b'result'
583
parent2 = inventory.InventoryDirectory(b'p-2', 'dir1', inv.root.file_id)
584
parent2.revision = b'result'
586
delta = [(None, u'dir1', b'p-2', parent2)]
587
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
590
def test_remove_dir_leaving_dangling_child(self):
591
inv = self.get_empty_inventory()
592
dir1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
593
dir1.revision = b'result'
594
dir2 = inventory.InventoryDirectory(b'p-2', 'child1', b'p-1')
595
dir2.revision = b'result'
596
dir3 = inventory.InventoryDirectory(b'p-3', 'child2', b'p-1')
597
dir3.revision = b'result'
601
delta = [(u'dir1', None, b'p-1', None),
602
(u'dir1/child2', None, b'p-3', None)]
603
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
606
def test_add_file(self):
607
inv = self.get_empty_inventory()
608
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
609
file1.revision = b'result'
611
file1.text_sha1 = b''
612
delta = [(None, u'path', b'file-id', file1)]
613
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
614
self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
616
def test_remove_file(self):
617
inv = self.get_empty_inventory()
618
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
619
file1.revision = b'result'
621
file1.text_sha1 = b''
623
delta = [(u'path', None, b'file-id', None)]
624
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
625
self.assertEqual(None, res_inv.path2id('path'))
626
self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
628
def test_rename_file(self):
629
inv = self.get_empty_inventory()
630
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
632
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
633
delta = [(u'path', 'path2', b'file-id', file2)]
634
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
635
self.assertEqual(None, res_inv.path2id('path'))
636
self.assertEqual(b'file-id', res_inv.path2id('path2'))
638
def test_replaced_at_new_path(self):
639
inv = self.get_empty_inventory()
640
file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
642
file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
643
delta = [(u'name', None, b'id1', None),
644
(None, u'name', b'id2', file2)]
645
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
646
self.assertEqual(b'id2', res_inv.path2id('name'))
648
def test_rename_dir(self):
649
inv = self.get_empty_inventory()
650
dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
651
dir1.revision = b'basis'
652
file1 = self.make_file_ie(parent_id=b'dir-id')
655
dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
656
dir2.revision = b'result'
657
delta = [('dir1', 'dir2', b'dir-id', dir2)]
658
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
659
# The file should be accessible under the new path
660
self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
662
def test_renamed_dir_with_renamed_child(self):
663
inv = self.get_empty_inventory()
664
dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
665
dir1.revision = b'basis'
666
file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
667
file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
671
dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
672
dir2.revision = b'result'
673
file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
674
delta = [('dir1', 'dir2', b'dir-id', dir2),
675
('dir1/name2', 'name2', b'file-id-2', file2b)]
676
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
677
# The file should be accessible under the new path
678
self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
679
self.assertEqual(None, res_inv.path2id('dir2/name2'))
680
self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
682
def test_is_root(self):
683
"""Ensure our root-checking code is accurate."""
684
inv = inventory.Inventory(b'TREE_ROOT')
685
self.assertTrue(inv.is_root(b'TREE_ROOT'))
686
self.assertFalse(inv.is_root(b'booga'))
687
inv.root.file_id = b'booga'
688
self.assertFalse(inv.is_root(b'TREE_ROOT'))
689
self.assertTrue(inv.is_root(b'booga'))
690
# works properly even if no root is set
692
self.assertFalse(inv.is_root(b'TREE_ROOT'))
693
self.assertFalse(inv.is_root(b'booga'))
695
def test_entries_for_empty_inventory(self):
696
"""Test that entries() will not fail for an empty inventory"""
697
inv = Inventory(root_id=None)
698
self.assertEqual([], inv.entries())
701
class TestInventoryEntry(TestCase):
703
def test_file_invalid_entry_name(self):
704
self.assertRaises(errors.InvalidEntryName, inventory.InventoryFile,
705
b'123', 'a/hello.c', ROOT_ID)
707
def test_file_backslash(self):
708
file = inventory.InventoryFile(b'123', 'h\\ello.c', ROOT_ID)
709
self.assertEquals(file.name, 'h\\ello.c')
711
def test_file_kind_character(self):
712
file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
713
self.assertEqual(file.kind_character(), '')
715
def test_dir_kind_character(self):
716
dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
717
self.assertEqual(dir.kind_character(), '/')
719
def test_link_kind_character(self):
720
dir = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
721
self.assertEqual(dir.kind_character(), '')
723
def test_link_kind_character(self):
724
dir = TreeReference(b'123', 'hello.c', ROOT_ID)
725
self.assertEqual(dir.kind_character(), '+')
727
def test_dir_detect_changes(self):
728
left = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
729
right = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
730
self.assertEqual((False, False), left.detect_changes(right))
731
self.assertEqual((False, False), right.detect_changes(left))
733
def test_file_detect_changes(self):
734
left = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
736
right = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
737
right.text_sha1 = 123
738
self.assertEqual((False, False), left.detect_changes(right))
739
self.assertEqual((False, False), right.detect_changes(left))
740
left.executable = True
741
self.assertEqual((False, True), left.detect_changes(right))
742
self.assertEqual((False, True), right.detect_changes(left))
743
right.text_sha1 = 321
744
self.assertEqual((True, True), left.detect_changes(right))
745
self.assertEqual((True, True), right.detect_changes(left))
747
def test_symlink_detect_changes(self):
748
left = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
749
left.symlink_target='foo'
750
right = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
751
right.symlink_target='foo'
752
self.assertEqual((False, False), left.detect_changes(right))
753
self.assertEqual((False, False), right.detect_changes(left))
754
left.symlink_target = 'different'
755
self.assertEqual((True, False), left.detect_changes(right))
756
self.assertEqual((True, False), right.detect_changes(left))
758
def test_file_has_text(self):
759
file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
760
self.assertTrue(file.has_text())
762
def test_directory_has_text(self):
763
dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
764
self.assertFalse(dir.has_text())
766
def test_link_has_text(self):
767
link = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
768
self.assertFalse(link.has_text())
770
def test_make_entry(self):
771
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
772
inventory.InventoryFile)
773
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
774
inventory.InventoryLink)
775
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
776
inventory.InventoryDirectory)
778
def test_make_entry_non_normalized(self):
779
orig_normalized_filename = osutils.normalized_filename
782
osutils.normalized_filename = osutils._accessible_normalized_filename
783
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
784
self.assertEqual(u'\xe5', entry.name)
785
self.assertIsInstance(entry, inventory.InventoryFile)
787
osutils.normalized_filename = osutils._inaccessible_normalized_filename
788
self.assertRaises(errors.InvalidNormalization,
789
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
791
osutils.normalized_filename = orig_normalized_filename
794
class TestDescribeChanges(TestCase):
796
def test_describe_change(self):
797
# we need to test the following change combinations:
803
# renamed/reparented and modified
804
# change kind (perhaps can't be done yet?)
805
# also, merged in combination with all of these?
806
old_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
807
old_a.text_sha1 = b'123132'
809
new_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
810
new_a.text_sha1 = b'123132'
813
self.assertChangeDescription('unchanged', old_a, new_a)
816
new_a.text_sha1 = b'abcabc'
817
self.assertChangeDescription('modified', old_a, new_a)
819
self.assertChangeDescription('added', None, new_a)
820
self.assertChangeDescription('removed', old_a, None)
821
# perhaps a bit questionable but seems like the most reasonable thing...
822
self.assertChangeDescription('unchanged', None, None)
824
# in this case it's both renamed and modified; show a rename and
826
new_a.name = 'newfilename'
827
self.assertChangeDescription('modified and renamed', old_a, new_a)
829
# reparenting is 'renaming'
830
new_a.name = old_a.name
831
new_a.parent_id = b'somedir-id'
832
self.assertChangeDescription('modified and renamed', old_a, new_a)
834
# reset the content values so its not modified
835
new_a.text_size = old_a.text_size
836
new_a.text_sha1 = old_a.text_sha1
837
new_a.name = old_a.name
839
new_a.name = 'newfilename'
840
self.assertChangeDescription('renamed', old_a, new_a)
842
# reparenting is 'renaming'
843
new_a.name = old_a.name
844
new_a.parent_id = b'somedir-id'
845
self.assertChangeDescription('renamed', old_a, new_a)
847
def assertChangeDescription(self, expected_change, old_ie, new_ie):
848
change = InventoryEntry.describe_change(old_ie, new_ie)
849
self.assertEqual(expected_change, change)
852
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
854
def get_chk_bytes(self):
855
factory = groupcompress.make_pack_factory(True, True, 1)
856
trans = self.get_transport('')
857
return factory(trans)
859
def read_bytes(self, chk_bytes, key):
860
stream = chk_bytes.get_record_stream([key], 'unordered', True)
861
return next(stream).get_bytes_as("fulltext")
863
def test_deserialise_gives_CHKInventory(self):
865
inv.revision_id = b"revid"
866
inv.root.revision = b"rootrev"
867
chk_bytes = self.get_chk_bytes()
868
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
869
bytes = b''.join(chk_inv.to_lines())
870
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
871
self.assertEqual(b"revid", new_inv.revision_id)
872
self.assertEqual("directory", new_inv.root.kind)
873
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
874
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
875
self.assertEqual(inv.root.name, new_inv.root.name)
876
self.assertEqual(b"rootrev", new_inv.root.revision)
877
self.assertEqual(b'plain', new_inv._search_key_name)
879
def test_deserialise_wrong_revid(self):
881
inv.revision_id = b"revid"
882
inv.root.revision = b"rootrev"
883
chk_bytes = self.get_chk_bytes()
884
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
885
bytes = b''.join(chk_inv.to_lines())
886
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
889
def test_captures_rev_root_byid(self):
891
inv.revision_id = b"foo"
892
inv.root.revision = b"bar"
893
chk_bytes = self.get_chk_bytes()
894
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
895
lines = chk_inv.to_lines()
898
b'revision_id: foo\n',
899
b'root_id: TREE_ROOT\n',
900
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
901
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
903
chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
904
self.assertEqual(b'plain', chk_inv._search_key_name)
906
def test_captures_parent_id_basename_index(self):
908
inv.revision_id = b"foo"
909
inv.root.revision = b"bar"
910
chk_bytes = self.get_chk_bytes()
911
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
912
lines = chk_inv.to_lines()
915
b'revision_id: foo\n',
916
b'root_id: TREE_ROOT\n',
917
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
918
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
920
chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
921
self.assertEqual(b'plain', chk_inv._search_key_name)
923
def test_captures_search_key_name(self):
925
inv.revision_id = b"foo"
926
inv.root.revision = b"bar"
927
chk_bytes = self.get_chk_bytes()
928
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
929
search_key_name=b'hash-16-way')
930
lines = chk_inv.to_lines()
933
b'search_key_name: hash-16-way\n',
934
b'root_id: TREE_ROOT\n',
935
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
936
b'revision_id: foo\n',
937
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
939
chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
940
self.assertEqual(b'hash-16-way', chk_inv._search_key_name)
942
def test_directory_children_on_demand(self):
944
inv.revision_id = b"revid"
945
inv.root.revision = b"rootrev"
946
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
947
inv.get_entry(b"fileid").revision = b"filerev"
948
inv.get_entry(b"fileid").executable = True
949
inv.get_entry(b"fileid").text_sha1 = b"ffff"
950
inv.get_entry(b"fileid").text_size = 1
951
chk_bytes = self.get_chk_bytes()
952
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
953
bytes = b''.join(chk_inv.to_lines())
954
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
955
root_entry = new_inv.get_entry(inv.root.file_id)
956
self.assertEqual(None, root_entry._children)
957
self.assertEqual({'file'}, set(root_entry.children))
958
file_direct = new_inv.get_entry(b"fileid")
959
file_found = root_entry.children['file']
960
self.assertEqual(file_direct.kind, file_found.kind)
961
self.assertEqual(file_direct.file_id, file_found.file_id)
962
self.assertEqual(file_direct.parent_id, file_found.parent_id)
963
self.assertEqual(file_direct.name, file_found.name)
964
self.assertEqual(file_direct.revision, file_found.revision)
965
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
966
self.assertEqual(file_direct.text_size, file_found.text_size)
967
self.assertEqual(file_direct.executable, file_found.executable)
969
def test_from_inventory_maximum_size(self):
970
# from_inventory supports the maximum_size parameter.
972
inv.revision_id = b"revid"
973
inv.root.revision = b"rootrev"
974
chk_bytes = self.get_chk_bytes()
975
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
976
chk_inv.id_to_entry._ensure_root()
977
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
978
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
979
p_id_basename = chk_inv.parent_id_basename_to_file_id
980
p_id_basename._ensure_root()
981
self.assertEqual(120, p_id_basename._root_node.maximum_size)
982
self.assertEqual(2, p_id_basename._root_node._key_width)
984
def test_iter_all_ids(self):
986
inv.revision_id = b"revid"
987
inv.root.revision = b"rootrev"
988
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
989
inv.get_entry(b"fileid").revision = b"filerev"
990
inv.get_entry(b"fileid").executable = True
991
inv.get_entry(b"fileid").text_sha1 = b"ffff"
992
inv.get_entry(b"fileid").text_size = 1
993
chk_bytes = self.get_chk_bytes()
994
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
995
bytes = b''.join(chk_inv.to_lines())
996
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
997
fileids = sorted(new_inv.iter_all_ids())
998
self.assertEqual([inv.root.file_id, b"fileid"], fileids)
1000
def test__len__(self):
1002
inv.revision_id = b"revid"
1003
inv.root.revision = b"rootrev"
1004
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1005
inv.get_entry(b"fileid").revision = b"filerev"
1006
inv.get_entry(b"fileid").executable = True
1007
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1008
inv.get_entry(b"fileid").text_size = 1
1009
chk_bytes = self.get_chk_bytes()
1010
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1011
self.assertEqual(2, len(chk_inv))
1013
def test_get_entry(self):
1015
inv.revision_id = b"revid"
1016
inv.root.revision = b"rootrev"
1017
inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
1018
inv.get_entry(b"fileid").revision = b"filerev"
1019
inv.get_entry(b"fileid").executable = True
1020
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1021
inv.get_entry(b"fileid").text_size = 1
1022
chk_bytes = self.get_chk_bytes()
1023
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1024
data = b''.join(chk_inv.to_lines())
1025
new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
1026
root_entry = new_inv.get_entry(inv.root.file_id)
1027
file_entry = new_inv.get_entry(b"fileid")
1028
self.assertEqual("directory", root_entry.kind)
1029
self.assertEqual(inv.root.file_id, root_entry.file_id)
1030
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
1031
self.assertEqual(inv.root.name, root_entry.name)
1032
self.assertEqual(b"rootrev", root_entry.revision)
1033
self.assertEqual("file", file_entry.kind)
1034
self.assertEqual(b"fileid", file_entry.file_id)
1035
self.assertEqual(inv.root.file_id, file_entry.parent_id)
1036
self.assertEqual(u"file", file_entry.name)
1037
self.assertEqual(b"filerev", file_entry.revision)
1038
self.assertEqual(b"ffff", file_entry.text_sha1)
1039
self.assertEqual(1, file_entry.text_size)
1040
self.assertEqual(True, file_entry.executable)
1041
self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
1043
def test_has_id_true(self):
1045
inv.revision_id = b"revid"
1046
inv.root.revision = b"rootrev"
1047
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1048
inv.get_entry(b"fileid").revision = b"filerev"
1049
inv.get_entry(b"fileid").executable = True
1050
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1051
inv.get_entry(b"fileid").text_size = 1
1052
chk_bytes = self.get_chk_bytes()
1053
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1054
self.assertTrue(chk_inv.has_id(b'fileid'))
1055
self.assertTrue(chk_inv.has_id(inv.root.file_id))
1057
def test_has_id_not(self):
1059
inv.revision_id = b"revid"
1060
inv.root.revision = b"rootrev"
1061
chk_bytes = self.get_chk_bytes()
1062
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1063
self.assertFalse(chk_inv.has_id(b'fileid'))
1065
def test_id2path(self):
1067
inv.revision_id = b"revid"
1068
inv.root.revision = b"rootrev"
1069
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1070
fileentry = InventoryFile(b"fileid", "file", b"dirid")
1073
inv.get_entry(b"fileid").revision = b"filerev"
1074
inv.get_entry(b"fileid").executable = True
1075
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1076
inv.get_entry(b"fileid").text_size = 1
1077
inv.get_entry(b"dirid").revision = b"filerev"
1078
chk_bytes = self.get_chk_bytes()
1079
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1080
bytes = b''.join(chk_inv.to_lines())
1081
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1082
self.assertEqual('', new_inv.id2path(inv.root.file_id))
1083
self.assertEqual('dir', new_inv.id2path(b'dirid'))
1084
self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
1086
def test_path2id(self):
1088
inv.revision_id = b"revid"
1089
inv.root.revision = b"rootrev"
1090
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1091
fileentry = InventoryFile(b"fileid", "file", b"dirid")
1094
inv.get_entry(b"fileid").revision = b"filerev"
1095
inv.get_entry(b"fileid").executable = True
1096
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1097
inv.get_entry(b"fileid").text_size = 1
1098
inv.get_entry(b"dirid").revision = b"filerev"
1099
chk_bytes = self.get_chk_bytes()
1100
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1101
bytes = b''.join(chk_inv.to_lines())
1102
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1103
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1104
self.assertEqual(b'dirid', new_inv.path2id('dir'))
1105
self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
1107
def test_create_by_apply_delta_sets_root(self):
1109
inv.root.revision = b"myrootrev"
1110
inv.revision_id = b"revid"
1111
chk_bytes = self.get_chk_bytes()
1112
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1113
inv.add_path("", "directory", b"myrootid", None)
1114
inv.revision_id = b"expectedid"
1115
inv.root.revision = b"myrootrev"
1116
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1117
delta = [("", None, base_inv.root.file_id, None),
1118
(None, "", b"myrootid", inv.root)]
1119
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1120
self.assertEqual(reference_inv.root, new_inv.root)
1122
def test_create_by_apply_delta_empty_add_child(self):
1124
inv.revision_id = b"revid"
1125
inv.root.revision = b"rootrev"
1126
chk_bytes = self.get_chk_bytes()
1127
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1128
a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
1129
a_entry.revision = b"filerev"
1130
a_entry.executable = True
1131
a_entry.text_sha1 = b"ffff"
1132
a_entry.text_size = 1
1134
inv.revision_id = b"expectedid"
1135
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1136
delta = [(None, "A", b"A-id", a_entry)]
1137
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1138
# new_inv should be the same as reference_inv.
1139
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1140
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1141
reference_inv.id_to_entry._ensure_root()
1142
new_inv.id_to_entry._ensure_root()
1143
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1144
new_inv.id_to_entry._root_node._key)
1146
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1148
inv.revision_id = b"revid"
1149
inv.root.revision = b"rootrev"
1150
chk_bytes = self.get_chk_bytes()
1151
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1152
a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
1153
a_entry.revision = b"filerev"
1154
a_entry.executable = True
1155
a_entry.text_sha1 = b"ffff"
1156
a_entry.text_size = 1
1158
inv.revision_id = b"expectedid"
1159
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1160
delta = [(None, "A", b"A-id", a_entry)]
1161
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1162
reference_inv.id_to_entry._ensure_root()
1163
reference_inv.parent_id_basename_to_file_id._ensure_root()
1164
new_inv.id_to_entry._ensure_root()
1165
new_inv.parent_id_basename_to_file_id._ensure_root()
1166
# new_inv should be the same as reference_inv.
1167
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1168
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1169
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1170
new_inv.id_to_entry._root_node._key)
1171
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1172
new_inv.parent_id_basename_to_file_id._root_node._key)
1174
def test_iter_changes(self):
1175
# Low level bootstrapping smoke test; comprehensive generic tests via
1176
# InterTree are coming.
1178
inv.revision_id = b"revid"
1179
inv.root.revision = b"rootrev"
1180
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1181
inv.get_entry(b"fileid").revision = b"filerev"
1182
inv.get_entry(b"fileid").executable = True
1183
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1184
inv.get_entry(b"fileid").text_size = 1
1186
inv2.revision_id = b"revid2"
1187
inv2.root.revision = b"rootrev"
1188
inv2.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1189
inv2.get_entry(b"fileid").revision = b"filerev2"
1190
inv2.get_entry(b"fileid").executable = False
1191
inv2.get_entry(b"fileid").text_sha1 = b"bbbb"
1192
inv2.get_entry(b"fileid").text_size = 2
1193
# get fresh objects.
1194
chk_bytes = self.get_chk_bytes()
1195
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1196
bytes = b''.join(chk_inv.to_lines())
1197
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1198
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1199
bytes = b''.join(chk_inv2.to_lines())
1200
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid2",))
1201
self.assertEqual([(b'fileid', (u'file', u'file'), True, (True, True),
1202
(b'TREE_ROOT', b'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1204
list(inv_1.iter_changes(inv_2)))
1206
def test_parent_id_basename_to_file_id_index_enabled(self):
1208
inv.revision_id = b"revid"
1209
inv.root.revision = b"rootrev"
1210
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1211
inv.get_entry(b"fileid").revision = b"filerev"
1212
inv.get_entry(b"fileid").executable = True
1213
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1214
inv.get_entry(b"fileid").text_size = 1
1215
# get fresh objects.
1216
chk_bytes = self.get_chk_bytes()
1217
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1218
bytes = b''.join(tmp_inv.to_lines())
1219
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1220
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1222
{(b'', b''): b'TREE_ROOT', (b'TREE_ROOT', b'file'): b'fileid'},
1223
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1225
def test_file_entry_to_bytes(self):
1226
inv = CHKInventory(None)
1227
ie = inventory.InventoryFile(b'file-id', 'filename', b'parent-id')
1228
ie.executable = True
1229
ie.revision = b'file-rev-id'
1230
ie.text_sha1 = b'abcdefgh'
1232
bytes = inv._entry_to_bytes(ie)
1233
self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
1234
b'file-rev-id\nabcdefgh\n100\nY', bytes)
1235
ie2 = inv._bytes_to_entry(bytes)
1236
self.assertEqual(ie, ie2)
1237
self.assertIsInstance(ie2.name, text_type)
1238
self.assertEqual((b'filename', b'file-id', b'file-rev-id'),
1239
inv._bytes_to_utf8name_key(bytes))
1241
def test_file2_entry_to_bytes(self):
1242
inv = CHKInventory(None)
1244
ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
1245
ie.executable = False
1246
ie.revision = b'file-rev-id'
1247
ie.text_sha1 = b'123456'
1249
bytes = inv._entry_to_bytes(ie)
1250
self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
1251
b'file-rev-id\n123456\n25\nN', bytes)
1252
ie2 = inv._bytes_to_entry(bytes)
1253
self.assertEqual(ie, ie2)
1254
self.assertIsInstance(ie2.name, text_type)
1255
self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
1256
inv._bytes_to_utf8name_key(bytes))
1258
def test_dir_entry_to_bytes(self):
1259
inv = CHKInventory(None)
1260
ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
1261
ie.revision = b'dir-rev-id'
1262
bytes = inv._entry_to_bytes(ie)
1263
self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1264
ie2 = inv._bytes_to_entry(bytes)
1265
self.assertEqual(ie, ie2)
1266
self.assertIsInstance(ie2.name, text_type)
1267
self.assertEqual((b'dirname', b'dir-id', b'dir-rev-id'),
1268
inv._bytes_to_utf8name_key(bytes))
1270
def test_dir2_entry_to_bytes(self):
1271
inv = CHKInventory(None)
1272
ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
1274
ie.revision = b'dir-rev-id'
1275
bytes = inv._entry_to_bytes(ie)
1276
self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
1277
b'dir-rev-id', bytes)
1278
ie2 = inv._bytes_to_entry(bytes)
1279
self.assertEqual(ie, ie2)
1280
self.assertIsInstance(ie2.name, text_type)
1281
self.assertIs(ie2.parent_id, None)
1282
self.assertEqual((b'dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
1283
inv._bytes_to_utf8name_key(bytes))
1285
def test_symlink_entry_to_bytes(self):
1286
inv = CHKInventory(None)
1287
ie = inventory.InventoryLink(b'link-id', 'linkname', b'parent-id')
1288
ie.revision = b'link-rev-id'
1289
ie.symlink_target = u'target/path'
1290
bytes = inv._entry_to_bytes(ie)
1291
self.assertEqual(b'symlink: link-id\nparent-id\nlinkname\n'
1292
b'link-rev-id\ntarget/path', bytes)
1293
ie2 = inv._bytes_to_entry(bytes)
1294
self.assertEqual(ie, ie2)
1295
self.assertIsInstance(ie2.name, text_type)
1296
self.assertIsInstance(ie2.symlink_target, text_type)
1297
self.assertEqual((b'linkname', b'link-id', b'link-rev-id'),
1298
inv._bytes_to_utf8name_key(bytes))
1300
def test_symlink2_entry_to_bytes(self):
1301
inv = CHKInventory(None)
1302
ie = inventory.InventoryLink(b'link-id', u'link\u03a9name', b'parent-id')
1303
ie.revision = b'link-rev-id'
1304
ie.symlink_target = u'target/\u03a9path'
1305
bytes = inv._entry_to_bytes(ie)
1306
self.assertEqual(b'symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1307
b'link-rev-id\ntarget/\xce\xa9path', bytes)
1308
ie2 = inv._bytes_to_entry(bytes)
1309
self.assertEqual(ie, ie2)
1310
self.assertIsInstance(ie2.name, text_type)
1311
self.assertIsInstance(ie2.symlink_target, text_type)
1312
self.assertEqual((b'link\xce\xa9name', b'link-id', b'link-rev-id'),
1313
inv._bytes_to_utf8name_key(bytes))
1315
def test_tree_reference_entry_to_bytes(self):
1316
inv = CHKInventory(None)
1317
ie = inventory.TreeReference(b'tree-root-id', u'tree\u03a9name',
1319
ie.revision = b'tree-rev-id'
1320
ie.reference_revision = b'ref-rev-id'
1321
bytes = inv._entry_to_bytes(ie)
1322
self.assertEqual(b'tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1323
b'tree-rev-id\nref-rev-id', bytes)
1324
ie2 = inv._bytes_to_entry(bytes)
1325
self.assertEqual(ie, ie2)
1326
self.assertIsInstance(ie2.name, text_type)
1327
self.assertEqual((b'tree\xce\xa9name', b'tree-root-id', b'tree-rev-id'),
1328
inv._bytes_to_utf8name_key(bytes))
1330
def make_basic_utf8_inventory(self):
1332
inv.revision_id = b"revid"
1333
inv.root.revision = b"rootrev"
1334
root_id = inv.root.file_id
1335
inv.add(InventoryFile(b"fileid", u'f\xefle', root_id))
1336
inv.get_entry(b"fileid").revision = b"filerev"
1337
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1338
inv.get_entry(b"fileid").text_size = 0
1339
inv.add(InventoryDirectory(b"dirid", u'dir-\N{EURO SIGN}', root_id))
1340
inv.get_entry(b"dirid").revision = b"dirrev"
1341
inv.add(InventoryFile(b"childid", u'ch\xefld', b"dirid"))
1342
inv.get_entry(b"childid").revision = b"filerev"
1343
inv.get_entry(b"childid").text_sha1 = b"ffff"
1344
inv.get_entry(b"childid").text_size = 0
1345
chk_bytes = self.get_chk_bytes()
1346
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1347
bytes = b''.join(chk_inv.to_lines())
1348
return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1350
def test__preload_handles_utf8(self):
1351
new_inv = self.make_basic_utf8_inventory()
1352
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1353
self.assertFalse(new_inv._fully_cached)
1354
new_inv._preload_cache()
1356
sorted([new_inv.root_id, b"fileid", b"dirid", b"childid"]),
1357
sorted(new_inv._fileid_to_entry_cache.keys()))
1358
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1359
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1360
sorted(ie_root._children.keys()))
1361
ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
1362
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1364
def test__preload_populates_cache(self):
1366
inv.revision_id = b"revid"
1367
inv.root.revision = b"rootrev"
1368
root_id = inv.root.file_id
1369
inv.add(InventoryFile(b"fileid", "file", root_id))
1370
inv.get_entry(b"fileid").revision = b"filerev"
1371
inv.get_entry(b"fileid").executable = True
1372
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1373
inv.get_entry(b"fileid").text_size = 1
1374
inv.add(InventoryDirectory(b"dirid", "dir", root_id))
1375
inv.get_entry(b"dirid").revision = b"dirrev"
1376
inv.add(InventoryFile(b"childid", "child", b"dirid"))
1377
inv.get_entry(b"childid").revision = b"filerev"
1378
inv.get_entry(b"childid").executable = False
1379
inv.get_entry(b"childid").text_sha1 = b"dddd"
1380
inv.get_entry(b"childid").text_size = 1
1381
chk_bytes = self.get_chk_bytes()
1382
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1383
bytes = b''.join(chk_inv.to_lines())
1384
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1385
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1386
self.assertFalse(new_inv._fully_cached)
1387
new_inv._preload_cache()
1389
sorted([root_id, b"fileid", b"dirid", b"childid"]),
1390
sorted(new_inv._fileid_to_entry_cache.keys()))
1391
self.assertTrue(new_inv._fully_cached)
1392
ie_root = new_inv._fileid_to_entry_cache[root_id]
1393
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1394
ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
1395
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1397
def test__preload_handles_partially_evaluated_inventory(self):
1398
new_inv = self.make_basic_utf8_inventory()
1399
ie = new_inv.get_entry(new_inv.root_id)
1400
self.assertIs(None, ie._children)
1401
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1402
sorted(ie.children.keys()))
1403
# Accessing .children loads _children
1404
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1405
sorted(ie._children.keys()))
1406
new_inv._preload_cache()
1408
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1409
sorted(ie._children.keys()))
1410
ie_dir = new_inv.get_entry(b"dirid")
1411
self.assertEqual([u'ch\xefld'],
1412
sorted(ie_dir._children.keys()))
1414
def test_filter_change_in_renamed_subfolder(self):
1415
inv = Inventory(b'tree-root')
1416
inv.root.revision = b'rootrev'
1417
src_ie = inv.add_path('src', 'directory', b'src-id')
1418
src_ie.revision = b'srcrev'
1419
sub_ie = inv.add_path('src/sub/', 'directory', b'sub-id')
1420
sub_ie.revision = b'subrev'
1421
a_ie = inv.add_path('src/sub/a', 'file', b'a-id')
1422
a_ie.revision = b'filerev'
1423
a_ie.text_sha1 = osutils.sha_string(b'content\n')
1424
a_ie.text_size = len(b'content\n')
1425
chk_bytes = self.get_chk_bytes()
1426
inv = CHKInventory.from_inventory(chk_bytes, inv)
1427
inv = inv.create_by_apply_delta([
1428
("src/sub/a", "src/sub/a", b"a-id", a_ie),
1429
("src", "src2", b"src-id", src_ie),
1431
new_inv = inv.filter([b'a-id', b'src-id'])
1435
('src/sub', b'sub-id'),
1436
('src/sub/a', b'a-id'),
1437
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1439
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1441
def get_chk_bytes(self):
1442
factory = groupcompress.make_pack_factory(True, True, 1)
1443
trans = self.get_transport('')
1444
return factory(trans)
1446
def make_dir(self, inv, name, parent_id, revision):
1447
ie = inv.make_entry('directory', name, parent_id, name.encode('utf-8') + b'-id')
1448
ie.revision = revision
1451
def make_file(self, inv, name, parent_id, revision, content=b'content\n'):
1452
ie = inv.make_entry('file', name, parent_id, name.encode('utf-8') + b'-id')
1453
ie.text_sha1 = osutils.sha_string(content)
1454
ie.text_size = len(content)
1455
ie.revision = revision
1458
def make_simple_inventory(self):
1459
inv = Inventory(b'TREE_ROOT')
1460
inv.revision_id = b"revid"
1461
inv.root.revision = b"rootrev"
1464
# sub-file1 sub-file1-id
1465
# sub-file2 sub-file2-id
1466
# sub-dir1/ sub-dir1-id
1467
# subsub-file1 subsub-file1-id
1469
# sub2-file1 sub2-file1-id
1471
self.make_dir(inv, 'dir1', b'TREE_ROOT', b'dirrev')
1472
self.make_dir(inv, 'dir2', b'TREE_ROOT', b'dirrev')
1473
self.make_dir(inv, 'sub-dir1', b'dir1-id', b'dirrev')
1474
self.make_file(inv, 'top', b'TREE_ROOT', b'filerev')
1475
self.make_file(inv, 'sub-file1', b'dir1-id', b'filerev')
1476
self.make_file(inv, 'sub-file2', b'dir1-id', b'filerev')
1477
self.make_file(inv, 'subsub-file1', b'sub-dir1-id', b'filerev')
1478
self.make_file(inv, 'sub2-file1', b'dir2-id', b'filerev')
1479
chk_bytes = self.get_chk_bytes()
1480
# use a small maximum_size to force internal paging structures
1481
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1483
search_key_name=b'hash-255-way')
1484
bytes = b''.join(chk_inv.to_lines())
1485
return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1487
def assert_Getitems(self, expected_fileids, inv, file_ids):
1488
self.assertEqual(sorted(expected_fileids),
1489
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1491
def assertExpand(self, all_ids, inv, file_ids):
1493
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1494
self.assertEqual(set(all_ids), val_all_ids)
1495
entries = inv._getitems(val_all_ids)
1496
expected_children = {}
1497
for entry in entries:
1498
s = expected_children.setdefault(entry.parent_id, [])
1499
s.append(entry.file_id)
1501
k: sorted(v) for k, v in val_children.items()}
1502
expected_children = {
1503
k: sorted(v) for k, v in expected_children.items()}
1504
self.assertEqual(expected_children, val_children)
1506
def test_make_simple_inventory(self):
1507
inv = self.make_simple_inventory()
1509
for path, entry in inv.iter_entries_by_dir():
1510
layout.append((path, entry.file_id))
1513
('dir1', b'dir1-id'),
1514
('dir2', b'dir2-id'),
1516
('dir1/sub-dir1', b'sub-dir1-id'),
1517
('dir1/sub-file1', b'sub-file1-id'),
1518
('dir1/sub-file2', b'sub-file2-id'),
1519
('dir1/sub-dir1/subsub-file1', b'subsub-file1-id'),
1520
('dir2/sub2-file1', b'sub2-file1-id'),
1523
def test__getitems(self):
1524
inv = self.make_simple_inventory()
1526
self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
1527
self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
1528
self.assertFalse(b'sub-file2-id' in inv._fileid_to_entry_cache)
1530
self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
1532
self.assert_Getitems([b'dir1-id', b'sub-file2-id'], inv,
1533
[b'dir1-id', b'sub-file2-id'])
1534
self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
1535
self.assertTrue(b'sub-file2-id' in inv._fileid_to_entry_cache)
1537
def test_single_file(self):
1538
inv = self.make_simple_inventory()
1539
self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
1541
def test_get_all_parents(self):
1542
inv = self.make_simple_inventory()
1543
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1545
], inv, [b'subsub-file1-id'])
1547
def test_get_children(self):
1548
inv = self.make_simple_inventory()
1549
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1550
b'sub-file1-id', b'sub-file2-id', b'subsub-file1-id',
1551
], inv, [b'dir1-id'])
1553
def test_from_root(self):
1554
inv = self.make_simple_inventory()
1555
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'dir2-id', b'sub-dir1-id',
1556
b'sub-file1-id', b'sub-file2-id', b'sub2-file1-id',
1557
b'subsub-file1-id', b'top-id'], inv, [b'TREE_ROOT'])
1559
def test_top_level_file(self):
1560
inv = self.make_simple_inventory()
1561
self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
1563
def test_subsub_file(self):
1564
inv = self.make_simple_inventory()
1565
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1566
b'subsub-file1-id'], inv, [b'subsub-file1-id'])
1568
def test_sub_and_root(self):
1569
inv = self.make_simple_inventory()
1570
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id', b'top-id',
1571
b'subsub-file1-id'], inv, [b'top-id', b'subsub-file1-id'])
1574
class TestMutableInventoryFromTree(TestCaseWithTransport):
1576
def test_empty(self):
1577
repository = self.make_repository('.')
1578
tree = repository.revision_tree(revision.NULL_REVISION)
1579
inv = mutable_inventory_from_tree(tree)
1580
self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1581
self.assertEqual(0, len(inv))
1583
def test_some_files(self):
1584
wt = self.make_branch_and_tree('.')
1585
self.build_tree(['a'])
1586
wt.add(['a'], [b'thefileid'])
1587
revid = wt.commit("commit")
1588
tree = wt.branch.repository.revision_tree(revid)
1589
inv = mutable_inventory_from_tree(tree)
1590
self.assertEqual(revid, inv.revision_id)
1591
self.assertEqual(2, len(inv))
1592
self.assertEqual("a", inv.get_entry(b'thefileid').name)
1593
# The inventory should be mutable and independent of
1595
self.assertFalse(tree.root_inventory.get_entry(b'thefileid').executable)
1596
inv.get_entry(b'thefileid').executable = True
1597
self.assertFalse(tree.root_inventory.get_entry(b'thefileid').executable)