/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_inv.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2018-11-16 18:26:22 UTC
  • mfrom: (7167.1.4 run-flake8)
  • Revision ID: breezy.the.bot@gmail.com-20181116182622-qw3gan3hz78a2imw
Add a flake8 test.

Merged from https://code.launchpad.net/~jelmer/brz/run-flake8/+merge/358902

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from .. import (
 
19
    errors,
 
20
    osutils,
 
21
    repository,
 
22
    revision,
 
23
    tests,
 
24
    workingtree,
 
25
    )
 
26
from ..sixish import text_type
 
27
from ..bzr import (
 
28
    chk_map,
 
29
    groupcompress,
 
30
    inventory,
 
31
    )
 
32
from ..bzr.inventory import (
 
33
    CHKInventory,
 
34
    Inventory,
 
35
    ROOT_ID,
 
36
    InventoryFile,
 
37
    InventoryDirectory,
 
38
    InventoryEntry,
 
39
    TreeReference,
 
40
    mutable_inventory_from_tree,
 
41
    )
 
42
from . import (
 
43
    TestCase,
 
44
    TestCaseWithTransport,
 
45
    )
 
46
from .scenarios import load_tests_apply_scenarios
 
47
 
 
48
 
 
49
load_tests = load_tests_apply_scenarios
 
50
 
 
51
 
 
52
def delta_application_scenarios():
 
53
    scenarios = [
 
54
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
55
        ]
 
56
    # Working tree basis delta application
 
57
    # Repository add_inv_by_delta.
 
58
    # Reduce form of the per_repository test logic - that logic needs to be
 
59
    # be able to get /just/ repositories whereas these tests are fine with
 
60
    # just creating trees.
 
61
    formats = set()
 
62
    for _, format in repository.format_registry.iteritems():
 
63
        if format.supports_full_versioned_files:
 
64
            scenarios.append((str(format.__name__), {
 
65
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
66
                'format':format}))
 
67
    for getter in workingtree.format_registry._get_all_lazy():
 
68
        try:
 
69
            format = getter()
 
70
            if callable(format):
 
71
                format = format()
 
72
        except ImportError:
 
73
            pass  # Format with unmet dependency
 
74
        repo_fmt = format._matchingcontroldir.repository_format
 
75
        if not repo_fmt.supports_full_versioned_files:
 
76
            continue
 
77
        scenarios.append(
 
78
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
79
            'apply_delta':apply_inventory_WT_basis,
 
80
            'format':format}))
 
81
        scenarios.append(
 
82
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
83
            'apply_delta':apply_inventory_WT,
 
84
            'format':format}))
 
85
    return scenarios
 
86
 
 
87
 
 
88
def create_texts_for_inv(repo, inv):
 
89
    for path, ie in inv.iter_entries():
 
90
        if ie.text_size:
 
91
            lines = [b'a' * ie.text_size]
 
92
        else:
 
93
            lines = []
 
94
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
95
 
 
96
 
 
97
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
98
    """Apply delta to basis and return the result.
 
99
 
 
100
    :param basis: An inventory to be used as the basis.
 
101
    :param delta: The inventory delta to apply:
 
102
    :return: An inventory resulting from the application.
 
103
    """
 
104
    basis.apply_delta(delta)
 
105
    return basis
 
106
 
 
107
 
 
108
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
109
    """Apply delta to basis and return the result.
 
110
 
 
111
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
112
 
 
113
    :param basis: An inventory to be used as the basis.
 
114
    :param delta: The inventory delta to apply:
 
115
    :return: An inventory resulting from the application.
 
116
    """
 
117
    control = self.make_controldir('tree', format=self.format._matchingcontroldir)
 
118
    control.create_repository()
 
119
    control.create_branch()
 
120
    tree = self.format.initialize(control)
 
121
    tree.lock_write()
 
122
    try:
 
123
        tree._write_inventory(basis)
 
124
    finally:
 
125
        tree.unlock()
 
126
    # Fresh object, reads disk again.
 
127
    tree = tree.controldir.open_workingtree()
 
128
    tree.lock_write()
 
129
    try:
 
130
        tree.apply_inventory_delta(delta)
 
131
    finally:
 
132
        tree.unlock()
 
133
    # reload tree - ensure we get what was written.
 
134
    tree = tree.controldir.open_workingtree()
 
135
    tree.lock_read()
 
136
    self.addCleanup(tree.unlock)
 
137
    if not invalid_delta:
 
138
        tree._validate()
 
139
    return tree.root_inventory
 
140
 
 
141
 
 
142
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
143
    repo.start_write_group()
 
144
    try:
 
145
        rev = revision.Revision(b'basis', timestamp=0, timezone=None,
 
146
            message="", committer="foo@example.com")
 
147
        basis.revision_id = b'basis'
 
148
        create_texts_for_inv(repo, basis)
 
149
        repo.add_revision(b'basis', rev, basis)
 
150
        if invalid_delta:
 
151
            # We don't want to apply the delta to the basis, because we expect
 
152
            # the delta is invalid.
 
153
            result_inv = basis
 
154
            result_inv.revision_id = b'result'
 
155
            target_entries = None
 
156
        else:
 
157
            result_inv = basis.create_by_apply_delta(delta, b'result')
 
158
            create_texts_for_inv(repo, result_inv)
 
159
            target_entries = list(result_inv.iter_entries_by_dir())
 
160
        rev = revision.Revision(b'result', timestamp=0, timezone=None,
 
161
            message="", committer="foo@example.com")
 
162
        repo.add_revision(b'result', rev, result_inv)
 
163
        repo.commit_write_group()
 
164
    except:
 
165
        repo.abort_write_group()
 
166
        raise
 
167
    return target_entries
 
168
 
 
169
 
 
170
def _get_basis_entries(tree):
 
171
    basis_tree = tree.basis_tree()
 
172
    basis_tree.lock_read()
 
173
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
174
    basis_tree.unlock()
 
175
    return basis_tree_entries
 
176
 
 
177
 
 
178
def _populate_different_tree(tree, basis, delta):
 
179
    """Put all entries into tree, but at a unique location."""
 
180
    added_ids = set()
 
181
    added_paths = set()
 
182
    tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
 
183
    for path, ie in basis.iter_entries_by_dir():
 
184
        if ie.file_id in added_ids:
 
185
            continue
 
186
        # We want a unique path for each of these, we use the file-id
 
187
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
188
        added_ids.add(ie.file_id)
 
189
    for old_path, new_path, file_id, ie in delta:
 
190
        if file_id in added_ids:
 
191
            continue
 
192
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
193
 
 
194
 
 
195
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
196
    """Apply delta to basis and return the result.
 
197
 
 
198
    This sets the parent and then calls update_basis_by_delta.
 
199
    It also puts the basis in the repository under both 'basis' and 'result' to
 
200
    allow safety checks made by the WT to succeed, and finally ensures that all
 
201
    items in the delta with a new path are present in the WT before calling
 
202
    update_basis_by_delta.
 
203
 
 
204
    :param basis: An inventory to be used as the basis.
 
205
    :param delta: The inventory delta to apply:
 
206
    :return: An inventory resulting from the application.
 
207
    """
 
208
    control = test.make_controldir('tree', format=test.format._matchingcontroldir)
 
209
    control.create_repository()
 
210
    control.create_branch()
 
211
    tree = test.format.initialize(control)
 
212
    tree.lock_write()
 
213
    try:
 
214
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
215
                                                delta, invalid_delta)
 
216
        # Set the basis state as the trees current state
 
217
        tree._write_inventory(basis)
 
218
        # This reads basis from the repo and puts it into the tree's local
 
219
        # cache, if it has one.
 
220
        tree.set_parent_ids([b'basis'])
 
221
    finally:
 
222
        tree.unlock()
 
223
    # Fresh lock, reads disk again.
 
224
    with tree.lock_write():
 
225
        tree.update_basis_by_delta(b'result', delta)
 
226
        if not invalid_delta:
 
227
            tree._validate()
 
228
    # reload tree - ensure we get what was written.
 
229
    tree = tree.controldir.open_workingtree()
 
230
    basis_tree = tree.basis_tree()
 
231
    basis_tree.lock_read()
 
232
    test.addCleanup(basis_tree.unlock)
 
233
    basis_inv = basis_tree.root_inventory
 
234
    if target_entries:
 
235
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
236
        test.assertEqual(target_entries, basis_entries)
 
237
    return basis_inv
 
238
 
 
239
 
 
240
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
241
                                                      invalid_delta=True):
 
242
    """Apply delta to basis and return the result.
 
243
    
 
244
    This inserts basis as a whole inventory and then uses
 
245
    add_inventory_by_delta to add delta.
 
246
 
 
247
    :param basis: An inventory to be used as the basis.
 
248
    :param delta: The inventory delta to apply:
 
249
    :return: An inventory resulting from the application.
 
250
    """
 
251
    format = self.format()
 
252
    control = self.make_controldir('tree', format=format._matchingcontroldir)
 
253
    repo = format.initialize(control)
 
254
    with repo.lock_write():
 
255
        repo.start_write_group()
 
256
        try:
 
257
            rev = revision.Revision(b'basis', timestamp=0, timezone=None,
 
258
                message="", committer="foo@example.com")
 
259
            basis.revision_id = b'basis'
 
260
            create_texts_for_inv(repo, basis)
 
261
            repo.add_revision(b'basis', rev, basis)
 
262
            repo.commit_write_group()
 
263
        except:
 
264
            repo.abort_write_group()
 
265
            raise
 
266
    with repo.lock_write():
 
267
        repo.start_write_group()
 
268
        try:
 
269
            inv_sha1 = repo.add_inventory_by_delta(b'basis', delta,
 
270
                b'result', [b'basis'])
 
271
        except:
 
272
            repo.abort_write_group()
 
273
            raise
 
274
        else:
 
275
            repo.commit_write_group()
 
276
    # Fresh lock, reads disk again.
 
277
    repo = repo.controldir.open_repository()
 
278
    repo.lock_read()
 
279
    self.addCleanup(repo.unlock)
 
280
    return repo.get_inventory(b'result')
 
281
 
 
282
 
 
283
class TestInventoryUpdates(TestCase):
 
284
 
 
285
    def test_creation_from_root_id(self):
 
286
        # iff a root id is passed to the constructor, a root directory is made
 
287
        inv = inventory.Inventory(root_id=b'tree-root')
 
288
        self.assertNotEqual(None, inv.root)
 
289
        self.assertEqual(b'tree-root', inv.root.file_id)
 
290
 
 
291
    def test_add_path_of_root(self):
 
292
        # if no root id is given at creation time, there is no root directory
 
293
        inv = inventory.Inventory(root_id=None)
 
294
        self.assertIs(None, inv.root)
 
295
        # add a root entry by adding its path
 
296
        ie = inv.add_path(u"", "directory", b"my-root")
 
297
        ie.revision = b'test-rev'
 
298
        self.assertEqual(b"my-root", ie.file_id)
 
299
        self.assertIs(ie, inv.root)
 
300
 
 
301
    def test_add_path(self):
 
302
        inv = inventory.Inventory(root_id=b'tree_root')
 
303
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
304
        self.assertEqual(b'hello-id', ie.file_id)
 
305
        self.assertEqual('file', ie.kind)
 
306
 
 
307
    def test_copy(self):
 
308
        """Make sure copy() works and creates a deep copy."""
 
309
        inv = inventory.Inventory(root_id=b'some-tree-root')
 
310
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
311
        inv2 = inv.copy()
 
312
        inv.root.file_id = b'some-new-root'
 
313
        ie.name = u'file2'
 
314
        self.assertEqual(b'some-tree-root', inv2.root.file_id)
 
315
        self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
 
316
 
 
317
    def test_copy_empty(self):
 
318
        """Make sure an empty inventory can be copied."""
 
319
        inv = inventory.Inventory(root_id=None)
 
320
        inv2 = inv.copy()
 
321
        self.assertIs(None, inv2.root)
 
322
 
 
323
    def test_copy_copies_root_revision(self):
 
324
        """Make sure the revision of the root gets copied."""
 
325
        inv = inventory.Inventory(root_id=b'someroot')
 
326
        inv.root.revision = b'therev'
 
327
        inv2 = inv.copy()
 
328
        self.assertEqual(b'someroot', inv2.root.file_id)
 
329
        self.assertEqual(b'therev', inv2.root.revision)
 
330
 
 
331
    def test_create_tree_reference(self):
 
332
        inv = inventory.Inventory(b'tree-root-123')
 
333
        inv.add(TreeReference(
 
334
            b'nested-id', 'nested', parent_id=b'tree-root-123',
 
335
            revision=b'rev', reference_revision=b'rev2'))
 
336
 
 
337
    def test_error_encoding(self):
 
338
        inv = inventory.Inventory(b'tree-root')
 
339
        inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
 
340
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
341
            InventoryFile(b'b-id', u'\u1234', b'tree-root'))
 
342
        self.assertContainsRe(str(e), '\\u1234')
 
343
 
 
344
    def test_add_recursive(self):
 
345
        parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
 
346
        child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
 
347
        parent.children[child.file_id] = child
 
348
        inv = inventory.Inventory(b'tree-root')
 
349
        inv.add(parent)
 
350
        self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
 
351
 
 
352
 
 
353
 
 
354
class TestDeltaApplication(TestCaseWithTransport):
 
355
 
 
356
    scenarios = delta_application_scenarios()
 
357
 
 
358
    def get_empty_inventory(self, reference_inv=None):
 
359
        """Get an empty inventory.
 
360
 
 
361
        Note that tests should not depend on the revision of the root for
 
362
        setting up test conditions, as it has to be flexible to accomodate non
 
363
        rich root repositories.
 
364
 
 
365
        :param reference_inv: If not None, get the revision for the root from
 
366
            this inventory. This is useful for dealing with older repositories
 
367
            that routinely discarded the root entry data. If None, the root's
 
368
            revision is set to 'basis'.
 
369
        """
 
370
        inv = inventory.Inventory()
 
371
        if reference_inv is not None:
 
372
            inv.root.revision = reference_inv.root.revision
 
373
        else:
 
374
            inv.root.revision = b'basis'
 
375
        return inv
 
376
 
 
377
    def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
 
378
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
379
        ie_file.revision = b'result'
 
380
        ie_file.text_size = 0
 
381
        ie_file.text_sha1 = b''
 
382
        return ie_file
 
383
 
 
384
    def test_empty_delta(self):
 
385
        inv = self.get_empty_inventory()
 
386
        delta = []
 
387
        inv = self.apply_delta(self, inv, delta)
 
388
        inv2 = self.get_empty_inventory(inv)
 
389
        self.assertEqual([], inv2._make_delta(inv))
 
390
 
 
391
    def test_None_file_id(self):
 
392
        inv = self.get_empty_inventory()
 
393
        dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
 
394
        dir1.file_id = None
 
395
        dir1.revision = b'result'
 
396
        delta = [(None, u'dir1', None, dir1)]
 
397
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
398
            inv, delta)
 
399
 
 
400
    def test_unicode_file_id(self):
 
401
        inv = self.get_empty_inventory()
 
402
        dir1 = inventory.InventoryDirectory(b'dirid', 'dir1', inv.root.file_id)
 
403
        dir1.file_id = u'dirid'
 
404
        dir1.revision = b'result'
 
405
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
406
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
407
            inv, delta)
 
408
 
 
409
    def test_repeated_file_id(self):
 
410
        inv = self.get_empty_inventory()
 
411
        file1 = inventory.InventoryFile(b'id', 'path1', inv.root.file_id)
 
412
        file1.revision = b'result'
 
413
        file1.text_size = 0
 
414
        file1.text_sha1 = b""
 
415
        file2 = file1.copy()
 
416
        file2.name = 'path2'
 
417
        delta = [(None, u'path1', b'id', file1), (None, u'path2', b'id', file2)]
 
418
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
419
            inv, delta)
 
420
 
 
421
    def test_repeated_new_path(self):
 
422
        inv = self.get_empty_inventory()
 
423
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
424
        file1.revision = b'result'
 
425
        file1.text_size = 0
 
426
        file1.text_sha1 = b""
 
427
        file2 = file1.copy()
 
428
        file2.file_id = b'id2'
 
429
        delta = [(None, u'path', b'id1', file1), (None, u'path', b'id2', file2)]
 
430
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
431
            inv, delta)
 
432
 
 
433
    def test_repeated_old_path(self):
 
434
        inv = self.get_empty_inventory()
 
435
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
436
        file1.revision = b'result'
 
437
        file1.text_size = 0
 
438
        file1.text_sha1 = b""
 
439
        # We can't *create* a source inventory with the same path, but
 
440
        # a badly generated partial delta might claim the same source twice.
 
441
        # This would be buggy in two ways: the path is repeated in the delta,
 
442
        # And the path for one of the file ids doesn't match the source
 
443
        # location. Alternatively, we could have a repeated fileid, but that
 
444
        # is separately checked for.
 
445
        file2 = inventory.InventoryFile(b'id2', 'path2', inv.root.file_id)
 
446
        file2.revision = b'result'
 
447
        file2.text_size = 0
 
448
        file2.text_sha1 = b""
 
449
        inv.add(file1)
 
450
        inv.add(file2)
 
451
        delta = [(u'path', None, b'id1', None), (u'path', None, b'id2', None)]
 
452
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
453
            inv, delta)
 
454
 
 
455
    def test_mismatched_id_entry_id(self):
 
456
        inv = self.get_empty_inventory()
 
457
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
458
        file1.revision = b'result'
 
459
        file1.text_size = 0
 
460
        file1.text_sha1 = b""
 
461
        delta = [(None, u'path', b'id', file1)]
 
462
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
463
            inv, delta)
 
464
 
 
465
    def test_mismatched_new_path_entry_None(self):
 
466
        inv = self.get_empty_inventory()
 
467
        delta = [(None, u'path', b'id', None)]
 
468
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
469
            inv, delta)
 
470
 
 
471
    def test_mismatched_new_path_None_entry(self):
 
472
        inv = self.get_empty_inventory()
 
473
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
474
        file1.revision = b'result'
 
475
        file1.text_size = 0
 
476
        file1.text_sha1 = b""
 
477
        delta = [(u"path", None, b'id1', file1)]
 
478
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
479
            inv, delta)
 
480
 
 
481
    def test_parent_is_not_directory(self):
 
482
        inv = self.get_empty_inventory()
 
483
        file1 = inventory.InventoryFile(b'id1', 'path', inv.root.file_id)
 
484
        file1.revision = b'result'
 
485
        file1.text_size = 0
 
486
        file1.text_sha1 = b""
 
487
        file2 = inventory.InventoryFile(b'id2', 'path2', b'id1')
 
488
        file2.revision = b'result'
 
489
        file2.text_size = 0
 
490
        file2.text_sha1 = b""
 
491
        inv.add(file1)
 
492
        delta = [(None, u'path/path2', b'id2', file2)]
 
493
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
494
            inv, delta)
 
495
 
 
496
    def test_parent_is_missing(self):
 
497
        inv = self.get_empty_inventory()
 
498
        file2 = inventory.InventoryFile(b'id2', 'path2', b'missingparent')
 
499
        file2.revision = b'result'
 
500
        file2.text_size = 0
 
501
        file2.text_sha1 = b""
 
502
        delta = [(None, u'path/path2', b'id2', file2)]
 
503
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
504
            inv, delta)
 
505
 
 
506
    def test_new_parent_path_has_wrong_id(self):
 
507
        inv = self.get_empty_inventory()
 
508
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
509
        parent1.revision = b'result'
 
510
        parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
 
511
        parent2.revision = b'result'
 
512
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
513
        file1.revision = b'result'
 
514
        file1.text_size = 0
 
515
        file1.text_sha1 = b""
 
516
        inv.add(parent1)
 
517
        inv.add(parent2)
 
518
        # This delta claims that file1 is at dir/path, but actually its at
 
519
        # dir2/path if you follow the inventory parent structure.
 
520
        delta = [(None, u'dir/path', b'id', file1)]
 
521
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
522
            inv, delta)
 
523
 
 
524
    def test_old_parent_path_is_wrong(self):
 
525
        inv = self.get_empty_inventory()
 
526
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
527
        parent1.revision = b'result'
 
528
        parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
 
529
        parent2.revision = b'result'
 
530
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
531
        file1.revision = b'result'
 
532
        file1.text_size = 0
 
533
        file1.text_sha1 = b""
 
534
        inv.add(parent1)
 
535
        inv.add(parent2)
 
536
        inv.add(file1)
 
537
        # This delta claims that file1 was at dir/path, but actually it was at
 
538
        # dir2/path if you follow the inventory parent structure.
 
539
        delta = [(u'dir/path', None, b'id', None)]
 
540
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
541
            inv, delta)
 
542
 
 
543
    def test_old_parent_path_is_for_other_id(self):
 
544
        inv = self.get_empty_inventory()
 
545
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir', inv.root.file_id)
 
546
        parent1.revision = b'result'
 
547
        parent2 = inventory.InventoryDirectory(b'p-2', 'dir2', inv.root.file_id)
 
548
        parent2.revision = b'result'
 
549
        file1 = inventory.InventoryFile(b'id', 'path', b'p-2')
 
550
        file1.revision = b'result'
 
551
        file1.text_size = 0
 
552
        file1.text_sha1 = b""
 
553
        file2 = inventory.InventoryFile(b'id2', 'path', b'p-1')
 
554
        file2.revision = b'result'
 
555
        file2.text_size = 0
 
556
        file2.text_sha1 = b""
 
557
        inv.add(parent1)
 
558
        inv.add(parent2)
 
559
        inv.add(file1)
 
560
        inv.add(file2)
 
561
        # This delta claims that file1 was at dir/path, but actually it was at
 
562
        # dir2/path if you follow the inventory parent structure. At dir/path
 
563
        # is another entry we should not delete.
 
564
        delta = [(u'dir/path', None, b'id', None)]
 
565
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
566
            inv, delta)
 
567
 
 
568
    def test_add_existing_id_new_path(self):
 
569
        inv = self.get_empty_inventory()
 
570
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
 
571
        parent1.revision = b'result'
 
572
        parent2 = inventory.InventoryDirectory(b'p-1', 'dir2', inv.root.file_id)
 
573
        parent2.revision = b'result'
 
574
        inv.add(parent1)
 
575
        delta = [(None, u'dir2', b'p-1', parent2)]
 
576
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
577
            inv, delta)
 
578
 
 
579
    def test_add_new_id_existing_path(self):
 
580
        inv = self.get_empty_inventory()
 
581
        parent1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
 
582
        parent1.revision = b'result'
 
583
        parent2 = inventory.InventoryDirectory(b'p-2', 'dir1', inv.root.file_id)
 
584
        parent2.revision = b'result'
 
585
        inv.add(parent1)
 
586
        delta = [(None, u'dir1', b'p-2', parent2)]
 
587
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
588
            inv, delta)
 
589
 
 
590
    def test_remove_dir_leaving_dangling_child(self):
 
591
        inv = self.get_empty_inventory()
 
592
        dir1 = inventory.InventoryDirectory(b'p-1', 'dir1', inv.root.file_id)
 
593
        dir1.revision = b'result'
 
594
        dir2 = inventory.InventoryDirectory(b'p-2', 'child1', b'p-1')
 
595
        dir2.revision = b'result'
 
596
        dir3 = inventory.InventoryDirectory(b'p-3', 'child2', b'p-1')
 
597
        dir3.revision = b'result'
 
598
        inv.add(dir1)
 
599
        inv.add(dir2)
 
600
        inv.add(dir3)
 
601
        delta = [(u'dir1', None, b'p-1', None),
 
602
            (u'dir1/child2', None, b'p-3', None)]
 
603
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
604
            inv, delta)
 
605
 
 
606
    def test_add_file(self):
 
607
        inv = self.get_empty_inventory()
 
608
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
609
        file1.revision = b'result'
 
610
        file1.text_size = 0
 
611
        file1.text_sha1 = b''
 
612
        delta = [(None, u'path', b'file-id', file1)]
 
613
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
614
        self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
 
615
 
 
616
    def test_remove_file(self):
 
617
        inv = self.get_empty_inventory()
 
618
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
619
        file1.revision = b'result'
 
620
        file1.text_size = 0
 
621
        file1.text_sha1 = b''
 
622
        inv.add(file1)
 
623
        delta = [(u'path', None, b'file-id', None)]
 
624
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
625
        self.assertEqual(None, res_inv.path2id('path'))
 
626
        self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
 
627
 
 
628
    def test_rename_file(self):
 
629
        inv = self.get_empty_inventory()
 
630
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
631
        inv.add(file1)
 
632
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
633
        delta = [(u'path', 'path2', b'file-id', file2)]
 
634
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
635
        self.assertEqual(None, res_inv.path2id('path'))
 
636
        self.assertEqual(b'file-id', res_inv.path2id('path2'))
 
637
 
 
638
    def test_replaced_at_new_path(self):
 
639
        inv = self.get_empty_inventory()
 
640
        file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
 
641
        inv.add(file1)
 
642
        file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
 
643
        delta = [(u'name', None, b'id1', None),
 
644
                 (None, u'name', b'id2', file2)]
 
645
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
646
        self.assertEqual(b'id2', res_inv.path2id('name'))
 
647
 
 
648
    def test_rename_dir(self):
 
649
        inv = self.get_empty_inventory()
 
650
        dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
 
651
        dir1.revision = b'basis'
 
652
        file1 = self.make_file_ie(parent_id=b'dir-id')
 
653
        inv.add(dir1)
 
654
        inv.add(file1)
 
655
        dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
 
656
        dir2.revision = b'result'
 
657
        delta = [('dir1', 'dir2', b'dir-id', dir2)]
 
658
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
659
        # The file should be accessible under the new path
 
660
        self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
 
661
 
 
662
    def test_renamed_dir_with_renamed_child(self):
 
663
        inv = self.get_empty_inventory()
 
664
        dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
 
665
        dir1.revision = b'basis'
 
666
        file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
 
667
        file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
 
668
        inv.add(dir1)
 
669
        inv.add(file1)
 
670
        inv.add(file2)
 
671
        dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
 
672
        dir2.revision = b'result'
 
673
        file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
 
674
        delta = [('dir1', 'dir2', b'dir-id', dir2),
 
675
                 ('dir1/name2', 'name2', b'file-id-2', file2b)]
 
676
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
677
        # The file should be accessible under the new path
 
678
        self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
 
679
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
680
        self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
 
681
 
 
682
    def test_is_root(self):
 
683
        """Ensure our root-checking code is accurate."""
 
684
        inv = inventory.Inventory(b'TREE_ROOT')
 
685
        self.assertTrue(inv.is_root(b'TREE_ROOT'))
 
686
        self.assertFalse(inv.is_root(b'booga'))
 
687
        inv.root.file_id = b'booga'
 
688
        self.assertFalse(inv.is_root(b'TREE_ROOT'))
 
689
        self.assertTrue(inv.is_root(b'booga'))
 
690
        # works properly even if no root is set
 
691
        inv.root = None
 
692
        self.assertFalse(inv.is_root(b'TREE_ROOT'))
 
693
        self.assertFalse(inv.is_root(b'booga'))
 
694
 
 
695
    def test_entries_for_empty_inventory(self):
 
696
        """Test that entries() will not fail for an empty inventory"""
 
697
        inv = Inventory(root_id=None)
 
698
        self.assertEqual([], inv.entries())
 
699
 
 
700
 
 
701
class TestInventoryEntry(TestCase):
 
702
 
 
703
    def test_file_invalid_entry_name(self):
 
704
        self.assertRaises(errors.InvalidEntryName, inventory.InventoryFile,
 
705
            b'123', 'a/hello.c', ROOT_ID)
 
706
 
 
707
    def test_file_backslash(self):
 
708
        file = inventory.InventoryFile(b'123', 'h\\ello.c', ROOT_ID)
 
709
        self.assertEquals(file.name, 'h\\ello.c')
 
710
 
 
711
    def test_file_kind_character(self):
 
712
        file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
713
        self.assertEqual(file.kind_character(), '')
 
714
 
 
715
    def test_dir_kind_character(self):
 
716
        dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
717
        self.assertEqual(dir.kind_character(), '/')
 
718
 
 
719
    def test_link_kind_character(self):
 
720
        dir = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
721
        self.assertEqual(dir.kind_character(), '')
 
722
 
 
723
    def test_link_kind_character(self):
 
724
        dir = TreeReference(b'123', 'hello.c', ROOT_ID)
 
725
        self.assertEqual(dir.kind_character(), '+')
 
726
 
 
727
    def test_dir_detect_changes(self):
 
728
        left = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
729
        right = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
730
        self.assertEqual((False, False), left.detect_changes(right))
 
731
        self.assertEqual((False, False), right.detect_changes(left))
 
732
 
 
733
    def test_file_detect_changes(self):
 
734
        left = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
735
        left.text_sha1 = 123
 
736
        right = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
737
        right.text_sha1 = 123
 
738
        self.assertEqual((False, False), left.detect_changes(right))
 
739
        self.assertEqual((False, False), right.detect_changes(left))
 
740
        left.executable = True
 
741
        self.assertEqual((False, True), left.detect_changes(right))
 
742
        self.assertEqual((False, True), right.detect_changes(left))
 
743
        right.text_sha1 = 321
 
744
        self.assertEqual((True, True), left.detect_changes(right))
 
745
        self.assertEqual((True, True), right.detect_changes(left))
 
746
 
 
747
    def test_symlink_detect_changes(self):
 
748
        left = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
749
        left.symlink_target='foo'
 
750
        right = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
751
        right.symlink_target='foo'
 
752
        self.assertEqual((False, False), left.detect_changes(right))
 
753
        self.assertEqual((False, False), right.detect_changes(left))
 
754
        left.symlink_target = 'different'
 
755
        self.assertEqual((True, False), left.detect_changes(right))
 
756
        self.assertEqual((True, False), right.detect_changes(left))
 
757
 
 
758
    def test_file_has_text(self):
 
759
        file = inventory.InventoryFile(b'123', 'hello.c', ROOT_ID)
 
760
        self.assertTrue(file.has_text())
 
761
 
 
762
    def test_directory_has_text(self):
 
763
        dir = inventory.InventoryDirectory(b'123', 'hello.c', ROOT_ID)
 
764
        self.assertFalse(dir.has_text())
 
765
 
 
766
    def test_link_has_text(self):
 
767
        link = inventory.InventoryLink(b'123', 'hello.c', ROOT_ID)
 
768
        self.assertFalse(link.has_text())
 
769
 
 
770
    def test_make_entry(self):
 
771
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
772
            inventory.InventoryFile)
 
773
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
774
            inventory.InventoryLink)
 
775
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
776
            inventory.InventoryDirectory)
 
777
 
 
778
    def test_make_entry_non_normalized(self):
 
779
        orig_normalized_filename = osutils.normalized_filename
 
780
 
 
781
        try:
 
782
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
783
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
784
            self.assertEqual(u'\xe5', entry.name)
 
785
            self.assertIsInstance(entry, inventory.InventoryFile)
 
786
 
 
787
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
788
            self.assertRaises(errors.InvalidNormalization,
 
789
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
790
        finally:
 
791
            osutils.normalized_filename = orig_normalized_filename
 
792
 
 
793
 
 
794
class TestDescribeChanges(TestCase):
 
795
 
 
796
    def test_describe_change(self):
 
797
        # we need to test the following change combinations:
 
798
        # rename
 
799
        # reparent
 
800
        # modify
 
801
        # gone
 
802
        # added
 
803
        # renamed/reparented and modified
 
804
        # change kind (perhaps can't be done yet?)
 
805
        # also, merged in combination with all of these?
 
806
        old_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
 
807
        old_a.text_sha1 = b'123132'
 
808
        old_a.text_size = 0
 
809
        new_a = InventoryFile(b'a-id', 'a_file', ROOT_ID)
 
810
        new_a.text_sha1 = b'123132'
 
811
        new_a.text_size = 0
 
812
 
 
813
        self.assertChangeDescription('unchanged', old_a, new_a)
 
814
 
 
815
        new_a.text_size = 10
 
816
        new_a.text_sha1 = b'abcabc'
 
817
        self.assertChangeDescription('modified', old_a, new_a)
 
818
 
 
819
        self.assertChangeDescription('added', None, new_a)
 
820
        self.assertChangeDescription('removed', old_a, None)
 
821
        # perhaps a bit questionable but seems like the most reasonable thing...
 
822
        self.assertChangeDescription('unchanged', None, None)
 
823
 
 
824
        # in this case it's both renamed and modified; show a rename and
 
825
        # modification:
 
826
        new_a.name = 'newfilename'
 
827
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
828
 
 
829
        # reparenting is 'renaming'
 
830
        new_a.name = old_a.name
 
831
        new_a.parent_id = b'somedir-id'
 
832
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
833
 
 
834
        # reset the content values so its not modified
 
835
        new_a.text_size = old_a.text_size
 
836
        new_a.text_sha1 = old_a.text_sha1
 
837
        new_a.name = old_a.name
 
838
 
 
839
        new_a.name = 'newfilename'
 
840
        self.assertChangeDescription('renamed', old_a, new_a)
 
841
 
 
842
        # reparenting is 'renaming'
 
843
        new_a.name = old_a.name
 
844
        new_a.parent_id = b'somedir-id'
 
845
        self.assertChangeDescription('renamed', old_a, new_a)
 
846
 
 
847
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
848
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
849
        self.assertEqual(expected_change, change)
 
850
 
 
851
 
 
852
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
853
 
 
854
    def get_chk_bytes(self):
 
855
        factory = groupcompress.make_pack_factory(True, True, 1)
 
856
        trans = self.get_transport('')
 
857
        return factory(trans)
 
858
 
 
859
    def read_bytes(self, chk_bytes, key):
 
860
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
861
        return next(stream).get_bytes_as("fulltext")
 
862
 
 
863
    def test_deserialise_gives_CHKInventory(self):
 
864
        inv = Inventory()
 
865
        inv.revision_id = b"revid"
 
866
        inv.root.revision = b"rootrev"
 
867
        chk_bytes = self.get_chk_bytes()
 
868
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
869
        bytes = b''.join(chk_inv.to_lines())
 
870
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
871
        self.assertEqual(b"revid", new_inv.revision_id)
 
872
        self.assertEqual("directory", new_inv.root.kind)
 
873
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
874
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
875
        self.assertEqual(inv.root.name, new_inv.root.name)
 
876
        self.assertEqual(b"rootrev", new_inv.root.revision)
 
877
        self.assertEqual(b'plain', new_inv._search_key_name)
 
878
 
 
879
    def test_deserialise_wrong_revid(self):
 
880
        inv = Inventory()
 
881
        inv.revision_id = b"revid"
 
882
        inv.root.revision = b"rootrev"
 
883
        chk_bytes = self.get_chk_bytes()
 
884
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
885
        bytes = b''.join(chk_inv.to_lines())
 
886
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
887
            bytes, (b"revid2",))
 
888
 
 
889
    def test_captures_rev_root_byid(self):
 
890
        inv = Inventory()
 
891
        inv.revision_id = b"foo"
 
892
        inv.root.revision = b"bar"
 
893
        chk_bytes = self.get_chk_bytes()
 
894
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
895
        lines = chk_inv.to_lines()
 
896
        self.assertEqual([
 
897
            b'chkinventory:\n',
 
898
            b'revision_id: foo\n',
 
899
            b'root_id: TREE_ROOT\n',
 
900
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
901
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
902
            ], lines)
 
903
        chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
 
904
        self.assertEqual(b'plain', chk_inv._search_key_name)
 
905
 
 
906
    def test_captures_parent_id_basename_index(self):
 
907
        inv = Inventory()
 
908
        inv.revision_id = b"foo"
 
909
        inv.root.revision = b"bar"
 
910
        chk_bytes = self.get_chk_bytes()
 
911
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
912
        lines = chk_inv.to_lines()
 
913
        self.assertEqual([
 
914
            b'chkinventory:\n',
 
915
            b'revision_id: foo\n',
 
916
            b'root_id: TREE_ROOT\n',
 
917
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
918
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
919
            ], lines)
 
920
        chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
 
921
        self.assertEqual(b'plain', chk_inv._search_key_name)
 
922
 
 
923
    def test_captures_search_key_name(self):
 
924
        inv = Inventory()
 
925
        inv.revision_id = b"foo"
 
926
        inv.root.revision = b"bar"
 
927
        chk_bytes = self.get_chk_bytes()
 
928
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
929
                                              search_key_name=b'hash-16-way')
 
930
        lines = chk_inv.to_lines()
 
931
        self.assertEqual([
 
932
            b'chkinventory:\n',
 
933
            b'search_key_name: hash-16-way\n',
 
934
            b'root_id: TREE_ROOT\n',
 
935
            b'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
936
            b'revision_id: foo\n',
 
937
            b'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
938
            ], lines)
 
939
        chk_inv = CHKInventory.deserialise(chk_bytes, b''.join(lines), (b'foo',))
 
940
        self.assertEqual(b'hash-16-way', chk_inv._search_key_name)
 
941
 
 
942
    def test_directory_children_on_demand(self):
 
943
        inv = Inventory()
 
944
        inv.revision_id = b"revid"
 
945
        inv.root.revision = b"rootrev"
 
946
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
947
        inv.get_entry(b"fileid").revision = b"filerev"
 
948
        inv.get_entry(b"fileid").executable = True
 
949
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
950
        inv.get_entry(b"fileid").text_size = 1
 
951
        chk_bytes = self.get_chk_bytes()
 
952
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
953
        bytes = b''.join(chk_inv.to_lines())
 
954
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
955
        root_entry = new_inv.get_entry(inv.root.file_id)
 
956
        self.assertEqual(None, root_entry._children)
 
957
        self.assertEqual({'file'}, set(root_entry.children))
 
958
        file_direct = new_inv.get_entry(b"fileid")
 
959
        file_found = root_entry.children['file']
 
960
        self.assertEqual(file_direct.kind, file_found.kind)
 
961
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
962
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
963
        self.assertEqual(file_direct.name, file_found.name)
 
964
        self.assertEqual(file_direct.revision, file_found.revision)
 
965
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
966
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
967
        self.assertEqual(file_direct.executable, file_found.executable)
 
968
 
 
969
    def test_from_inventory_maximum_size(self):
 
970
        # from_inventory supports the maximum_size parameter.
 
971
        inv = Inventory()
 
972
        inv.revision_id = b"revid"
 
973
        inv.root.revision = b"rootrev"
 
974
        chk_bytes = self.get_chk_bytes()
 
975
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
976
        chk_inv.id_to_entry._ensure_root()
 
977
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
978
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
979
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
980
        p_id_basename._ensure_root()
 
981
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
982
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
983
 
 
984
    def test_iter_all_ids(self):
 
985
        inv = Inventory()
 
986
        inv.revision_id = b"revid"
 
987
        inv.root.revision = b"rootrev"
 
988
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
989
        inv.get_entry(b"fileid").revision = b"filerev"
 
990
        inv.get_entry(b"fileid").executable = True
 
991
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
992
        inv.get_entry(b"fileid").text_size = 1
 
993
        chk_bytes = self.get_chk_bytes()
 
994
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
995
        bytes = b''.join(chk_inv.to_lines())
 
996
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
997
        fileids = sorted(new_inv.iter_all_ids())
 
998
        self.assertEqual([inv.root.file_id, b"fileid"], fileids)
 
999
 
 
1000
    def test__len__(self):
 
1001
        inv = Inventory()
 
1002
        inv.revision_id = b"revid"
 
1003
        inv.root.revision = b"rootrev"
 
1004
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1005
        inv.get_entry(b"fileid").revision = b"filerev"
 
1006
        inv.get_entry(b"fileid").executable = True
 
1007
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1008
        inv.get_entry(b"fileid").text_size = 1
 
1009
        chk_bytes = self.get_chk_bytes()
 
1010
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1011
        self.assertEqual(2, len(chk_inv))
 
1012
 
 
1013
    def test_get_entry(self):
 
1014
        inv = Inventory()
 
1015
        inv.revision_id = b"revid"
 
1016
        inv.root.revision = b"rootrev"
 
1017
        inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
 
1018
        inv.get_entry(b"fileid").revision = b"filerev"
 
1019
        inv.get_entry(b"fileid").executable = True
 
1020
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1021
        inv.get_entry(b"fileid").text_size = 1
 
1022
        chk_bytes = self.get_chk_bytes()
 
1023
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1024
        data = b''.join(chk_inv.to_lines())
 
1025
        new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
 
1026
        root_entry = new_inv.get_entry(inv.root.file_id)
 
1027
        file_entry = new_inv.get_entry(b"fileid")
 
1028
        self.assertEqual("directory", root_entry.kind)
 
1029
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
1030
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
1031
        self.assertEqual(inv.root.name, root_entry.name)
 
1032
        self.assertEqual(b"rootrev", root_entry.revision)
 
1033
        self.assertEqual("file", file_entry.kind)
 
1034
        self.assertEqual(b"fileid", file_entry.file_id)
 
1035
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
1036
        self.assertEqual(u"file", file_entry.name)
 
1037
        self.assertEqual(b"filerev", file_entry.revision)
 
1038
        self.assertEqual(b"ffff", file_entry.text_sha1)
 
1039
        self.assertEqual(1, file_entry.text_size)
 
1040
        self.assertEqual(True, file_entry.executable)
 
1041
        self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
 
1042
 
 
1043
    def test_has_id_true(self):
 
1044
        inv = Inventory()
 
1045
        inv.revision_id = b"revid"
 
1046
        inv.root.revision = b"rootrev"
 
1047
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1048
        inv.get_entry(b"fileid").revision = b"filerev"
 
1049
        inv.get_entry(b"fileid").executable = True
 
1050
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1051
        inv.get_entry(b"fileid").text_size = 1
 
1052
        chk_bytes = self.get_chk_bytes()
 
1053
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1054
        self.assertTrue(chk_inv.has_id(b'fileid'))
 
1055
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
1056
 
 
1057
    def test_has_id_not(self):
 
1058
        inv = Inventory()
 
1059
        inv.revision_id = b"revid"
 
1060
        inv.root.revision = b"rootrev"
 
1061
        chk_bytes = self.get_chk_bytes()
 
1062
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1063
        self.assertFalse(chk_inv.has_id(b'fileid'))
 
1064
 
 
1065
    def test_id2path(self):
 
1066
        inv = Inventory()
 
1067
        inv.revision_id = b"revid"
 
1068
        inv.root.revision = b"rootrev"
 
1069
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1070
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1071
        inv.add(direntry)
 
1072
        inv.add(fileentry)
 
1073
        inv.get_entry(b"fileid").revision = b"filerev"
 
1074
        inv.get_entry(b"fileid").executable = True
 
1075
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1076
        inv.get_entry(b"fileid").text_size = 1
 
1077
        inv.get_entry(b"dirid").revision = b"filerev"
 
1078
        chk_bytes = self.get_chk_bytes()
 
1079
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1080
        bytes = b''.join(chk_inv.to_lines())
 
1081
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1082
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
1083
        self.assertEqual('dir', new_inv.id2path(b'dirid'))
 
1084
        self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
 
1085
 
 
1086
    def test_path2id(self):
 
1087
        inv = Inventory()
 
1088
        inv.revision_id = b"revid"
 
1089
        inv.root.revision = b"rootrev"
 
1090
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1091
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1092
        inv.add(direntry)
 
1093
        inv.add(fileentry)
 
1094
        inv.get_entry(b"fileid").revision = b"filerev"
 
1095
        inv.get_entry(b"fileid").executable = True
 
1096
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1097
        inv.get_entry(b"fileid").text_size = 1
 
1098
        inv.get_entry(b"dirid").revision = b"filerev"
 
1099
        chk_bytes = self.get_chk_bytes()
 
1100
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1101
        bytes = b''.join(chk_inv.to_lines())
 
1102
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1103
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
1104
        self.assertEqual(b'dirid', new_inv.path2id('dir'))
 
1105
        self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
 
1106
 
 
1107
    def test_create_by_apply_delta_sets_root(self):
 
1108
        inv = Inventory()
 
1109
        inv.root.revision = b"myrootrev"
 
1110
        inv.revision_id = b"revid"
 
1111
        chk_bytes = self.get_chk_bytes()
 
1112
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1113
        inv.add_path("", "directory", b"myrootid", None)
 
1114
        inv.revision_id = b"expectedid"
 
1115
        inv.root.revision = b"myrootrev"
 
1116
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1117
        delta = [("", None, base_inv.root.file_id, None),
 
1118
            (None, "",  b"myrootid", inv.root)]
 
1119
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1120
        self.assertEqual(reference_inv.root, new_inv.root)
 
1121
 
 
1122
    def test_create_by_apply_delta_empty_add_child(self):
 
1123
        inv = Inventory()
 
1124
        inv.revision_id = b"revid"
 
1125
        inv.root.revision = b"rootrev"
 
1126
        chk_bytes = self.get_chk_bytes()
 
1127
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1128
        a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
 
1129
        a_entry.revision = b"filerev"
 
1130
        a_entry.executable = True
 
1131
        a_entry.text_sha1 = b"ffff"
 
1132
        a_entry.text_size = 1
 
1133
        inv.add(a_entry)
 
1134
        inv.revision_id = b"expectedid"
 
1135
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1136
        delta = [(None, "A",  b"A-id", a_entry)]
 
1137
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1138
        # new_inv should be the same as reference_inv.
 
1139
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1140
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1141
        reference_inv.id_to_entry._ensure_root()
 
1142
        new_inv.id_to_entry._ensure_root()
 
1143
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1144
            new_inv.id_to_entry._root_node._key)
 
1145
 
 
1146
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1147
        inv = Inventory()
 
1148
        inv.revision_id = b"revid"
 
1149
        inv.root.revision = b"rootrev"
 
1150
        chk_bytes = self.get_chk_bytes()
 
1151
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1152
        a_entry = InventoryFile(b"A-id", "A", inv.root.file_id)
 
1153
        a_entry.revision = b"filerev"
 
1154
        a_entry.executable = True
 
1155
        a_entry.text_sha1 = b"ffff"
 
1156
        a_entry.text_size = 1
 
1157
        inv.add(a_entry)
 
1158
        inv.revision_id = b"expectedid"
 
1159
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1160
        delta = [(None, "A", b"A-id", a_entry)]
 
1161
        new_inv = base_inv.create_by_apply_delta(delta, b"expectedid")
 
1162
        reference_inv.id_to_entry._ensure_root()
 
1163
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1164
        new_inv.id_to_entry._ensure_root()
 
1165
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1166
        # new_inv should be the same as reference_inv.
 
1167
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1168
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1169
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1170
            new_inv.id_to_entry._root_node._key)
 
1171
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1172
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1173
 
 
1174
    def test_iter_changes(self):
 
1175
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1176
        # InterTree are coming.
 
1177
        inv = Inventory()
 
1178
        inv.revision_id = b"revid"
 
1179
        inv.root.revision = b"rootrev"
 
1180
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1181
        inv.get_entry(b"fileid").revision = b"filerev"
 
1182
        inv.get_entry(b"fileid").executable = True
 
1183
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1184
        inv.get_entry(b"fileid").text_size = 1
 
1185
        inv2 = Inventory()
 
1186
        inv2.revision_id = b"revid2"
 
1187
        inv2.root.revision = b"rootrev"
 
1188
        inv2.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1189
        inv2.get_entry(b"fileid").revision = b"filerev2"
 
1190
        inv2.get_entry(b"fileid").executable = False
 
1191
        inv2.get_entry(b"fileid").text_sha1 = b"bbbb"
 
1192
        inv2.get_entry(b"fileid").text_size = 2
 
1193
        # get fresh objects.
 
1194
        chk_bytes = self.get_chk_bytes()
 
1195
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1196
        bytes = b''.join(chk_inv.to_lines())
 
1197
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1198
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1199
        bytes = b''.join(chk_inv2.to_lines())
 
1200
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, (b"revid2",))
 
1201
        self.assertEqual([(b'fileid', (u'file', u'file'), True, (True, True),
 
1202
            (b'TREE_ROOT', b'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1203
            (False, True))],
 
1204
            list(inv_1.iter_changes(inv_2)))
 
1205
 
 
1206
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1207
        inv = Inventory()
 
1208
        inv.revision_id = b"revid"
 
1209
        inv.root.revision = b"rootrev"
 
1210
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1211
        inv.get_entry(b"fileid").revision = b"filerev"
 
1212
        inv.get_entry(b"fileid").executable = True
 
1213
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1214
        inv.get_entry(b"fileid").text_size = 1
 
1215
        # get fresh objects.
 
1216
        chk_bytes = self.get_chk_bytes()
 
1217
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1218
        bytes = b''.join(tmp_inv.to_lines())
 
1219
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1220
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1221
        self.assertEqual(
 
1222
            {(b'', b''): b'TREE_ROOT', (b'TREE_ROOT', b'file'): b'fileid'},
 
1223
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1224
 
 
1225
    def test_file_entry_to_bytes(self):
 
1226
        inv = CHKInventory(None)
 
1227
        ie = inventory.InventoryFile(b'file-id', 'filename', b'parent-id')
 
1228
        ie.executable = True
 
1229
        ie.revision = b'file-rev-id'
 
1230
        ie.text_sha1 = b'abcdefgh'
 
1231
        ie.text_size = 100
 
1232
        bytes = inv._entry_to_bytes(ie)
 
1233
        self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
 
1234
                         b'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1235
        ie2 = inv._bytes_to_entry(bytes)
 
1236
        self.assertEqual(ie, ie2)
 
1237
        self.assertIsInstance(ie2.name, text_type)
 
1238
        self.assertEqual((b'filename', b'file-id', b'file-rev-id'),
 
1239
                         inv._bytes_to_utf8name_key(bytes))
 
1240
 
 
1241
    def test_file2_entry_to_bytes(self):
 
1242
        inv = CHKInventory(None)
 
1243
        # \u30a9 == 'omega'
 
1244
        ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
 
1245
        ie.executable = False
 
1246
        ie.revision = b'file-rev-id'
 
1247
        ie.text_sha1 = b'123456'
 
1248
        ie.text_size = 25
 
1249
        bytes = inv._entry_to_bytes(ie)
 
1250
        self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
 
1251
                         b'file-rev-id\n123456\n25\nN', bytes)
 
1252
        ie2 = inv._bytes_to_entry(bytes)
 
1253
        self.assertEqual(ie, ie2)
 
1254
        self.assertIsInstance(ie2.name, text_type)
 
1255
        self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
 
1256
                         inv._bytes_to_utf8name_key(bytes))
 
1257
 
 
1258
    def test_dir_entry_to_bytes(self):
 
1259
        inv = CHKInventory(None)
 
1260
        ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
 
1261
        ie.revision = b'dir-rev-id'
 
1262
        bytes = inv._entry_to_bytes(ie)
 
1263
        self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1264
        ie2 = inv._bytes_to_entry(bytes)
 
1265
        self.assertEqual(ie, ie2)
 
1266
        self.assertIsInstance(ie2.name, text_type)
 
1267
        self.assertEqual((b'dirname', b'dir-id', b'dir-rev-id'),
 
1268
                         inv._bytes_to_utf8name_key(bytes))
 
1269
 
 
1270
    def test_dir2_entry_to_bytes(self):
 
1271
        inv = CHKInventory(None)
 
1272
        ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
 
1273
                                          None)
 
1274
        ie.revision = b'dir-rev-id'
 
1275
        bytes = inv._entry_to_bytes(ie)
 
1276
        self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
 
1277
                         b'dir-rev-id', bytes)
 
1278
        ie2 = inv._bytes_to_entry(bytes)
 
1279
        self.assertEqual(ie, ie2)
 
1280
        self.assertIsInstance(ie2.name, text_type)
 
1281
        self.assertIs(ie2.parent_id, None)
 
1282
        self.assertEqual((b'dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
 
1283
                         inv._bytes_to_utf8name_key(bytes))
 
1284
 
 
1285
    def test_symlink_entry_to_bytes(self):
 
1286
        inv = CHKInventory(None)
 
1287
        ie = inventory.InventoryLink(b'link-id', 'linkname', b'parent-id')
 
1288
        ie.revision = b'link-rev-id'
 
1289
        ie.symlink_target = u'target/path'
 
1290
        bytes = inv._entry_to_bytes(ie)
 
1291
        self.assertEqual(b'symlink: link-id\nparent-id\nlinkname\n'
 
1292
                         b'link-rev-id\ntarget/path', bytes)
 
1293
        ie2 = inv._bytes_to_entry(bytes)
 
1294
        self.assertEqual(ie, ie2)
 
1295
        self.assertIsInstance(ie2.name, text_type)
 
1296
        self.assertIsInstance(ie2.symlink_target, text_type)
 
1297
        self.assertEqual((b'linkname', b'link-id', b'link-rev-id'),
 
1298
                         inv._bytes_to_utf8name_key(bytes))
 
1299
 
 
1300
    def test_symlink2_entry_to_bytes(self):
 
1301
        inv = CHKInventory(None)
 
1302
        ie = inventory.InventoryLink(b'link-id', u'link\u03a9name', b'parent-id')
 
1303
        ie.revision = b'link-rev-id'
 
1304
        ie.symlink_target = u'target/\u03a9path'
 
1305
        bytes = inv._entry_to_bytes(ie)
 
1306
        self.assertEqual(b'symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1307
                         b'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1308
        ie2 = inv._bytes_to_entry(bytes)
 
1309
        self.assertEqual(ie, ie2)
 
1310
        self.assertIsInstance(ie2.name, text_type)
 
1311
        self.assertIsInstance(ie2.symlink_target, text_type)
 
1312
        self.assertEqual((b'link\xce\xa9name', b'link-id', b'link-rev-id'),
 
1313
                         inv._bytes_to_utf8name_key(bytes))
 
1314
 
 
1315
    def test_tree_reference_entry_to_bytes(self):
 
1316
        inv = CHKInventory(None)
 
1317
        ie = inventory.TreeReference(b'tree-root-id', u'tree\u03a9name',
 
1318
                                     b'parent-id')
 
1319
        ie.revision = b'tree-rev-id'
 
1320
        ie.reference_revision = b'ref-rev-id'
 
1321
        bytes = inv._entry_to_bytes(ie)
 
1322
        self.assertEqual(b'tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1323
                         b'tree-rev-id\nref-rev-id', bytes)
 
1324
        ie2 = inv._bytes_to_entry(bytes)
 
1325
        self.assertEqual(ie, ie2)
 
1326
        self.assertIsInstance(ie2.name, text_type)
 
1327
        self.assertEqual((b'tree\xce\xa9name', b'tree-root-id', b'tree-rev-id'),
 
1328
                         inv._bytes_to_utf8name_key(bytes))
 
1329
 
 
1330
    def make_basic_utf8_inventory(self):
 
1331
        inv = Inventory()
 
1332
        inv.revision_id = b"revid"
 
1333
        inv.root.revision = b"rootrev"
 
1334
        root_id = inv.root.file_id
 
1335
        inv.add(InventoryFile(b"fileid", u'f\xefle', root_id))
 
1336
        inv.get_entry(b"fileid").revision = b"filerev"
 
1337
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1338
        inv.get_entry(b"fileid").text_size = 0
 
1339
        inv.add(InventoryDirectory(b"dirid", u'dir-\N{EURO SIGN}', root_id))
 
1340
        inv.get_entry(b"dirid").revision = b"dirrev"
 
1341
        inv.add(InventoryFile(b"childid", u'ch\xefld', b"dirid"))
 
1342
        inv.get_entry(b"childid").revision = b"filerev"
 
1343
        inv.get_entry(b"childid").text_sha1 = b"ffff"
 
1344
        inv.get_entry(b"childid").text_size = 0
 
1345
        chk_bytes = self.get_chk_bytes()
 
1346
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1347
        bytes = b''.join(chk_inv.to_lines())
 
1348
        return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1349
 
 
1350
    def test__preload_handles_utf8(self):
 
1351
        new_inv = self.make_basic_utf8_inventory()
 
1352
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1353
        self.assertFalse(new_inv._fully_cached)
 
1354
        new_inv._preload_cache()
 
1355
        self.assertEqual(
 
1356
            sorted([new_inv.root_id, b"fileid", b"dirid", b"childid"]),
 
1357
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1358
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1359
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1360
                         sorted(ie_root._children.keys()))
 
1361
        ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
 
1362
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1363
 
 
1364
    def test__preload_populates_cache(self):
 
1365
        inv = Inventory()
 
1366
        inv.revision_id = b"revid"
 
1367
        inv.root.revision = b"rootrev"
 
1368
        root_id = inv.root.file_id
 
1369
        inv.add(InventoryFile(b"fileid", "file", root_id))
 
1370
        inv.get_entry(b"fileid").revision = b"filerev"
 
1371
        inv.get_entry(b"fileid").executable = True
 
1372
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1373
        inv.get_entry(b"fileid").text_size = 1
 
1374
        inv.add(InventoryDirectory(b"dirid", "dir", root_id))
 
1375
        inv.get_entry(b"dirid").revision = b"dirrev"
 
1376
        inv.add(InventoryFile(b"childid", "child", b"dirid"))
 
1377
        inv.get_entry(b"childid").revision = b"filerev"
 
1378
        inv.get_entry(b"childid").executable = False
 
1379
        inv.get_entry(b"childid").text_sha1 = b"dddd"
 
1380
        inv.get_entry(b"childid").text_size = 1
 
1381
        chk_bytes = self.get_chk_bytes()
 
1382
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1383
        bytes = b''.join(chk_inv.to_lines())
 
1384
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1385
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1386
        self.assertFalse(new_inv._fully_cached)
 
1387
        new_inv._preload_cache()
 
1388
        self.assertEqual(
 
1389
            sorted([root_id, b"fileid", b"dirid", b"childid"]),
 
1390
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1391
        self.assertTrue(new_inv._fully_cached)
 
1392
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1393
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1394
        ie_dir = new_inv._fileid_to_entry_cache[b'dirid']
 
1395
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1396
 
 
1397
    def test__preload_handles_partially_evaluated_inventory(self):
 
1398
        new_inv = self.make_basic_utf8_inventory()
 
1399
        ie = new_inv.get_entry(new_inv.root_id)
 
1400
        self.assertIs(None, ie._children)
 
1401
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1402
                         sorted(ie.children.keys()))
 
1403
        # Accessing .children loads _children
 
1404
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1405
                         sorted(ie._children.keys()))
 
1406
        new_inv._preload_cache()
 
1407
        # No change
 
1408
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1409
                         sorted(ie._children.keys()))
 
1410
        ie_dir = new_inv.get_entry(b"dirid")
 
1411
        self.assertEqual([u'ch\xefld'],
 
1412
                         sorted(ie_dir._children.keys()))
 
1413
 
 
1414
    def test_filter_change_in_renamed_subfolder(self):
 
1415
        inv = Inventory(b'tree-root')
 
1416
        inv.root.revision = b'rootrev'
 
1417
        src_ie = inv.add_path('src', 'directory', b'src-id')
 
1418
        src_ie.revision = b'srcrev'
 
1419
        sub_ie = inv.add_path('src/sub/', 'directory', b'sub-id')
 
1420
        sub_ie.revision = b'subrev'
 
1421
        a_ie = inv.add_path('src/sub/a', 'file', b'a-id')
 
1422
        a_ie.revision = b'filerev'
 
1423
        a_ie.text_sha1 = osutils.sha_string(b'content\n')
 
1424
        a_ie.text_size = len(b'content\n')
 
1425
        chk_bytes = self.get_chk_bytes()
 
1426
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1427
        inv = inv.create_by_apply_delta([
 
1428
            ("src/sub/a", "src/sub/a", b"a-id", a_ie),
 
1429
            ("src", "src2", b"src-id", src_ie),
 
1430
            ], b'new-rev-2')
 
1431
        new_inv = inv.filter([b'a-id', b'src-id'])
 
1432
        self.assertEqual([
 
1433
            ('', b'tree-root'),
 
1434
            ('src', b'src-id'),
 
1435
            ('src/sub', b'sub-id'),
 
1436
            ('src/sub/a', b'a-id'),
 
1437
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1438
 
 
1439
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1440
 
 
1441
    def get_chk_bytes(self):
 
1442
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1443
        trans = self.get_transport('')
 
1444
        return factory(trans)
 
1445
 
 
1446
    def make_dir(self, inv, name, parent_id, revision):
 
1447
        ie = inv.make_entry('directory', name, parent_id, name.encode('utf-8') + b'-id')
 
1448
        ie.revision = revision
 
1449
        inv.add(ie)
 
1450
 
 
1451
    def make_file(self, inv, name, parent_id, revision, content=b'content\n'):
 
1452
        ie = inv.make_entry('file', name, parent_id, name.encode('utf-8') + b'-id')
 
1453
        ie.text_sha1 = osutils.sha_string(content)
 
1454
        ie.text_size = len(content)
 
1455
        ie.revision = revision
 
1456
        inv.add(ie)
 
1457
 
 
1458
    def make_simple_inventory(self):
 
1459
        inv = Inventory(b'TREE_ROOT')
 
1460
        inv.revision_id = b"revid"
 
1461
        inv.root.revision = b"rootrev"
 
1462
        # /                 TREE_ROOT
 
1463
        # dir1/             dir1-id
 
1464
        #   sub-file1       sub-file1-id
 
1465
        #   sub-file2       sub-file2-id
 
1466
        #   sub-dir1/       sub-dir1-id
 
1467
        #     subsub-file1  subsub-file1-id
 
1468
        # dir2/             dir2-id
 
1469
        #   sub2-file1      sub2-file1-id
 
1470
        # top               top-id
 
1471
        self.make_dir(inv, 'dir1', b'TREE_ROOT', b'dirrev')
 
1472
        self.make_dir(inv, 'dir2', b'TREE_ROOT', b'dirrev')
 
1473
        self.make_dir(inv, 'sub-dir1', b'dir1-id', b'dirrev')
 
1474
        self.make_file(inv, 'top', b'TREE_ROOT', b'filerev')
 
1475
        self.make_file(inv, 'sub-file1', b'dir1-id', b'filerev')
 
1476
        self.make_file(inv, 'sub-file2', b'dir1-id', b'filerev')
 
1477
        self.make_file(inv, 'subsub-file1', b'sub-dir1-id', b'filerev')
 
1478
        self.make_file(inv, 'sub2-file1', b'dir2-id', b'filerev')
 
1479
        chk_bytes = self.get_chk_bytes()
 
1480
        #  use a small maximum_size to force internal paging structures
 
1481
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1482
                        maximum_size=100,
 
1483
                        search_key_name=b'hash-255-way')
 
1484
        bytes = b''.join(chk_inv.to_lines())
 
1485
        return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1486
 
 
1487
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1488
        self.assertEqual(sorted(expected_fileids),
 
1489
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1490
 
 
1491
    def assertExpand(self, all_ids, inv, file_ids):
 
1492
        (val_all_ids,
 
1493
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1494
        self.assertEqual(set(all_ids), val_all_ids)
 
1495
        entries = inv._getitems(val_all_ids)
 
1496
        expected_children = {}
 
1497
        for entry in entries:
 
1498
            s = expected_children.setdefault(entry.parent_id, [])
 
1499
            s.append(entry.file_id)
 
1500
        val_children = {
 
1501
            k: sorted(v) for k, v in val_children.items()}
 
1502
        expected_children = {
 
1503
            k: sorted(v) for k, v in expected_children.items()}
 
1504
        self.assertEqual(expected_children, val_children)
 
1505
 
 
1506
    def test_make_simple_inventory(self):
 
1507
        inv = self.make_simple_inventory()
 
1508
        layout = []
 
1509
        for path, entry in inv.iter_entries_by_dir():
 
1510
            layout.append((path, entry.file_id))
 
1511
        self.assertEqual([
 
1512
            ('', b'TREE_ROOT'),
 
1513
            ('dir1', b'dir1-id'),
 
1514
            ('dir2', b'dir2-id'),
 
1515
            ('top', b'top-id'),
 
1516
            ('dir1/sub-dir1', b'sub-dir1-id'),
 
1517
            ('dir1/sub-file1', b'sub-file1-id'),
 
1518
            ('dir1/sub-file2', b'sub-file2-id'),
 
1519
            ('dir1/sub-dir1/subsub-file1', b'subsub-file1-id'),
 
1520
            ('dir2/sub2-file1', b'sub2-file1-id'),
 
1521
            ], layout)
 
1522
 
 
1523
    def test__getitems(self):
 
1524
        inv = self.make_simple_inventory()
 
1525
        # Reading from disk
 
1526
        self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
 
1527
        self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
 
1528
        self.assertFalse(b'sub-file2-id' in inv._fileid_to_entry_cache)
 
1529
        # From cache
 
1530
        self.assert_Getitems([b'dir1-id'], inv, [b'dir1-id'])
 
1531
        # Mixed
 
1532
        self.assert_Getitems([b'dir1-id', b'sub-file2-id'], inv,
 
1533
                             [b'dir1-id', b'sub-file2-id'])
 
1534
        self.assertTrue(b'dir1-id' in inv._fileid_to_entry_cache)
 
1535
        self.assertTrue(b'sub-file2-id' in inv._fileid_to_entry_cache)
 
1536
 
 
1537
    def test_single_file(self):
 
1538
        inv = self.make_simple_inventory()
 
1539
        self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
 
1540
 
 
1541
    def test_get_all_parents(self):
 
1542
        inv = self.make_simple_inventory()
 
1543
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1544
                           b'subsub-file1-id',
 
1545
                          ], inv, [b'subsub-file1-id'])
 
1546
 
 
1547
    def test_get_children(self):
 
1548
        inv = self.make_simple_inventory()
 
1549
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1550
                           b'sub-file1-id', b'sub-file2-id', b'subsub-file1-id',
 
1551
                          ], inv, [b'dir1-id'])
 
1552
 
 
1553
    def test_from_root(self):
 
1554
        inv = self.make_simple_inventory()
 
1555
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'dir2-id', b'sub-dir1-id',
 
1556
                           b'sub-file1-id', b'sub-file2-id', b'sub2-file1-id',
 
1557
                           b'subsub-file1-id', b'top-id'], inv, [b'TREE_ROOT'])
 
1558
 
 
1559
    def test_top_level_file(self):
 
1560
        inv = self.make_simple_inventory()
 
1561
        self.assertExpand([b'TREE_ROOT', b'top-id'], inv, [b'top-id'])
 
1562
 
 
1563
    def test_subsub_file(self):
 
1564
        inv = self.make_simple_inventory()
 
1565
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id',
 
1566
                           b'subsub-file1-id'], inv, [b'subsub-file1-id'])
 
1567
 
 
1568
    def test_sub_and_root(self):
 
1569
        inv = self.make_simple_inventory()
 
1570
        self.assertExpand([b'TREE_ROOT', b'dir1-id', b'sub-dir1-id', b'top-id',
 
1571
                           b'subsub-file1-id'], inv, [b'top-id', b'subsub-file1-id'])
 
1572
 
 
1573
 
 
1574
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1575
 
 
1576
    def test_empty(self):
 
1577
        repository = self.make_repository('.')
 
1578
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1579
        inv = mutable_inventory_from_tree(tree)
 
1580
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1581
        self.assertEqual(0, len(inv))
 
1582
 
 
1583
    def test_some_files(self):
 
1584
        wt = self.make_branch_and_tree('.')
 
1585
        self.build_tree(['a'])
 
1586
        wt.add(['a'], [b'thefileid'])
 
1587
        revid = wt.commit("commit")
 
1588
        tree = wt.branch.repository.revision_tree(revid)
 
1589
        inv = mutable_inventory_from_tree(tree)
 
1590
        self.assertEqual(revid, inv.revision_id)
 
1591
        self.assertEqual(2, len(inv))
 
1592
        self.assertEqual("a", inv.get_entry(b'thefileid').name)
 
1593
        # The inventory should be mutable and independent of
 
1594
        # the original tree
 
1595
        self.assertFalse(tree.root_inventory.get_entry(b'thefileid').executable)
 
1596
        inv.get_entry(b'thefileid').executable = True
 
1597
        self.assertFalse(tree.root_inventory.get_entry(b'thefileid').executable)