1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
26
from ..sixish import text_type
32
from ..bzr.inventory import (
40
mutable_inventory_from_tree,
44
TestCaseWithTransport,
46
from .scenarios import load_tests_apply_scenarios
49
load_tests = load_tests_apply_scenarios
52
def delta_application_scenarios():
54
('Inventory', {'apply_delta': apply_inventory_Inventory}),
56
# Working tree basis delta application
57
# Repository add_inv_by_delta.
58
# Reduce form of the per_repository test logic - that logic needs to be
59
# be able to get /just/ repositories whereas these tests are fine with
60
# just creating trees.
62
for _, format in repository.format_registry.iteritems():
63
if format.supports_full_versioned_files:
64
scenarios.append((str(format.__name__), {
65
'apply_delta': apply_inventory_Repository_add_inventory_by_delta,
67
for getter in workingtree.format_registry._get_all_lazy():
73
pass # Format with unmet dependency
74
repo_fmt = format._matchingcontroldir.repository_format
75
if not repo_fmt.supports_full_versioned_files:
78
(str(format.__class__.__name__) + ".update_basis_by_delta", {
79
'apply_delta': apply_inventory_WT_basis,
82
(str(format.__class__.__name__) + ".apply_inventory_delta", {
83
'apply_delta': apply_inventory_WT,
88
def create_texts_for_inv(repo, inv):
89
for path, ie in inv.iter_entries():
91
lines = [b'a' * ie.text_size]
94
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
97
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
98
"""Apply delta to basis and return the result.
100
:param basis: An inventory to be used as the basis.
101
:param delta: The inventory delta to apply:
102
:return: An inventory resulting from the application.
104
basis.apply_delta(delta)
108
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
109
"""Apply delta to basis and return the result.
111
This sets the tree state to be basis, and then calls apply_inventory_delta.
113
:param basis: An inventory to be used as the basis.
114
:param delta: The inventory delta to apply:
115
:return: An inventory resulting from the application.
117
control = self.make_controldir(
118
'tree', format=self.format._matchingcontroldir)
119
control.create_repository()
120
control.create_branch()
121
tree = self.format.initialize(control)
124
tree._write_inventory(basis)
127
# Fresh object, reads disk again.
128
tree = tree.controldir.open_workingtree()
131
tree.apply_inventory_delta(delta)
134
# reload tree - ensure we get what was written.
135
tree = tree.controldir.open_workingtree()
137
self.addCleanup(tree.unlock)
138
if not invalid_delta:
140
return tree.root_inventory
143
def _create_repo_revisions(repo, basis, delta, invalid_delta):
144
repo.start_write_group()
146
rev = revision.Revision(b'basis', timestamp=0, timezone=None,
147
message="", committer="foo@example.com")
148
basis.revision_id = b'basis'
149
create_texts_for_inv(repo, basis)
150
repo.add_revision(b'basis', rev, basis)
152
# We don't want to apply the delta to the basis, because we expect
153
# the delta is invalid.
155
result_inv.revision_id = b'result'
156
target_entries = None
158
result_inv = basis.create_by_apply_delta(delta, b'result')
159
create_texts_for_inv(repo, result_inv)
160
target_entries = list(result_inv.iter_entries_by_dir())
161
rev = revision.Revision(b'result', timestamp=0, timezone=None,
162
message="", committer="foo@example.com")
163
repo.add_revision(b'result', rev, result_inv)
164
repo.commit_write_group()
166
repo.abort_write_group()
168
return target_entries
171
def _get_basis_entries(tree):
172
basis_tree = tree.basis_tree()
173
basis_tree.lock_read()
174
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
176
return basis_tree_entries
179
def _populate_different_tree(tree, basis, delta):
180
"""Put all entries into tree, but at a unique location."""
183
tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
184
for path, ie in basis.iter_entries_by_dir():
185
if ie.file_id in added_ids:
187
# We want a unique path for each of these, we use the file-id
188
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
189
added_ids.add(ie.file_id)
190
for old_path, new_path, file_id, ie in delta:
191
if file_id in added_ids:
193
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
196
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
197
"""Apply delta to basis and return the result.
199
This sets the parent and then calls update_basis_by_delta.
200
It also puts the basis in the repository under both 'basis' and 'result' to
201
allow safety checks made by the WT to succeed, and finally ensures that all
202
items in the delta with a new path are present in the WT before calling
203
update_basis_by_delta.
205
:param basis: An inventory to be used as the basis.
206
:param delta: The inventory delta to apply:
207
:return: An inventory resulting from the application.
209
control = test.make_controldir(
210
'tree', format=test.format._matchingcontroldir)
211
control.create_repository()
212
control.create_branch()
213
tree = test.format.initialize(control)
216
target_entries = _create_repo_revisions(tree.branch.repository, basis,
217
delta, invalid_delta)
218
# Set the basis state as the trees current state
219
tree._write_inventory(basis)
220
# This reads basis from the repo and puts it into the tree's local
221
# cache, if it has one.
222
tree.set_parent_ids([b'basis'])
225
# Fresh lock, reads disk again.
226
with tree.lock_write():
227
tree.update_basis_by_delta(b'result', delta)
228
if not invalid_delta:
230
# reload tree - ensure we get what was written.
231
tree = tree.controldir.open_workingtree()
232
basis_tree = tree.basis_tree()
233
basis_tree.lock_read()
234
test.addCleanup(basis_tree.unlock)
235
basis_inv = basis_tree.root_inventory
237
basis_entries = list(basis_inv.iter_entries_by_dir())
238
test.assertEqual(target_entries, basis_entries)
242
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
244
"""Apply delta to basis and return the result.
246
This inserts basis as a whole inventory and then uses
247
add_inventory_by_delta to add delta.
249
:param basis: An inventory to be used as the basis.
250
:param delta: The inventory delta to apply:
251
:return: An inventory resulting from the application.
253
format = self.format()
254
control = self.make_controldir('tree', format=format._matchingcontroldir)
255
repo = format.initialize(control)
256
with repo.lock_write():
257
repo.start_write_group()
259
rev = revision.Revision(b'basis', timestamp=0, timezone=None,
260
message="", committer="foo@example.com")
261
basis.revision_id = b'basis'
262
create_texts_for_inv(repo, basis)
263
repo.add_revision(b'basis', rev, basis)
264
repo.commit_write_group()
266
repo.abort_write_group()
268
with repo.lock_write():
269
repo.start_write_group()
271
inv_sha1 = repo.add_inventory_by_delta(b'basis', delta,
272
b'result', [b'basis'])
274
repo.abort_write_group()
277
repo.commit_write_group()
278
# Fresh lock, reads disk again.
279
repo = repo.controldir.open_repository()
281
self.addCleanup(repo.unlock)
282
return repo.get_inventory(b'result')
285
class TestInventoryUpdates(TestCase):
287
def test_creation_from_root_id(self):
288
# iff a root id is passed to the constructor, a root directory is made
289
inv = inventory.Inventory(root_id=b'tree-root')
290
self.assertNotEqual(None, inv.root)
291
self.assertEqual(b'tree-root', inv.root.file_id)
293
def test_add_path_of_root(self):
294
# if no root id is given at creation time, there is no root directory
295
inv = inventory.Inventory(root_id=None)
296
self.assertIs(None, inv.root)
297
# add a root entry by adding its path
298
ie = inv.add_path(u"", "directory", b"my-root")
299
ie.revision = b'test-rev'
300
self.assertEqual(b"my-root", ie.file_id)
301
self.assertIs(ie, inv.root)
303
def test_add_path(self):
304
inv = inventory.Inventory(root_id=b'tree_root')
305
ie = inv.add_path(u'hello', 'file', b'hello-id')
306
self.assertEqual(b'hello-id', ie.file_id)
307
self.assertEqual('file', ie.kind)
310
"""Make sure copy() works and creates a deep copy."""
311
inv = inventory.Inventory(root_id=b'some-tree-root')
312
ie = inv.add_path(u'hello', 'file', b'hello-id')
314
inv.root.file_id = b'some-new-root'
316
self.assertEqual(b'some-tree-root', inv2.root.file_id)
317
self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
319
def test_copy_empty(self):
320
"""Make sure an empty inventory can be copied."""
321
inv = inventory.Inventory(root_id=None)
323
self.assertIs(None, inv2.root)
325
def test_copy_copies_root_revision(self):
326
"""Make sure the revision of the root gets copied."""
327
inv = inventory.Inventory(root_id=b'someroot')
328
inv.root.revision = b'therev'
330
self.assertEqual(b'someroot', inv2.root.file_id)
331
self.assertEqual(b'therev', inv2.root.revision)
333
def test_create_tree_reference(self):
334
inv = inventory.Inventory(b'tree-root-123')
335
inv.add(TreeReference(
336
b'nested-id', 'nested', parent_id=b'tree-root-123',
337
revision=b'rev', reference_revision=b'rev2'))
339
def test_error_encoding(self):
340
inv = inventory.Inventory(b'tree-root')
341
inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
342
e = self.assertRaises(errors.InconsistentDelta, inv.add,
343
InventoryFile(b'b-id', u'\u1234', b'tree-root'))
344
self.assertContainsRe(str(e), '\\u1234')
346
def test_add_recursive(self):
347
parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
348
child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
349
parent.children[child.file_id] = child
350
inv = inventory.Inventory(b'tree-root')
352
self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
355
class TestDeltaApplication(TestCaseWithTransport):
357
scenarios = delta_application_scenarios()
359
def get_empty_inventory(self, reference_inv=None):
360
"""Get an empty inventory.
362
Note that tests should not depend on the revision of the root for
363
setting up test conditions, as it has to be flexible to accomodate non
364
rich root repositories.
366
:param reference_inv: If not None, get the revision for the root from
367
this inventory. This is useful for dealing with older repositories
368
that routinely discarded the root entry data. If None, the root's
369
revision is set to 'basis'.
371
inv = inventory.Inventory()
372
if reference_inv is not None:
373
inv.root.revision = reference_inv.root.revision
375
inv.root.revision = b'basis'
378
def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
379
ie_file = inventory.InventoryFile(file_id, name, parent_id)
380
ie_file.revision = b'result'
381
ie_file.text_size = 0
382
ie_file.text_sha1 = b''
385
def test_empty_delta(self):
386
inv = self.get_empty_inventory()
388
inv = self.apply_delta(self, inv, delta)
389
inv2 = self.get_empty_inventory(inv)
390
self.assertEqual([], inv2._make_delta(inv))
392
def test_None_file_id(self):
393
inv = self.get_empty_inventory()
394
dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
396
dir1.revision = b'result'
397
delta = [(None, u'dir1', None, dir1)]
398
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
401
def test_unicode_file_id(self):
402
inv = self.get_empty_inventory()
403
dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
404
dir1.file_id = u'dirid'
405
dir1.revision = b'result'
406
delta = [(None, u'dir1', dir1.file_id, dir1)]
407
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
410
def test_repeated_file_id(self):
411
inv = self.get_empty_inventory()
412
file1 = inventory.InventoryFile(b'id', 'path1', inv.root.file_id)
413
file1.revision = b'result'
415
file1.text_sha1 = b""
418
delta = [(None, u'path1', b'id', file1),
419
(None, u'path2', b'id', file2)]
420
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
423
def test_repeated_new_path(self):
424
inv = self.get_empty_inventory()
425
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
426
file1.revision = b'result'
428
file1.text_sha1 = b""
430
file2.file_id = b'id2'
431
delta = [(None, u'path', b'id1', file1),
432
(None, u'path', b'id2', file2)]
433
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
436
def test_repeated_old_path(self):
437
inv = self.get_empty_inventory()
438
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
439
file1.revision = b'result'
441
file1.text_sha1 = b""
442
# We can't *create* a source inventory with the same path, but
443
# a badly generated partial delta might claim the same source twice.
444
# This would be buggy in two ways: the path is repeated in the delta,
445
# And the path for one of the file ids doesn't match the source
446
# location. Alternatively, we could have a repeated fileid, but that
447
# is separately checked for.
448
file2 = inventory.InventoryFile(b'id2', 'path2', inv.root.file_id)
449
file2.revision = b'result'
451
file2.text_sha1 = b""
454
delta = [(u'path', None, b'id1', None), (u'path', None, b'id2', None)]
455
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
458
def test_mismatched_id_entry_id(self):
459
inv = self.get_empty_inventory()
460
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
461
file1.revision = b'result'
463
file1.text_sha1 = b""
464
delta = [(None, u'path', b'id', file1)]
465
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
468
def test_mismatched_new_path_entry_None(self):
469
inv = self.get_empty_inventory()
470
delta = [(None, u'path', b'id', None)]
471
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
474
def test_mismatched_new_path_None_entry(self):
475
inv = self.get_empty_inventory()
476
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
477
file1.revision = b'result'
479
file1.text_sha1 = b""
480
delta = [(u"path", None, b'id1', file1)]
481
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
484
def test_parent_is_not_directory(self):
485
inv = self.get_empty_inventory()
486
file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
487
file1.revision = b'result'
489
file1.text_sha1 = b""
490
file2 = inventory.InventoryFile(b'id2', 'path2', b'id1')
491
file2.revision = b'result'
493
file2.text_sha1 = b""
495
delta = [(None, u'path/path2', b'id2', file2)]
496
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
499
def test_parent_is_missing(self):
500
inv = self.get_empty_inventory()
501
file2 = inventory.InventoryFile(b'id2', 'path2', b'missingparent')
502
file2.revision = b'result'
504
file2.text_sha1 = b""
505
delta = [(None, u'path/path2', b'id2', file2)]
506
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
509
def test_new_parent_path_has_wrong_id(self):
510
inv = self.get_empty_inventory()
511
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
512
parent1.revision = b'result'
513
parent2 = inventory.InventoryDirectory(
514
b'p-2', 'dir2', inv.root.file_id)
515
parent2.revision = b'result'
516
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
517
file1.revision = b'result'
519
file1.text_sha1 = b""
522
# This delta claims that file1 is at dir/path, but actually its at
523
# dir2/path if you follow the inventory parent structure.
524
delta = [(None, u'dir/path', b'id', file1)]
525
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
528
def test_old_parent_path_is_wrong(self):
529
inv = self.get_empty_inventory()
530
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
531
parent1.revision = b'result'
532
parent2 = inventory.InventoryDirectory(
533
b'p-2', 'dir2', inv.root.file_id)
534
parent2.revision = b'result'
535
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
536
file1.revision = b'result'
538
file1.text_sha1 = b""
542
# This delta claims that file1 was at dir/path, but actually it was at
543
# dir2/path if you follow the inventory parent structure.
544
delta = [(u'dir/path', None, b'id', None)]
545
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
548
def test_old_parent_path_is_for_other_id(self):
549
inv = self.get_empty_inventory()
550
parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
551
parent1.revision = b'result'
552
parent2 = inventory.InventoryDirectory(
553
b'p-2', 'dir2', inv.root.file_id)
554
parent2.revision = b'result'
555
file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
556
file1.revision = b'result'
558
file1.text_sha1 = b""
559
file2 = inventory.InventoryFile(b'id2', 'path', b'p-1')
560
file2.revision = b'result'
562
file2.text_sha1 = b""
567
# This delta claims that file1 was at dir/path, but actually it was at
568
# dir2/path if you follow the inventory parent structure. At dir/path
569
# is another entry we should not delete.
570
delta = [(u'dir/path', None, b'id', None)]
571
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
574
def test_add_existing_id_new_path(self):
575
inv = self.get_empty_inventory()
576
parent1 = inventory.InventoryDirectory(
577
b'p-1', 'dir1', inv.root.file_id)
578
parent1.revision = b'result'
579
parent2 = inventory.InventoryDirectory(
580
b'p-1', 'dir2', inv.root.file_id)
581
parent2.revision = b'result'
583
delta = [(None, u'dir2', b'p-1', parent2)]
584
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
587
def test_add_new_id_existing_path(self):
588
inv = self.get_empty_inventory()
589
parent1 = inventory.InventoryDirectory(
590
b'p-1', 'dir1', inv.root.file_id)
591
parent1.revision = b'result'
592
parent2 = inventory.InventoryDirectory(
593
b'p-2', 'dir1', inv.root.file_id)
594
parent2.revision = b'result'
596
delta = [(None, u'dir1', b'p-2', parent2)]
597
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
600
def test_remove_dir_leaving_dangling_child(self):
601
inv = self.get_empty_inventory()
602
dir1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
603
dir1.revision = b'result'
604
dir2 = inventory.InventoryDirectory(b'p-2', 'child1', b'p-1')
605
dir2.revision = b'result'
606
dir3 = inventory.InventoryDirectory(b'p-3', 'child2', b'p-1')
607
dir3.revision = b'result'
611
delta = [(u'dir1', None, b'p-1', None),
612
(u'dir1/child2', None, b'p-3', None)]
613
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
616
def test_add_file(self):
617
inv = self.get_empty_inventory()
618
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
619
file1.revision = b'result'
621
file1.text_sha1 = b''
622
delta = [(None, u'path', b'file-id', file1)]
623
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
624
self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
626
def test_remove_file(self):
627
inv = self.get_empty_inventory()
628
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
629
file1.revision = b'result'
631
file1.text_sha1 = b''
633
delta = [(u'path', None, b'file-id', None)]
634
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
635
self.assertEqual(None, res_inv.path2id('path'))
636
self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
638
def test_rename_file(self):
639
inv = self.get_empty_inventory()
640
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
642
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
643
delta = [(u'path', 'path2', b'file-id', file2)]
644
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
645
self.assertEqual(None, res_inv.path2id('path'))
646
self.assertEqual(b'file-id', res_inv.path2id('path2'))
648
def test_replaced_at_new_path(self):
649
inv = self.get_empty_inventory()
650
file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
652
file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
653
delta = [(u'name', None, b'id1', None),
654
(None, u'name', b'id2', file2)]
655
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
656
self.assertEqual(b'id2', res_inv.path2id('name'))
658
def test_rename_dir(self):
659
inv = self.get_empty_inventory()
660
dir1 = inventory.InventoryDirectory(
661
b'dir-id', 'dir1', inv.root.file_id)
662
dir1.revision = b'basis'
663
file1 = self.make_file_ie(parent_id=b'dir-id')
666
dir2 = inventory.InventoryDirectory(
667
b'dir-id', 'dir2', inv.root.file_id)
668
dir2.revision = b'result'
669
delta = [('dir1', 'dir2', b'dir-id', dir2)]
670
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
671
# The file should be accessible under the new path
672
self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
674
def test_renamed_dir_with_renamed_child(self):
675
inv = self.get_empty_inventory()
676
dir1 = inventory.InventoryDirectory(
677
b'dir-id', 'dir1', inv.root.file_id)
678
dir1.revision = b'basis'
679
file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
680
file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
684
dir2 = inventory.InventoryDirectory(
685
b'dir-id', 'dir2', inv.root.file_id)
686
dir2.revision = b'result'
687
file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
688
delta = [('dir1', 'dir2', b'dir-id', dir2),
689
('dir1/name2', 'name2', b'file-id-2', file2b)]
690
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
691
# The file should be accessible under the new path
692
self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
693
self.assertEqual(None, res_inv.path2id('dir2/name2'))
694
self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
696
def test_is_root(self):
697
"""Ensure our root-checking code is accurate."""
698
inv = inventory.Inventory(b'TREE_ROOT')
699
self.assertTrue(inv.is_root(b'TREE_ROOT'))
700
self.assertFalse(inv.is_root(b'booga'))
701
inv.root.file_id = b'booga'
702
self.assertFalse(inv.is_root(b'TREE_ROOT'))
703
self.assertTrue(inv.is_root(b'booga'))
704
# works properly even if no root is set
706
self.assertFalse(inv.is_root(b'TREE_ROOT'))
707
self.assertFalse(inv.is_root(b'booga'))
709
def test_entries_for_empty_inventory(self):
710
"""Test that entries() will not fail for an empty inventory"""
711
inv = Inventory(root_id=None)
712
self.assertEqual([], inv.entries())
715
class TestInventoryEntry(TestCase):
717
def test_file_invalid_entry_name(self):
718
self.assertRaises(errors.InvalidEntryName, inventory.InventoryFile,
719
b'123', 'a/hello.c', ROOT_ID)
721
def test_file_backslash(self):
722
file = inventory.InventoryFile(b'123', 'h\\ello.c', ROOT_ID)
723
self.assertEquals(file.name, 'h\\ello.c')
725
def test_file_kind_character(self):
726
file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
727
self.assertEqual(file.kind_character(), '')
729
def test_dir_kind_character(self):
730
dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
731
self.assertEqual(dir.kind_character(), '/')
733
def test_link_kind_character(self):
734
dir = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
735
self.assertEqual(dir.kind_character(), '')
737
def test_link_kind_character(self):
738
dir = TreeReference(b'123', 'hello.c', ROOT_ID)
739
self.assertEqual(dir.kind_character(), '+')
741
def test_dir_detect_changes(self):
742
left = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
743
right = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
744
self.assertEqual((False, False), left.detect_changes(right))
745
self.assertEqual((False, False), right.detect_changes(left))
747
def test_file_detect_changes(self):
748
left = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
750
right = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
751
right.text_sha1 = 123
752
self.assertEqual((False, False), left.detect_changes(right))
753
self.assertEqual((False, False), right.detect_changes(left))
754
left.executable = True
755
self.assertEqual((False, True), left.detect_changes(right))
756
self.assertEqual((False, True), right.detect_changes(left))
757
right.text_sha1 = 321
758
self.assertEqual((True, True), left.detect_changes(right))
759
self.assertEqual((True, True), right.detect_changes(left))
761
def test_symlink_detect_changes(self):
762
left = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
763
left.symlink_target = 'foo'
764
right = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
765
right.symlink_target = 'foo'
766
self.assertEqual((False, False), left.detect_changes(right))
767
self.assertEqual((False, False), right.detect_changes(left))
768
left.symlink_target = 'different'
769
self.assertEqual((True, False), left.detect_changes(right))
770
self.assertEqual((True, False), right.detect_changes(left))
772
def test_file_has_text(self):
773
file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
774
self.assertTrue(file.has_text())
776
def test_directory_has_text(self):
777
dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
778
self.assertFalse(dir.has_text())
780
def test_link_has_text(self):
781
link = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
782
self.assertFalse(link.has_text())
784
def test_make_entry(self):
785
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
786
inventory.InventoryFile)
787
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
788
inventory.InventoryLink)
789
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
790
inventory.InventoryDirectory)
792
def test_make_entry_non_normalized(self):
793
orig_normalized_filename = osutils.normalized_filename
796
osutils.normalized_filename = osutils._accessible_normalized_filename
797
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
798
self.assertEqual(u'\xe5', entry.name)
799
self.assertIsInstance(entry, inventory.InventoryFile)
801
osutils.normalized_filename = osutils._inaccessible_normalized_filename
802
self.assertRaises(errors.InvalidNormalization,
803
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
805
osutils.normalized_filename = orig_normalized_filename
808
class TestDescribeChanges(TestCase):
810
def test_describe_change(self):
811
# we need to test the following change combinations:
817
# renamed/reparented and modified
818
# change kind (perhaps can't be done yet?)
819
# also, merged in combination with all of these?
820
old_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
821
old_a.text_sha1 = b'123132'
823
new_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
824
new_a.text_sha1 = b'123132'
827
self.assertChangeDescription('unchanged', old_a, new_a)
830
new_a.text_sha1 = b'abcabc'
831
self.assertChangeDescription('modified', old_a, new_a)
833
self.assertChangeDescription('added', None, new_a)
834
self.assertChangeDescription('removed', old_a, None)
835
# perhaps a bit questionable but seems like the most reasonable thing...
836
self.assertChangeDescription('unchanged', None, None)
838
# in this case it's both renamed and modified; show a rename and
840
new_a.name = 'newfilename'
841
self.assertChangeDescription('modified and renamed', old_a, new_a)
843
# reparenting is 'renaming'
844
new_a.name = old_a.name
845
new_a.parent_id = b'somedir-id'
846
self.assertChangeDescription('modified and renamed', old_a, new_a)
848
# reset the content values so its not modified
849
new_a.text_size = old_a.text_size
850
new_a.text_sha1 = old_a.text_sha1
851
new_a.name = old_a.name
853
new_a.name = 'newfilename'
854
self.assertChangeDescription('renamed', old_a, new_a)
856
# reparenting is 'renaming'
857
new_a.name = old_a.name
858
new_a.parent_id = b'somedir-id'
859
self.assertChangeDescription('renamed', old_a, new_a)
861
def assertChangeDescription(self, expected_change, old_ie, new_ie):
862
change = InventoryEntry.describe_change(old_ie, new_ie)
863
self.assertEqual(expected_change, change)
866
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
868
def get_chk_bytes(self):
869
factory = groupcompress.make_pack_factory(True, True, 1)
870
trans = self.get_transport('')
871
return factory(trans)
873
def read_bytes(self, chk_bytes, key):
874
stream = chk_bytes.get_record_stream([key], 'unordered', True)
875
return next(stream).get_bytes_as("fulltext")
877
def test_deserialise_gives_CHKInventory(self):
879
inv.revision_id = b"revid"
880
inv.root.revision = b"rootrev"
881
chk_bytes = self.get_chk_bytes()
882
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
883
bytes = b''.join(chk_inv.to_lines())
884
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
885
self.assertEqual(b"revid", new_inv.revision_id)
886
self.assertEqual("directory", new_inv.root.kind)
887
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
888
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
889
self.assertEqual(inv.root.name, new_inv.root.name)
890
self.assertEqual(b"rootrev", new_inv.root.revision)
891
self.assertEqual(b'plain', new_inv._search_key_name)
893
def test_deserialise_wrong_revid(self):
895
inv.revision_id = b"revid"
896
inv.root.revision = b"rootrev"
897
chk_bytes = self.get_chk_bytes()
898
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
899
bytes = b''.join(chk_inv.to_lines())
900
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
903
def test_captures_rev_root_byid(self):
905
inv.revision_id = b"foo"
906
inv.root.revision = b"bar"
907
chk_bytes = self.get_chk_bytes()
908
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
909
lines = chk_inv.to_lines()
912
b'revision_id: foo\n',
913
b'root_id: TREE_ROOT\n',
914
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
915
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
917
chk_inv = CHKInventory.deserialise(
918
chk_bytes, b''.join(lines), (b'foo',))
919
self.assertEqual(b'plain', chk_inv._search_key_name)
921
def test_captures_parent_id_basename_index(self):
923
inv.revision_id = b"foo"
924
inv.root.revision = b"bar"
925
chk_bytes = self.get_chk_bytes()
926
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
927
lines = chk_inv.to_lines()
930
b'revision_id: foo\n',
931
b'root_id: TREE_ROOT\n',
932
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
933
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
935
chk_inv = CHKInventory.deserialise(
936
chk_bytes, b''.join(lines), (b'foo',))
937
self.assertEqual(b'plain', chk_inv._search_key_name)
939
def test_captures_search_key_name(self):
941
inv.revision_id = b"foo"
942
inv.root.revision = b"bar"
943
chk_bytes = self.get_chk_bytes()
944
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
945
search_key_name=b'hash-16-way')
946
lines = chk_inv.to_lines()
949
b'search_key_name: hash-16-way\n',
950
b'root_id: TREE_ROOT\n',
951
b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
952
b'revision_id: foo\n',
953
b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
955
chk_inv = CHKInventory.deserialise(
956
chk_bytes, b''.join(lines), (b'foo',))
957
self.assertEqual(b'hash-16-way', chk_inv._search_key_name)
959
def test_directory_children_on_demand(self):
961
inv.revision_id = b"revid"
962
inv.root.revision = b"rootrev"
963
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
964
inv.get_entry(b"fileid").revision = b"filerev"
965
inv.get_entry(b"fileid").executable = True
966
inv.get_entry(b"fileid").text_sha1 = b"ffff"
967
inv.get_entry(b"fileid").text_size = 1
968
chk_bytes = self.get_chk_bytes()
969
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
970
bytes = b''.join(chk_inv.to_lines())
971
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
972
root_entry = new_inv.get_entry(inv.root.file_id)
973
self.assertEqual(None, root_entry._children)
974
self.assertEqual({'file'}, set(root_entry.children))
975
file_direct = new_inv.get_entry(b"fileid")
976
file_found = root_entry.children['file']
977
self.assertEqual(file_direct.kind, file_found.kind)
978
self.assertEqual(file_direct.file_id, file_found.file_id)
979
self.assertEqual(file_direct.parent_id, file_found.parent_id)
980
self.assertEqual(file_direct.name, file_found.name)
981
self.assertEqual(file_direct.revision, file_found.revision)
982
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
983
self.assertEqual(file_direct.text_size, file_found.text_size)
984
self.assertEqual(file_direct.executable, file_found.executable)
986
def test_from_inventory_maximum_size(self):
987
# from_inventory supports the maximum_size parameter.
989
inv.revision_id = b"revid"
990
inv.root.revision = b"rootrev"
991
chk_bytes = self.get_chk_bytes()
992
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
993
chk_inv.id_to_entry._ensure_root()
994
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
995
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
996
p_id_basename = chk_inv.parent_id_basename_to_file_id
997
p_id_basename._ensure_root()
998
self.assertEqual(120, p_id_basename._root_node.maximum_size)
999
self.assertEqual(2, p_id_basename._root_node._key_width)
1001
def test_iter_all_ids(self):
1003
inv.revision_id = b"revid"
1004
inv.root.revision = b"rootrev"
1005
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1006
inv.get_entry(b"fileid").revision = b"filerev"
1007
inv.get_entry(b"fileid").executable = True
1008
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1009
inv.get_entry(b"fileid").text_size = 1
1010
chk_bytes = self.get_chk_bytes()
1011
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1012
bytes = b''.join(chk_inv.to_lines())
1013
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1014
fileids = sorted(new_inv.iter_all_ids())
1015
self.assertEqual([inv.root.file_id, b"fileid"], fileids)
1017
def test__len__(self):
1019
inv.revision_id = b"revid"
1020
inv.root.revision = b"rootrev"
1021
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1022
inv.get_entry(b"fileid").revision = b"filerev"
1023
inv.get_entry(b"fileid").executable = True
1024
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1025
inv.get_entry(b"fileid").text_size = 1
1026
chk_bytes = self.get_chk_bytes()
1027
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1028
self.assertEqual(2, len(chk_inv))
1030
def test_get_entry(self):
1032
inv.revision_id = b"revid"
1033
inv.root.revision = b"rootrev"
1034
inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
1035
inv.get_entry(b"fileid").revision = b"filerev"
1036
inv.get_entry(b"fileid").executable = True
1037
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1038
inv.get_entry(b"fileid").text_size = 1
1039
chk_bytes = self.get_chk_bytes()
1040
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1041
data = b''.join(chk_inv.to_lines())
1042
new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
1043
root_entry = new_inv.get_entry(inv.root.file_id)
1044
file_entry = new_inv.get_entry(b"fileid")
1045
self.assertEqual("directory", root_entry.kind)
1046
self.assertEqual(inv.root.file_id, root_entry.file_id)
1047
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
1048
self.assertEqual(inv.root.name, root_entry.name)
1049
self.assertEqual(b"rootrev", root_entry.revision)
1050
self.assertEqual("file", file_entry.kind)
1051
self.assertEqual(b"fileid", file_entry.file_id)
1052
self.assertEqual(inv.root.file_id, file_entry.parent_id)
1053
self.assertEqual(u"file", file_entry.name)
1054
self.assertEqual(b"filerev", file_entry.revision)
1055
self.assertEqual(b"ffff", file_entry.text_sha1)
1056
self.assertEqual(1, file_entry.text_size)
1057
self.assertEqual(True, file_entry.executable)
1058
self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
1060
def test_has_id_true(self):
1062
inv.revision_id = b"revid"
1063
inv.root.revision = b"rootrev"
1064
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1065
inv.get_entry(b"fileid").revision = b"filerev"
1066
inv.get_entry(b"fileid").executable = True
1067
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1068
inv.get_entry(b"fileid").text_size = 1
1069
chk_bytes = self.get_chk_bytes()
1070
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1071
self.assertTrue(chk_inv.has_id(b'fileid'))
1072
self.assertTrue(chk_inv.has_id(inv.root.file_id))
1074
def test_has_id_not(self):
1076
inv.revision_id = b"revid"
1077
inv.root.revision = b"rootrev"
1078
chk_bytes = self.get_chk_bytes()
1079
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1080
self.assertFalse(chk_inv.has_id(b'fileid'))
1082
def test_id2path(self):
1084
inv.revision_id = b"revid"
1085
inv.root.revision = b"rootrev"
1086
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1087
fileentry = InventoryFile(b"fileid", "file", b"dirid")
1090
inv.get_entry(b"fileid").revision = b"filerev"
1091
inv.get_entry(b"fileid").executable = True
1092
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1093
inv.get_entry(b"fileid").text_size = 1
1094
inv.get_entry(b"dirid").revision = b"filerev"
1095
chk_bytes = self.get_chk_bytes()
1096
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1097
bytes = b''.join(chk_inv.to_lines())
1098
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1099
self.assertEqual('', new_inv.id2path(inv.root.file_id))
1100
self.assertEqual('dir', new_inv.id2path(b'dirid'))
1101
self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
1103
def test_path2id(self):
1105
inv.revision_id = b"revid"
1106
inv.root.revision = b"rootrev"
1107
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1108
fileentry = InventoryFile(b"fileid", "file", b"dirid")
1111
inv.get_entry(b"fileid").revision = b"filerev"
1112
inv.get_entry(b"fileid").executable = True
1113
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1114
inv.get_entry(b"fileid").text_size = 1
1115
inv.get_entry(b"dirid").revision = b"filerev"
1116
chk_bytes = self.get_chk_bytes()
1117
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1118
bytes = b''.join(chk_inv.to_lines())
1119
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1120
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1121
self.assertEqual(b'dirid', new_inv.path2id('dir'))
1122
self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
1124
def test_create_by_apply_delta_sets_root(self):
1126
inv.root.revision = b"myrootrev"
1127
inv.revision_id = b"revid"
1128
chk_bytes = self.get_chk_bytes()
1129
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1130
inv.add_path("", "directory", b"myrootid", None)
1131
inv.revision_id = b"expectedid"
1132
inv.root.revision = b"myrootrev"
1133
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1134
delta = [("", None, base_inv.root.file_id, None),
1135
(None, "", b"myrootid", inv.root)]
1136
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1137
self.assertEqual(reference_inv.root, new_inv.root)
1139
def test_create_by_apply_delta_empty_add_child(self):
1141
inv.revision_id = b"revid"
1142
inv.root.revision = b"rootrev"
1143
chk_bytes = self.get_chk_bytes()
1144
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1145
a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
1146
a_entry.revision = b"filerev"
1147
a_entry.executable = True
1148
a_entry.text_sha1 = b"ffff"
1149
a_entry.text_size = 1
1151
inv.revision_id = b"expectedid"
1152
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1153
delta = [(None, "A", b"A-id", a_entry)]
1154
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1155
# new_inv should be the same as reference_inv.
1156
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1157
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1158
reference_inv.id_to_entry._ensure_root()
1159
new_inv.id_to_entry._ensure_root()
1160
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1161
new_inv.id_to_entry._root_node._key)
1163
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1165
inv.revision_id = b"revid"
1166
inv.root.revision = b"rootrev"
1167
chk_bytes = self.get_chk_bytes()
1168
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1169
a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
1170
a_entry.revision = b"filerev"
1171
a_entry.executable = True
1172
a_entry.text_sha1 = b"ffff"
1173
a_entry.text_size = 1
1175
inv.revision_id = b"expectedid"
1176
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1177
delta = [(None, "A", b"A-id", a_entry)]
1178
new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
1179
reference_inv.id_to_entry._ensure_root()
1180
reference_inv.parent_id_basename_to_file_id._ensure_root()
1181
new_inv.id_to_entry._ensure_root()
1182
new_inv.parent_id_basename_to_file_id._ensure_root()
1183
# new_inv should be the same as reference_inv.
1184
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1185
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1186
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1187
new_inv.id_to_entry._root_node._key)
1188
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1189
new_inv.parent_id_basename_to_file_id._root_node._key)
1191
def test_iter_changes(self):
1192
# Low level bootstrapping smoke test; comprehensive generic tests via
1193
# InterTree are coming.
1195
inv.revision_id = b"revid"
1196
inv.root.revision = b"rootrev"
1197
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1198
inv.get_entry(b"fileid").revision = b"filerev"
1199
inv.get_entry(b"fileid").executable = True
1200
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1201
inv.get_entry(b"fileid").text_size = 1
1203
inv2.revision_id = b"revid2"
1204
inv2.root.revision = b"rootrev"
1205
inv2.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1206
inv2.get_entry(b"fileid").revision = b"filerev2"
1207
inv2.get_entry(b"fileid").executable = False
1208
inv2.get_entry(b"fileid").text_sha1 = b"bbbb"
1209
inv2.get_entry(b"fileid").text_size = 2
1210
# get fresh objects.
1211
chk_bytes = self.get_chk_bytes()
1212
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1213
bytes = b''.join(chk_inv.to_lines())
1214
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1215
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1216
bytes = b''.join(chk_inv2.to_lines())
1217
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid2",))
1218
self.assertEqual([(b'fileid', (u'file', u'file'), True, (True, True),
1219
(b'TREE_ROOT', b'TREE_ROOT'), (u'file',
1220
u'file'), ('file', 'file'),
1222
list(inv_1.iter_changes(inv_2)))
1224
def test_parent_id_basename_to_file_id_index_enabled(self):
1226
inv.revision_id = b"revid"
1227
inv.root.revision = b"rootrev"
1228
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1229
inv.get_entry(b"fileid").revision = b"filerev"
1230
inv.get_entry(b"fileid").executable = True
1231
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1232
inv.get_entry(b"fileid").text_size = 1
1233
# get fresh objects.
1234
chk_bytes = self.get_chk_bytes()
1235
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1236
bytes = b''.join(tmp_inv.to_lines())
1237
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1238
self.assertIsInstance(
1239
chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1241
{(b'', b''): b'TREE_ROOT', (b'TREE_ROOT', b'file'): b'fileid'},
1242
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1244
def test_file_entry_to_bytes(self):
1245
inv = CHKInventory(None)
1246
ie = inventory.InventoryFile(b'file-id', 'filename', b'parent-id')
1247
ie.executable = True
1248
ie.revision = b'file-rev-id'
1249
ie.text_sha1 = b'abcdefgh'
1251
bytes = inv._entry_to_bytes(ie)
1252
self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
1253
b'file-rev-id\nabcdefgh\n100\nY', bytes)
1254
ie2 = inv._bytes_to_entry(bytes)
1255
self.assertEqual(ie, ie2)
1256
self.assertIsInstance(ie2.name, text_type)
1257
self.assertEqual((b'filename', b'file-id', b'file-rev-id'),
1258
inv._bytes_to_utf8name_key(bytes))
1260
def test_file2_entry_to_bytes(self):
1261
inv = CHKInventory(None)
1263
ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
1264
ie.executable = False
1265
ie.revision = b'file-rev-id'
1266
ie.text_sha1 = b'123456'
1268
bytes = inv._entry_to_bytes(ie)
1269
self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
1270
b'file-rev-id\n123456\n25\nN', bytes)
1271
ie2 = inv._bytes_to_entry(bytes)
1272
self.assertEqual(ie, ie2)
1273
self.assertIsInstance(ie2.name, text_type)
1274
self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
1275
inv._bytes_to_utf8name_key(bytes))
1277
def test_dir_entry_to_bytes(self):
1278
inv = CHKInventory(None)
1279
ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
1280
ie.revision = b'dir-rev-id'
1281
bytes = inv._entry_to_bytes(ie)
1282
self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1283
ie2 = inv._bytes_to_entry(bytes)
1284
self.assertEqual(ie, ie2)
1285
self.assertIsInstance(ie2.name, text_type)
1286
self.assertEqual((b'dirname', b'dir-id', b'dir-rev-id'),
1287
inv._bytes_to_utf8name_key(bytes))
1289
def test_dir2_entry_to_bytes(self):
1290
inv = CHKInventory(None)
1291
ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
1293
ie.revision = b'dir-rev-id'
1294
bytes = inv._entry_to_bytes(ie)
1295
self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
1296
b'dir-rev-id', bytes)
1297
ie2 = inv._bytes_to_entry(bytes)
1298
self.assertEqual(ie, ie2)
1299
self.assertIsInstance(ie2.name, text_type)
1300
self.assertIs(ie2.parent_id, None)
1301
self.assertEqual((b'dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
1302
inv._bytes_to_utf8name_key(bytes))
1304
def test_symlink_entry_to_bytes(self):
1305
inv = CHKInventory(None)
1306
ie = inventory.InventoryLink(b'link-id', 'linkname', b'parent-id')
1307
ie.revision = b'link-rev-id'
1308
ie.symlink_target = u'target/path'
1309
bytes = inv._entry_to_bytes(ie)
1310
self.assertEqual(b'symlink: link-id\nparent-id\nlinkname\n'
1311
b'link-rev-id\ntarget/path', bytes)
1312
ie2 = inv._bytes_to_entry(bytes)
1313
self.assertEqual(ie, ie2)
1314
self.assertIsInstance(ie2.name, text_type)
1315
self.assertIsInstance(ie2.symlink_target, text_type)
1316
self.assertEqual((b'linkname', b'link-id', b'link-rev-id'),
1317
inv._bytes_to_utf8name_key(bytes))
1319
def test_symlink2_entry_to_bytes(self):
1320
inv = CHKInventory(None)
1321
ie = inventory.InventoryLink(
1322
b'link-id', u'link\u03a9name', b'parent-id')
1323
ie.revision = b'link-rev-id'
1324
ie.symlink_target = u'target/\u03a9path'
1325
bytes = inv._entry_to_bytes(ie)
1326
self.assertEqual(b'symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1327
b'link-rev-id\ntarget/\xce\xa9path', bytes)
1328
ie2 = inv._bytes_to_entry(bytes)
1329
self.assertEqual(ie, ie2)
1330
self.assertIsInstance(ie2.name, text_type)
1331
self.assertIsInstance(ie2.symlink_target, text_type)
1332
self.assertEqual((b'link\xce\xa9name', b'link-id', b'link-rev-id'),
1333
inv._bytes_to_utf8name_key(bytes))
1335
def test_tree_reference_entry_to_bytes(self):
1336
inv = CHKInventory(None)
1337
ie = inventory.TreeReference(b'tree-root-id', u'tree\u03a9name',
1339
ie.revision = b'tree-rev-id'
1340
ie.reference_revision = b'ref-rev-id'
1341
bytes = inv._entry_to_bytes(ie)
1342
self.assertEqual(b'tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1343
b'tree-rev-id\nref-rev-id', bytes)
1344
ie2 = inv._bytes_to_entry(bytes)
1345
self.assertEqual(ie, ie2)
1346
self.assertIsInstance(ie2.name, text_type)
1347
self.assertEqual((b'tree\xce\xa9name', b'tree-root-id', b'tree-rev-id'),
1348
inv._bytes_to_utf8name_key(bytes))
1350
def make_basic_utf8_inventory(self):
1352
inv.revision_id = b"revid"
1353
inv.root.revision = b"rootrev"
1354
root_id = inv.root.file_id
1355
inv.add(InventoryFile(b"fileid", u'f\xefle', root_id))
1356
inv.get_entry(b"fileid").revision = b"filerev"
1357
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1358
inv.get_entry(b"fileid").text_size = 0
1359
inv.add(InventoryDirectory(b"dirid", u'dir-\N{EURO SIGN}', root_id))
1360
inv.get_entry(b"dirid").revision = b"dirrev"
1361
inv.add(InventoryFile(b"childid", u'ch\xefld', b"dirid"))
1362
inv.get_entry(b"childid").revision = b"filerev"
1363
inv.get_entry(b"childid").text_sha1 = b"ffff"
1364
inv.get_entry(b"childid").text_size = 0
1365
chk_bytes = self.get_chk_bytes()
1366
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1367
bytes = b''.join(chk_inv.to_lines())
1368
return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1370
def test__preload_handles_utf8(self):
1371
new_inv = self.make_basic_utf8_inventory()
1372
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1373
self.assertFalse(new_inv._fully_cached)
1374
new_inv._preload_cache()
1376
sorted([new_inv.root_id, b"fileid", b"dirid", b"childid"]),
1377
sorted(new_inv._fileid_to_entry_cache.keys()))
1378
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1379
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1380
sorted(ie_root._children.keys()))
1381
ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
1382
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1384
def test__preload_populates_cache(self):
1386
inv.revision_id = b"revid"
1387
inv.root.revision = b"rootrev"
1388
root_id = inv.root.file_id
1389
inv.add(InventoryFile(b"fileid", "file", root_id))
1390
inv.get_entry(b"fileid").revision = b"filerev"
1391
inv.get_entry(b"fileid").executable = True
1392
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1393
inv.get_entry(b"fileid").text_size = 1
1394
inv.add(InventoryDirectory(b"dirid", "dir", root_id))
1395
inv.get_entry(b"dirid").revision = b"dirrev"
1396
inv.add(InventoryFile(b"childid", "child", b"dirid"))
1397
inv.get_entry(b"childid").revision = b"filerev"
1398
inv.get_entry(b"childid").executable = False
1399
inv.get_entry(b"childid").text_sha1 = b"dddd"
1400
inv.get_entry(b"childid").text_size = 1
1401
chk_bytes = self.get_chk_bytes()
1402
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1403
bytes = b''.join(chk_inv.to_lines())
1404
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1405
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1406
self.assertFalse(new_inv._fully_cached)
1407
new_inv._preload_cache()
1409
sorted([root_id, b"fileid", b"dirid", b"childid"]),
1410
sorted(new_inv._fileid_to_entry_cache.keys()))
1411
self.assertTrue(new_inv._fully_cached)
1412
ie_root = new_inv._fileid_to_entry_cache[root_id]
1413
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1414
ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
1415
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1417
def test__preload_handles_partially_evaluated_inventory(self):
1418
new_inv = self.make_basic_utf8_inventory()
1419
ie = new_inv.get_entry(new_inv.root_id)
1420
self.assertIs(None, ie._children)
1421
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1422
sorted(ie.children.keys()))
1423
# Accessing .children loads _children
1424
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1425
sorted(ie._children.keys()))
1426
new_inv._preload_cache()
1428
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1429
sorted(ie._children.keys()))
1430
ie_dir = new_inv.get_entry(b"dirid")
1431
self.assertEqual([u'ch\xefld'],
1432
sorted(ie_dir._children.keys()))
1434
def test_filter_change_in_renamed_subfolder(self):
1435
inv = Inventory(b'tree-root')
1436
inv.root.revision = b'rootrev'
1437
src_ie = inv.add_path('src', 'directory', b'src-id')
1438
src_ie.revision = b'srcrev'
1439
sub_ie = inv.add_path('src/sub/', 'directory', b'sub-id')
1440
sub_ie.revision = b'subrev'
1441
a_ie = inv.add_path('src/sub/a', 'file', b'a-id')
1442
a_ie.revision = b'filerev'
1443
a_ie.text_sha1 = osutils.sha_string(b'content\n')
1444
a_ie.text_size = len(b'content\n')
1445
chk_bytes = self.get_chk_bytes()
1446
inv = CHKInventory.from_inventory(chk_bytes, inv)
1447
inv = inv.create_by_apply_delta([
1448
("src/sub/a", "src/sub/a", b"a-id", a_ie),
1449
("src", "src2", b"src-id", src_ie),
1451
new_inv = inv.filter([b'a-id', b'src-id'])
1455
('src/sub', b'sub-id'),
1456
('src/sub/a', b'a-id'),
1457
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1460
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1462
def get_chk_bytes(self):
1463
factory = groupcompress.make_pack_factory(True, True, 1)
1464
trans = self.get_transport('')
1465
return factory(trans)
1467
def make_dir(self, inv, name, parent_id, revision):
1468
ie = inv.make_entry('directory', name, parent_id,
1469
name.encode('utf-8') + b'-id')
1470
ie.revision = revision
1473
def make_file(self, inv, name, parent_id, revision, content=b'content\n'):
1474
ie = inv.make_entry('file', name, parent_id,
1475
name.encode('utf-8') + b'-id')
1476
ie.text_sha1 = osutils.sha_string(content)
1477
ie.text_size = len(content)
1478
ie.revision = revision
1481
def make_simple_inventory(self):
1482
inv = Inventory(b'TREE_ROOT')
1483
inv.revision_id = b"revid"
1484
inv.root.revision = b"rootrev"
1487
# sub-file1 sub-file1-id
1488
# sub-file2 sub-file2-id
1489
# sub-dir1/ sub-dir1-id
1490
# subsub-file1 subsub-file1-id
1492
# sub2-file1 sub2-file1-id
1494
self.make_dir(inv, 'dir1', b'TREE_ROOT', b'dirrev')
1495
self.make_dir(inv, 'dir2', b'TREE_ROOT', b'dirrev')
1496
self.make_dir(inv, 'sub-dir1', b'dir1-id', b'dirrev')
1497
self.make_file(inv, 'top', b'TREE_ROOT', b'filerev')
1498
self.make_file(inv, 'sub-file1', b'dir1-id', b'filerev')
1499
self.make_file(inv, 'sub-file2', b'dir1-id', b'filerev')
1500
self.make_file(inv, 'subsub-file1', b'sub-dir1-id', b'filerev')
1501
self.make_file(inv, 'sub2-file1', b'dir2-id', b'filerev')
1502
chk_bytes = self.get_chk_bytes()
1503
# use a small maximum_size to force internal paging structures
1504
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1506
search_key_name=b'hash-255-way')
1507
bytes = b''.join(chk_inv.to_lines())
1508
return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
1510
def assert_Getitems(self, expected_fileids, inv, file_ids):
1511
self.assertEqual(sorted(expected_fileids),
1512
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1514
def assertExpand(self, all_ids, inv, file_ids):
1516
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1517
self.assertEqual(set(all_ids), val_all_ids)
1518
entries = inv._getitems(val_all_ids)
1519
expected_children = {}
1520
for entry in entries:
1521
s = expected_children.setdefault(entry.parent_id, [])
1522
s.append(entry.file_id)
1524
k: sorted(v) for k, v in val_children.items()}
1525
expected_children = {
1526
k: sorted(v) for k, v in expected_children.items()}
1527
self.assertEqual(expected_children, val_children)
1529
def test_make_simple_inventory(self):
1530
inv = self.make_simple_inventory()
1532
for path, entry in inv.iter_entries_by_dir():
1533
layout.append((path, entry.file_id))
1536
('dir1', b'dir1-id'),
1537
('dir2', b'dir2-id'),
1539
('dir1/sub-dir1', b'sub-dir1-id'),
1540
('dir1/sub-file1', b'sub-file1-id'),
1541
('dir1/sub-file2', b'sub-file2-id'),
1542
('dir1/sub-dir1/subsub-file1', b'subsub-file1-id'),
1543
('dir2/sub2-file1', b'sub2-file1-id'),
1546
def test__getitems(self):
1547
inv = self.make_simple_inventory()
1549
self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
1550
self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
1551
self.assertFalse(b'sub-file2-id' in inv._fileid_to_entry_cache)
1553
self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
1555
self.assert_Getitems([b'dir1-id', b'sub-file2-id'], inv,
1556
[b'dir1-id', b'sub-file2-id'])
1557
self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
1558
self.assertTrue(b'sub-file2-id' in inv._fileid_to_entry_cache)
1560
def test_single_file(self):
1561
inv = self.make_simple_inventory()
1562
self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
1564
def test_get_all_parents(self):
1565
inv = self.make_simple_inventory()
1566
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1568
], inv, [b'subsub-file1-id'])
1570
def test_get_children(self):
1571
inv = self.make_simple_inventory()
1572
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1573
b'sub-file1-id', b'sub-file2-id', b'subsub-file1-id',
1574
], inv, [b'dir1-id'])
1576
def test_from_root(self):
1577
inv = self.make_simple_inventory()
1578
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'dir2-id', b'sub-dir1-id',
1579
b'sub-file1-id', b'sub-file2-id', b'sub2-file1-id',
1580
b'subsub-file1-id', b'top-id'], inv, [b'TREE_ROOT'])
1582
def test_top_level_file(self):
1583
inv = self.make_simple_inventory()
1584
self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
1586
def test_subsub_file(self):
1587
inv = self.make_simple_inventory()
1588
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
1589
b'subsub-file1-id'], inv, [b'subsub-file1-id'])
1591
def test_sub_and_root(self):
1592
inv = self.make_simple_inventory()
1593
self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id', b'top-id',
1594
b'subsub-file1-id'], inv, [b'top-id', b'subsub-file1-id'])
1597
class TestMutableInventoryFromTree(TestCaseWithTransport):
1599
def test_empty(self):
1600
repository = self.make_repository('.')
1601
tree = repository.revision_tree(revision.NULL_REVISION)
1602
inv = mutable_inventory_from_tree(tree)
1603
self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1604
self.assertEqual(0, len(inv))
1606
def test_some_files(self):
1607
wt = self.make_branch_and_tree('.')
1608
self.build_tree(['a'])
1609
wt.add(['a'], [b'thefileid'])
1610
revid = wt.commit("commit")
1611
tree = wt.branch.repository.revision_tree(revid)
1612
inv = mutable_inventory_from_tree(tree)
1613
self.assertEqual(revid, inv.revision_id)
1614
self.assertEqual(2, len(inv))
1615
self.assertEqual("a", inv.get_entry(b'thefileid').name)
1616
# The inventory should be mutable and independent of
1618
self.assertFalse(tree.root_inventory.get_entry(
1619
b'thefileid').executable)
1620
inv.get_entry(b'thefileid').executable = True
1621
self.assertFalse(tree.root_inventory.get_entry(
1622
b'thefileid').executable)