/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_inv.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2019-02-14 03:30:18 UTC
  • mfrom: (6745.1.3 test-file-ids)
  • Revision ID: breezy.the.bot@gmail.com-20190214033018-4mhv416kiuozgned
Fix a commonly typoed word: compatibility.

Merged from https://code.launchpad.net/~jelmer/brz/compatibility-typos/+merge/363008

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from .. import (
 
19
    errors,
 
20
    osutils,
 
21
    repository,
 
22
    revision,
 
23
    tests,
 
24
    workingtree,
 
25
    )
 
26
from ..sixish import text_type
 
27
from ..bzr import (
 
28
    chk_map,
 
29
    groupcompress,
 
30
    inventory,
 
31
    )
 
32
from ..bzr.inventory import (
 
33
    CHKInventory,
 
34
    Inventory,
 
35
    ROOT_ID,
 
36
    InventoryFile,
 
37
    InventoryDirectory,
 
38
    InventoryEntry,
 
39
    TreeReference,
 
40
    mutable_inventory_from_tree,
 
41
    )
 
42
from . import (
 
43
    TestCase,
 
44
    TestCaseWithTransport,
 
45
    )
 
46
from .scenarios import load_tests_apply_scenarios
 
47
 
 
48
 
 
49
load_tests = load_tests_apply_scenarios
 
50
 
 
51
 
 
52
def delta_application_scenarios():
 
53
    scenarios = [
 
54
        ('Inventory', {'apply_delta': apply_inventory_Inventory}),
 
55
        ]
 
56
    # Working tree basis delta application
 
57
    # Repository add_inv_by_delta.
 
58
    # Reduce form of the per_repository test logic - that logic needs to be
 
59
    # be able to get /just/ repositories whereas these tests are fine with
 
60
    # just creating trees.
 
61
    formats = set()
 
62
    for _, format in repository.format_registry.iteritems():
 
63
        if format.supports_full_versioned_files:
 
64
            scenarios.append((str(format.__name__), {
 
65
                'apply_delta': apply_inventory_Repository_add_inventory_by_delta,
 
66
                'format': format}))
 
67
    for getter in workingtree.format_registry._get_all_lazy():
 
68
        try:
 
69
            format = getter()
 
70
            if callable(format):
 
71
                format = format()
 
72
        except ImportError:
 
73
            pass  # Format with unmet dependency
 
74
        repo_fmt = format._matchingcontroldir.repository_format
 
75
        if not repo_fmt.supports_full_versioned_files:
 
76
            continue
 
77
        scenarios.append(
 
78
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
79
                'apply_delta': apply_inventory_WT_basis,
 
80
                'format': format}))
 
81
        scenarios.append(
 
82
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
83
                'apply_delta': apply_inventory_WT,
 
84
                'format': format}))
 
85
    return scenarios
 
86
 
 
87
 
 
88
def create_texts_for_inv(repo, inv):
 
89
    for path, ie in inv.iter_entries():
 
90
        if ie.text_size:
 
91
            lines = [b'a' * ie.text_size]
 
92
        else:
 
93
            lines = []
 
94
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
95
 
 
96
 
 
97
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
98
    """Apply delta to basis and return the result.
 
99
 
 
100
    :param basis: An inventory to be used as the basis.
 
101
    :param delta: The inventory delta to apply:
 
102
    :return: An inventory resulting from the application.
 
103
    """
 
104
    basis.apply_delta(delta)
 
105
    return basis
 
106
 
 
107
 
 
108
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
109
    """Apply delta to basis and return the result.
 
110
 
 
111
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
112
 
 
113
    :param basis: An inventory to be used as the basis.
 
114
    :param delta: The inventory delta to apply:
 
115
    :return: An inventory resulting from the application.
 
116
    """
 
117
    control = self.make_controldir(
 
118
        'tree', format=self.format._matchingcontroldir)
 
119
    control.create_repository()
 
120
    control.create_branch()
 
121
    tree = self.format.initialize(control)
 
122
    tree.lock_write()
 
123
    try:
 
124
        tree._write_inventory(basis)
 
125
    finally:
 
126
        tree.unlock()
 
127
    # Fresh object, reads disk again.
 
128
    tree = tree.controldir.open_workingtree()
 
129
    tree.lock_write()
 
130
    try:
 
131
        tree.apply_inventory_delta(delta)
 
132
    finally:
 
133
        tree.unlock()
 
134
    # reload tree - ensure we get what was written.
 
135
    tree = tree.controldir.open_workingtree()
 
136
    tree.lock_read()
 
137
    self.addCleanup(tree.unlock)
 
138
    if not invalid_delta:
 
139
        tree._validate()
 
140
    return tree.root_inventory
 
141
 
 
142
 
 
143
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
144
    repo.start_write_group()
 
145
    try:
 
146
        rev = revision.Revision(b'basis', timestamp=0, timezone=None,
 
147
                                message="", committer="foo@example.com")
 
148
        basis.revision_id = b'basis'
 
149
        create_texts_for_inv(repo, basis)
 
150
        repo.add_revision(b'basis', rev, basis)
 
151
        if invalid_delta:
 
152
            # We don't want to apply the delta to the basis, because we expect
 
153
            # the delta is invalid.
 
154
            result_inv = basis
 
155
            result_inv.revision_id = b'result'
 
156
            target_entries = None
 
157
        else:
 
158
            result_inv = basis.create_by_apply_delta(delta, b'result')
 
159
            create_texts_for_inv(repo, result_inv)
 
160
            target_entries = list(result_inv.iter_entries_by_dir())
 
161
        rev = revision.Revision(b'result', timestamp=0, timezone=None,
 
162
                                message="", committer="foo@example.com")
 
163
        repo.add_revision(b'result', rev, result_inv)
 
164
        repo.commit_write_group()
 
165
    except:
 
166
        repo.abort_write_group()
 
167
        raise
 
168
    return target_entries
 
169
 
 
170
 
 
171
def _get_basis_entries(tree):
 
172
    basis_tree = tree.basis_tree()
 
173
    basis_tree.lock_read()
 
174
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
175
    basis_tree.unlock()
 
176
    return basis_tree_entries
 
177
 
 
178
 
 
179
def _populate_different_tree(tree, basis, delta):
 
180
    """Put all entries into tree, but at a unique location."""
 
181
    added_ids = set()
 
182
    added_paths = set()
 
183
    tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
 
184
    for path, ie in basis.iter_entries_by_dir():
 
185
        if ie.file_id in added_ids:
 
186
            continue
 
187
        # We want a unique path for each of these, we use the file-id
 
188
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
189
        added_ids.add(ie.file_id)
 
190
    for old_path, new_path, file_id, ie in delta:
 
191
        if file_id in added_ids:
 
192
            continue
 
193
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
194
 
 
195
 
 
196
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
197
    """Apply delta to basis and return the result.
 
198
 
 
199
    This sets the parent and then calls update_basis_by_delta.
 
200
    It also puts the basis in the repository under both 'basis' and 'result' to
 
201
    allow safety checks made by the WT to succeed, and finally ensures that all
 
202
    items in the delta with a new path are present in the WT before calling
 
203
    update_basis_by_delta.
 
204
 
 
205
    :param basis: An inventory to be used as the basis.
 
206
    :param delta: The inventory delta to apply:
 
207
    :return: An inventory resulting from the application.
 
208
    """
 
209
    control = test.make_controldir(
 
210
        'tree', format=test.format._matchingcontroldir)
 
211
    control.create_repository()
 
212
    control.create_branch()
 
213
    tree = test.format.initialize(control)
 
214
    tree.lock_write()
 
215
    try:
 
216
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
217
                                                delta, invalid_delta)
 
218
        # Set the basis state as the trees current state
 
219
        tree._write_inventory(basis)
 
220
        # This reads basis from the repo and puts it into the tree's local
 
221
        # cache, if it has one.
 
222
        tree.set_parent_ids([b'basis'])
 
223
    finally:
 
224
        tree.unlock()
 
225
    # Fresh lock, reads disk again.
 
226
    with tree.lock_write():
 
227
        tree.update_basis_by_delta(b'result', delta)
 
228
        if not invalid_delta:
 
229
            tree._validate()
 
230
    # reload tree - ensure we get what was written.
 
231
    tree = tree.controldir.open_workingtree()
 
232
    basis_tree = tree.basis_tree()
 
233
    basis_tree.lock_read()
 
234
    test.addCleanup(basis_tree.unlock)
 
235
    basis_inv = basis_tree.root_inventory
 
236
    if target_entries:
 
237
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
238
        test.assertEqual(target_entries, basis_entries)
 
239
    return basis_inv
 
240
 
 
241
 
 
242
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
243
                                                      invalid_delta=True):
 
244
    """Apply delta to basis and return the result.
 
245
 
 
246
    This inserts basis as a whole inventory and then uses
 
247
    add_inventory_by_delta to add delta.
 
248
 
 
249
    :param basis: An inventory to be used as the basis.
 
250
    :param delta: The inventory delta to apply:
 
251
    :return: An inventory resulting from the application.
 
252
    """
 
253
    format = self.format()
 
254
    control = self.make_controldir('tree', format=format._matchingcontroldir)
 
255
    repo = format.initialize(control)
 
256
    with repo.lock_write():
 
257
        repo.start_write_group()
 
258
        try:
 
259
            rev = revision.Revision(b'basis', timestamp=0, timezone=None,
 
260
                                    message="", committer="foo@example.com")
 
261
            basis.revision_id = b'basis'
 
262
            create_texts_for_inv(repo, basis)
 
263
            repo.add_revision(b'basis', rev, basis)
 
264
            repo.commit_write_group()
 
265
        except:
 
266
            repo.abort_write_group()
 
267
            raise
 
268
    with repo.lock_write():
 
269
        repo.start_write_group()
 
270
        try:
 
271
            inv_sha1 = repo.add_inventory_by_delta(b'basis', delta,
 
272
                                                   b'result', [b'basis'])
 
273
        except:
 
274
            repo.abort_write_group()
 
275
            raise
 
276
        else:
 
277
            repo.commit_write_group()
 
278
    # Fresh lock, reads disk again.
 
279
    repo = repo.controldir.open_repository()
 
280
    repo.lock_read()
 
281
    self.addCleanup(repo.unlock)
 
282
    return repo.get_inventory(b'result')
 
283
 
 
284
 
 
285
class TestInventoryUpdates(TestCase):
 
286
 
 
287
    def test_creation_from_root_id(self):
 
288
        # iff a root id is passed to the constructor, a root directory is made
 
289
        inv = inventory.Inventory(root_id=b'tree-root')
 
290
        self.assertNotEqual(None, inv.root)
 
291
        self.assertEqual(b'tree-root', inv.root.file_id)
 
292
 
 
293
    def test_add_path_of_root(self):
 
294
        # if no root id is given at creation time, there is no root directory
 
295
        inv = inventory.Inventory(root_id=None)
 
296
        self.assertIs(None, inv.root)
 
297
        # add a root entry by adding its path
 
298
        ie = inv.add_path(u"", "directory", b"my-root")
 
299
        ie.revision = b'test-rev'
 
300
        self.assertEqual(b"my-root", ie.file_id)
 
301
        self.assertIs(ie, inv.root)
 
302
 
 
303
    def test_add_path(self):
 
304
        inv = inventory.Inventory(root_id=b'tree_root')
 
305
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
306
        self.assertEqual(b'hello-id', ie.file_id)
 
307
        self.assertEqual('file', ie.kind)
 
308
 
 
309
    def test_copy(self):
 
310
        """Make sure copy() works and creates a deep copy."""
 
311
        inv = inventory.Inventory(root_id=b'some-tree-root')
 
312
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
313
        inv2 = inv.copy()
 
314
        inv.root.file_id = b'some-new-root'
 
315
        ie.name = u'file2'
 
316
        self.assertEqual(b'some-tree-root', inv2.root.file_id)
 
317
        self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
 
318
 
 
319
    def test_copy_empty(self):
 
320
        """Make sure an empty inventory can be copied."""
 
321
        inv = inventory.Inventory(root_id=None)
 
322
        inv2 = inv.copy()
 
323
        self.assertIs(None, inv2.root)
 
324
 
 
325
    def test_copy_copies_root_revision(self):
 
326
        """Make sure the revision of the root gets copied."""
 
327
        inv = inventory.Inventory(root_id=b'someroot')
 
328
        inv.root.revision = b'therev'
 
329
        inv2 = inv.copy()
 
330
        self.assertEqual(b'someroot', inv2.root.file_id)
 
331
        self.assertEqual(b'therev', inv2.root.revision)
 
332
 
 
333
    def test_create_tree_reference(self):
 
334
        inv = inventory.Inventory(b'tree-root-123')
 
335
        inv.add(TreeReference(
 
336
            b'nested-id', 'nested', parent_id=b'tree-root-123',
 
337
            revision=b'rev', reference_revision=b'rev2'))
 
338
 
 
339
    def test_error_encoding(self):
 
340
        inv = inventory.Inventory(b'tree-root')
 
341
        inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
 
342
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
343
                              InventoryFile(b'b-id', u'\u1234', b'tree-root'))
 
344
        self.assertContainsRe(str(e), '\\u1234')
 
345
 
 
346
    def test_add_recursive(self):
 
347
        parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
 
348
        child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
 
349
        parent.children[child.file_id] = child
 
350
        inv = inventory.Inventory(b'tree-root')
 
351
        inv.add(parent)
 
352
        self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
 
353
 
 
354
 
 
355
class TestDeltaApplication(TestCaseWithTransport):
 
356
 
 
357
    scenarios = delta_application_scenarios()
 
358
 
 
359
    def get_empty_inventory(self, reference_inv=None):
 
360
        """Get an empty inventory.
 
361
 
 
362
        Note that tests should not depend on the revision of the root for
 
363
        setting up test conditions, as it has to be flexible to accomodate non
 
364
        rich root repositories.
 
365
 
 
366
        :param reference_inv: If not None, get the revision for the root from
 
367
            this inventory. This is useful for dealing with older repositories
 
368
            that routinely discarded the root entry data. If None, the root's
 
369
            revision is set to 'basis'.
 
370
        """
 
371
        inv = inventory.Inventory()
 
372
        if reference_inv is not None:
 
373
            inv.root.revision = reference_inv.root.revision
 
374
        else:
 
375
            inv.root.revision = b'basis'
 
376
        return inv
 
377
 
 
378
    def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
 
379
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
380
        ie_file.revision = b'result'
 
381
        ie_file.text_size = 0
 
382
        ie_file.text_sha1 = b''
 
383
        return ie_file
 
384
 
 
385
    def test_empty_delta(self):
 
386
        inv = self.get_empty_inventory()
 
387
        delta = []
 
388
        inv = self.apply_delta(self, inv, delta)
 
389
        inv2 = self.get_empty_inventory(inv)
 
390
        self.assertEqual([], inv2._make_delta(inv))
 
391
 
 
392
    def test_None_file_id(self):
 
393
        inv = self.get_empty_inventory()
 
394
        dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
 
395
        dir1.file_id = None
 
396
        dir1.revision = b'result'
 
397
        delta = [(None, u'dir1', None, dir1)]
 
398
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
399
                          inv, delta)
 
400
 
 
401
    def test_unicode_file_id(self):
 
402
        inv = self.get_empty_inventory()
 
403
        dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
 
404
        dir1.file_id = u'dirid'
 
405
        dir1.revision = b'result'
 
406
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
407
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
408
                          inv, delta)
 
409
 
 
410
    def test_repeated_file_id(self):
 
411
        inv = self.get_empty_inventory()
 
412
        file1 = inventory.InventoryFile(b'id', 'path1', inv.root.file_id)
 
413
        file1.revision = b'result'
 
414
        file1.text_size = 0
 
415
        file1.text_sha1 = b""
 
416
        file2 = file1.copy()
 
417
        file2.name = 'path2'
 
418
        delta = [(None, u'path1', b'id', file1),
 
419
                 (None, u'path2', b'id', file2)]
 
420
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
421
                          inv, delta)
 
422
 
 
423
    def test_repeated_new_path(self):
 
424
        inv = self.get_empty_inventory()
 
425
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
426
        file1.revision = b'result'
 
427
        file1.text_size = 0
 
428
        file1.text_sha1 = b""
 
429
        file2 = file1.copy()
 
430
        file2.file_id = b'id2'
 
431
        delta = [(None, u'path', b'id1', file1),
 
432
                 (None, u'path', b'id2', file2)]
 
433
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
434
                          inv, delta)
 
435
 
 
436
    def test_repeated_old_path(self):
 
437
        inv = self.get_empty_inventory()
 
438
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
439
        file1.revision = b'result'
 
440
        file1.text_size = 0
 
441
        file1.text_sha1 = b""
 
442
        # We can't *create* a source inventory with the same path, but
 
443
        # a badly generated partial delta might claim the same source twice.
 
444
        # This would be buggy in two ways: the path is repeated in the delta,
 
445
        # And the path for one of the file ids doesn't match the source
 
446
        # location. Alternatively, we could have a repeated fileid, but that
 
447
        # is separately checked for.
 
448
        file2 = inventory.InventoryFile(b'id2', 'path2', inv.root.file_id)
 
449
        file2.revision = b'result'
 
450
        file2.text_size = 0
 
451
        file2.text_sha1 = b""
 
452
        inv.add(file1)
 
453
        inv.add(file2)
 
454
        delta = [(u'path', None, b'id1', None), (u'path', None, b'id2', None)]
 
455
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
456
                          inv, delta)
 
457
 
 
458
    def test_mismatched_id_entry_id(self):
 
459
        inv = self.get_empty_inventory()
 
460
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
461
        file1.revision = b'result'
 
462
        file1.text_size = 0
 
463
        file1.text_sha1 = b""
 
464
        delta = [(None, u'path', b'id', file1)]
 
465
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
466
                          inv, delta)
 
467
 
 
468
    def test_mismatched_new_path_entry_None(self):
 
469
        inv = self.get_empty_inventory()
 
470
        delta = [(None, u'path', b'id', None)]
 
471
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
472
                          inv, delta)
 
473
 
 
474
    def test_mismatched_new_path_None_entry(self):
 
475
        inv = self.get_empty_inventory()
 
476
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
477
        file1.revision = b'result'
 
478
        file1.text_size = 0
 
479
        file1.text_sha1 = b""
 
480
        delta = [(u"path", None, b'id1', file1)]
 
481
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
482
                          inv, delta)
 
483
 
 
484
    def test_parent_is_not_directory(self):
 
485
        inv = self.get_empty_inventory()
 
486
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
487
        file1.revision = b'result'
 
488
        file1.text_size = 0
 
489
        file1.text_sha1 = b""
 
490
        file2 = inventory.InventoryFile(b'id2', 'path2', b'id1')
 
491
        file2.revision = b'result'
 
492
        file2.text_size = 0
 
493
        file2.text_sha1 = b""
 
494
        inv.add(file1)
 
495
        delta = [(None, u'path/path2', b'id2', file2)]
 
496
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
497
                          inv, delta)
 
498
 
 
499
    def test_parent_is_missing(self):
 
500
        inv = self.get_empty_inventory()
 
501
        file2 = inventory.InventoryFile(b'id2', 'path2', b'missingparent')
 
502
        file2.revision = b'result'
 
503
        file2.text_size = 0
 
504
        file2.text_sha1 = b""
 
505
        delta = [(None, u'path/path2', b'id2', file2)]
 
506
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
507
                          inv, delta)
 
508
 
 
509
    def test_new_parent_path_has_wrong_id(self):
 
510
        inv = self.get_empty_inventory()
 
511
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
512
        parent1.revision = b'result'
 
513
        parent2 = inventory.InventoryDirectory(
 
514
            b'p-2', 'dir2', inv.root.file_id)
 
515
        parent2.revision = b'result'
 
516
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
517
        file1.revision = b'result'
 
518
        file1.text_size = 0
 
519
        file1.text_sha1 = b""
 
520
        inv.add(parent1)
 
521
        inv.add(parent2)
 
522
        # This delta claims that file1 is at dir/path, but actually its at
 
523
        # dir2/path if you follow the inventory parent structure.
 
524
        delta = [(None, u'dir/path', b'id', file1)]
 
525
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
526
                          inv, delta)
 
527
 
 
528
    def test_old_parent_path_is_wrong(self):
 
529
        inv = self.get_empty_inventory()
 
530
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
531
        parent1.revision = b'result'
 
532
        parent2 = inventory.InventoryDirectory(
 
533
            b'p-2', 'dir2', inv.root.file_id)
 
534
        parent2.revision = b'result'
 
535
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
536
        file1.revision = b'result'
 
537
        file1.text_size = 0
 
538
        file1.text_sha1 = b""
 
539
        inv.add(parent1)
 
540
        inv.add(parent2)
 
541
        inv.add(file1)
 
542
        # This delta claims that file1 was at dir/path, but actually it was at
 
543
        # dir2/path if you follow the inventory parent structure.
 
544
        delta = [(u'dir/path', None, b'id', None)]
 
545
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
546
                          inv, delta)
 
547
 
 
548
    def test_old_parent_path_is_for_other_id(self):
 
549
        inv = self.get_empty_inventory()
 
550
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
551
        parent1.revision = b'result'
 
552
        parent2 = inventory.InventoryDirectory(
 
553
            b'p-2', 'dir2', inv.root.file_id)
 
554
        parent2.revision = b'result'
 
555
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
556
        file1.revision = b'result'
 
557
        file1.text_size = 0
 
558
        file1.text_sha1 = b""
 
559
        file2 = inventory.InventoryFile(b'id2', 'path', b'p-1')
 
560
        file2.revision = b'result'
 
561
        file2.text_size = 0
 
562
        file2.text_sha1 = b""
 
563
        inv.add(parent1)
 
564
        inv.add(parent2)
 
565
        inv.add(file1)
 
566
        inv.add(file2)
 
567
        # This delta claims that file1 was at dir/path, but actually it was at
 
568
        # dir2/path if you follow the inventory parent structure. At dir/path
 
569
        # is another entry we should not delete.
 
570
        delta = [(u'dir/path', None, b'id', None)]
 
571
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
572
                          inv, delta)
 
573
 
 
574
    def test_add_existing_id_new_path(self):
 
575
        inv = self.get_empty_inventory()
 
576
        parent1 = inventory.InventoryDirectory(
 
577
            b'p-1', 'dir1', inv.root.file_id)
 
578
        parent1.revision = b'result'
 
579
        parent2 = inventory.InventoryDirectory(
 
580
            b'p-1', 'dir2', inv.root.file_id)
 
581
        parent2.revision = b'result'
 
582
        inv.add(parent1)
 
583
        delta = [(None, u'dir2', b'p-1', parent2)]
 
584
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
585
                          inv, delta)
 
586
 
 
587
    def test_add_new_id_existing_path(self):
 
588
        inv = self.get_empty_inventory()
 
589
        parent1 = inventory.InventoryDirectory(
 
590
            b'p-1', 'dir1', inv.root.file_id)
 
591
        parent1.revision = b'result'
 
592
        parent2 = inventory.InventoryDirectory(
 
593
            b'p-2', 'dir1', inv.root.file_id)
 
594
        parent2.revision = b'result'
 
595
        inv.add(parent1)
 
596
        delta = [(None, u'dir1', b'p-2', parent2)]
 
597
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
598
                          inv, delta)
 
599
 
 
600
    def test_remove_dir_leaving_dangling_child(self):
 
601
        inv = self.get_empty_inventory()
 
602
        dir1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
 
603
        dir1.revision = b'result'
 
604
        dir2 = inventory.InventoryDirectory(b'p-2', 'child1', b'p-1')
 
605
        dir2.revision = b'result'
 
606
        dir3 = inventory.InventoryDirectory(b'p-3', 'child2', b'p-1')
 
607
        dir3.revision = b'result'
 
608
        inv.add(dir1)
 
609
        inv.add(dir2)
 
610
        inv.add(dir3)
 
611
        delta = [(u'dir1', None, b'p-1', None),
 
612
                 (u'dir1/child2', None, b'p-3', None)]
 
613
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
614
                          inv, delta)
 
615
 
 
616
    def test_add_file(self):
 
617
        inv = self.get_empty_inventory()
 
618
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
619
        file1.revision = b'result'
 
620
        file1.text_size = 0
 
621
        file1.text_sha1 = b''
 
622
        delta = [(None, u'path', b'file-id', file1)]
 
623
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
624
        self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
 
625
 
 
626
    def test_remove_file(self):
 
627
        inv = self.get_empty_inventory()
 
628
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
629
        file1.revision = b'result'
 
630
        file1.text_size = 0
 
631
        file1.text_sha1 = b''
 
632
        inv.add(file1)
 
633
        delta = [(u'path', None, b'file-id', None)]
 
634
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
635
        self.assertEqual(None, res_inv.path2id('path'))
 
636
        self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
 
637
 
 
638
    def test_rename_file(self):
 
639
        inv = self.get_empty_inventory()
 
640
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
641
        inv.add(file1)
 
642
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
643
        delta = [(u'path', 'path2', b'file-id', file2)]
 
644
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
645
        self.assertEqual(None, res_inv.path2id('path'))
 
646
        self.assertEqual(b'file-id', res_inv.path2id('path2'))
 
647
 
 
648
    def test_replaced_at_new_path(self):
 
649
        inv = self.get_empty_inventory()
 
650
        file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
 
651
        inv.add(file1)
 
652
        file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
 
653
        delta = [(u'name', None, b'id1', None),
 
654
                 (None, u'name', b'id2', file2)]
 
655
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
656
        self.assertEqual(b'id2', res_inv.path2id('name'))
 
657
 
 
658
    def test_rename_dir(self):
 
659
        inv = self.get_empty_inventory()
 
660
        dir1 = inventory.InventoryDirectory(
 
661
            b'dir-id', 'dir1', inv.root.file_id)
 
662
        dir1.revision = b'basis'
 
663
        file1 = self.make_file_ie(parent_id=b'dir-id')
 
664
        inv.add(dir1)
 
665
        inv.add(file1)
 
666
        dir2 = inventory.InventoryDirectory(
 
667
            b'dir-id', 'dir2', inv.root.file_id)
 
668
        dir2.revision = b'result'
 
669
        delta = [('dir1', 'dir2', b'dir-id', dir2)]
 
670
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
671
        # The file should be accessible under the new path
 
672
        self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
 
673
 
 
674
    def test_renamed_dir_with_renamed_child(self):
 
675
        inv = self.get_empty_inventory()
 
676
        dir1 = inventory.InventoryDirectory(
 
677
            b'dir-id', 'dir1', inv.root.file_id)
 
678
        dir1.revision = b'basis'
 
679
        file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
 
680
        file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
 
681
        inv.add(dir1)
 
682
        inv.add(file1)
 
683
        inv.add(file2)
 
684
        dir2 = inventory.InventoryDirectory(
 
685
            b'dir-id', 'dir2', inv.root.file_id)
 
686
        dir2.revision = b'result'
 
687
        file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
 
688
        delta = [('dir1', 'dir2', b'dir-id', dir2),
 
689
                 ('dir1/name2', 'name2', b'file-id-2', file2b)]
 
690
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
691
        # The file should be accessible under the new path
 
692
        self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
 
693
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
694
        self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
 
695
 
 
696
    def test_is_root(self):
 
697
        """Ensure our root-checking code is accurate."""
 
698
        inv = inventory.Inventory(b'TREE_ROOT')
 
699
        self.assertTrue(inv.is_root(b'TREE_ROOT'))
 
700
        self.assertFalse(inv.is_root(b'booga'))
 
701
        inv.root.file_id = b'booga'
 
702
        self.assertFalse(inv.is_root(b'TREE_ROOT'))
 
703
        self.assertTrue(inv.is_root(b'booga'))
 
704
        # works properly even if no root is set
 
705
        inv.root = None
 
706
        self.assertFalse(inv.is_root(b'TREE_ROOT'))
 
707
        self.assertFalse(inv.is_root(b'booga'))
 
708
 
 
709
    def test_entries_for_empty_inventory(self):
 
710
        """Test that entries() will not fail for an empty inventory"""
 
711
        inv = Inventory(root_id=None)
 
712
        self.assertEqual([], inv.entries())
 
713
 
 
714
 
 
715
class TestInventoryEntry(TestCase):
 
716
 
 
717
    def test_file_invalid_entry_name(self):
 
718
        self.assertRaises(errors.InvalidEntryName, inventory.InventoryFile,
 
719
                          b'123', 'a/hello.c', ROOT_ID)
 
720
 
 
721
    def test_file_backslash(self):
 
722
        file = inventory.InventoryFile(b'123', 'h\\ello.c', ROOT_ID)
 
723
        self.assertEquals(file.name, 'h\\ello.c')
 
724
 
 
725
    def test_file_kind_character(self):
 
726
        file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
727
        self.assertEqual(file.kind_character(), '')
 
728
 
 
729
    def test_dir_kind_character(self):
 
730
        dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
731
        self.assertEqual(dir.kind_character(), '/')
 
732
 
 
733
    def test_link_kind_character(self):
 
734
        dir = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
735
        self.assertEqual(dir.kind_character(), '')
 
736
 
 
737
    def test_link_kind_character(self):
 
738
        dir = TreeReference(b'123', 'hello.c', ROOT_ID)
 
739
        self.assertEqual(dir.kind_character(), '+')
 
740
 
 
741
    def test_dir_detect_changes(self):
 
742
        left = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
743
        right = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
744
        self.assertEqual((False, False), left.detect_changes(right))
 
745
        self.assertEqual((False, False), right.detect_changes(left))
 
746
 
 
747
    def test_file_detect_changes(self):
 
748
        left = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
749
        left.text_sha1 = 123
 
750
        right = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
751
        right.text_sha1 = 123
 
752
        self.assertEqual((False, False), left.detect_changes(right))
 
753
        self.assertEqual((False, False), right.detect_changes(left))
 
754
        left.executable = True
 
755
        self.assertEqual((False, True), left.detect_changes(right))
 
756
        self.assertEqual((False, True), right.detect_changes(left))
 
757
        right.text_sha1 = 321
 
758
        self.assertEqual((True, True), left.detect_changes(right))
 
759
        self.assertEqual((True, True), right.detect_changes(left))
 
760
 
 
761
    def test_symlink_detect_changes(self):
 
762
        left = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
763
        left.symlink_target = 'foo'
 
764
        right = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
765
        right.symlink_target = 'foo'
 
766
        self.assertEqual((False, False), left.detect_changes(right))
 
767
        self.assertEqual((False, False), right.detect_changes(left))
 
768
        left.symlink_target = 'different'
 
769
        self.assertEqual((True, False), left.detect_changes(right))
 
770
        self.assertEqual((True, False), right.detect_changes(left))
 
771
 
 
772
    def test_file_has_text(self):
 
773
        file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
774
        self.assertTrue(file.has_text())
 
775
 
 
776
    def test_directory_has_text(self):
 
777
        dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
778
        self.assertFalse(dir.has_text())
 
779
 
 
780
    def test_link_has_text(self):
 
781
        link = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
782
        self.assertFalse(link.has_text())
 
783
 
 
784
    def test_make_entry(self):
 
785
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
786
                              inventory.InventoryFile)
 
787
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
788
                              inventory.InventoryLink)
 
789
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
790
                              inventory.InventoryDirectory)
 
791
 
 
792
    def test_make_entry_non_normalized(self):
 
793
        orig_normalized_filename = osutils.normalized_filename
 
794
 
 
795
        try:
 
796
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
797
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
798
            self.assertEqual(u'\xe5', entry.name)
 
799
            self.assertIsInstance(entry, inventory.InventoryFile)
 
800
 
 
801
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
802
            self.assertRaises(errors.InvalidNormalization,
 
803
                              inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
804
        finally:
 
805
            osutils.normalized_filename = orig_normalized_filename
 
806
 
 
807
 
 
808
class TestDescribeChanges(TestCase):
 
809
 
 
810
    def test_describe_change(self):
 
811
        # we need to test the following change combinations:
 
812
        # rename
 
813
        # reparent
 
814
        # modify
 
815
        # gone
 
816
        # added
 
817
        # renamed/reparented and modified
 
818
        # change kind (perhaps can't be done yet?)
 
819
        # also, merged in combination with all of these?
 
820
        old_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
 
821
        old_a.text_sha1 = b'123132'
 
822
        old_a.text_size = 0
 
823
        new_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
 
824
        new_a.text_sha1 = b'123132'
 
825
        new_a.text_size = 0
 
826
 
 
827
        self.assertChangeDescription('unchanged', old_a, new_a)
 
828
 
 
829
        new_a.text_size = 10
 
830
        new_a.text_sha1 = b'abcabc'
 
831
        self.assertChangeDescription('modified', old_a, new_a)
 
832
 
 
833
        self.assertChangeDescription('added', None, new_a)
 
834
        self.assertChangeDescription('removed', old_a, None)
 
835
        # perhaps a bit questionable but seems like the most reasonable thing...
 
836
        self.assertChangeDescription('unchanged', None, None)
 
837
 
 
838
        # in this case it's both renamed and modified; show a rename and
 
839
        # modification:
 
840
        new_a.name = 'newfilename'
 
841
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
842
 
 
843
        # reparenting is 'renaming'
 
844
        new_a.name = old_a.name
 
845
        new_a.parent_id = b'somedir-id'
 
846
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
847
 
 
848
        # reset the content values so its not modified
 
849
        new_a.text_size = old_a.text_size
 
850
        new_a.text_sha1 = old_a.text_sha1
 
851
        new_a.name = old_a.name
 
852
 
 
853
        new_a.name = 'newfilename'
 
854
        self.assertChangeDescription('renamed', old_a, new_a)
 
855
 
 
856
        # reparenting is 'renaming'
 
857
        new_a.name = old_a.name
 
858
        new_a.parent_id = b'somedir-id'
 
859
        self.assertChangeDescription('renamed', old_a, new_a)
 
860
 
 
861
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
862
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
863
        self.assertEqual(expected_change, change)
 
864
 
 
865
 
 
866
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
867
 
 
868
    def get_chk_bytes(self):
 
869
        factory = groupcompress.make_pack_factory(True, True, 1)
 
870
        trans = self.get_transport('')
 
871
        return factory(trans)
 
872
 
 
873
    def read_bytes(self, chk_bytes, key):
 
874
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
875
        return next(stream).get_bytes_as("fulltext")
 
876
 
 
877
    def test_deserialise_gives_CHKInventory(self):
 
878
        inv = Inventory()
 
879
        inv.revision_id = b"revid"
 
880
        inv.root.revision = b"rootrev"
 
881
        chk_bytes = self.get_chk_bytes()
 
882
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
883
        bytes = b''.join(chk_inv.to_lines())
 
884
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
885
        self.assertEqual(b"revid", new_inv.revision_id)
 
886
        self.assertEqual("directory", new_inv.root.kind)
 
887
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
888
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
889
        self.assertEqual(inv.root.name, new_inv.root.name)
 
890
        self.assertEqual(b"rootrev", new_inv.root.revision)
 
891
        self.assertEqual(b'plain', new_inv._search_key_name)
 
892
 
 
893
    def test_deserialise_wrong_revid(self):
 
894
        inv = Inventory()
 
895
        inv.revision_id = b"revid"
 
896
        inv.root.revision = b"rootrev"
 
897
        chk_bytes = self.get_chk_bytes()
 
898
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
899
        bytes = b''.join(chk_inv.to_lines())
 
900
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
901
                          bytes, (b"revid2",))
 
902
 
 
903
    def test_captures_rev_root_byid(self):
 
904
        inv = Inventory()
 
905
        inv.revision_id = b"foo"
 
906
        inv.root.revision = b"bar"
 
907
        chk_bytes = self.get_chk_bytes()
 
908
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
909
        lines = chk_inv.to_lines()
 
910
        self.assertEqual([
 
911
            b'chkinventory:\n',
 
912
            b'revision_id: foo\n',
 
913
            b'root_id: TREE_ROOT\n',
 
914
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
915
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
916
            ], lines)
 
917
        chk_inv = CHKInventory.deserialise(
 
918
            chk_bytes, b''.join(lines), (b'foo',))
 
919
        self.assertEqual(b'plain', chk_inv._search_key_name)
 
920
 
 
921
    def test_captures_parent_id_basename_index(self):
 
922
        inv = Inventory()
 
923
        inv.revision_id = b"foo"
 
924
        inv.root.revision = b"bar"
 
925
        chk_bytes = self.get_chk_bytes()
 
926
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
927
        lines = chk_inv.to_lines()
 
928
        self.assertEqual([
 
929
            b'chkinventory:\n',
 
930
            b'revision_id: foo\n',
 
931
            b'root_id: TREE_ROOT\n',
 
932
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
933
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
934
            ], lines)
 
935
        chk_inv = CHKInventory.deserialise(
 
936
            chk_bytes, b''.join(lines), (b'foo',))
 
937
        self.assertEqual(b'plain', chk_inv._search_key_name)
 
938
 
 
939
    def test_captures_search_key_name(self):
 
940
        inv = Inventory()
 
941
        inv.revision_id = b"foo"
 
942
        inv.root.revision = b"bar"
 
943
        chk_bytes = self.get_chk_bytes()
 
944
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
945
                                              search_key_name=b'hash-16-way')
 
946
        lines = chk_inv.to_lines()
 
947
        self.assertEqual([
 
948
            b'chkinventory:\n',
 
949
            b'search_key_name: hash-16-way\n',
 
950
            b'root_id: TREE_ROOT\n',
 
951
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
952
            b'revision_id: foo\n',
 
953
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
954
            ], lines)
 
955
        chk_inv = CHKInventory.deserialise(
 
956
            chk_bytes, b''.join(lines), (b'foo',))
 
957
        self.assertEqual(b'hash-16-way', chk_inv._search_key_name)
 
958
 
 
959
    def test_directory_children_on_demand(self):
 
960
        inv = Inventory()
 
961
        inv.revision_id = b"revid"
 
962
        inv.root.revision = b"rootrev"
 
963
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
964
        inv.get_entry(b"fileid").revision = b"filerev"
 
965
        inv.get_entry(b"fileid").executable = True
 
966
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
967
        inv.get_entry(b"fileid").text_size = 1
 
968
        chk_bytes = self.get_chk_bytes()
 
969
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
970
        bytes = b''.join(chk_inv.to_lines())
 
971
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
972
        root_entry = new_inv.get_entry(inv.root.file_id)
 
973
        self.assertEqual(None, root_entry._children)
 
974
        self.assertEqual({'file'}, set(root_entry.children))
 
975
        file_direct = new_inv.get_entry(b"fileid")
 
976
        file_found = root_entry.children['file']
 
977
        self.assertEqual(file_direct.kind, file_found.kind)
 
978
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
979
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
980
        self.assertEqual(file_direct.name, file_found.name)
 
981
        self.assertEqual(file_direct.revision, file_found.revision)
 
982
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
983
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
984
        self.assertEqual(file_direct.executable, file_found.executable)
 
985
 
 
986
    def test_from_inventory_maximum_size(self):
 
987
        # from_inventory supports the maximum_size parameter.
 
988
        inv = Inventory()
 
989
        inv.revision_id = b"revid"
 
990
        inv.root.revision = b"rootrev"
 
991
        chk_bytes = self.get_chk_bytes()
 
992
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
993
        chk_inv.id_to_entry._ensure_root()
 
994
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
995
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
996
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
997
        p_id_basename._ensure_root()
 
998
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
999
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
1000
 
 
1001
    def test_iter_all_ids(self):
 
1002
        inv = Inventory()
 
1003
        inv.revision_id = b"revid"
 
1004
        inv.root.revision = b"rootrev"
 
1005
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1006
        inv.get_entry(b"fileid").revision = b"filerev"
 
1007
        inv.get_entry(b"fileid").executable = True
 
1008
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1009
        inv.get_entry(b"fileid").text_size = 1
 
1010
        chk_bytes = self.get_chk_bytes()
 
1011
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1012
        bytes = b''.join(chk_inv.to_lines())
 
1013
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1014
        fileids = sorted(new_inv.iter_all_ids())
 
1015
        self.assertEqual([inv.root.file_id, b"fileid"], fileids)
 
1016
 
 
1017
    def test__len__(self):
 
1018
        inv = Inventory()
 
1019
        inv.revision_id = b"revid"
 
1020
        inv.root.revision = b"rootrev"
 
1021
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1022
        inv.get_entry(b"fileid").revision = b"filerev"
 
1023
        inv.get_entry(b"fileid").executable = True
 
1024
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1025
        inv.get_entry(b"fileid").text_size = 1
 
1026
        chk_bytes = self.get_chk_bytes()
 
1027
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1028
        self.assertEqual(2, len(chk_inv))
 
1029
 
 
1030
    def test_get_entry(self):
 
1031
        inv = Inventory()
 
1032
        inv.revision_id = b"revid"
 
1033
        inv.root.revision = b"rootrev"
 
1034
        inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
 
1035
        inv.get_entry(b"fileid").revision = b"filerev"
 
1036
        inv.get_entry(b"fileid").executable = True
 
1037
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1038
        inv.get_entry(b"fileid").text_size = 1
 
1039
        chk_bytes = self.get_chk_bytes()
 
1040
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1041
        data = b''.join(chk_inv.to_lines())
 
1042
        new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
 
1043
        root_entry = new_inv.get_entry(inv.root.file_id)
 
1044
        file_entry = new_inv.get_entry(b"fileid")
 
1045
        self.assertEqual("directory", root_entry.kind)
 
1046
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
1047
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
1048
        self.assertEqual(inv.root.name, root_entry.name)
 
1049
        self.assertEqual(b"rootrev", root_entry.revision)
 
1050
        self.assertEqual("file", file_entry.kind)
 
1051
        self.assertEqual(b"fileid", file_entry.file_id)
 
1052
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
1053
        self.assertEqual(u"file", file_entry.name)
 
1054
        self.assertEqual(b"filerev", file_entry.revision)
 
1055
        self.assertEqual(b"ffff", file_entry.text_sha1)
 
1056
        self.assertEqual(1, file_entry.text_size)
 
1057
        self.assertEqual(True, file_entry.executable)
 
1058
        self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
 
1059
 
 
1060
    def test_has_id_true(self):
 
1061
        inv = Inventory()
 
1062
        inv.revision_id = b"revid"
 
1063
        inv.root.revision = b"rootrev"
 
1064
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1065
        inv.get_entry(b"fileid").revision = b"filerev"
 
1066
        inv.get_entry(b"fileid").executable = True
 
1067
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1068
        inv.get_entry(b"fileid").text_size = 1
 
1069
        chk_bytes = self.get_chk_bytes()
 
1070
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1071
        self.assertTrue(chk_inv.has_id(b'fileid'))
 
1072
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
1073
 
 
1074
    def test_has_id_not(self):
 
1075
        inv = Inventory()
 
1076
        inv.revision_id = b"revid"
 
1077
        inv.root.revision = b"rootrev"
 
1078
        chk_bytes = self.get_chk_bytes()
 
1079
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1080
        self.assertFalse(chk_inv.has_id(b'fileid'))
 
1081
 
 
1082
    def test_id2path(self):
 
1083
        inv = Inventory()
 
1084
        inv.revision_id = b"revid"
 
1085
        inv.root.revision = b"rootrev"
 
1086
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1087
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1088
        inv.add(direntry)
 
1089
        inv.add(fileentry)
 
1090
        inv.get_entry(b"fileid").revision = b"filerev"
 
1091
        inv.get_entry(b"fileid").executable = True
 
1092
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1093
        inv.get_entry(b"fileid").text_size = 1
 
1094
        inv.get_entry(b"dirid").revision = b"filerev"
 
1095
        chk_bytes = self.get_chk_bytes()
 
1096
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1097
        bytes = b''.join(chk_inv.to_lines())
 
1098
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1099
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
1100
        self.assertEqual('dir', new_inv.id2path(b'dirid'))
 
1101
        self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
 
1102
 
 
1103
    def test_path2id(self):
 
1104
        inv = Inventory()
 
1105
        inv.revision_id = b"revid"
 
1106
        inv.root.revision = b"rootrev"
 
1107
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1108
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1109
        inv.add(direntry)
 
1110
        inv.add(fileentry)
 
1111
        inv.get_entry(b"fileid").revision = b"filerev"
 
1112
        inv.get_entry(b"fileid").executable = True
 
1113
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1114
        inv.get_entry(b"fileid").text_size = 1
 
1115
        inv.get_entry(b"dirid").revision = b"filerev"
 
1116
        chk_bytes = self.get_chk_bytes()
 
1117
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1118
        bytes = b''.join(chk_inv.to_lines())
 
1119
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1120
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
1121
        self.assertEqual(b'dirid', new_inv.path2id('dir'))
 
1122
        self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
 
1123
 
 
1124
    def test_create_by_apply_delta_sets_root(self):
 
1125
        inv = Inventory()
 
1126
        inv.root.revision = b"myrootrev"
 
1127
        inv.revision_id = b"revid"
 
1128
        chk_bytes = self.get_chk_bytes()
 
1129
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1130
        inv.add_path("", "directory", b"myrootid", None)
 
1131
        inv.revision_id = b"expectedid"
 
1132
        inv.root.revision = b"myrootrev"
 
1133
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1134
        delta = [("", None, base_inv.root.file_id, None),
 
1135
                 (None, "", b"myrootid", inv.root)]
 
1136
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1137
        self.assertEqual(reference_inv.root, new_inv.root)
 
1138
 
 
1139
    def test_create_by_apply_delta_empty_add_child(self):
 
1140
        inv = Inventory()
 
1141
        inv.revision_id = b"revid"
 
1142
        inv.root.revision = b"rootrev"
 
1143
        chk_bytes = self.get_chk_bytes()
 
1144
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1145
        a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
 
1146
        a_entry.revision = b"filerev"
 
1147
        a_entry.executable = True
 
1148
        a_entry.text_sha1 = b"ffff"
 
1149
        a_entry.text_size = 1
 
1150
        inv.add(a_entry)
 
1151
        inv.revision_id = b"expectedid"
 
1152
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1153
        delta = [(None, "A", b"A-id", a_entry)]
 
1154
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1155
        # new_inv should be the same as reference_inv.
 
1156
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1157
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1158
        reference_inv.id_to_entry._ensure_root()
 
1159
        new_inv.id_to_entry._ensure_root()
 
1160
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1161
                         new_inv.id_to_entry._root_node._key)
 
1162
 
 
1163
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1164
        inv = Inventory()
 
1165
        inv.revision_id = b"revid"
 
1166
        inv.root.revision = b"rootrev"
 
1167
        chk_bytes = self.get_chk_bytes()
 
1168
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1169
        a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
 
1170
        a_entry.revision = b"filerev"
 
1171
        a_entry.executable = True
 
1172
        a_entry.text_sha1 = b"ffff"
 
1173
        a_entry.text_size = 1
 
1174
        inv.add(a_entry)
 
1175
        inv.revision_id = b"expectedid"
 
1176
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1177
        delta = [(None, "A", b"A-id", a_entry)]
 
1178
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1179
        reference_inv.id_to_entry._ensure_root()
 
1180
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1181
        new_inv.id_to_entry._ensure_root()
 
1182
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1183
        # new_inv should be the same as reference_inv.
 
1184
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1185
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1186
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1187
                         new_inv.id_to_entry._root_node._key)
 
1188
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1189
                         new_inv.parent_id_basename_to_file_id._root_node._key)
 
1190
 
 
1191
    def test_iter_changes(self):
 
1192
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1193
        # InterTree are coming.
 
1194
        inv = Inventory()
 
1195
        inv.revision_id = b"revid"
 
1196
        inv.root.revision = b"rootrev"
 
1197
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1198
        inv.get_entry(b"fileid").revision = b"filerev"
 
1199
        inv.get_entry(b"fileid").executable = True
 
1200
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1201
        inv.get_entry(b"fileid").text_size = 1
 
1202
        inv2 = Inventory()
 
1203
        inv2.revision_id = b"revid2"
 
1204
        inv2.root.revision = b"rootrev"
 
1205
        inv2.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1206
        inv2.get_entry(b"fileid").revision = b"filerev2"
 
1207
        inv2.get_entry(b"fileid").executable = False
 
1208
        inv2.get_entry(b"fileid").text_sha1 = b"bbbb"
 
1209
        inv2.get_entry(b"fileid").text_size = 2
 
1210
        # get fresh objects.
 
1211
        chk_bytes = self.get_chk_bytes()
 
1212
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1213
        bytes = b''.join(chk_inv.to_lines())
 
1214
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1215
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1216
        bytes = b''.join(chk_inv2.to_lines())
 
1217
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid2",))
 
1218
        self.assertEqual([(b'fileid', (u'file', u'file'), True, (True, True),
 
1219
                           (b'TREE_ROOT', b'TREE_ROOT'), (u'file',
 
1220
                                                          u'file'), ('file', 'file'),
 
1221
                           (False, True))],
 
1222
                         list(inv_1.iter_changes(inv_2)))
 
1223
 
 
1224
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1225
        inv = Inventory()
 
1226
        inv.revision_id = b"revid"
 
1227
        inv.root.revision = b"rootrev"
 
1228
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1229
        inv.get_entry(b"fileid").revision = b"filerev"
 
1230
        inv.get_entry(b"fileid").executable = True
 
1231
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1232
        inv.get_entry(b"fileid").text_size = 1
 
1233
        # get fresh objects.
 
1234
        chk_bytes = self.get_chk_bytes()
 
1235
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1236
        bytes = b''.join(tmp_inv.to_lines())
 
1237
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1238
        self.assertIsInstance(
 
1239
            chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1240
        self.assertEqual(
 
1241
            {(b'', b''): b'TREE_ROOT', (b'TREE_ROOT', b'file'): b'fileid'},
 
1242
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1243
 
 
1244
    def test_file_entry_to_bytes(self):
 
1245
        inv = CHKInventory(None)
 
1246
        ie = inventory.InventoryFile(b'file-id', 'filename', b'parent-id')
 
1247
        ie.executable = True
 
1248
        ie.revision = b'file-rev-id'
 
1249
        ie.text_sha1 = b'abcdefgh'
 
1250
        ie.text_size = 100
 
1251
        bytes = inv._entry_to_bytes(ie)
 
1252
        self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
 
1253
                         b'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1254
        ie2 = inv._bytes_to_entry(bytes)
 
1255
        self.assertEqual(ie, ie2)
 
1256
        self.assertIsInstance(ie2.name, text_type)
 
1257
        self.assertEqual((b'filename', b'file-id', b'file-rev-id'),
 
1258
                         inv._bytes_to_utf8name_key(bytes))
 
1259
 
 
1260
    def test_file2_entry_to_bytes(self):
 
1261
        inv = CHKInventory(None)
 
1262
        # \u30a9 == 'omega'
 
1263
        ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
 
1264
        ie.executable = False
 
1265
        ie.revision = b'file-rev-id'
 
1266
        ie.text_sha1 = b'123456'
 
1267
        ie.text_size = 25
 
1268
        bytes = inv._entry_to_bytes(ie)
 
1269
        self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
 
1270
                         b'file-rev-id\n123456\n25\nN', bytes)
 
1271
        ie2 = inv._bytes_to_entry(bytes)
 
1272
        self.assertEqual(ie, ie2)
 
1273
        self.assertIsInstance(ie2.name, text_type)
 
1274
        self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
 
1275
                         inv._bytes_to_utf8name_key(bytes))
 
1276
 
 
1277
    def test_dir_entry_to_bytes(self):
 
1278
        inv = CHKInventory(None)
 
1279
        ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
 
1280
        ie.revision = b'dir-rev-id'
 
1281
        bytes = inv._entry_to_bytes(ie)
 
1282
        self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1283
        ie2 = inv._bytes_to_entry(bytes)
 
1284
        self.assertEqual(ie, ie2)
 
1285
        self.assertIsInstance(ie2.name, text_type)
 
1286
        self.assertEqual((b'dirname', b'dir-id', b'dir-rev-id'),
 
1287
                         inv._bytes_to_utf8name_key(bytes))
 
1288
 
 
1289
    def test_dir2_entry_to_bytes(self):
 
1290
        inv = CHKInventory(None)
 
1291
        ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
 
1292
                                          None)
 
1293
        ie.revision = b'dir-rev-id'
 
1294
        bytes = inv._entry_to_bytes(ie)
 
1295
        self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
 
1296
                         b'dir-rev-id', bytes)
 
1297
        ie2 = inv._bytes_to_entry(bytes)
 
1298
        self.assertEqual(ie, ie2)
 
1299
        self.assertIsInstance(ie2.name, text_type)
 
1300
        self.assertIs(ie2.parent_id, None)
 
1301
        self.assertEqual((b'dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
 
1302
                         inv._bytes_to_utf8name_key(bytes))
 
1303
 
 
1304
    def test_symlink_entry_to_bytes(self):
 
1305
        inv = CHKInventory(None)
 
1306
        ie = inventory.InventoryLink(b'link-id', 'linkname', b'parent-id')
 
1307
        ie.revision = b'link-rev-id'
 
1308
        ie.symlink_target = u'target/path'
 
1309
        bytes = inv._entry_to_bytes(ie)
 
1310
        self.assertEqual(b'symlink: link-id\nparent-id\nlinkname\n'
 
1311
                         b'link-rev-id\ntarget/path', bytes)
 
1312
        ie2 = inv._bytes_to_entry(bytes)
 
1313
        self.assertEqual(ie, ie2)
 
1314
        self.assertIsInstance(ie2.name, text_type)
 
1315
        self.assertIsInstance(ie2.symlink_target, text_type)
 
1316
        self.assertEqual((b'linkname', b'link-id', b'link-rev-id'),
 
1317
                         inv._bytes_to_utf8name_key(bytes))
 
1318
 
 
1319
    def test_symlink2_entry_to_bytes(self):
 
1320
        inv = CHKInventory(None)
 
1321
        ie = inventory.InventoryLink(
 
1322
            b'link-id', u'link\u03a9name', b'parent-id')
 
1323
        ie.revision = b'link-rev-id'
 
1324
        ie.symlink_target = u'target/\u03a9path'
 
1325
        bytes = inv._entry_to_bytes(ie)
 
1326
        self.assertEqual(b'symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1327
                         b'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1328
        ie2 = inv._bytes_to_entry(bytes)
 
1329
        self.assertEqual(ie, ie2)
 
1330
        self.assertIsInstance(ie2.name, text_type)
 
1331
        self.assertIsInstance(ie2.symlink_target, text_type)
 
1332
        self.assertEqual((b'link\xce\xa9name', b'link-id', b'link-rev-id'),
 
1333
                         inv._bytes_to_utf8name_key(bytes))
 
1334
 
 
1335
    def test_tree_reference_entry_to_bytes(self):
 
1336
        inv = CHKInventory(None)
 
1337
        ie = inventory.TreeReference(b'tree-root-id', u'tree\u03a9name',
 
1338
                                     b'parent-id')
 
1339
        ie.revision = b'tree-rev-id'
 
1340
        ie.reference_revision = b'ref-rev-id'
 
1341
        bytes = inv._entry_to_bytes(ie)
 
1342
        self.assertEqual(b'tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1343
                         b'tree-rev-id\nref-rev-id', bytes)
 
1344
        ie2 = inv._bytes_to_entry(bytes)
 
1345
        self.assertEqual(ie, ie2)
 
1346
        self.assertIsInstance(ie2.name, text_type)
 
1347
        self.assertEqual((b'tree\xce\xa9name', b'tree-root-id', b'tree-rev-id'),
 
1348
                         inv._bytes_to_utf8name_key(bytes))
 
1349
 
 
1350
    def make_basic_utf8_inventory(self):
 
1351
        inv = Inventory()
 
1352
        inv.revision_id = b"revid"
 
1353
        inv.root.revision = b"rootrev"
 
1354
        root_id = inv.root.file_id
 
1355
        inv.add(InventoryFile(b"fileid", u'f\xefle', root_id))
 
1356
        inv.get_entry(b"fileid").revision = b"filerev"
 
1357
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1358
        inv.get_entry(b"fileid").text_size = 0
 
1359
        inv.add(InventoryDirectory(b"dirid", u'dir-\N{EURO SIGN}', root_id))
 
1360
        inv.get_entry(b"dirid").revision = b"dirrev"
 
1361
        inv.add(InventoryFile(b"childid", u'ch\xefld', b"dirid"))
 
1362
        inv.get_entry(b"childid").revision = b"filerev"
 
1363
        inv.get_entry(b"childid").text_sha1 = b"ffff"
 
1364
        inv.get_entry(b"childid").text_size = 0
 
1365
        chk_bytes = self.get_chk_bytes()
 
1366
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1367
        bytes = b''.join(chk_inv.to_lines())
 
1368
        return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1369
 
 
1370
    def test__preload_handles_utf8(self):
 
1371
        new_inv = self.make_basic_utf8_inventory()
 
1372
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1373
        self.assertFalse(new_inv._fully_cached)
 
1374
        new_inv._preload_cache()
 
1375
        self.assertEqual(
 
1376
            sorted([new_inv.root_id, b"fileid", b"dirid", b"childid"]),
 
1377
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1378
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1379
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1380
                         sorted(ie_root._children.keys()))
 
1381
        ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
 
1382
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1383
 
 
1384
    def test__preload_populates_cache(self):
 
1385
        inv = Inventory()
 
1386
        inv.revision_id = b"revid"
 
1387
        inv.root.revision = b"rootrev"
 
1388
        root_id = inv.root.file_id
 
1389
        inv.add(InventoryFile(b"fileid", "file", root_id))
 
1390
        inv.get_entry(b"fileid").revision = b"filerev"
 
1391
        inv.get_entry(b"fileid").executable = True
 
1392
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1393
        inv.get_entry(b"fileid").text_size = 1
 
1394
        inv.add(InventoryDirectory(b"dirid", "dir", root_id))
 
1395
        inv.get_entry(b"dirid").revision = b"dirrev"
 
1396
        inv.add(InventoryFile(b"childid", "child", b"dirid"))
 
1397
        inv.get_entry(b"childid").revision = b"filerev"
 
1398
        inv.get_entry(b"childid").executable = False
 
1399
        inv.get_entry(b"childid").text_sha1 = b"dddd"
 
1400
        inv.get_entry(b"childid").text_size = 1
 
1401
        chk_bytes = self.get_chk_bytes()
 
1402
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1403
        bytes = b''.join(chk_inv.to_lines())
 
1404
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1405
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1406
        self.assertFalse(new_inv._fully_cached)
 
1407
        new_inv._preload_cache()
 
1408
        self.assertEqual(
 
1409
            sorted([root_id, b"fileid", b"dirid", b"childid"]),
 
1410
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1411
        self.assertTrue(new_inv._fully_cached)
 
1412
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1413
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1414
        ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
 
1415
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1416
 
 
1417
    def test__preload_handles_partially_evaluated_inventory(self):
 
1418
        new_inv = self.make_basic_utf8_inventory()
 
1419
        ie = new_inv.get_entry(new_inv.root_id)
 
1420
        self.assertIs(None, ie._children)
 
1421
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1422
                         sorted(ie.children.keys()))
 
1423
        # Accessing .children loads _children
 
1424
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1425
                         sorted(ie._children.keys()))
 
1426
        new_inv._preload_cache()
 
1427
        # No change
 
1428
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1429
                         sorted(ie._children.keys()))
 
1430
        ie_dir = new_inv.get_entry(b"dirid")
 
1431
        self.assertEqual([u'ch\xefld'],
 
1432
                         sorted(ie_dir._children.keys()))
 
1433
 
 
1434
    def test_filter_change_in_renamed_subfolder(self):
 
1435
        inv = Inventory(b'tree-root')
 
1436
        inv.root.revision = b'rootrev'
 
1437
        src_ie = inv.add_path('src', 'directory', b'src-id')
 
1438
        src_ie.revision = b'srcrev'
 
1439
        sub_ie = inv.add_path('src/sub/', 'directory', b'sub-id')
 
1440
        sub_ie.revision = b'subrev'
 
1441
        a_ie = inv.add_path('src/sub/a', 'file', b'a-id')
 
1442
        a_ie.revision = b'filerev'
 
1443
        a_ie.text_sha1 = osutils.sha_string(b'content\n')
 
1444
        a_ie.text_size = len(b'content\n')
 
1445
        chk_bytes = self.get_chk_bytes()
 
1446
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1447
        inv = inv.create_by_apply_delta([
 
1448
            ("src/sub/a", "src/sub/a", b"a-id", a_ie),
 
1449
            ("src", "src2", b"src-id", src_ie),
 
1450
            ], b'new-rev-2')
 
1451
        new_inv = inv.filter([b'a-id', b'src-id'])
 
1452
        self.assertEqual([
 
1453
            ('', b'tree-root'),
 
1454
            ('src', b'src-id'),
 
1455
            ('src/sub', b'sub-id'),
 
1456
            ('src/sub/a', b'a-id'),
 
1457
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1458
 
 
1459
 
 
1460
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1461
 
 
1462
    def get_chk_bytes(self):
 
1463
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1464
        trans = self.get_transport('')
 
1465
        return factory(trans)
 
1466
 
 
1467
    def make_dir(self, inv, name, parent_id, revision):
 
1468
        ie = inv.make_entry('directory', name, parent_id,
 
1469
                            name.encode('utf-8') + b'-id')
 
1470
        ie.revision = revision
 
1471
        inv.add(ie)
 
1472
 
 
1473
    def make_file(self, inv, name, parent_id, revision, content=b'content\n'):
 
1474
        ie = inv.make_entry('file', name, parent_id,
 
1475
                            name.encode('utf-8') + b'-id')
 
1476
        ie.text_sha1 = osutils.sha_string(content)
 
1477
        ie.text_size = len(content)
 
1478
        ie.revision = revision
 
1479
        inv.add(ie)
 
1480
 
 
1481
    def make_simple_inventory(self):
 
1482
        inv = Inventory(b'TREE_ROOT')
 
1483
        inv.revision_id = b"revid"
 
1484
        inv.root.revision = b"rootrev"
 
1485
        # /                 TREE_ROOT
 
1486
        # dir1/             dir1-id
 
1487
        #   sub-file1       sub-file1-id
 
1488
        #   sub-file2       sub-file2-id
 
1489
        #   sub-dir1/       sub-dir1-id
 
1490
        #     subsub-file1  subsub-file1-id
 
1491
        # dir2/             dir2-id
 
1492
        #   sub2-file1      sub2-file1-id
 
1493
        # top               top-id
 
1494
        self.make_dir(inv, 'dir1', b'TREE_ROOT', b'dirrev')
 
1495
        self.make_dir(inv, 'dir2', b'TREE_ROOT', b'dirrev')
 
1496
        self.make_dir(inv, 'sub-dir1', b'dir1-id', b'dirrev')
 
1497
        self.make_file(inv, 'top', b'TREE_ROOT', b'filerev')
 
1498
        self.make_file(inv, 'sub-file1', b'dir1-id', b'filerev')
 
1499
        self.make_file(inv, 'sub-file2', b'dir1-id', b'filerev')
 
1500
        self.make_file(inv, 'subsub-file1', b'sub-dir1-id', b'filerev')
 
1501
        self.make_file(inv, 'sub2-file1', b'dir2-id', b'filerev')
 
1502
        chk_bytes = self.get_chk_bytes()
 
1503
        #  use a small maximum_size to force internal paging structures
 
1504
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1505
                                              maximum_size=100,
 
1506
                                              search_key_name=b'hash-255-way')
 
1507
        bytes = b''.join(chk_inv.to_lines())
 
1508
        return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1509
 
 
1510
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1511
        self.assertEqual(sorted(expected_fileids),
 
1512
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1513
 
 
1514
    def assertExpand(self, all_ids, inv, file_ids):
 
1515
        (val_all_ids,
 
1516
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1517
        self.assertEqual(set(all_ids), val_all_ids)
 
1518
        entries = inv._getitems(val_all_ids)
 
1519
        expected_children = {}
 
1520
        for entry in entries:
 
1521
            s = expected_children.setdefault(entry.parent_id, [])
 
1522
            s.append(entry.file_id)
 
1523
        val_children = {
 
1524
            k: sorted(v) for k, v in val_children.items()}
 
1525
        expected_children = {
 
1526
            k: sorted(v) for k, v in expected_children.items()}
 
1527
        self.assertEqual(expected_children, val_children)
 
1528
 
 
1529
    def test_make_simple_inventory(self):
 
1530
        inv = self.make_simple_inventory()
 
1531
        layout = []
 
1532
        for path, entry in inv.iter_entries_by_dir():
 
1533
            layout.append((path, entry.file_id))
 
1534
        self.assertEqual([
 
1535
            ('', b'TREE_ROOT'),
 
1536
            ('dir1', b'dir1-id'),
 
1537
            ('dir2', b'dir2-id'),
 
1538
            ('top', b'top-id'),
 
1539
            ('dir1/sub-dir1', b'sub-dir1-id'),
 
1540
            ('dir1/sub-file1', b'sub-file1-id'),
 
1541
            ('dir1/sub-file2', b'sub-file2-id'),
 
1542
            ('dir1/sub-dir1/subsub-file1', b'subsub-file1-id'),
 
1543
            ('dir2/sub2-file1', b'sub2-file1-id'),
 
1544
            ], layout)
 
1545
 
 
1546
    def test__getitems(self):
 
1547
        inv = self.make_simple_inventory()
 
1548
        # Reading from disk
 
1549
        self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
 
1550
        self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
 
1551
        self.assertFalse(b'sub-file2-id' in inv._fileid_to_entry_cache)
 
1552
        # From cache
 
1553
        self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
 
1554
        # Mixed
 
1555
        self.assert_Getitems([b'dir1-id', b'sub-file2-id'], inv,
 
1556
                             [b'dir1-id', b'sub-file2-id'])
 
1557
        self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
 
1558
        self.assertTrue(b'sub-file2-id' in inv._fileid_to_entry_cache)
 
1559
 
 
1560
    def test_single_file(self):
 
1561
        inv = self.make_simple_inventory()
 
1562
        self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
 
1563
 
 
1564
    def test_get_all_parents(self):
 
1565
        inv = self.make_simple_inventory()
 
1566
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1567
                           b'subsub-file1-id',
 
1568
                           ], inv, [b'subsub-file1-id'])
 
1569
 
 
1570
    def test_get_children(self):
 
1571
        inv = self.make_simple_inventory()
 
1572
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1573
                           b'sub-file1-id', b'sub-file2-id', b'subsub-file1-id',
 
1574
                           ], inv, [b'dir1-id'])
 
1575
 
 
1576
    def test_from_root(self):
 
1577
        inv = self.make_simple_inventory()
 
1578
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'dir2-id', b'sub-dir1-id',
 
1579
                           b'sub-file1-id', b'sub-file2-id', b'sub2-file1-id',
 
1580
                           b'subsub-file1-id', b'top-id'], inv, [b'TREE_ROOT'])
 
1581
 
 
1582
    def test_top_level_file(self):
 
1583
        inv = self.make_simple_inventory()
 
1584
        self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
 
1585
 
 
1586
    def test_subsub_file(self):
 
1587
        inv = self.make_simple_inventory()
 
1588
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1589
                           b'subsub-file1-id'], inv, [b'subsub-file1-id'])
 
1590
 
 
1591
    def test_sub_and_root(self):
 
1592
        inv = self.make_simple_inventory()
 
1593
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id', b'top-id',
 
1594
                           b'subsub-file1-id'], inv, [b'top-id', b'subsub-file1-id'])
 
1595
 
 
1596
 
 
1597
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1598
 
 
1599
    def test_empty(self):
 
1600
        repository = self.make_repository('.')
 
1601
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1602
        inv = mutable_inventory_from_tree(tree)
 
1603
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1604
        self.assertEqual(0, len(inv))
 
1605
 
 
1606
    def test_some_files(self):
 
1607
        wt = self.make_branch_and_tree('.')
 
1608
        self.build_tree(['a'])
 
1609
        wt.add(['a'], [b'thefileid'])
 
1610
        revid = wt.commit("commit")
 
1611
        tree = wt.branch.repository.revision_tree(revid)
 
1612
        inv = mutable_inventory_from_tree(tree)
 
1613
        self.assertEqual(revid, inv.revision_id)
 
1614
        self.assertEqual(2, len(inv))
 
1615
        self.assertEqual("a", inv.get_entry(b'thefileid').name)
 
1616
        # The inventory should be mutable and independent of
 
1617
        # the original tree
 
1618
        self.assertFalse(tree.root_inventory.get_entry(
 
1619
            b'thefileid').executable)
 
1620
        inv.get_entry(b'thefileid').executable = True
 
1621
        self.assertFalse(tree.root_inventory.get_entry(
 
1622
            b'thefileid').executable)