/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_inv.py

  • Committer: Jelmer Vernooij
  • Date: 2020-03-22 01:35:14 UTC
  • mfrom: (7490.7.6 work)
  • mto: This revision was merged to the branch mainline in revision 7499.
  • Revision ID: jelmer@jelmer.uk-20200322013514-7vw1ntwho04rcuj3
merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from .. import (
 
19
    errors,
 
20
    osutils,
 
21
    repository,
 
22
    revision,
 
23
    tests,
 
24
    workingtree,
 
25
    )
 
26
from ..bzr import (
 
27
    chk_map,
 
28
    groupcompress,
 
29
    inventory,
 
30
    )
 
31
from ..bzr.inventory import (
 
32
    CHKInventory,
 
33
    Inventory,
 
34
    ROOT_ID,
 
35
    InventoryFile,
 
36
    InventoryDirectory,
 
37
    InventoryEntry,
 
38
    InvalidEntryName,
 
39
    TreeReference,
 
40
    mutable_inventory_from_tree,
 
41
    )
 
42
from . import (
 
43
    TestCase,
 
44
    TestCaseWithTransport,
 
45
    )
 
46
from .scenarios import load_tests_apply_scenarios
 
47
 
 
48
 
 
49
load_tests = load_tests_apply_scenarios
 
50
 
 
51
 
 
52
def delta_application_scenarios():
 
53
    scenarios = [
 
54
        ('Inventory', {'apply_delta': apply_inventory_Inventory}),
 
55
        ]
 
56
    # Working tree basis delta application
 
57
    # Repository add_inv_by_delta.
 
58
    # Reduce form of the per_repository test logic - that logic needs to be
 
59
    # be able to get /just/ repositories whereas these tests are fine with
 
60
    # just creating trees.
 
61
    formats = set()
 
62
    for _, format in repository.format_registry.iteritems():
 
63
        if format.supports_full_versioned_files:
 
64
            scenarios.append((str(format.__name__), {
 
65
                'apply_delta': apply_inventory_Repository_add_inventory_by_delta,
 
66
                'format': format}))
 
67
    for getter in workingtree.format_registry._get_all_lazy():
 
68
        try:
 
69
            format = getter()
 
70
            if callable(format):
 
71
                format = format()
 
72
        except ImportError:
 
73
            pass  # Format with unmet dependency
 
74
        repo_fmt = format._matchingcontroldir.repository_format
 
75
        if not repo_fmt.supports_full_versioned_files:
 
76
            continue
 
77
        scenarios.append(
 
78
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
79
                'apply_delta': apply_inventory_WT_basis,
 
80
                'format': format}))
 
81
        scenarios.append(
 
82
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
83
                'apply_delta': apply_inventory_WT,
 
84
                'format': format}))
 
85
    return scenarios
 
86
 
 
87
 
 
88
def create_texts_for_inv(repo, inv):
 
89
    for path, ie in inv.iter_entries():
 
90
        if ie.text_size:
 
91
            lines = [b'a' * ie.text_size]
 
92
        else:
 
93
            lines = []
 
94
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
95
 
 
96
 
 
97
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
98
    """Apply delta to basis and return the result.
 
99
 
 
100
    :param basis: An inventory to be used as the basis.
 
101
    :param delta: The inventory delta to apply:
 
102
    :return: An inventory resulting from the application.
 
103
    """
 
104
    basis.apply_delta(delta)
 
105
    return basis
 
106
 
 
107
 
 
108
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
109
    """Apply delta to basis and return the result.
 
110
 
 
111
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
112
 
 
113
    :param basis: An inventory to be used as the basis.
 
114
    :param delta: The inventory delta to apply:
 
115
    :return: An inventory resulting from the application.
 
116
    """
 
117
    control = self.make_controldir(
 
118
        'tree', format=self.format._matchingcontroldir)
 
119
    control.create_repository()
 
120
    control.create_branch()
 
121
    tree = self.format.initialize(control)
 
122
    tree.lock_write()
 
123
    try:
 
124
        tree._write_inventory(basis)
 
125
    finally:
 
126
        tree.unlock()
 
127
    # Fresh object, reads disk again.
 
128
    tree = tree.controldir.open_workingtree()
 
129
    tree.lock_write()
 
130
    try:
 
131
        tree.apply_inventory_delta(delta)
 
132
    finally:
 
133
        tree.unlock()
 
134
    # reload tree - ensure we get what was written.
 
135
    tree = tree.controldir.open_workingtree()
 
136
    tree.lock_read()
 
137
    self.addCleanup(tree.unlock)
 
138
    if not invalid_delta:
 
139
        tree._validate()
 
140
    return tree.root_inventory
 
141
 
 
142
 
 
143
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
144
    with repository.WriteGroup(repo):
 
145
        rev = revision.Revision(b'basis', timestamp=0, timezone=None,
 
146
                                message="", committer="foo@example.com")
 
147
        basis.revision_id = b'basis'
 
148
        create_texts_for_inv(repo, basis)
 
149
        repo.add_revision(b'basis', rev, basis)
 
150
        if invalid_delta:
 
151
            # We don't want to apply the delta to the basis, because we expect
 
152
            # the delta is invalid.
 
153
            result_inv = basis
 
154
            result_inv.revision_id = b'result'
 
155
            target_entries = None
 
156
        else:
 
157
            result_inv = basis.create_by_apply_delta(delta, b'result')
 
158
            create_texts_for_inv(repo, result_inv)
 
159
            target_entries = list(result_inv.iter_entries_by_dir())
 
160
        rev = revision.Revision(b'result', timestamp=0, timezone=None,
 
161
                                message="", committer="foo@example.com")
 
162
        repo.add_revision(b'result', rev, result_inv)
 
163
    return target_entries
 
164
 
 
165
 
 
166
def _get_basis_entries(tree):
 
167
    basis_tree = tree.basis_tree()
 
168
    with basis_tree.lock_read():
 
169
        return list(basis_tree.inventory.iter_entries_by_dir())
 
170
 
 
171
 
 
172
def _populate_different_tree(tree, basis, delta):
 
173
    """Put all entries into tree, but at a unique location."""
 
174
    added_ids = set()
 
175
    added_paths = set()
 
176
    tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
 
177
    for path, ie in basis.iter_entries_by_dir():
 
178
        if ie.file_id in added_ids:
 
179
            continue
 
180
        # We want a unique path for each of these, we use the file-id
 
181
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
182
        added_ids.add(ie.file_id)
 
183
    for old_path, new_path, file_id, ie in delta:
 
184
        if file_id in added_ids:
 
185
            continue
 
186
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
187
 
 
188
 
 
189
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
190
    """Apply delta to basis and return the result.
 
191
 
 
192
    This sets the parent and then calls update_basis_by_delta.
 
193
    It also puts the basis in the repository under both 'basis' and 'result' to
 
194
    allow safety checks made by the WT to succeed, and finally ensures that all
 
195
    items in the delta with a new path are present in the WT before calling
 
196
    update_basis_by_delta.
 
197
 
 
198
    :param basis: An inventory to be used as the basis.
 
199
    :param delta: The inventory delta to apply:
 
200
    :return: An inventory resulting from the application.
 
201
    """
 
202
    control = test.make_controldir(
 
203
        'tree', format=test.format._matchingcontroldir)
 
204
    control.create_repository()
 
205
    control.create_branch()
 
206
    tree = test.format.initialize(control)
 
207
    tree.lock_write()
 
208
    try:
 
209
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
210
                                                delta, invalid_delta)
 
211
        # Set the basis state as the trees current state
 
212
        tree._write_inventory(basis)
 
213
        # This reads basis from the repo and puts it into the tree's local
 
214
        # cache, if it has one.
 
215
        tree.set_parent_ids([b'basis'])
 
216
    finally:
 
217
        tree.unlock()
 
218
    # Fresh lock, reads disk again.
 
219
    with tree.lock_write():
 
220
        tree.update_basis_by_delta(b'result', delta)
 
221
        if not invalid_delta:
 
222
            tree._validate()
 
223
    # reload tree - ensure we get what was written.
 
224
    tree = tree.controldir.open_workingtree()
 
225
    basis_tree = tree.basis_tree()
 
226
    basis_tree.lock_read()
 
227
    test.addCleanup(basis_tree.unlock)
 
228
    basis_inv = basis_tree.root_inventory
 
229
    if target_entries:
 
230
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
231
        test.assertEqual(target_entries, basis_entries)
 
232
    return basis_inv
 
233
 
 
234
 
 
235
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
236
                                                      invalid_delta=True):
 
237
    """Apply delta to basis and return the result.
 
238
 
 
239
    This inserts basis as a whole inventory and then uses
 
240
    add_inventory_by_delta to add delta.
 
241
 
 
242
    :param basis: An inventory to be used as the basis.
 
243
    :param delta: The inventory delta to apply:
 
244
    :return: An inventory resulting from the application.
 
245
    """
 
246
    format = self.format()
 
247
    control = self.make_controldir('tree', format=format._matchingcontroldir)
 
248
    repo = format.initialize(control)
 
249
    with repo.lock_write(), repository.WriteGroup(repo):
 
250
        rev = revision.Revision(
 
251
            b'basis', timestamp=0, timezone=None, message="",
 
252
            committer="foo@example.com")
 
253
        basis.revision_id = b'basis'
 
254
        create_texts_for_inv(repo, basis)
 
255
        repo.add_revision(b'basis', rev, basis)
 
256
    with repo.lock_write(), repository.WriteGroup(repo):
 
257
        inv_sha1 = repo.add_inventory_by_delta(
 
258
            b'basis', delta, b'result', [b'basis'])
 
259
    # Fresh lock, reads disk again.
 
260
    repo = repo.controldir.open_repository()
 
261
    repo.lock_read()
 
262
    self.addCleanup(repo.unlock)
 
263
    return repo.get_inventory(b'result')
 
264
 
 
265
 
 
266
class TestInventoryUpdates(TestCase):
 
267
 
 
268
    def test_creation_from_root_id(self):
 
269
        # iff a root id is passed to the constructor, a root directory is made
 
270
        inv = inventory.Inventory(root_id=b'tree-root')
 
271
        self.assertNotEqual(None, inv.root)
 
272
        self.assertEqual(b'tree-root', inv.root.file_id)
 
273
 
 
274
    def test_add_path_of_root(self):
 
275
        # if no root id is given at creation time, there is no root directory
 
276
        inv = inventory.Inventory(root_id=None)
 
277
        self.assertIs(None, inv.root)
 
278
        # add a root entry by adding its path
 
279
        ie = inv.add_path(u"", "directory", b"my-root")
 
280
        ie.revision = b'test-rev'
 
281
        self.assertEqual(b"my-root", ie.file_id)
 
282
        self.assertIs(ie, inv.root)
 
283
 
 
284
    def test_add_path(self):
 
285
        inv = inventory.Inventory(root_id=b'tree_root')
 
286
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
287
        self.assertEqual(b'hello-id', ie.file_id)
 
288
        self.assertEqual('file', ie.kind)
 
289
 
 
290
    def test_copy(self):
 
291
        """Make sure copy() works and creates a deep copy."""
 
292
        inv = inventory.Inventory(root_id=b'some-tree-root')
 
293
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
294
        inv2 = inv.copy()
 
295
        inv.root.file_id = b'some-new-root'
 
296
        ie.name = u'file2'
 
297
        self.assertEqual(b'some-tree-root', inv2.root.file_id)
 
298
        self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
 
299
 
 
300
    def test_copy_empty(self):
 
301
        """Make sure an empty inventory can be copied."""
 
302
        inv = inventory.Inventory(root_id=None)
 
303
        inv2 = inv.copy()
 
304
        self.assertIs(None, inv2.root)
 
305
 
 
306
    def test_copy_copies_root_revision(self):
 
307
        """Make sure the revision of the root gets copied."""
 
308
        inv = inventory.Inventory(root_id=b'someroot')
 
309
        inv.root.revision = b'therev'
 
310
        inv2 = inv.copy()
 
311
        self.assertEqual(b'someroot', inv2.root.file_id)
 
312
        self.assertEqual(b'therev', inv2.root.revision)
 
313
 
 
314
    def test_create_tree_reference(self):
 
315
        inv = inventory.Inventory(b'tree-root-123')
 
316
        inv.add(TreeReference(
 
317
            b'nested-id', 'nested', parent_id=b'tree-root-123',
 
318
            revision=b'rev', reference_revision=b'rev2'))
 
319
 
 
320
    def test_error_encoding(self):
 
321
        inv = inventory.Inventory(b'tree-root')
 
322
        inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
 
323
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
324
                              InventoryFile(b'b-id', u'\u1234', b'tree-root'))
 
325
        self.assertContainsRe(str(e), '\\u1234')
 
326
 
 
327
    def test_add_recursive(self):
 
328
        parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
 
329
        child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
 
330
        parent.children[child.file_id] = child
 
331
        inv = inventory.Inventory(b'tree-root')
 
332
        inv.add(parent)
 
333
        self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
 
334
 
 
335
 
 
336
class TestDeltaApplication(TestCaseWithTransport):
 
337
 
 
338
    scenarios = delta_application_scenarios()
 
339
 
 
340
    def get_empty_inventory(self, reference_inv=None):
 
341
        """Get an empty inventory.
 
342
 
 
343
        Note that tests should not depend on the revision of the root for
 
344
        setting up test conditions, as it has to be flexible to accomodate non
 
345
        rich root repositories.
 
346
 
 
347
        :param reference_inv: If not None, get the revision for the root from
 
348
            this inventory. This is useful for dealing with older repositories
 
349
            that routinely discarded the root entry data. If None, the root's
 
350
            revision is set to 'basis'.
 
351
        """
 
352
        inv = inventory.Inventory()
 
353
        if reference_inv is not None:
 
354
            inv.root.revision = reference_inv.root.revision
 
355
        else:
 
356
            inv.root.revision = b'basis'
 
357
        return inv
 
358
 
 
359
    def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
 
360
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
361
        ie_file.revision = b'result'
 
362
        ie_file.text_size = 0
 
363
        ie_file.text_sha1 = b''
 
364
        return ie_file
 
365
 
 
366
    def test_empty_delta(self):
 
367
        inv = self.get_empty_inventory()
 
368
        delta = []
 
369
        inv = self.apply_delta(self, inv, delta)
 
370
        inv2 = self.get_empty_inventory(inv)
 
371
        self.assertEqual([], inv2._make_delta(inv))
 
372
 
 
373
    def test_None_file_id(self):
 
374
        inv = self.get_empty_inventory()
 
375
        dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
 
376
        dir1.file_id = None
 
377
        dir1.revision = b'result'
 
378
        delta = [(None, u'dir1', None, dir1)]
 
379
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
380
                          inv, delta)
 
381
 
 
382
    def test_unicode_file_id(self):
 
383
        inv = self.get_empty_inventory()
 
384
        dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
 
385
        dir1.file_id = u'dirid'
 
386
        dir1.revision = b'result'
 
387
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
388
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
389
                          inv, delta)
 
390
 
 
391
    def test_repeated_file_id(self):
 
392
        inv = self.get_empty_inventory()
 
393
        file1 = inventory.InventoryFile(b'id', 'path1', inv.root.file_id)
 
394
        file1.revision = b'result'
 
395
        file1.text_size = 0
 
396
        file1.text_sha1 = b""
 
397
        file2 = file1.copy()
 
398
        file2.name = 'path2'
 
399
        delta = [(None, u'path1', b'id', file1),
 
400
                 (None, u'path2', b'id', file2)]
 
401
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
402
                          inv, delta)
 
403
 
 
404
    def test_repeated_new_path(self):
 
405
        inv = self.get_empty_inventory()
 
406
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
407
        file1.revision = b'result'
 
408
        file1.text_size = 0
 
409
        file1.text_sha1 = b""
 
410
        file2 = file1.copy()
 
411
        file2.file_id = b'id2'
 
412
        delta = [(None, u'path', b'id1', file1),
 
413
                 (None, u'path', b'id2', file2)]
 
414
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
415
                          inv, delta)
 
416
 
 
417
    def test_repeated_old_path(self):
 
418
        inv = self.get_empty_inventory()
 
419
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
420
        file1.revision = b'result'
 
421
        file1.text_size = 0
 
422
        file1.text_sha1 = b""
 
423
        # We can't *create* a source inventory with the same path, but
 
424
        # a badly generated partial delta might claim the same source twice.
 
425
        # This would be buggy in two ways: the path is repeated in the delta,
 
426
        # And the path for one of the file ids doesn't match the source
 
427
        # location. Alternatively, we could have a repeated fileid, but that
 
428
        # is separately checked for.
 
429
        file2 = inventory.InventoryFile(b'id2', 'path2', inv.root.file_id)
 
430
        file2.revision = b'result'
 
431
        file2.text_size = 0
 
432
        file2.text_sha1 = b""
 
433
        inv.add(file1)
 
434
        inv.add(file2)
 
435
        delta = [(u'path', None, b'id1', None), (u'path', None, b'id2', None)]
 
436
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
437
                          inv, delta)
 
438
 
 
439
    def test_mismatched_id_entry_id(self):
 
440
        inv = self.get_empty_inventory()
 
441
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
442
        file1.revision = b'result'
 
443
        file1.text_size = 0
 
444
        file1.text_sha1 = b""
 
445
        delta = [(None, u'path', b'id', file1)]
 
446
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
447
                          inv, delta)
 
448
 
 
449
    def test_mismatched_new_path_entry_None(self):
 
450
        inv = self.get_empty_inventory()
 
451
        delta = [(None, u'path', b'id', None)]
 
452
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
453
                          inv, delta)
 
454
 
 
455
    def test_mismatched_new_path_None_entry(self):
 
456
        inv = self.get_empty_inventory()
 
457
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
458
        file1.revision = b'result'
 
459
        file1.text_size = 0
 
460
        file1.text_sha1 = b""
 
461
        delta = [(u"path", None, b'id1', file1)]
 
462
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
463
                          inv, delta)
 
464
 
 
465
    def test_parent_is_not_directory(self):
 
466
        inv = self.get_empty_inventory()
 
467
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
468
        file1.revision = b'result'
 
469
        file1.text_size = 0
 
470
        file1.text_sha1 = b""
 
471
        file2 = inventory.InventoryFile(b'id2', 'path2', b'id1')
 
472
        file2.revision = b'result'
 
473
        file2.text_size = 0
 
474
        file2.text_sha1 = b""
 
475
        inv.add(file1)
 
476
        delta = [(None, u'path/path2', b'id2', file2)]
 
477
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
478
                          inv, delta)
 
479
 
 
480
    def test_parent_is_missing(self):
 
481
        inv = self.get_empty_inventory()
 
482
        file2 = inventory.InventoryFile(b'id2', 'path2', b'missingparent')
 
483
        file2.revision = b'result'
 
484
        file2.text_size = 0
 
485
        file2.text_sha1 = b""
 
486
        delta = [(None, u'path/path2', b'id2', file2)]
 
487
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
488
                          inv, delta)
 
489
 
 
490
    def test_new_parent_path_has_wrong_id(self):
 
491
        inv = self.get_empty_inventory()
 
492
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
493
        parent1.revision = b'result'
 
494
        parent2 = inventory.InventoryDirectory(
 
495
            b'p-2', 'dir2', inv.root.file_id)
 
496
        parent2.revision = b'result'
 
497
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
498
        file1.revision = b'result'
 
499
        file1.text_size = 0
 
500
        file1.text_sha1 = b""
 
501
        inv.add(parent1)
 
502
        inv.add(parent2)
 
503
        # This delta claims that file1 is at dir/path, but actually its at
 
504
        # dir2/path if you follow the inventory parent structure.
 
505
        delta = [(None, u'dir/path', b'id', file1)]
 
506
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
507
                          inv, delta)
 
508
 
 
509
    def test_old_parent_path_is_wrong(self):
 
510
        inv = self.get_empty_inventory()
 
511
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
512
        parent1.revision = b'result'
 
513
        parent2 = inventory.InventoryDirectory(
 
514
            b'p-2', 'dir2', inv.root.file_id)
 
515
        parent2.revision = b'result'
 
516
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
517
        file1.revision = b'result'
 
518
        file1.text_size = 0
 
519
        file1.text_sha1 = b""
 
520
        inv.add(parent1)
 
521
        inv.add(parent2)
 
522
        inv.add(file1)
 
523
        # This delta claims that file1 was at dir/path, but actually it was at
 
524
        # dir2/path if you follow the inventory parent structure.
 
525
        delta = [(u'dir/path', None, b'id', None)]
 
526
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
527
                          inv, delta)
 
528
 
 
529
    def test_old_parent_path_is_for_other_id(self):
 
530
        inv = self.get_empty_inventory()
 
531
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
532
        parent1.revision = b'result'
 
533
        parent2 = inventory.InventoryDirectory(
 
534
            b'p-2', 'dir2', inv.root.file_id)
 
535
        parent2.revision = b'result'
 
536
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
537
        file1.revision = b'result'
 
538
        file1.text_size = 0
 
539
        file1.text_sha1 = b""
 
540
        file2 = inventory.InventoryFile(b'id2', 'path', b'p-1')
 
541
        file2.revision = b'result'
 
542
        file2.text_size = 0
 
543
        file2.text_sha1 = b""
 
544
        inv.add(parent1)
 
545
        inv.add(parent2)
 
546
        inv.add(file1)
 
547
        inv.add(file2)
 
548
        # This delta claims that file1 was at dir/path, but actually it was at
 
549
        # dir2/path if you follow the inventory parent structure. At dir/path
 
550
        # is another entry we should not delete.
 
551
        delta = [(u'dir/path', None, b'id', None)]
 
552
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
553
                          inv, delta)
 
554
 
 
555
    def test_add_existing_id_new_path(self):
 
556
        inv = self.get_empty_inventory()
 
557
        parent1 = inventory.InventoryDirectory(
 
558
            b'p-1', 'dir1', inv.root.file_id)
 
559
        parent1.revision = b'result'
 
560
        parent2 = inventory.InventoryDirectory(
 
561
            b'p-1', 'dir2', inv.root.file_id)
 
562
        parent2.revision = b'result'
 
563
        inv.add(parent1)
 
564
        delta = [(None, u'dir2', b'p-1', parent2)]
 
565
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
566
                          inv, delta)
 
567
 
 
568
    def test_add_new_id_existing_path(self):
 
569
        inv = self.get_empty_inventory()
 
570
        parent1 = inventory.InventoryDirectory(
 
571
            b'p-1', 'dir1', inv.root.file_id)
 
572
        parent1.revision = b'result'
 
573
        parent2 = inventory.InventoryDirectory(
 
574
            b'p-2', 'dir1', inv.root.file_id)
 
575
        parent2.revision = b'result'
 
576
        inv.add(parent1)
 
577
        delta = [(None, u'dir1', b'p-2', parent2)]
 
578
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
579
                          inv, delta)
 
580
 
 
581
    def test_remove_dir_leaving_dangling_child(self):
 
582
        inv = self.get_empty_inventory()
 
583
        dir1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
 
584
        dir1.revision = b'result'
 
585
        dir2 = inventory.InventoryDirectory(b'p-2', 'child1', b'p-1')
 
586
        dir2.revision = b'result'
 
587
        dir3 = inventory.InventoryDirectory(b'p-3', 'child2', b'p-1')
 
588
        dir3.revision = b'result'
 
589
        inv.add(dir1)
 
590
        inv.add(dir2)
 
591
        inv.add(dir3)
 
592
        delta = [(u'dir1', None, b'p-1', None),
 
593
                 (u'dir1/child2', None, b'p-3', None)]
 
594
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
595
                          inv, delta)
 
596
 
 
597
    def test_add_file(self):
 
598
        inv = self.get_empty_inventory()
 
599
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
600
        file1.revision = b'result'
 
601
        file1.text_size = 0
 
602
        file1.text_sha1 = b''
 
603
        delta = [(None, u'path', b'file-id', file1)]
 
604
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
605
        self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
 
606
 
 
607
    def test_remove_file(self):
 
608
        inv = self.get_empty_inventory()
 
609
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
610
        file1.revision = b'result'
 
611
        file1.text_size = 0
 
612
        file1.text_sha1 = b''
 
613
        inv.add(file1)
 
614
        delta = [(u'path', None, b'file-id', None)]
 
615
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
616
        self.assertEqual(None, res_inv.path2id('path'))
 
617
        self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
 
618
 
 
619
    def test_rename_file(self):
 
620
        inv = self.get_empty_inventory()
 
621
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
622
        inv.add(file1)
 
623
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
624
        delta = [(u'path', 'path2', b'file-id', file2)]
 
625
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
626
        self.assertEqual(None, res_inv.path2id('path'))
 
627
        self.assertEqual(b'file-id', res_inv.path2id('path2'))
 
628
 
 
629
    def test_replaced_at_new_path(self):
 
630
        inv = self.get_empty_inventory()
 
631
        file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
 
632
        inv.add(file1)
 
633
        file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
 
634
        delta = [(u'name', None, b'id1', None),
 
635
                 (None, u'name', b'id2', file2)]
 
636
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
637
        self.assertEqual(b'id2', res_inv.path2id('name'))
 
638
 
 
639
    def test_rename_dir(self):
 
640
        inv = self.get_empty_inventory()
 
641
        dir1 = inventory.InventoryDirectory(
 
642
            b'dir-id', 'dir1', inv.root.file_id)
 
643
        dir1.revision = b'basis'
 
644
        file1 = self.make_file_ie(parent_id=b'dir-id')
 
645
        inv.add(dir1)
 
646
        inv.add(file1)
 
647
        dir2 = inventory.InventoryDirectory(
 
648
            b'dir-id', 'dir2', inv.root.file_id)
 
649
        dir2.revision = b'result'
 
650
        delta = [('dir1', 'dir2', b'dir-id', dir2)]
 
651
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
652
        # The file should be accessible under the new path
 
653
        self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
 
654
 
 
655
    def test_renamed_dir_with_renamed_child(self):
 
656
        inv = self.get_empty_inventory()
 
657
        dir1 = inventory.InventoryDirectory(
 
658
            b'dir-id', 'dir1', inv.root.file_id)
 
659
        dir1.revision = b'basis'
 
660
        file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
 
661
        file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
 
662
        inv.add(dir1)
 
663
        inv.add(file1)
 
664
        inv.add(file2)
 
665
        dir2 = inventory.InventoryDirectory(
 
666
            b'dir-id', 'dir2', inv.root.file_id)
 
667
        dir2.revision = b'result'
 
668
        file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
 
669
        delta = [('dir1', 'dir2', b'dir-id', dir2),
 
670
                 ('dir1/name2', 'name2', b'file-id-2', file2b)]
 
671
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
672
        # The file should be accessible under the new path
 
673
        self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
 
674
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
675
        self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
 
676
 
 
677
    def test_is_root(self):
 
678
        """Ensure our root-checking code is accurate."""
 
679
        inv = inventory.Inventory(b'TREE_ROOT')
 
680
        self.assertTrue(inv.is_root(b'TREE_ROOT'))
 
681
        self.assertFalse(inv.is_root(b'booga'))
 
682
        inv.root.file_id = b'booga'
 
683
        self.assertFalse(inv.is_root(b'TREE_ROOT'))
 
684
        self.assertTrue(inv.is_root(b'booga'))
 
685
        # works properly even if no root is set
 
686
        inv.root = None
 
687
        self.assertFalse(inv.is_root(b'TREE_ROOT'))
 
688
        self.assertFalse(inv.is_root(b'booga'))
 
689
 
 
690
    def test_entries_for_empty_inventory(self):
 
691
        """Test that entries() will not fail for an empty inventory"""
 
692
        inv = Inventory(root_id=None)
 
693
        self.assertEqual([], inv.entries())
 
694
 
 
695
 
 
696
class TestInventoryEntry(TestCase):
 
697
 
 
698
    def test_file_invalid_entry_name(self):
 
699
        self.assertRaises(InvalidEntryName, inventory.InventoryFile,
 
700
                          b'123', 'a/hello.c', ROOT_ID)
 
701
 
 
702
    def test_file_backslash(self):
 
703
        file = inventory.InventoryFile(b'123', 'h\\ello.c', ROOT_ID)
 
704
        self.assertEquals(file.name, 'h\\ello.c')
 
705
 
 
706
    def test_file_kind_character(self):
 
707
        file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
708
        self.assertEqual(file.kind_character(), '')
 
709
 
 
710
    def test_dir_kind_character(self):
 
711
        dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
712
        self.assertEqual(dir.kind_character(), '/')
 
713
 
 
714
    def test_link_kind_character(self):
 
715
        dir = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
716
        self.assertEqual(dir.kind_character(), '')
 
717
 
 
718
    def test_link_kind_character(self):
 
719
        dir = TreeReference(b'123', 'hello.c', ROOT_ID)
 
720
        self.assertEqual(dir.kind_character(), '+')
 
721
 
 
722
    def test_dir_detect_changes(self):
 
723
        left = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
724
        right = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
725
        self.assertEqual((False, False), left.detect_changes(right))
 
726
        self.assertEqual((False, False), right.detect_changes(left))
 
727
 
 
728
    def test_file_detect_changes(self):
 
729
        left = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
730
        left.text_sha1 = 123
 
731
        right = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
732
        right.text_sha1 = 123
 
733
        self.assertEqual((False, False), left.detect_changes(right))
 
734
        self.assertEqual((False, False), right.detect_changes(left))
 
735
        left.executable = True
 
736
        self.assertEqual((False, True), left.detect_changes(right))
 
737
        self.assertEqual((False, True), right.detect_changes(left))
 
738
        right.text_sha1 = 321
 
739
        self.assertEqual((True, True), left.detect_changes(right))
 
740
        self.assertEqual((True, True), right.detect_changes(left))
 
741
 
 
742
    def test_symlink_detect_changes(self):
 
743
        left = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
744
        left.symlink_target = 'foo'
 
745
        right = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
746
        right.symlink_target = 'foo'
 
747
        self.assertEqual((False, False), left.detect_changes(right))
 
748
        self.assertEqual((False, False), right.detect_changes(left))
 
749
        left.symlink_target = 'different'
 
750
        self.assertEqual((True, False), left.detect_changes(right))
 
751
        self.assertEqual((True, False), right.detect_changes(left))
 
752
 
 
753
    def test_file_has_text(self):
 
754
        file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
755
        self.assertTrue(file.has_text())
 
756
 
 
757
    def test_directory_has_text(self):
 
758
        dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
759
        self.assertFalse(dir.has_text())
 
760
 
 
761
    def test_link_has_text(self):
 
762
        link = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
763
        self.assertFalse(link.has_text())
 
764
 
 
765
    def test_make_entry(self):
 
766
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
767
                              inventory.InventoryFile)
 
768
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
769
                              inventory.InventoryLink)
 
770
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
771
                              inventory.InventoryDirectory)
 
772
 
 
773
    def test_make_entry_non_normalized(self):
 
774
        orig_normalized_filename = osutils.normalized_filename
 
775
 
 
776
        try:
 
777
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
778
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
779
            self.assertEqual(u'\xe5', entry.name)
 
780
            self.assertIsInstance(entry, inventory.InventoryFile)
 
781
 
 
782
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
783
            self.assertRaises(errors.InvalidNormalization,
 
784
                              inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
785
        finally:
 
786
            osutils.normalized_filename = orig_normalized_filename
 
787
 
 
788
 
 
789
class TestDescribeChanges(TestCase):
 
790
 
 
791
    def test_describe_change(self):
 
792
        # we need to test the following change combinations:
 
793
        # rename
 
794
        # reparent
 
795
        # modify
 
796
        # gone
 
797
        # added
 
798
        # renamed/reparented and modified
 
799
        # change kind (perhaps can't be done yet?)
 
800
        # also, merged in combination with all of these?
 
801
        old_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
 
802
        old_a.text_sha1 = b'123132'
 
803
        old_a.text_size = 0
 
804
        new_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
 
805
        new_a.text_sha1 = b'123132'
 
806
        new_a.text_size = 0
 
807
 
 
808
        self.assertChangeDescription('unchanged', old_a, new_a)
 
809
 
 
810
        new_a.text_size = 10
 
811
        new_a.text_sha1 = b'abcabc'
 
812
        self.assertChangeDescription('modified', old_a, new_a)
 
813
 
 
814
        self.assertChangeDescription('added', None, new_a)
 
815
        self.assertChangeDescription('removed', old_a, None)
 
816
        # perhaps a bit questionable but seems like the most reasonable thing...
 
817
        self.assertChangeDescription('unchanged', None, None)
 
818
 
 
819
        # in this case it's both renamed and modified; show a rename and
 
820
        # modification:
 
821
        new_a.name = 'newfilename'
 
822
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
823
 
 
824
        # reparenting is 'renaming'
 
825
        new_a.name = old_a.name
 
826
        new_a.parent_id = b'somedir-id'
 
827
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
828
 
 
829
        # reset the content values so its not modified
 
830
        new_a.text_size = old_a.text_size
 
831
        new_a.text_sha1 = old_a.text_sha1
 
832
        new_a.name = old_a.name
 
833
 
 
834
        new_a.name = 'newfilename'
 
835
        self.assertChangeDescription('renamed', old_a, new_a)
 
836
 
 
837
        # reparenting is 'renaming'
 
838
        new_a.name = old_a.name
 
839
        new_a.parent_id = b'somedir-id'
 
840
        self.assertChangeDescription('renamed', old_a, new_a)
 
841
 
 
842
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
843
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
844
        self.assertEqual(expected_change, change)
 
845
 
 
846
 
 
847
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
848
 
 
849
    def get_chk_bytes(self):
 
850
        factory = groupcompress.make_pack_factory(True, True, 1)
 
851
        trans = self.get_transport('')
 
852
        return factory(trans)
 
853
 
 
854
    def read_bytes(self, chk_bytes, key):
 
855
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
856
        return next(stream).get_bytes_as("fulltext")
 
857
 
 
858
    def test_deserialise_gives_CHKInventory(self):
 
859
        inv = Inventory()
 
860
        inv.revision_id = b"revid"
 
861
        inv.root.revision = b"rootrev"
 
862
        chk_bytes = self.get_chk_bytes()
 
863
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
864
        lines = chk_inv.to_lines()
 
865
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
866
        self.assertEqual(b"revid", new_inv.revision_id)
 
867
        self.assertEqual("directory", new_inv.root.kind)
 
868
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
869
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
870
        self.assertEqual(inv.root.name, new_inv.root.name)
 
871
        self.assertEqual(b"rootrev", new_inv.root.revision)
 
872
        self.assertEqual(b'plain', new_inv._search_key_name)
 
873
 
 
874
    def test_deserialise_wrong_revid(self):
 
875
        inv = Inventory()
 
876
        inv.revision_id = b"revid"
 
877
        inv.root.revision = b"rootrev"
 
878
        chk_bytes = self.get_chk_bytes()
 
879
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
880
        lines = chk_inv.to_lines()
 
881
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
882
                          lines, (b"revid2",))
 
883
 
 
884
    def test_captures_rev_root_byid(self):
 
885
        inv = Inventory()
 
886
        inv.revision_id = b"foo"
 
887
        inv.root.revision = b"bar"
 
888
        chk_bytes = self.get_chk_bytes()
 
889
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
890
        lines = chk_inv.to_lines()
 
891
        self.assertEqual([
 
892
            b'chkinventory:\n',
 
893
            b'revision_id: foo\n',
 
894
            b'root_id: TREE_ROOT\n',
 
895
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
896
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
897
            ], lines)
 
898
        chk_inv = CHKInventory.deserialise(
 
899
            chk_bytes, lines, (b'foo',))
 
900
        self.assertEqual(b'plain', chk_inv._search_key_name)
 
901
 
 
902
    def test_captures_parent_id_basename_index(self):
 
903
        inv = Inventory()
 
904
        inv.revision_id = b"foo"
 
905
        inv.root.revision = b"bar"
 
906
        chk_bytes = self.get_chk_bytes()
 
907
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
908
        lines = chk_inv.to_lines()
 
909
        self.assertEqual([
 
910
            b'chkinventory:\n',
 
911
            b'revision_id: foo\n',
 
912
            b'root_id: TREE_ROOT\n',
 
913
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
914
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
915
            ], lines)
 
916
        chk_inv = CHKInventory.deserialise(
 
917
            chk_bytes, lines, (b'foo',))
 
918
        self.assertEqual(b'plain', chk_inv._search_key_name)
 
919
 
 
920
    def test_captures_search_key_name(self):
 
921
        inv = Inventory()
 
922
        inv.revision_id = b"foo"
 
923
        inv.root.revision = b"bar"
 
924
        chk_bytes = self.get_chk_bytes()
 
925
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
926
                                              search_key_name=b'hash-16-way')
 
927
        lines = chk_inv.to_lines()
 
928
        self.assertEqual([
 
929
            b'chkinventory:\n',
 
930
            b'search_key_name: hash-16-way\n',
 
931
            b'root_id: TREE_ROOT\n',
 
932
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
933
            b'revision_id: foo\n',
 
934
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
935
            ], lines)
 
936
        chk_inv = CHKInventory.deserialise(
 
937
            chk_bytes, lines, (b'foo',))
 
938
        self.assertEqual(b'hash-16-way', chk_inv._search_key_name)
 
939
 
 
940
    def test_directory_children_on_demand(self):
 
941
        inv = Inventory()
 
942
        inv.revision_id = b"revid"
 
943
        inv.root.revision = b"rootrev"
 
944
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
945
        inv.get_entry(b"fileid").revision = b"filerev"
 
946
        inv.get_entry(b"fileid").executable = True
 
947
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
948
        inv.get_entry(b"fileid").text_size = 1
 
949
        chk_bytes = self.get_chk_bytes()
 
950
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
951
        lines = chk_inv.to_lines()
 
952
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
953
        root_entry = new_inv.get_entry(inv.root.file_id)
 
954
        self.assertEqual(None, root_entry._children)
 
955
        self.assertEqual({'file'}, set(root_entry.children))
 
956
        file_direct = new_inv.get_entry(b"fileid")
 
957
        file_found = root_entry.children['file']
 
958
        self.assertEqual(file_direct.kind, file_found.kind)
 
959
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
960
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
961
        self.assertEqual(file_direct.name, file_found.name)
 
962
        self.assertEqual(file_direct.revision, file_found.revision)
 
963
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
964
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
965
        self.assertEqual(file_direct.executable, file_found.executable)
 
966
 
 
967
    def test_from_inventory_maximum_size(self):
 
968
        # from_inventory supports the maximum_size parameter.
 
969
        inv = Inventory()
 
970
        inv.revision_id = b"revid"
 
971
        inv.root.revision = b"rootrev"
 
972
        chk_bytes = self.get_chk_bytes()
 
973
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
974
        chk_inv.id_to_entry._ensure_root()
 
975
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
976
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
977
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
978
        p_id_basename._ensure_root()
 
979
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
980
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
981
 
 
982
    def test_iter_all_ids(self):
 
983
        inv = Inventory()
 
984
        inv.revision_id = b"revid"
 
985
        inv.root.revision = b"rootrev"
 
986
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
987
        inv.get_entry(b"fileid").revision = b"filerev"
 
988
        inv.get_entry(b"fileid").executable = True
 
989
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
990
        inv.get_entry(b"fileid").text_size = 1
 
991
        chk_bytes = self.get_chk_bytes()
 
992
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
993
        lines = chk_inv.to_lines()
 
994
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
995
        fileids = sorted(new_inv.iter_all_ids())
 
996
        self.assertEqual([inv.root.file_id, b"fileid"], fileids)
 
997
 
 
998
    def test__len__(self):
 
999
        inv = Inventory()
 
1000
        inv.revision_id = b"revid"
 
1001
        inv.root.revision = b"rootrev"
 
1002
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1003
        inv.get_entry(b"fileid").revision = b"filerev"
 
1004
        inv.get_entry(b"fileid").executable = True
 
1005
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1006
        inv.get_entry(b"fileid").text_size = 1
 
1007
        chk_bytes = self.get_chk_bytes()
 
1008
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1009
        self.assertEqual(2, len(chk_inv))
 
1010
 
 
1011
    def test_get_entry(self):
 
1012
        inv = Inventory()
 
1013
        inv.revision_id = b"revid"
 
1014
        inv.root.revision = b"rootrev"
 
1015
        inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
 
1016
        inv.get_entry(b"fileid").revision = b"filerev"
 
1017
        inv.get_entry(b"fileid").executable = True
 
1018
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1019
        inv.get_entry(b"fileid").text_size = 1
 
1020
        chk_bytes = self.get_chk_bytes()
 
1021
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1022
        lines = chk_inv.to_lines()
 
1023
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1024
        root_entry = new_inv.get_entry(inv.root.file_id)
 
1025
        file_entry = new_inv.get_entry(b"fileid")
 
1026
        self.assertEqual("directory", root_entry.kind)
 
1027
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
1028
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
1029
        self.assertEqual(inv.root.name, root_entry.name)
 
1030
        self.assertEqual(b"rootrev", root_entry.revision)
 
1031
        self.assertEqual("file", file_entry.kind)
 
1032
        self.assertEqual(b"fileid", file_entry.file_id)
 
1033
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
1034
        self.assertEqual(u"file", file_entry.name)
 
1035
        self.assertEqual(b"filerev", file_entry.revision)
 
1036
        self.assertEqual(b"ffff", file_entry.text_sha1)
 
1037
        self.assertEqual(1, file_entry.text_size)
 
1038
        self.assertEqual(True, file_entry.executable)
 
1039
        self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
 
1040
 
 
1041
    def test_has_id_true(self):
 
1042
        inv = Inventory()
 
1043
        inv.revision_id = b"revid"
 
1044
        inv.root.revision = b"rootrev"
 
1045
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1046
        inv.get_entry(b"fileid").revision = b"filerev"
 
1047
        inv.get_entry(b"fileid").executable = True
 
1048
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1049
        inv.get_entry(b"fileid").text_size = 1
 
1050
        chk_bytes = self.get_chk_bytes()
 
1051
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1052
        self.assertTrue(chk_inv.has_id(b'fileid'))
 
1053
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
1054
 
 
1055
    def test_has_id_not(self):
 
1056
        inv = Inventory()
 
1057
        inv.revision_id = b"revid"
 
1058
        inv.root.revision = b"rootrev"
 
1059
        chk_bytes = self.get_chk_bytes()
 
1060
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1061
        self.assertFalse(chk_inv.has_id(b'fileid'))
 
1062
 
 
1063
    def test_id2path(self):
 
1064
        inv = Inventory()
 
1065
        inv.revision_id = b"revid"
 
1066
        inv.root.revision = b"rootrev"
 
1067
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1068
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1069
        inv.add(direntry)
 
1070
        inv.add(fileentry)
 
1071
        inv.get_entry(b"fileid").revision = b"filerev"
 
1072
        inv.get_entry(b"fileid").executable = True
 
1073
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1074
        inv.get_entry(b"fileid").text_size = 1
 
1075
        inv.get_entry(b"dirid").revision = b"filerev"
 
1076
        chk_bytes = self.get_chk_bytes()
 
1077
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1078
        lines = chk_inv.to_lines()
 
1079
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1080
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
1081
        self.assertEqual('dir', new_inv.id2path(b'dirid'))
 
1082
        self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
 
1083
 
 
1084
    def test_path2id(self):
 
1085
        inv = Inventory()
 
1086
        inv.revision_id = b"revid"
 
1087
        inv.root.revision = b"rootrev"
 
1088
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1089
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1090
        inv.add(direntry)
 
1091
        inv.add(fileentry)
 
1092
        inv.get_entry(b"fileid").revision = b"filerev"
 
1093
        inv.get_entry(b"fileid").executable = True
 
1094
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1095
        inv.get_entry(b"fileid").text_size = 1
 
1096
        inv.get_entry(b"dirid").revision = b"filerev"
 
1097
        chk_bytes = self.get_chk_bytes()
 
1098
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1099
        lines = chk_inv.to_lines()
 
1100
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1101
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
1102
        self.assertEqual(b'dirid', new_inv.path2id('dir'))
 
1103
        self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
 
1104
 
 
1105
    def test_create_by_apply_delta_sets_root(self):
 
1106
        inv = Inventory()
 
1107
        inv.root.revision = b"myrootrev"
 
1108
        inv.revision_id = b"revid"
 
1109
        chk_bytes = self.get_chk_bytes()
 
1110
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1111
        inv.add_path("", "directory", b"myrootid", None)
 
1112
        inv.revision_id = b"expectedid"
 
1113
        inv.root.revision = b"myrootrev"
 
1114
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1115
        delta = [("", None, base_inv.root.file_id, None),
 
1116
                 (None, "", b"myrootid", inv.root)]
 
1117
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1118
        self.assertEqual(reference_inv.root, new_inv.root)
 
1119
 
 
1120
    def test_create_by_apply_delta_empty_add_child(self):
 
1121
        inv = Inventory()
 
1122
        inv.revision_id = b"revid"
 
1123
        inv.root.revision = b"rootrev"
 
1124
        chk_bytes = self.get_chk_bytes()
 
1125
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1126
        a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
 
1127
        a_entry.revision = b"filerev"
 
1128
        a_entry.executable = True
 
1129
        a_entry.text_sha1 = b"ffff"
 
1130
        a_entry.text_size = 1
 
1131
        inv.add(a_entry)
 
1132
        inv.revision_id = b"expectedid"
 
1133
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1134
        delta = [(None, "A", b"A-id", a_entry)]
 
1135
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1136
        # new_inv should be the same as reference_inv.
 
1137
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1138
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1139
        reference_inv.id_to_entry._ensure_root()
 
1140
        new_inv.id_to_entry._ensure_root()
 
1141
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1142
                         new_inv.id_to_entry._root_node._key)
 
1143
 
 
1144
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1145
        inv = Inventory()
 
1146
        inv.revision_id = b"revid"
 
1147
        inv.root.revision = b"rootrev"
 
1148
        chk_bytes = self.get_chk_bytes()
 
1149
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1150
        a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
 
1151
        a_entry.revision = b"filerev"
 
1152
        a_entry.executable = True
 
1153
        a_entry.text_sha1 = b"ffff"
 
1154
        a_entry.text_size = 1
 
1155
        inv.add(a_entry)
 
1156
        inv.revision_id = b"expectedid"
 
1157
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1158
        delta = [(None, "A", b"A-id", a_entry)]
 
1159
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1160
        reference_inv.id_to_entry._ensure_root()
 
1161
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1162
        new_inv.id_to_entry._ensure_root()
 
1163
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1164
        # new_inv should be the same as reference_inv.
 
1165
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1166
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1167
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1168
                         new_inv.id_to_entry._root_node._key)
 
1169
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1170
                         new_inv.parent_id_basename_to_file_id._root_node._key)
 
1171
 
 
1172
    def test_iter_changes(self):
 
1173
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1174
        # InterTree are coming.
 
1175
        inv = Inventory()
 
1176
        inv.revision_id = b"revid"
 
1177
        inv.root.revision = b"rootrev"
 
1178
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1179
        inv.get_entry(b"fileid").revision = b"filerev"
 
1180
        inv.get_entry(b"fileid").executable = True
 
1181
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1182
        inv.get_entry(b"fileid").text_size = 1
 
1183
        inv2 = Inventory()
 
1184
        inv2.revision_id = b"revid2"
 
1185
        inv2.root.revision = b"rootrev"
 
1186
        inv2.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1187
        inv2.get_entry(b"fileid").revision = b"filerev2"
 
1188
        inv2.get_entry(b"fileid").executable = False
 
1189
        inv2.get_entry(b"fileid").text_sha1 = b"bbbb"
 
1190
        inv2.get_entry(b"fileid").text_size = 2
 
1191
        # get fresh objects.
 
1192
        chk_bytes = self.get_chk_bytes()
 
1193
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1194
        lines = chk_inv.to_lines()
 
1195
        inv_1 = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1196
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1197
        lines = chk_inv2.to_lines()
 
1198
        inv_2 = CHKInventory.deserialise(chk_bytes, lines, (b"revid2",))
 
1199
        self.assertEqual([(b'fileid', (u'file', u'file'), True, (True, True),
 
1200
                           (b'TREE_ROOT', b'TREE_ROOT'), (u'file',
 
1201
                                                          u'file'), ('file', 'file'),
 
1202
                           (False, True))],
 
1203
                         list(inv_1.iter_changes(inv_2)))
 
1204
 
 
1205
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1206
        inv = Inventory()
 
1207
        inv.revision_id = b"revid"
 
1208
        inv.root.revision = b"rootrev"
 
1209
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1210
        inv.get_entry(b"fileid").revision = b"filerev"
 
1211
        inv.get_entry(b"fileid").executable = True
 
1212
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1213
        inv.get_entry(b"fileid").text_size = 1
 
1214
        # get fresh objects.
 
1215
        chk_bytes = self.get_chk_bytes()
 
1216
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1217
        lines = tmp_inv.to_lines()
 
1218
        chk_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1219
        self.assertIsInstance(
 
1220
            chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1221
        self.assertEqual(
 
1222
            {(b'', b''): b'TREE_ROOT', (b'TREE_ROOT', b'file'): b'fileid'},
 
1223
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1224
 
 
1225
    def test_file_entry_to_bytes(self):
 
1226
        inv = CHKInventory(None)
 
1227
        ie = inventory.InventoryFile(b'file-id', 'filename', b'parent-id')
 
1228
        ie.executable = True
 
1229
        ie.revision = b'file-rev-id'
 
1230
        ie.text_sha1 = b'abcdefgh'
 
1231
        ie.text_size = 100
 
1232
        bytes = inv._entry_to_bytes(ie)
 
1233
        self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
 
1234
                         b'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1235
        ie2 = inv._bytes_to_entry(bytes)
 
1236
        self.assertEqual(ie, ie2)
 
1237
        self.assertIsInstance(ie2.name, str)
 
1238
        self.assertEqual((b'filename', b'file-id', b'file-rev-id'),
 
1239
                         inv._bytes_to_utf8name_key(bytes))
 
1240
 
 
1241
    def test_file2_entry_to_bytes(self):
 
1242
        inv = CHKInventory(None)
 
1243
        # \u30a9 == 'omega'
 
1244
        ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
 
1245
        ie.executable = False
 
1246
        ie.revision = b'file-rev-id'
 
1247
        ie.text_sha1 = b'123456'
 
1248
        ie.text_size = 25
 
1249
        bytes = inv._entry_to_bytes(ie)
 
1250
        self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
 
1251
                         b'file-rev-id\n123456\n25\nN', bytes)
 
1252
        ie2 = inv._bytes_to_entry(bytes)
 
1253
        self.assertEqual(ie, ie2)
 
1254
        self.assertIsInstance(ie2.name, str)
 
1255
        self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
 
1256
                         inv._bytes_to_utf8name_key(bytes))
 
1257
 
 
1258
    def test_dir_entry_to_bytes(self):
 
1259
        inv = CHKInventory(None)
 
1260
        ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
 
1261
        ie.revision = b'dir-rev-id'
 
1262
        bytes = inv._entry_to_bytes(ie)
 
1263
        self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1264
        ie2 = inv._bytes_to_entry(bytes)
 
1265
        self.assertEqual(ie, ie2)
 
1266
        self.assertIsInstance(ie2.name, str)
 
1267
        self.assertEqual((b'dirname', b'dir-id', b'dir-rev-id'),
 
1268
                         inv._bytes_to_utf8name_key(bytes))
 
1269
 
 
1270
    def test_dir2_entry_to_bytes(self):
 
1271
        inv = CHKInventory(None)
 
1272
        ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
 
1273
                                          None)
 
1274
        ie.revision = b'dir-rev-id'
 
1275
        bytes = inv._entry_to_bytes(ie)
 
1276
        self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
 
1277
                         b'dir-rev-id', bytes)
 
1278
        ie2 = inv._bytes_to_entry(bytes)
 
1279
        self.assertEqual(ie, ie2)
 
1280
        self.assertIsInstance(ie2.name, str)
 
1281
        self.assertIs(ie2.parent_id, None)
 
1282
        self.assertEqual((b'dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
 
1283
                         inv._bytes_to_utf8name_key(bytes))
 
1284
 
 
1285
    def test_symlink_entry_to_bytes(self):
 
1286
        inv = CHKInventory(None)
 
1287
        ie = inventory.InventoryLink(b'link-id', 'linkname', b'parent-id')
 
1288
        ie.revision = b'link-rev-id'
 
1289
        ie.symlink_target = u'target/path'
 
1290
        bytes = inv._entry_to_bytes(ie)
 
1291
        self.assertEqual(b'symlink: link-id\nparent-id\nlinkname\n'
 
1292
                         b'link-rev-id\ntarget/path', bytes)
 
1293
        ie2 = inv._bytes_to_entry(bytes)
 
1294
        self.assertEqual(ie, ie2)
 
1295
        self.assertIsInstance(ie2.name, str)
 
1296
        self.assertIsInstance(ie2.symlink_target, str)
 
1297
        self.assertEqual((b'linkname', b'link-id', b'link-rev-id'),
 
1298
                         inv._bytes_to_utf8name_key(bytes))
 
1299
 
 
1300
    def test_symlink2_entry_to_bytes(self):
 
1301
        inv = CHKInventory(None)
 
1302
        ie = inventory.InventoryLink(
 
1303
            b'link-id', u'link\u03a9name', b'parent-id')
 
1304
        ie.revision = b'link-rev-id'
 
1305
        ie.symlink_target = u'target/\u03a9path'
 
1306
        bytes = inv._entry_to_bytes(ie)
 
1307
        self.assertEqual(b'symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1308
                         b'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1309
        ie2 = inv._bytes_to_entry(bytes)
 
1310
        self.assertEqual(ie, ie2)
 
1311
        self.assertIsInstance(ie2.name, str)
 
1312
        self.assertIsInstance(ie2.symlink_target, str)
 
1313
        self.assertEqual((b'link\xce\xa9name', b'link-id', b'link-rev-id'),
 
1314
                         inv._bytes_to_utf8name_key(bytes))
 
1315
 
 
1316
    def test_tree_reference_entry_to_bytes(self):
 
1317
        inv = CHKInventory(None)
 
1318
        ie = inventory.TreeReference(b'tree-root-id', u'tree\u03a9name',
 
1319
                                     b'parent-id')
 
1320
        ie.revision = b'tree-rev-id'
 
1321
        ie.reference_revision = b'ref-rev-id'
 
1322
        bytes = inv._entry_to_bytes(ie)
 
1323
        self.assertEqual(b'tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1324
                         b'tree-rev-id\nref-rev-id', bytes)
 
1325
        ie2 = inv._bytes_to_entry(bytes)
 
1326
        self.assertEqual(ie, ie2)
 
1327
        self.assertIsInstance(ie2.name, str)
 
1328
        self.assertEqual((b'tree\xce\xa9name', b'tree-root-id', b'tree-rev-id'),
 
1329
                         inv._bytes_to_utf8name_key(bytes))
 
1330
 
 
1331
    def make_basic_utf8_inventory(self):
 
1332
        inv = Inventory()
 
1333
        inv.revision_id = b"revid"
 
1334
        inv.root.revision = b"rootrev"
 
1335
        root_id = inv.root.file_id
 
1336
        inv.add(InventoryFile(b"fileid", u'f\xefle', root_id))
 
1337
        inv.get_entry(b"fileid").revision = b"filerev"
 
1338
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1339
        inv.get_entry(b"fileid").text_size = 0
 
1340
        inv.add(InventoryDirectory(b"dirid", u'dir-\N{EURO SIGN}', root_id))
 
1341
        inv.get_entry(b"dirid").revision = b"dirrev"
 
1342
        inv.add(InventoryFile(b"childid", u'ch\xefld', b"dirid"))
 
1343
        inv.get_entry(b"childid").revision = b"filerev"
 
1344
        inv.get_entry(b"childid").text_sha1 = b"ffff"
 
1345
        inv.get_entry(b"childid").text_size = 0
 
1346
        chk_bytes = self.get_chk_bytes()
 
1347
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1348
        lines = chk_inv.to_lines()
 
1349
        return CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1350
 
 
1351
    def test__preload_handles_utf8(self):
 
1352
        new_inv = self.make_basic_utf8_inventory()
 
1353
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1354
        self.assertFalse(new_inv._fully_cached)
 
1355
        new_inv._preload_cache()
 
1356
        self.assertEqual(
 
1357
            sorted([new_inv.root_id, b"fileid", b"dirid", b"childid"]),
 
1358
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1359
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1360
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1361
                         sorted(ie_root._children.keys()))
 
1362
        ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
 
1363
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1364
 
 
1365
    def test__preload_populates_cache(self):
 
1366
        inv = Inventory()
 
1367
        inv.revision_id = b"revid"
 
1368
        inv.root.revision = b"rootrev"
 
1369
        root_id = inv.root.file_id
 
1370
        inv.add(InventoryFile(b"fileid", "file", root_id))
 
1371
        inv.get_entry(b"fileid").revision = b"filerev"
 
1372
        inv.get_entry(b"fileid").executable = True
 
1373
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1374
        inv.get_entry(b"fileid").text_size = 1
 
1375
        inv.add(InventoryDirectory(b"dirid", "dir", root_id))
 
1376
        inv.get_entry(b"dirid").revision = b"dirrev"
 
1377
        inv.add(InventoryFile(b"childid", "child", b"dirid"))
 
1378
        inv.get_entry(b"childid").revision = b"filerev"
 
1379
        inv.get_entry(b"childid").executable = False
 
1380
        inv.get_entry(b"childid").text_sha1 = b"dddd"
 
1381
        inv.get_entry(b"childid").text_size = 1
 
1382
        chk_bytes = self.get_chk_bytes()
 
1383
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1384
        lines = chk_inv.to_lines()
 
1385
        new_inv = CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1386
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1387
        self.assertFalse(new_inv._fully_cached)
 
1388
        new_inv._preload_cache()
 
1389
        self.assertEqual(
 
1390
            sorted([root_id, b"fileid", b"dirid", b"childid"]),
 
1391
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1392
        self.assertTrue(new_inv._fully_cached)
 
1393
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1394
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1395
        ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
 
1396
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1397
 
 
1398
    def test__preload_handles_partially_evaluated_inventory(self):
 
1399
        new_inv = self.make_basic_utf8_inventory()
 
1400
        ie = new_inv.get_entry(new_inv.root_id)
 
1401
        self.assertIs(None, ie._children)
 
1402
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1403
                         sorted(ie.children.keys()))
 
1404
        # Accessing .children loads _children
 
1405
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1406
                         sorted(ie._children.keys()))
 
1407
        new_inv._preload_cache()
 
1408
        # No change
 
1409
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1410
                         sorted(ie._children.keys()))
 
1411
        ie_dir = new_inv.get_entry(b"dirid")
 
1412
        self.assertEqual([u'ch\xefld'],
 
1413
                         sorted(ie_dir._children.keys()))
 
1414
 
 
1415
    def test_filter_change_in_renamed_subfolder(self):
 
1416
        inv = Inventory(b'tree-root')
 
1417
        inv.root.revision = b'rootrev'
 
1418
        src_ie = inv.add_path('src', 'directory', b'src-id')
 
1419
        src_ie.revision = b'srcrev'
 
1420
        sub_ie = inv.add_path('src/sub/', 'directory', b'sub-id')
 
1421
        sub_ie.revision = b'subrev'
 
1422
        a_ie = inv.add_path('src/sub/a', 'file', b'a-id')
 
1423
        a_ie.revision = b'filerev'
 
1424
        a_ie.text_sha1 = osutils.sha_string(b'content\n')
 
1425
        a_ie.text_size = len(b'content\n')
 
1426
        chk_bytes = self.get_chk_bytes()
 
1427
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1428
        inv = inv.create_by_apply_delta([
 
1429
            ("src/sub/a", "src/sub/a", b"a-id", a_ie),
 
1430
            ("src", "src2", b"src-id", src_ie),
 
1431
            ], b'new-rev-2')
 
1432
        new_inv = inv.filter([b'a-id', b'src-id'])
 
1433
        self.assertEqual([
 
1434
            ('', b'tree-root'),
 
1435
            ('src', b'src-id'),
 
1436
            ('src/sub', b'sub-id'),
 
1437
            ('src/sub/a', b'a-id'),
 
1438
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1439
 
 
1440
 
 
1441
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1442
 
 
1443
    def get_chk_bytes(self):
 
1444
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1445
        trans = self.get_transport('')
 
1446
        return factory(trans)
 
1447
 
 
1448
    def make_dir(self, inv, name, parent_id, revision):
 
1449
        ie = inv.make_entry('directory', name, parent_id,
 
1450
                            name.encode('utf-8') + b'-id')
 
1451
        ie.revision = revision
 
1452
        inv.add(ie)
 
1453
 
 
1454
    def make_file(self, inv, name, parent_id, revision, content=b'content\n'):
 
1455
        ie = inv.make_entry('file', name, parent_id,
 
1456
                            name.encode('utf-8') + b'-id')
 
1457
        ie.text_sha1 = osutils.sha_string(content)
 
1458
        ie.text_size = len(content)
 
1459
        ie.revision = revision
 
1460
        inv.add(ie)
 
1461
 
 
1462
    def make_simple_inventory(self):
 
1463
        inv = Inventory(b'TREE_ROOT')
 
1464
        inv.revision_id = b"revid"
 
1465
        inv.root.revision = b"rootrev"
 
1466
        # /                 TREE_ROOT
 
1467
        # dir1/             dir1-id
 
1468
        #   sub-file1       sub-file1-id
 
1469
        #   sub-file2       sub-file2-id
 
1470
        #   sub-dir1/       sub-dir1-id
 
1471
        #     subsub-file1  subsub-file1-id
 
1472
        # dir2/             dir2-id
 
1473
        #   sub2-file1      sub2-file1-id
 
1474
        # top               top-id
 
1475
        self.make_dir(inv, 'dir1', b'TREE_ROOT', b'dirrev')
 
1476
        self.make_dir(inv, 'dir2', b'TREE_ROOT', b'dirrev')
 
1477
        self.make_dir(inv, 'sub-dir1', b'dir1-id', b'dirrev')
 
1478
        self.make_file(inv, 'top', b'TREE_ROOT', b'filerev')
 
1479
        self.make_file(inv, 'sub-file1', b'dir1-id', b'filerev')
 
1480
        self.make_file(inv, 'sub-file2', b'dir1-id', b'filerev')
 
1481
        self.make_file(inv, 'subsub-file1', b'sub-dir1-id', b'filerev')
 
1482
        self.make_file(inv, 'sub2-file1', b'dir2-id', b'filerev')
 
1483
        chk_bytes = self.get_chk_bytes()
 
1484
        #  use a small maximum_size to force internal paging structures
 
1485
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1486
                                              maximum_size=100,
 
1487
                                              search_key_name=b'hash-255-way')
 
1488
        lines = chk_inv.to_lines()
 
1489
        return CHKInventory.deserialise(chk_bytes, lines, (b"revid",))
 
1490
 
 
1491
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1492
        self.assertEqual(sorted(expected_fileids),
 
1493
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1494
 
 
1495
    def assertExpand(self, all_ids, inv, file_ids):
 
1496
        (val_all_ids,
 
1497
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1498
        self.assertEqual(set(all_ids), val_all_ids)
 
1499
        entries = inv._getitems(val_all_ids)
 
1500
        expected_children = {}
 
1501
        for entry in entries:
 
1502
            s = expected_children.setdefault(entry.parent_id, [])
 
1503
            s.append(entry.file_id)
 
1504
        val_children = {
 
1505
            k: sorted(v) for k, v in val_children.items()}
 
1506
        expected_children = {
 
1507
            k: sorted(v) for k, v in expected_children.items()}
 
1508
        self.assertEqual(expected_children, val_children)
 
1509
 
 
1510
    def test_make_simple_inventory(self):
 
1511
        inv = self.make_simple_inventory()
 
1512
        layout = []
 
1513
        for path, entry in inv.iter_entries_by_dir():
 
1514
            layout.append((path, entry.file_id))
 
1515
        self.assertEqual([
 
1516
            ('', b'TREE_ROOT'),
 
1517
            ('dir1', b'dir1-id'),
 
1518
            ('dir2', b'dir2-id'),
 
1519
            ('top', b'top-id'),
 
1520
            ('dir1/sub-dir1', b'sub-dir1-id'),
 
1521
            ('dir1/sub-file1', b'sub-file1-id'),
 
1522
            ('dir1/sub-file2', b'sub-file2-id'),
 
1523
            ('dir1/sub-dir1/subsub-file1', b'subsub-file1-id'),
 
1524
            ('dir2/sub2-file1', b'sub2-file1-id'),
 
1525
            ], layout)
 
1526
 
 
1527
    def test__getitems(self):
 
1528
        inv = self.make_simple_inventory()
 
1529
        # Reading from disk
 
1530
        self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
 
1531
        self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
 
1532
        self.assertFalse(b'sub-file2-id' in inv._fileid_to_entry_cache)
 
1533
        # From cache
 
1534
        self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
 
1535
        # Mixed
 
1536
        self.assert_Getitems([b'dir1-id', b'sub-file2-id'], inv,
 
1537
                             [b'dir1-id', b'sub-file2-id'])
 
1538
        self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
 
1539
        self.assertTrue(b'sub-file2-id' in inv._fileid_to_entry_cache)
 
1540
 
 
1541
    def test_single_file(self):
 
1542
        inv = self.make_simple_inventory()
 
1543
        self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
 
1544
 
 
1545
    def test_get_all_parents(self):
 
1546
        inv = self.make_simple_inventory()
 
1547
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1548
                           b'subsub-file1-id',
 
1549
                           ], inv, [b'subsub-file1-id'])
 
1550
 
 
1551
    def test_get_children(self):
 
1552
        inv = self.make_simple_inventory()
 
1553
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1554
                           b'sub-file1-id', b'sub-file2-id', b'subsub-file1-id',
 
1555
                           ], inv, [b'dir1-id'])
 
1556
 
 
1557
    def test_from_root(self):
 
1558
        inv = self.make_simple_inventory()
 
1559
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'dir2-id', b'sub-dir1-id',
 
1560
                           b'sub-file1-id', b'sub-file2-id', b'sub2-file1-id',
 
1561
                           b'subsub-file1-id', b'top-id'], inv, [b'TREE_ROOT'])
 
1562
 
 
1563
    def test_top_level_file(self):
 
1564
        inv = self.make_simple_inventory()
 
1565
        self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
 
1566
 
 
1567
    def test_subsub_file(self):
 
1568
        inv = self.make_simple_inventory()
 
1569
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1570
                           b'subsub-file1-id'], inv, [b'subsub-file1-id'])
 
1571
 
 
1572
    def test_sub_and_root(self):
 
1573
        inv = self.make_simple_inventory()
 
1574
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id', b'top-id',
 
1575
                           b'subsub-file1-id'], inv, [b'top-id', b'subsub-file1-id'])
 
1576
 
 
1577
 
 
1578
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1579
 
 
1580
    def test_empty(self):
 
1581
        repository = self.make_repository('.')
 
1582
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1583
        inv = mutable_inventory_from_tree(tree)
 
1584
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1585
        self.assertEqual(0, len(inv))
 
1586
 
 
1587
    def test_some_files(self):
 
1588
        wt = self.make_branch_and_tree('.')
 
1589
        self.build_tree(['a'])
 
1590
        wt.add(['a'], [b'thefileid'])
 
1591
        revid = wt.commit("commit")
 
1592
        tree = wt.branch.repository.revision_tree(revid)
 
1593
        inv = mutable_inventory_from_tree(tree)
 
1594
        self.assertEqual(revid, inv.revision_id)
 
1595
        self.assertEqual(2, len(inv))
 
1596
        self.assertEqual("a", inv.get_entry(b'thefileid').name)
 
1597
        # The inventory should be mutable and independent of
 
1598
        # the original tree
 
1599
        self.assertFalse(tree.root_inventory.get_entry(
 
1600
            b'thefileid').executable)
 
1601
        inv.get_entry(b'thefileid').executable = True
 
1602
        self.assertFalse(tree.root_inventory.get_entry(
 
1603
            b'thefileid').executable)