1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
26
from ..sixish import text_type
32
from ..bzr.inventory import (
40
mutable_inventory_from_tree,
44
TestCaseWithTransport,
46
from .scenarios import load_tests_apply_scenarios
49
load_tests = load_tests_apply_scenarios
52
def delta_application_scenarios():
54
('Inventory', {'apply_delta':apply_inventory_Inventory}),
56
# Working tree basis delta application
57
# Repository add_inv_by_delta.
58
# Reduce form of the per_repository test logic - that logic needs to be
59
# be able to get /just/ repositories whereas these tests are fine with
60
# just creating trees.
62
for _, format in repository.format_registry.iteritems():
63
if format.supports_full_versioned_files:
64
scenarios.append((str(format.__name__), {
65
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
67
for getter in workingtree.format_registry._get_all_lazy():
73
pass # Format with unmet dependency
74
repo_fmt = format._matchingcontroldir.repository_format
75
if not repo_fmt.supports_full_versioned_files:
78
(str(format.__class__.__name__) + ".update_basis_by_delta", {
79
'apply_delta':apply_inventory_WT_basis,
82
(str(format.__class__.__name__) + ".apply_inventory_delta", {
83
'apply_delta':apply_inventory_WT,
88
def create_texts_for_inv(repo, inv):
89
for path, ie in inv.iter_entries():
91
lines = [b'a' * ie.text_size]
94
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
97
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
98
"""Apply delta to basis and return the result.
100
:param basis: An inventory to be used as the basis.
101
:param delta: The inventory delta to apply:
102
:return: An inventory resulting from the application.
104
basis.apply_delta(delta)
108
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
109
"""Apply delta to basis and return the result.
111
This sets the tree state to be basis, and then calls apply_inventory_delta.
113
:param basis: An inventory to be used as the basis.
114
:param delta: The inventory delta to apply:
115
:return: An inventory resulting from the application.
117
control = self.make_controldir('tree', format=self.format._matchingcontroldir)
118
control.create_repository()
119
control.create_branch()
120
tree = self.format.initialize(control)
123
tree._write_inventory(basis)
126
# Fresh object, reads disk again.
127
tree = tree.controldir.open_workingtree()
130
tree.apply_inventory_delta(delta)
133
# reload tree - ensure we get what was written.
134
tree = tree.controldir.open_workingtree()
136
self.addCleanup(tree.unlock)
137
if not invalid_delta:
139
return tree.root_inventory
142
def _create_repo_revisions(repo, basis, delta, invalid_delta):
143
repo.start_write_group()
145
rev = revision.Revision(b'basis', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = b'basis'
148
create_texts_for_inv(repo, basis)
149
repo.add_revision(b'basis', rev, basis)
151
# We don't want to apply the delta to the basis, because we expect
152
# the delta is invalid.
154
result_inv.revision_id = b'result'
155
target_entries = None
157
result_inv = basis.create_by_apply_delta(delta, b'result')
158
create_texts_for_inv(repo, result_inv)
159
target_entries = list(result_inv.iter_entries_by_dir())
160
rev = revision.Revision(b'result', timestamp=0, timezone=None,
161
message="", committer="foo@example.com")
162
repo.add_revision(b'result', rev, result_inv)
163
repo.commit_write_group()
165
repo.abort_write_group()
167
return target_entries
170
def _get_basis_entries(tree):
171
basis_tree = tree.basis_tree()
172
basis_tree.lock_read()
173
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
175
return basis_tree_entries
178
def _populate_different_tree(tree, basis, delta):
179
"""Put all entries into tree, but at a unique location."""
182
tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
183
for path, ie in basis.iter_entries_by_dir():
184
if ie.file_id in added_ids:
186
# We want a unique path for each of these, we use the file-id
187
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
188
added_ids.add(ie.file_id)
189
for old_path, new_path, file_id, ie in delta:
190
if file_id in added_ids:
192
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
195
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
196
"""Apply delta to basis and return the result.
198
This sets the parent and then calls update_basis_by_delta.
199
It also puts the basis in the repository under both 'basis' and 'result' to
200
allow safety checks made by the WT to succeed, and finally ensures that all
201
items in the delta with a new path are present in the WT before calling
202
update_basis_by_delta.
204
:param basis: An inventory to be used as the basis.
205
:param delta: The inventory delta to apply:
206
:return: An inventory resulting from the application.
208
control = test.make_controldir('tree', format=test.format._matchingcontroldir)
209
control.create_repository()
210
control.create_branch()
211
tree = test.format.initialize(control)
214
target_entries = _create_repo_revisions(tree.branch.repository, basis,
215
delta, invalid_delta)
216
# Set the basis state as the trees current state
217
tree._write_inventory(basis)
218
# This reads basis from the repo and puts it into the tree's local
219
# cache, if it has one.
220
tree.set_parent_ids([b'basis'])
223
# Fresh lock, reads disk again.
224
with tree.lock_write():
225
tree.update_basis_by_delta(b'result', delta)
226
if not invalid_delta:
228
# reload tree - ensure we get what was written.
229
tree = tree.controldir.open_workingtree()
230
basis_tree = tree.basis_tree()
231
basis_tree.lock_read()
232
test.addCleanup(basis_tree.unlock)
233
basis_inv = basis_tree.root_inventory
235
basis_entries = list(basis_inv.iter_entries_by_dir())
236
test.assertEqual(target_entries, basis_entries)
240
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
242
"""Apply delta to basis and return the result.
244
This inserts basis as a whole inventory and then uses
245
add_inventory_by_delta to add delta.
247
:param basis: An inventory to be used as the basis.
248
:param delta: The inventory delta to apply:
249
:return: An inventory resulting from the application.
251
format = self.format()
252
control = self.make_controldir('tree', format=format._matchingcontroldir)
253
repo = format.initialize(control)
254
with repo.lock_write():
255
repo.start_write_group()
257
rev = revision.Revision(b'basis', timestamp=0, timezone=None,
258
message="", committer="foo@example.com")
259
basis.revision_id = b'basis'
260
create_texts_for_inv(repo, basis)
261
repo.add_revision(b'basis', rev, basis)
262
repo.commit_write_group()
264
repo.abort_write_group()
266
with repo.lock_write():
267
repo.start_write_group()
269
inv_sha1 = repo.add_inventory_by_delta(b'basis', delta,
270
b'result', [b'basis'])
272
repo.abort_write_group()
275
repo.commit_write_group()
276
# Fresh lock, reads disk again.
277
repo = repo.controldir.open_repository()
279
self.addCleanup(repo.unlock)
280
return repo.get_inventory(b'result')
283
class TestInventoryUpdates(TestCase):
285
def test_creation_from_root_id(self):
286
# iff a root id is passed to the constructor, a root directory is made
287
inv = inventory.Inventory(root_id=b'tree-root')
288
self.assertNotEqual(None, inv.root)
289
self.assertEqual(b'tree-root', inv.root.file_id)
291
def test_add_path_of_root(self):
292
# if no root id is given at creation time, there is no root directory
293
inv = inventory.Inventory(root_id=None)
294
self.assertIs(None, inv.root)
295
# add a root entry by adding its path
296
ie = inv.add_path(u"", "directory", b"my-root")
297
ie.revision = b'test-rev'
298
self.assertEqual(b"my-root", ie.file_id)
299
self.assertIs(ie, inv.root)
301
def test_add_path(self):
302
inv = inventory.Inventory(root_id=b'tree_root')
303
ie = inv.add_path(u'hello', 'file', b'hello-id')
304
self.assertEqual(b'hello-id', ie.file_id)
305
self.assertEqual('file', ie.kind)
308
"""Make sure copy() works and creates a deep copy."""
309
inv = inventory.Inventory(root_id=b'some-tree-root')
310
ie = inv.add_path(u'hello', 'file', b'hello-id')
312
inv.root.file_id = b'some-new-root'
314
self.assertEqual(b'some-tree-root', inv2.root.file_id)
315
self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
317
def test_copy_empty(self):
318
"""Make sure an empty inventory can be copied."""
319
inv = inventory.Inventory(root_id=None)
321
self.assertIs(None, inv2.root)
323
def test_copy_copies_root_revision(self):
324
"""Make sure the revision of the root gets copied."""
325
inv = inventory.Inventory(root_id=b'someroot')
326
inv.root.revision = b'therev'
328
self.assertEqual(b'someroot', inv2.root.file_id)
329
self.assertEqual(b'therev', inv2.root.revision)
331
def test_create_tree_reference(self):
332
inv = inventory.Inventory(b'tree-root-123')
333
inv.add(TreeReference(
334
b'nested-id', 'nested', parent_id=b'tree-root-123',
335
revision=b'rev', reference_revision=b'rev2'))
337
def test_error_encoding(self):
338
inv = inventory.Inventory(b'tree-root')
339
inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
340
e = self.assertRaises(errors.InconsistentDelta, inv.add,
341
InventoryFile(b'b-id', u'\u1234', b'tree-root'))
342
self.assertContainsRe(str(e), '\\u1234')
344
def test_add_recursive(self):
345
parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
346
child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
347
parent.children[child.file_id] = child
348
inv = inventory.Inventory(b'tree-root')
350
self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
354
class TestDeltaApplication(TestCaseWithTransport):
356
scenarios = delta_application_scenarios()
358
def get_empty_inventory(self, reference_inv=None):
359
"""Get an empty inventory.
361
Note that tests should not depend on the revision of the root for
362
setting up test conditions, as it has to be flexible to accomodate non
363
rich root repositories.
365
:param reference_inv: If not None, get the revision for the root from
366
this inventory. This is useful for dealing with older repositories
367
that routinely discarded the root entry data. If None, the root's
368
revision is set to 'basis'.
370
inv = inventory.Inventory()
371
if reference_inv is not None:
372
inv.root.revision = reference_inv.root.revision
374
inv.root.revision = b'basis'
377
def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
378
ie_file = inventory.InventoryFile(file_id, name, parent_id)
379
ie_file.revision = b'result'
380
ie_file.text_size = 0
381
ie_file.text_sha1 = b''
384
def test_empty_delta(self):
385
inv = self.get_empty_inventory()
387
inv = self.apply_delta(self, inv, delta)
388
inv2 = self.get_empty_inventory(inv)
389
self.assertEqual([], inv2._make_delta(inv))
391
def test_None_file_id(self):
392
inv = self.get_empty_inventory()
393
dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
395
dir1.revision = b'result'
396
delta = [(None, u'dir1', None, dir1)]
397
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
400
def test_unicode_file_id(self):
401
inv = self.get_empty_inventory()
402
dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
403
dir1.file_id = u'dirid'
404
dir1.revision = b'result'
405
delta = [(None, u'dir1', dir1.file_id, dir1)]
406
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
409
def test_repeated_file_id(self):
410
inv = self.get_empty_inventory()
411
file1 = inventory.InventoryFile(b'id', 'path1', inv.root.file_id)
412
file1.revision = b'result'
414
file1.text_sha1 = b""
417
delta = [(None, u'path1', b'id', file1), (None, u'path2', b'id', file2)]
418
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
421
def test_repeated_new_path(self):
422
inv = self.get_empty_inventory()
423
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
424
file1.revision = b'result'
426
file1.text_sha1 = b""
428
file2.file_id = b'id2'
429
delta = [(None, u'path', b'id1', file1), (None, u'path', b'id2', file2)]
430
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
433
def test_repeated_old_path(self):
434
inv = self.get_empty_inventory()
435
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
436
file1.revision = b'result'
438
file1.text_sha1 = b""
439
# We can't *create* a source inventory with the same path, but
440
# a badly generated partial delta might claim the same source twice.
441
# This would be buggy in two ways: the path is repeated in the delta,
442
# And the path for one of the file ids doesn't match the source
443
# location. Alternatively, we could have a repeated fileid, but that
444
# is separately checked for.
445
file2 = inventory.InventoryFile(b'id2', 'path2', inv.root.file_id)
446
file2.revision = b'result'
448
file2.text_sha1 = b""
451
delta = [(u'path', None, b'id1', None), (u'path', None, b'id2', None)]
452
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
455
def test_mismatched_id_entry_id(self):
456
inv = self.get_empty_inventory()
457
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
458
file1.revision = b'result'
460
file1.text_sha1 = b""
461
delta = [(None, u'path', b'id', file1)]
462
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
465
def test_mismatched_new_path_entry_None(self):
466
inv = self.get_empty_inventory()
467
delta = [(None, u'path', b'id', None)]
468
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
471
def test_mismatched_new_path_None_entry(self):
472
inv = self.get_empty_inventory()
473
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
474
file1.revision = b'result'
476
file1.text_sha1 = b""
477
delta = [(u"path", None, b'id1', file1)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_parent_is_not_directory(self):
482
inv = self.get_empty_inventory()
483
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
484
file1.revision = b'result'
486
file1.text_sha1 = b""
487
file2 = inventory.InventoryFile(b'id2', 'path2', b'id1')
488
file2.revision = b'result'
490
file2.text_sha1 = b""
492
delta = [(None, u'path/path2', b'id2', file2)]
493
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
496
def test_parent_is_missing(self):
497
inv = self.get_empty_inventory()
498
file2 = inventory.InventoryFile(b'id2', 'path2', b'missingparent')
499
file2.revision = b'result'
501
file2.text_sha1 = b""
502
delta = [(None, u'path/path2', b'id2', file2)]
503
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
506
def test_new_parent_path_has_wrong_id(self):
507
inv = self.get_empty_inventory()
508
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
509
parent1.revision = b'result'
510
parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
511
parent2.revision = b'result'
512
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
513
file1.revision = b'result'
515
file1.text_sha1 = b""
518
# This delta claims that file1 is at dir/path, but actually its at
519
# dir2/path if you follow the inventory parent structure.
520
delta = [(None, u'dir/path', b'id', file1)]
521
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
524
def test_old_parent_path_is_wrong(self):
525
inv = self.get_empty_inventory()
526
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
527
parent1.revision = b'result'
528
parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
529
parent2.revision = b'result'
530
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
531
file1.revision = b'result'
533
file1.text_sha1 = b""
537
# This delta claims that file1 was at dir/path, but actually it was at
538
# dir2/path if you follow the inventory parent structure.
539
delta = [(u'dir/path', None, b'id', None)]
540
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
543
def test_old_parent_path_is_for_other_id(self):
544
inv = self.get_empty_inventory()
545
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
546
parent1.revision = b'result'
547
parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
548
parent2.revision = b'result'
549
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
550
file1.revision = b'result'
552
file1.text_sha1 = b""
553
file2 = inventory.InventoryFile(b'id2', 'path', b'p-1')
554
file2.revision = b'result'
556
file2.text_sha1 = b""
561
# This delta claims that file1 was at dir/path, but actually it was at
562
# dir2/path if you follow the inventory parent structure. At dir/path
563
# is another entry we should not delete.
564
delta = [(u'dir/path', None, b'id', None)]
565
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
568
def test_add_existing_id_new_path(self):
569
inv = self.get_empty_inventory()
570
parent1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
571
parent1.revision = b'result'
572
parent2 = inventory.InventoryDirectory(b'p-1', 'dir2', inv.root.file_id)
573
parent2.revision = b'result'
575
delta = [(None, u'dir2', b'p-1', parent2)]
576
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
579
def test_add_new_id_existing_path(self):
580
inv = self.get_empty_inventory()
581
parent1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
582
parent1.revision = b'result'
583
parent2 = inventory.InventoryDirectory(b'p-2', 'dir1', inv.root.file_id)
584
parent2.revision = b'result'
586
delta = [(None, u'dir1', b'p-2', parent2)]
587
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
590
def test_remove_dir_leaving_dangling_child(self):
591
inv = self.get_empty_inventory()
592
dir1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
593
dir1.revision = b'result'
594
dir2 = inventory.InventoryDirectory(b'p-2', 'child1', b'p-1')
595
dir2.revision = b'result'
596
dir3 = inventory.InventoryDirectory(b'p-3', 'child2', b'p-1')
597
dir3.revision = b'result'
601
delta = [(u'dir1', None, b'p-1', None),
602
(u'dir1/child2', None, b'p-3', None)]
603
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
606
def test_add_file(self):
607
inv = self.get_empty_inventory()
608
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
609
file1.revision = b'result'
611
file1.text_sha1 = b''
612
delta = [(None, u'path', b'file-id', file1)]
613
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
614
self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
616
def test_remove_file(self):
617
inv = self.get_empty_inventory()
618
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
619
file1.revision = b'result'
621
file1.text_sha1 = b''
623
delta = [(u'path', None, b'file-id', None)]
624
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
625
self.assertEqual(None, res_inv.path2id('path'))
626
self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
628
def test_rename_file(self):
629
inv = self.get_empty_inventory()
630
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
632
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
633
delta = [(u'path', 'path2', b'file-id', file2)]
634
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
635
self.assertEqual(None, res_inv.path2id('path'))
636
self.assertEqual(b'file-id', res_inv.path2id('path2'))
638
def test_replaced_at_new_path(self):
639
inv = self.get_empty_inventory()
640
file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
642
file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
643
delta = [(u'name', None, b'id1', None),
644
(None, u'name', b'id2', file2)]
645
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
646
self.assertEqual(b'id2', res_inv.path2id('name'))
648
def test_rename_dir(self):
649
inv = self.get_empty_inventory()
650
dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
651
dir1.revision = b'basis'
652
file1 = self.make_file_ie(parent_id=b'dir-id')
655
dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
656
dir2.revision = b'result'
657
delta = [('dir1', 'dir2', b'dir-id', dir2)]
658
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
659
# The file should be accessible under the new path
660
self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
662
def test_renamed_dir_with_renamed_child(self):
663
inv = self.get_empty_inventory()
664
dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
665
dir1.revision = b'basis'
666
file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
667
file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
671
dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
672
dir2.revision = b'result'
673
file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
674
delta = [('dir1', 'dir2', b'dir-id', dir2),
675
('dir1/name2', 'name2', b'file-id-2', file2b)]
676
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
677
# The file should be accessible under the new path
678
self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
679
self.assertEqual(None, res_inv.path2id('dir2/name2'))
680
self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
682
def test_is_root(self):
683
"""Ensure our root-checking code is accurate."""
684
inv = inventory.Inventory(b'TREE_ROOT')
685
self.assertTrue(inv.is_root(b'TREE_ROOT'))
686
self.assertFalse(inv.is_root(b'booga'))
687
inv.root.file_id = b'booga'
688
self.assertFalse(inv.is_root(b'TREE_ROOT'))
689
self.assertTrue(inv.is_root(b'booga'))
690
# works properly even if no root is set
692
self.assertFalse(inv.is_root(b'TREE_ROOT'))
693
self.assertFalse(inv.is_root(b'booga'))
695
def test_entries_for_empty_inventory(self):
696
"""Test that entries() will not fail for an empty inventory"""
697
inv = Inventory(root_id=None)
698
self.assertEqual([], inv.entries())
701
class TestInventoryEntry(TestCase):
703
def test_file_invalid_entry_name(self):
704
self.assertRaises(errors.InvalidEntryName, inventory.InventoryFile,
705
b'123', 'a/hello.c', ROOT_ID)
707
def test_file_backslash(self):
708
file = inventory.InventoryFile(b'123', 'h\\ello.c', ROOT_ID)
709
self.assertEquals(file.name, 'h\\ello.c')
711
def test_file_kind_character(self):
712
file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
713
self.assertEqual(file.kind_character(), '')
715
def test_dir_kind_character(self):
716
dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
717
self.assertEqual(dir.kind_character(), '/')
719
def test_link_kind_character(self):
720
dir = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
721
self.assertEqual(dir.kind_character(), '')
723
def test_dir_detect_changes(self):
724
left = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
725
right = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
726
self.assertEqual((False, False), left.detect_changes(right))
727
self.assertEqual((False, False), right.detect_changes(left))
729
def test_file_detect_changes(self):
730
left = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
732
right = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
733
right.text_sha1 = 123
734
self.assertEqual((False, False), left.detect_changes(right))
735
self.assertEqual((False, False), right.detect_changes(left))
736
left.executable = True
737
self.assertEqual((False, True), left.detect_changes(right))
738
self.assertEqual((False, True), right.detect_changes(left))
739
right.text_sha1 = 321
740
self.assertEqual((True, True), left.detect_changes(right))
741
self.assertEqual((True, True), right.detect_changes(left))
743
def test_symlink_detect_changes(self):
744
left = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
745
left.symlink_target='foo'
746
right = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
747
right.symlink_target='foo'
748
self.assertEqual((False, False), left.detect_changes(right))
749
self.assertEqual((False, False), right.detect_changes(left))
750
left.symlink_target = 'different'
751
self.assertEqual((True, False), left.detect_changes(right))
752
self.assertEqual((True, False), right.detect_changes(left))
754
def test_file_has_text(self):
755
file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
756
self.assertTrue(file.has_text())
758
def test_directory_has_text(self):
759
dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
760
self.assertFalse(dir.has_text())
762
def test_link_has_text(self):
763
link = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
764
self.assertFalse(link.has_text())
766
def test_make_entry(self):
767
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
768
inventory.InventoryFile)
769
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
770
inventory.InventoryLink)
771
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
772
inventory.InventoryDirectory)
774
def test_make_entry_non_normalized(self):
775
orig_normalized_filename = osutils.normalized_filename
778
osutils.normalized_filename = osutils._accessible_normalized_filename
779
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
780
self.assertEqual(u'\xe5', entry.name)
781
self.assertIsInstance(entry, inventory.InventoryFile)
783
osutils.normalized_filename = osutils._inaccessible_normalized_filename
784
self.assertRaises(errors.InvalidNormalization,
785
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
787
osutils.normalized_filename = orig_normalized_filename
790
class TestDescribeChanges(TestCase):
792
def test_describe_change(self):
793
# we need to test the following change combinations:
799
# renamed/reparented and modified
800
# change kind (perhaps can't be done yet?)
801
# also, merged in combination with all of these?
802
old_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
803
old_a.text_sha1 = b'123132'
805
new_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
806
new_a.text_sha1 = b'123132'
809
self.assertChangeDescription('unchanged', old_a, new_a)
812
new_a.text_sha1 = b'abcabc'
813
self.assertChangeDescription('modified', old_a, new_a)
815
self.assertChangeDescription('added', None, new_a)
816
self.assertChangeDescription('removed', old_a, None)
817
# perhaps a bit questionable but seems like the most reasonable thing...
818
self.assertChangeDescription('unchanged', None, None)
820
# in this case it's both renamed and modified; show a rename and
822
new_a.name = 'newfilename'
823
self.assertChangeDescription('modified and renamed', old_a, new_a)
825
# reparenting is 'renaming'
826
new_a.name = old_a.name
827
new_a.parent_id = b'somedir-id'
828
self.assertChangeDescription('modified and renamed', old_a, new_a)
830
# reset the content values so its not modified
831
new_a.text_size = old_a.text_size
832
new_a.text_sha1 = old_a.text_sha1
833
new_a.name = old_a.name
835
new_a.name = 'newfilename'
836
self.assertChangeDescription('renamed', old_a, new_a)
838
# reparenting is 'renaming'
839
new_a.name = old_a.name
840
new_a.parent_id = b'somedir-id'
841
self.assertChangeDescription('renamed', old_a, new_a)
843
def assertChangeDescription(self, expected_change, old_ie, new_ie):
844
change = InventoryEntry.describe_change(old_ie, new_ie)
845
self.assertEqual(expected_change, change)
848
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
850
def get_chk_bytes(self):
851
factory = groupcompress.make_pack_factory(True, True, 1)
852
trans = self.get_transport('')
853
return factory(trans)
855
def read_bytes(self, chk_bytes, key):
856
stream = chk_bytes.get_record_stream([key], 'unordered', True)
857
return next(stream).get_bytes_as("fulltext")
859
def test_deserialise_gives_CHKInventory(self):
861
inv.revision_id = b"revid"
862
inv.root.revision = b"rootrev"
863
chk_bytes = self.get_chk_bytes()
864
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
865
bytes = b''.join(chk_inv.to_lines())
866
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
867
self.assertEqual(b"revid", new_inv.revision_id)
868
self.assertEqual("directory", new_inv.root.kind)
869
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
870
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
871
self.assertEqual(inv.root.name, new_inv.root.name)
872
self.assertEqual(b"rootrev", new_inv.root.revision)
873
self.assertEqual(b'plain', new_inv._search_key_name)
875
def test_deserialise_wrong_revid(self):
877
inv.revision_id = b"revid"
878
inv.root.revision = b"rootrev"
879
chk_bytes = self.get_chk_bytes()
880
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
881
bytes = b''.join(chk_inv.to_lines())
882
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
885
def test_captures_rev_root_byid(self):
887
inv.revision_id = b"foo"
888
inv.root.revision = b"bar"
889
chk_bytes = self.get_chk_bytes()
890
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
891
lines = chk_inv.to_lines()
894
b'revision_id: foo\n',
895
b'root_id: TREE_ROOT\n',
896
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
897
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
899
chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
900
self.assertEqual(b'plain', chk_inv._search_key_name)
902
def test_captures_parent_id_basename_index(self):
904
inv.revision_id = b"foo"
905
inv.root.revision = b"bar"
906
chk_bytes = self.get_chk_bytes()
907
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
908
lines = chk_inv.to_lines()
911
b'revision_id: foo\n',
912
b'root_id: TREE_ROOT\n',
913
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
914
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
916
chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
917
self.assertEqual(b'plain', chk_inv._search_key_name)
919
def test_captures_search_key_name(self):
921
inv.revision_id = b"foo"
922
inv.root.revision = b"bar"
923
chk_bytes = self.get_chk_bytes()
924
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
925
search_key_name=b'hash-16-way')
926
lines = chk_inv.to_lines()
929
b'search_key_name: hash-16-way\n',
930
b'root_id: TREE_ROOT\n',
931
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
932
b'revision_id: foo\n',
933
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
935
chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
936
self.assertEqual(b'hash-16-way', chk_inv._search_key_name)
938
def test_directory_children_on_demand(self):
940
inv.revision_id = b"revid"
941
inv.root.revision = b"rootrev"
942
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
943
inv.get_entry(b"fileid").revision = b"filerev"
944
inv.get_entry(b"fileid").executable = True
945
inv.get_entry(b"fileid").text_sha1 = b"ffff"
946
inv.get_entry(b"fileid").text_size = 1
947
chk_bytes = self.get_chk_bytes()
948
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
bytes = b''.join(chk_inv.to_lines())
950
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
951
root_entry = new_inv.get_entry(inv.root.file_id)
952
self.assertEqual(None, root_entry._children)
953
self.assertEqual({'file'}, set(root_entry.children))
954
file_direct = new_inv.get_entry(b"fileid")
955
file_found = root_entry.children['file']
956
self.assertEqual(file_direct.kind, file_found.kind)
957
self.assertEqual(file_direct.file_id, file_found.file_id)
958
self.assertEqual(file_direct.parent_id, file_found.parent_id)
959
self.assertEqual(file_direct.name, file_found.name)
960
self.assertEqual(file_direct.revision, file_found.revision)
961
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
962
self.assertEqual(file_direct.text_size, file_found.text_size)
963
self.assertEqual(file_direct.executable, file_found.executable)
965
def test_from_inventory_maximum_size(self):
966
# from_inventory supports the maximum_size parameter.
968
inv.revision_id = b"revid"
969
inv.root.revision = b"rootrev"
970
chk_bytes = self.get_chk_bytes()
971
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
972
chk_inv.id_to_entry._ensure_root()
973
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
974
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
975
p_id_basename = chk_inv.parent_id_basename_to_file_id
976
p_id_basename._ensure_root()
977
self.assertEqual(120, p_id_basename._root_node.maximum_size)
978
self.assertEqual(2, p_id_basename._root_node._key_width)
980
def test_iter_all_ids(self):
982
inv.revision_id = b"revid"
983
inv.root.revision = b"rootrev"
984
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
985
inv.get_entry(b"fileid").revision = b"filerev"
986
inv.get_entry(b"fileid").executable = True
987
inv.get_entry(b"fileid").text_sha1 = b"ffff"
988
inv.get_entry(b"fileid").text_size = 1
989
chk_bytes = self.get_chk_bytes()
990
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
991
bytes = b''.join(chk_inv.to_lines())
992
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
993
fileids = sorted(new_inv.iter_all_ids())
994
self.assertEqual([inv.root.file_id, b"fileid"], fileids)
996
def test__len__(self):
998
inv.revision_id = b"revid"
999
inv.root.revision = b"rootrev"
1000
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1001
inv.get_entry(b"fileid").revision = b"filerev"
1002
inv.get_entry(b"fileid").executable = True
1003
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1004
inv.get_entry(b"fileid").text_size = 1
1005
chk_bytes = self.get_chk_bytes()
1006
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1007
self.assertEqual(2, len(chk_inv))
1009
def test_get_entry(self):
1011
inv.revision_id = b"revid"
1012
inv.root.revision = b"rootrev"
1013
inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
1014
inv.get_entry(b"fileid").revision = b"filerev"
1015
inv.get_entry(b"fileid").executable = True
1016
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1017
inv.get_entry(b"fileid").text_size = 1
1018
chk_bytes = self.get_chk_bytes()
1019
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1020
data = b''.join(chk_inv.to_lines())
1021
new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
1022
root_entry = new_inv.get_entry(inv.root.file_id)
1023
file_entry = new_inv.get_entry(b"fileid")
1024
self.assertEqual("directory", root_entry.kind)
1025
self.assertEqual(inv.root.file_id, root_entry.file_id)
1026
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
1027
self.assertEqual(inv.root.name, root_entry.name)
1028
self.assertEqual(b"rootrev", root_entry.revision)
1029
self.assertEqual("file", file_entry.kind)
1030
self.assertEqual(b"fileid", file_entry.file_id)
1031
self.assertEqual(inv.root.file_id, file_entry.parent_id)
1032
self.assertEqual(u"file", file_entry.name)
1033
self.assertEqual(b"filerev", file_entry.revision)
1034
self.assertEqual(b"ffff", file_entry.text_sha1)
1035
self.assertEqual(1, file_entry.text_size)
1036
self.assertEqual(True, file_entry.executable)
1037
self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
1039
def test_has_id_true(self):
1041
inv.revision_id = b"revid"
1042
inv.root.revision = b"rootrev"
1043
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1044
inv.get_entry(b"fileid").revision = b"filerev"
1045
inv.get_entry(b"fileid").executable = True
1046
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1047
inv.get_entry(b"fileid").text_size = 1
1048
chk_bytes = self.get_chk_bytes()
1049
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1050
self.assertTrue(chk_inv.has_id(b'fileid'))
1051
self.assertTrue(chk_inv.has_id(inv.root.file_id))
1053
def test_has_id_not(self):
1055
inv.revision_id = b"revid"
1056
inv.root.revision = b"rootrev"
1057
chk_bytes = self.get_chk_bytes()
1058
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1059
self.assertFalse(chk_inv.has_id(b'fileid'))
1061
def test_id2path(self):
1063
inv.revision_id = b"revid"
1064
inv.root.revision = b"rootrev"
1065
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1066
fileentry = InventoryFile(b"fileid", "file", b"dirid")
1069
inv.get_entry(b"fileid").revision = b"filerev"
1070
inv.get_entry(b"fileid").executable = True
1071
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1072
inv.get_entry(b"fileid").text_size = 1
1073
inv.get_entry(b"dirid").revision = b"filerev"
1074
chk_bytes = self.get_chk_bytes()
1075
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1076
bytes = b''.join(chk_inv.to_lines())
1077
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1078
self.assertEqual('', new_inv.id2path(inv.root.file_id))
1079
self.assertEqual('dir', new_inv.id2path(b'dirid'))
1080
self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
1082
def test_path2id(self):
1084
inv.revision_id = b"revid"
1085
inv.root.revision = b"rootrev"
1086
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1087
fileentry = InventoryFile(b"fileid", "file", b"dirid")
1090
inv.get_entry(b"fileid").revision = b"filerev"
1091
inv.get_entry(b"fileid").executable = True
1092
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1093
inv.get_entry(b"fileid").text_size = 1
1094
inv.get_entry(b"dirid").revision = b"filerev"
1095
chk_bytes = self.get_chk_bytes()
1096
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1097
bytes = b''.join(chk_inv.to_lines())
1098
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1099
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1100
self.assertEqual(b'dirid', new_inv.path2id('dir'))
1101
self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
1103
def test_create_by_apply_delta_sets_root(self):
1105
inv.root.revision = b"myrootrev"
1106
inv.revision_id = b"revid"
1107
chk_bytes = self.get_chk_bytes()
1108
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1109
inv.add_path("", "directory", b"myrootid", None)
1110
inv.revision_id = b"expectedid"
1111
inv.root.revision = b"myrootrev"
1112
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1113
delta = [("", None, base_inv.root.file_id, None),
1114
(None, "", b"myrootid", inv.root)]
1115
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1116
self.assertEqual(reference_inv.root, new_inv.root)
1118
def test_create_by_apply_delta_empty_add_child(self):
1120
inv.revision_id = b"revid"
1121
inv.root.revision = b"rootrev"
1122
chk_bytes = self.get_chk_bytes()
1123
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1124
a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
1125
a_entry.revision = b"filerev"
1126
a_entry.executable = True
1127
a_entry.text_sha1 = b"ffff"
1128
a_entry.text_size = 1
1130
inv.revision_id = b"expectedid"
1131
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1132
delta = [(None, "A", b"A-id", a_entry)]
1133
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1134
# new_inv should be the same as reference_inv.
1135
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1136
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1137
reference_inv.id_to_entry._ensure_root()
1138
new_inv.id_to_entry._ensure_root()
1139
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1140
new_inv.id_to_entry._root_node._key)
1142
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1144
inv.revision_id = b"revid"
1145
inv.root.revision = b"rootrev"
1146
chk_bytes = self.get_chk_bytes()
1147
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1148
a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
1149
a_entry.revision = b"filerev"
1150
a_entry.executable = True
1151
a_entry.text_sha1 = b"ffff"
1152
a_entry.text_size = 1
1154
inv.revision_id = b"expectedid"
1155
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1156
delta = [(None, "A", b"A-id", a_entry)]
1157
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1158
reference_inv.id_to_entry._ensure_root()
1159
reference_inv.parent_id_basename_to_file_id._ensure_root()
1160
new_inv.id_to_entry._ensure_root()
1161
new_inv.parent_id_basename_to_file_id._ensure_root()
1162
# new_inv should be the same as reference_inv.
1163
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1164
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1165
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1166
new_inv.id_to_entry._root_node._key)
1167
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1168
new_inv.parent_id_basename_to_file_id._root_node._key)
1170
def test_iter_changes(self):
1171
# Low level bootstrapping smoke test; comprehensive generic tests via
1172
# InterTree are coming.
1174
inv.revision_id = b"revid"
1175
inv.root.revision = b"rootrev"
1176
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1177
inv.get_entry(b"fileid").revision = b"filerev"
1178
inv.get_entry(b"fileid").executable = True
1179
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1180
inv.get_entry(b"fileid").text_size = 1
1182
inv2.revision_id = b"revid2"
1183
inv2.root.revision = b"rootrev"
1184
inv2.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1185
inv2.get_entry(b"fileid").revision = b"filerev2"
1186
inv2.get_entry(b"fileid").executable = False
1187
inv2.get_entry(b"fileid").text_sha1 = b"bbbb"
1188
inv2.get_entry(b"fileid").text_size = 2
1189
# get fresh objects.
1190
chk_bytes = self.get_chk_bytes()
1191
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1192
bytes = b''.join(chk_inv.to_lines())
1193
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1194
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1195
bytes = b''.join(chk_inv2.to_lines())
1196
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid2",))
1197
self.assertEqual([(b'fileid', (u'file', u'file'), True, (True, True),
1198
(b'TREE_ROOT', b'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1200
list(inv_1.iter_changes(inv_2)))
1202
def test_parent_id_basename_to_file_id_index_enabled(self):
1204
inv.revision_id = b"revid"
1205
inv.root.revision = b"rootrev"
1206
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1207
inv.get_entry(b"fileid").revision = b"filerev"
1208
inv.get_entry(b"fileid").executable = True
1209
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1210
inv.get_entry(b"fileid").text_size = 1
1211
# get fresh objects.
1212
chk_bytes = self.get_chk_bytes()
1213
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1214
bytes = b''.join(tmp_inv.to_lines())
1215
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1216
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1218
{(b'', b''): b'TREE_ROOT', (b'TREE_ROOT', b'file'): b'fileid'},
1219
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1221
def test_file_entry_to_bytes(self):
1222
inv = CHKInventory(None)
1223
ie = inventory.InventoryFile(b'file-id', 'filename', b'parent-id')
1224
ie.executable = True
1225
ie.revision = b'file-rev-id'
1226
ie.text_sha1 = b'abcdefgh'
1228
bytes = inv._entry_to_bytes(ie)
1229
self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
1230
b'file-rev-id\nabcdefgh\n100\nY', bytes)
1231
ie2 = inv._bytes_to_entry(bytes)
1232
self.assertEqual(ie, ie2)
1233
self.assertIsInstance(ie2.name, text_type)
1234
self.assertEqual((b'filename', b'file-id', b'file-rev-id'),
1235
inv._bytes_to_utf8name_key(bytes))
1237
def test_file2_entry_to_bytes(self):
1238
inv = CHKInventory(None)
1240
ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
1241
ie.executable = False
1242
ie.revision = b'file-rev-id'
1243
ie.text_sha1 = b'123456'
1245
bytes = inv._entry_to_bytes(ie)
1246
self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
1247
b'file-rev-id\n123456\n25\nN', bytes)
1248
ie2 = inv._bytes_to_entry(bytes)
1249
self.assertEqual(ie, ie2)
1250
self.assertIsInstance(ie2.name, text_type)
1251
self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
1252
inv._bytes_to_utf8name_key(bytes))
1254
def test_dir_entry_to_bytes(self):
1255
inv = CHKInventory(None)
1256
ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
1257
ie.revision = b'dir-rev-id'
1258
bytes = inv._entry_to_bytes(ie)
1259
self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1260
ie2 = inv._bytes_to_entry(bytes)
1261
self.assertEqual(ie, ie2)
1262
self.assertIsInstance(ie2.name, text_type)
1263
self.assertEqual((b'dirname', b'dir-id', b'dir-rev-id'),
1264
inv._bytes_to_utf8name_key(bytes))
1266
def test_dir2_entry_to_bytes(self):
1267
inv = CHKInventory(None)
1268
ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
1270
ie.revision = b'dir-rev-id'
1271
bytes = inv._entry_to_bytes(ie)
1272
self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
1273
b'dir-rev-id', bytes)
1274
ie2 = inv._bytes_to_entry(bytes)
1275
self.assertEqual(ie, ie2)
1276
self.assertIsInstance(ie2.name, text_type)
1277
self.assertIs(ie2.parent_id, None)
1278
self.assertEqual((b'dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
1279
inv._bytes_to_utf8name_key(bytes))
1281
def test_symlink_entry_to_bytes(self):
1282
inv = CHKInventory(None)
1283
ie = inventory.InventoryLink(b'link-id', 'linkname', b'parent-id')
1284
ie.revision = b'link-rev-id'
1285
ie.symlink_target = u'target/path'
1286
bytes = inv._entry_to_bytes(ie)
1287
self.assertEqual(b'symlink: link-id\nparent-id\nlinkname\n'
1288
b'link-rev-id\ntarget/path', bytes)
1289
ie2 = inv._bytes_to_entry(bytes)
1290
self.assertEqual(ie, ie2)
1291
self.assertIsInstance(ie2.name, text_type)
1292
self.assertIsInstance(ie2.symlink_target, text_type)
1293
self.assertEqual((b'linkname', b'link-id', b'link-rev-id'),
1294
inv._bytes_to_utf8name_key(bytes))
1296
def test_symlink2_entry_to_bytes(self):
1297
inv = CHKInventory(None)
1298
ie = inventory.InventoryLink(b'link-id', u'link\u03a9name', b'parent-id')
1299
ie.revision = b'link-rev-id'
1300
ie.symlink_target = u'target/\u03a9path'
1301
bytes = inv._entry_to_bytes(ie)
1302
self.assertEqual(b'symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1303
b'link-rev-id\ntarget/\xce\xa9path', bytes)
1304
ie2 = inv._bytes_to_entry(bytes)
1305
self.assertEqual(ie, ie2)
1306
self.assertIsInstance(ie2.name, text_type)
1307
self.assertIsInstance(ie2.symlink_target, text_type)
1308
self.assertEqual((b'link\xce\xa9name', b'link-id', b'link-rev-id'),
1309
inv._bytes_to_utf8name_key(bytes))
1311
def test_tree_reference_entry_to_bytes(self):
1312
inv = CHKInventory(None)
1313
ie = inventory.TreeReference(b'tree-root-id', u'tree\u03a9name',
1315
ie.revision = b'tree-rev-id'
1316
ie.reference_revision = b'ref-rev-id'
1317
bytes = inv._entry_to_bytes(ie)
1318
self.assertEqual(b'tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1319
b'tree-rev-id\nref-rev-id', bytes)
1320
ie2 = inv._bytes_to_entry(bytes)
1321
self.assertEqual(ie, ie2)
1322
self.assertIsInstance(ie2.name, text_type)
1323
self.assertEqual((b'tree\xce\xa9name', b'tree-root-id', b'tree-rev-id'),
1324
inv._bytes_to_utf8name_key(bytes))
1326
def make_basic_utf8_inventory(self):
1328
inv.revision_id = b"revid"
1329
inv.root.revision = b"rootrev"
1330
root_id = inv.root.file_id
1331
inv.add(InventoryFile(b"fileid", u'f\xefle', root_id))
1332
inv.get_entry(b"fileid").revision = b"filerev"
1333
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1334
inv.get_entry(b"fileid").text_size = 0
1335
inv.add(InventoryDirectory(b"dirid", u'dir-\N{EURO SIGN}', root_id))
1336
inv.get_entry(b"dirid").revision = b"dirrev"
1337
inv.add(InventoryFile(b"childid", u'ch\xefld', b"dirid"))
1338
inv.get_entry(b"childid").revision = b"filerev"
1339
inv.get_entry(b"childid").text_sha1 = b"ffff"
1340
inv.get_entry(b"childid").text_size = 0
1341
chk_bytes = self.get_chk_bytes()
1342
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1343
bytes = b''.join(chk_inv.to_lines())
1344
return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1346
def test__preload_handles_utf8(self):
1347
new_inv = self.make_basic_utf8_inventory()
1348
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1349
self.assertFalse(new_inv._fully_cached)
1350
new_inv._preload_cache()
1352
sorted([new_inv.root_id, b"fileid", b"dirid", b"childid"]),
1353
sorted(new_inv._fileid_to_entry_cache.keys()))
1354
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1355
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1356
sorted(ie_root._children.keys()))
1357
ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
1358
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1360
def test__preload_populates_cache(self):
1362
inv.revision_id = b"revid"
1363
inv.root.revision = b"rootrev"
1364
root_id = inv.root.file_id
1365
inv.add(InventoryFile(b"fileid", "file", root_id))
1366
inv.get_entry(b"fileid").revision = b"filerev"
1367
inv.get_entry(b"fileid").executable = True
1368
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1369
inv.get_entry(b"fileid").text_size = 1
1370
inv.add(InventoryDirectory(b"dirid", "dir", root_id))
1371
inv.get_entry(b"dirid").revision = b"dirrev"
1372
inv.add(InventoryFile(b"childid", "child", b"dirid"))
1373
inv.get_entry(b"childid").revision = b"filerev"
1374
inv.get_entry(b"childid").executable = False
1375
inv.get_entry(b"childid").text_sha1 = b"dddd"
1376
inv.get_entry(b"childid").text_size = 1
1377
chk_bytes = self.get_chk_bytes()
1378
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1379
bytes = b''.join(chk_inv.to_lines())
1380
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1381
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1382
self.assertFalse(new_inv._fully_cached)
1383
new_inv._preload_cache()
1385
sorted([root_id, b"fileid", b"dirid", b"childid"]),
1386
sorted(new_inv._fileid_to_entry_cache.keys()))
1387
self.assertTrue(new_inv._fully_cached)
1388
ie_root = new_inv._fileid_to_entry_cache[root_id]
1389
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1390
ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
1391
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1393
def test__preload_handles_partially_evaluated_inventory(self):
1394
new_inv = self.make_basic_utf8_inventory()
1395
ie = new_inv.get_entry(new_inv.root_id)
1396
self.assertIs(None, ie._children)
1397
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1398
sorted(ie.children.keys()))
1399
# Accessing .children loads _children
1400
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1401
sorted(ie._children.keys()))
1402
new_inv._preload_cache()
1404
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1405
sorted(ie._children.keys()))
1406
ie_dir = new_inv.get_entry(b"dirid")
1407
self.assertEqual([u'ch\xefld'],
1408
sorted(ie_dir._children.keys()))
1410
def test_filter_change_in_renamed_subfolder(self):
1411
inv = Inventory(b'tree-root')
1412
inv.root.revision = b'rootrev'
1413
src_ie = inv.add_path('src', 'directory', b'src-id')
1414
src_ie.revision = b'srcrev'
1415
sub_ie = inv.add_path('src/sub/', 'directory', b'sub-id')
1416
sub_ie.revision = b'subrev'
1417
a_ie = inv.add_path('src/sub/a', 'file', b'a-id')
1418
a_ie.revision = b'filerev'
1419
a_ie.text_sha1 = osutils.sha_string(b'content\n')
1420
a_ie.text_size = len(b'content\n')
1421
chk_bytes = self.get_chk_bytes()
1422
inv = CHKInventory.from_inventory(chk_bytes, inv)
1423
inv = inv.create_by_apply_delta([
1424
("src/sub/a", "src/sub/a", b"a-id", a_ie),
1425
("src", "src2", b"src-id", src_ie),
1427
new_inv = inv.filter([b'a-id', b'src-id'])
1431
('src/sub', b'sub-id'),
1432
('src/sub/a', b'a-id'),
1433
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1435
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1437
def get_chk_bytes(self):
1438
factory = groupcompress.make_pack_factory(True, True, 1)
1439
trans = self.get_transport('')
1440
return factory(trans)
1442
def make_dir(self, inv, name, parent_id, revision):
1443
ie = inv.make_entry('directory', name, parent_id, name.encode('utf-8') + b'-id')
1444
ie.revision = revision
1447
def make_file(self, inv, name, parent_id, revision, content=b'content\n'):
1448
ie = inv.make_entry('file', name, parent_id, name.encode('utf-8') + b'-id')
1449
ie.text_sha1 = osutils.sha_string(content)
1450
ie.text_size = len(content)
1451
ie.revision = revision
1454
def make_simple_inventory(self):
1455
inv = Inventory(b'TREE_ROOT')
1456
inv.revision_id = b"revid"
1457
inv.root.revision = b"rootrev"
1460
# sub-file1 sub-file1-id
1461
# sub-file2 sub-file2-id
1462
# sub-dir1/ sub-dir1-id
1463
# subsub-file1 subsub-file1-id
1465
# sub2-file1 sub2-file1-id
1467
self.make_dir(inv, 'dir1', b'TREE_ROOT', b'dirrev')
1468
self.make_dir(inv, 'dir2', b'TREE_ROOT', b'dirrev')
1469
self.make_dir(inv, 'sub-dir1', b'dir1-id', b'dirrev')
1470
self.make_file(inv, 'top', b'TREE_ROOT', b'filerev')
1471
self.make_file(inv, 'sub-file1', b'dir1-id', b'filerev')
1472
self.make_file(inv, 'sub-file2', b'dir1-id', b'filerev')
1473
self.make_file(inv, 'subsub-file1', b'sub-dir1-id', b'filerev')
1474
self.make_file(inv, 'sub2-file1', b'dir2-id', b'filerev')
1475
chk_bytes = self.get_chk_bytes()
1476
# use a small maximum_size to force internal paging structures
1477
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1479
search_key_name=b'hash-255-way')
1480
bytes = b''.join(chk_inv.to_lines())
1481
return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1483
def assert_Getitems(self, expected_fileids, inv, file_ids):
1484
self.assertEqual(sorted(expected_fileids),
1485
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1487
def assertExpand(self, all_ids, inv, file_ids):
1489
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1490
self.assertEqual(set(all_ids), val_all_ids)
1491
entries = inv._getitems(val_all_ids)
1492
expected_children = {}
1493
for entry in entries:
1494
s = expected_children.setdefault(entry.parent_id, [])
1495
s.append(entry.file_id)
1497
k: sorted(v) for k, v in val_children.items()}
1498
expected_children = {
1499
k: sorted(v) for k, v in expected_children.items()}
1500
self.assertEqual(expected_children, val_children)
1502
def test_make_simple_inventory(self):
1503
inv = self.make_simple_inventory()
1505
for path, entry in inv.iter_entries_by_dir():
1506
layout.append((path, entry.file_id))
1509
('dir1', b'dir1-id'),
1510
('dir2', b'dir2-id'),
1512
('dir1/sub-dir1', b'sub-dir1-id'),
1513
('dir1/sub-file1', b'sub-file1-id'),
1514
('dir1/sub-file2', b'sub-file2-id'),
1515
('dir1/sub-dir1/subsub-file1', b'subsub-file1-id'),
1516
('dir2/sub2-file1', b'sub2-file1-id'),
1519
def test__getitems(self):
1520
inv = self.make_simple_inventory()
1522
self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
1523
self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
1524
self.assertFalse(b'sub-file2-id' in inv._fileid_to_entry_cache)
1526
self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
1528
self.assert_Getitems([b'dir1-id', b'sub-file2-id'], inv,
1529
[b'dir1-id', b'sub-file2-id'])
1530
self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
1531
self.assertTrue(b'sub-file2-id' in inv._fileid_to_entry_cache)
1533
def test_single_file(self):
1534
inv = self.make_simple_inventory()
1535
self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
1537
def test_get_all_parents(self):
1538
inv = self.make_simple_inventory()
1539
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1541
], inv, [b'subsub-file1-id'])
1543
def test_get_children(self):
1544
inv = self.make_simple_inventory()
1545
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1546
b'sub-file1-id', b'sub-file2-id', b'subsub-file1-id',
1547
], inv, [b'dir1-id'])
1549
def test_from_root(self):
1550
inv = self.make_simple_inventory()
1551
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'dir2-id', b'sub-dir1-id',
1552
b'sub-file1-id', b'sub-file2-id', b'sub2-file1-id',
1553
b'subsub-file1-id', b'top-id'], inv, [b'TREE_ROOT'])
1555
def test_top_level_file(self):
1556
inv = self.make_simple_inventory()
1557
self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
1559
def test_subsub_file(self):
1560
inv = self.make_simple_inventory()
1561
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1562
b'subsub-file1-id'], inv, [b'subsub-file1-id'])
1564
def test_sub_and_root(self):
1565
inv = self.make_simple_inventory()
1566
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id', b'top-id',
1567
b'subsub-file1-id'], inv, [b'top-id', b'subsub-file1-id'])
1570
class TestMutableInventoryFromTree(TestCaseWithTransport):
1572
def test_empty(self):
1573
repository = self.make_repository('.')
1574
tree = repository.revision_tree(revision.NULL_REVISION)
1575
inv = mutable_inventory_from_tree(tree)
1576
self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1577
self.assertEqual(0, len(inv))
1579
def test_some_files(self):
1580
wt = self.make_branch_and_tree('.')
1581
self.build_tree(['a'])
1582
wt.add(['a'], [b'thefileid'])
1583
revid = wt.commit("commit")
1584
tree = wt.branch.repository.revision_tree(revid)
1585
inv = mutable_inventory_from_tree(tree)
1586
self.assertEqual(revid, inv.revision_id)
1587
self.assertEqual(2, len(inv))
1588
self.assertEqual("a", inv.get_entry(b'thefileid').name)
1589
# The inventory should be mutable and independent of
1591
self.assertFalse(tree.root_inventory.get_entry(b'thefileid').executable)
1592
inv.get_entry(b'thefileid').executable = True
1593
self.assertFalse(tree.root_inventory.get_entry(b'thefileid').executable)