/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_inv.py

  • Committer: Jelmer Vernooij
  • Date: 2018-06-11 20:48:24 UTC
  • mfrom: (6974.2.1 python3-graph)
  • Revision ID: jelmer@jelmer.uk-20180611204824-zll0a3pzyciekd1f
merge lp:~jelmer/brz/python3-graph

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from .. import (
 
19
    errors,
 
20
    osutils,
 
21
    repository,
 
22
    revision,
 
23
    tests,
 
24
    workingtree,
 
25
    )
 
26
from ..bzr import (
 
27
    chk_map,
 
28
    groupcompress,
 
29
    inventory,
 
30
    )
 
31
from ..bzr.inventory import (
 
32
    CHKInventory,
 
33
    Inventory,
 
34
    ROOT_ID,
 
35
    InventoryFile,
 
36
    InventoryDirectory,
 
37
    InventoryEntry,
 
38
    TreeReference,
 
39
    mutable_inventory_from_tree,
 
40
    )
 
41
from . import (
 
42
    TestCase,
 
43
    TestCaseWithTransport,
 
44
    )
 
45
from .scenarios import load_tests_apply_scenarios
 
46
 
 
47
 
 
48
load_tests = load_tests_apply_scenarios
 
49
 
 
50
 
 
51
def delta_application_scenarios():
 
52
    scenarios = [
 
53
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
54
        ]
 
55
    # Working tree basis delta application
 
56
    # Repository add_inv_by_delta.
 
57
    # Reduce form of the per_repository test logic - that logic needs to be
 
58
    # be able to get /just/ repositories whereas these tests are fine with
 
59
    # just creating trees.
 
60
    formats = set()
 
61
    for _, format in repository.format_registry.iteritems():
 
62
        if format.supports_full_versioned_files:
 
63
            scenarios.append((str(format.__name__), {
 
64
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
65
                'format':format}))
 
66
    for format in workingtree.format_registry._get_all():
 
67
        repo_fmt = format._matchingcontroldir.repository_format
 
68
        if not repo_fmt.supports_full_versioned_files:
 
69
            continue
 
70
        scenarios.append(
 
71
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
72
            'apply_delta':apply_inventory_WT_basis,
 
73
            'format':format}))
 
74
        scenarios.append(
 
75
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
76
            'apply_delta':apply_inventory_WT,
 
77
            'format':format}))
 
78
    return scenarios
 
79
 
 
80
 
 
81
def create_texts_for_inv(repo, inv):
 
82
    for path, ie in inv.iter_entries():
 
83
        if ie.text_size:
 
84
            lines = ['a' * ie.text_size]
 
85
        else:
 
86
            lines = []
 
87
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
88
 
 
89
 
 
90
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
91
    """Apply delta to basis and return the result.
 
92
 
 
93
    :param basis: An inventory to be used as the basis.
 
94
    :param delta: The inventory delta to apply:
 
95
    :return: An inventory resulting from the application.
 
96
    """
 
97
    basis.apply_delta(delta)
 
98
    return basis
 
99
 
 
100
 
 
101
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
102
    """Apply delta to basis and return the result.
 
103
 
 
104
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
105
 
 
106
    :param basis: An inventory to be used as the basis.
 
107
    :param delta: The inventory delta to apply:
 
108
    :return: An inventory resulting from the application.
 
109
    """
 
110
    control = self.make_controldir('tree', format=self.format._matchingcontroldir)
 
111
    control.create_repository()
 
112
    control.create_branch()
 
113
    tree = self.format.initialize(control)
 
114
    tree.lock_write()
 
115
    try:
 
116
        tree._write_inventory(basis)
 
117
    finally:
 
118
        tree.unlock()
 
119
    # Fresh object, reads disk again.
 
120
    tree = tree.controldir.open_workingtree()
 
121
    tree.lock_write()
 
122
    try:
 
123
        tree.apply_inventory_delta(delta)
 
124
    finally:
 
125
        tree.unlock()
 
126
    # reload tree - ensure we get what was written.
 
127
    tree = tree.controldir.open_workingtree()
 
128
    tree.lock_read()
 
129
    self.addCleanup(tree.unlock)
 
130
    if not invalid_delta:
 
131
        tree._validate()
 
132
    return tree.root_inventory
 
133
 
 
134
 
 
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
136
    repo.start_write_group()
 
137
    try:
 
138
        rev = revision.Revision('basis', timestamp=0, timezone=None,
 
139
            message="", committer="foo@example.com")
 
140
        basis.revision_id = 'basis'
 
141
        create_texts_for_inv(repo, basis)
 
142
        repo.add_revision('basis', rev, basis)
 
143
        if invalid_delta:
 
144
            # We don't want to apply the delta to the basis, because we expect
 
145
            # the delta is invalid.
 
146
            result_inv = basis
 
147
            result_inv.revision_id = b'result'
 
148
            target_entries = None
 
149
        else:
 
150
            result_inv = basis.create_by_apply_delta(delta, 'result')
 
151
            create_texts_for_inv(repo, result_inv)
 
152
            target_entries = list(result_inv.iter_entries_by_dir())
 
153
        rev = revision.Revision('result', timestamp=0, timezone=None,
 
154
            message="", committer="foo@example.com")
 
155
        repo.add_revision('result', rev, result_inv)
 
156
        repo.commit_write_group()
 
157
    except:
 
158
        repo.abort_write_group()
 
159
        raise
 
160
    return target_entries
 
161
 
 
162
 
 
163
def _get_basis_entries(tree):
 
164
    basis_tree = tree.basis_tree()
 
165
    basis_tree.lock_read()
 
166
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
167
    basis_tree.unlock()
 
168
    return basis_tree_entries
 
169
 
 
170
 
 
171
def _populate_different_tree(tree, basis, delta):
 
172
    """Put all entries into tree, but at a unique location."""
 
173
    added_ids = set()
 
174
    added_paths = set()
 
175
    tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
 
176
    for path, ie in basis.iter_entries_by_dir():
 
177
        if ie.file_id in added_ids:
 
178
            continue
 
179
        # We want a unique path for each of these, we use the file-id
 
180
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
181
        added_ids.add(ie.file_id)
 
182
    for old_path, new_path, file_id, ie in delta:
 
183
        if file_id in added_ids:
 
184
            continue
 
185
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
186
 
 
187
 
 
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
189
    """Apply delta to basis and return the result.
 
190
 
 
191
    This sets the parent and then calls update_basis_by_delta.
 
192
    It also puts the basis in the repository under both 'basis' and 'result' to
 
193
    allow safety checks made by the WT to succeed, and finally ensures that all
 
194
    items in the delta with a new path are present in the WT before calling
 
195
    update_basis_by_delta.
 
196
 
 
197
    :param basis: An inventory to be used as the basis.
 
198
    :param delta: The inventory delta to apply:
 
199
    :return: An inventory resulting from the application.
 
200
    """
 
201
    control = test.make_controldir('tree', format=test.format._matchingcontroldir)
 
202
    control.create_repository()
 
203
    control.create_branch()
 
204
    tree = test.format.initialize(control)
 
205
    tree.lock_write()
 
206
    try:
 
207
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
208
                                                delta, invalid_delta)
 
209
        # Set the basis state as the trees current state
 
210
        tree._write_inventory(basis)
 
211
        # This reads basis from the repo and puts it into the tree's local
 
212
        # cache, if it has one.
 
213
        tree.set_parent_ids(['basis'])
 
214
    finally:
 
215
        tree.unlock()
 
216
    # Fresh lock, reads disk again.
 
217
    tree.lock_write()
 
218
    try:
 
219
        tree.update_basis_by_delta('result', delta)
 
220
        if not invalid_delta:
 
221
            tree._validate()
 
222
    finally:
 
223
        tree.unlock()
 
224
    # reload tree - ensure we get what was written.
 
225
    tree = tree.controldir.open_workingtree()
 
226
    basis_tree = tree.basis_tree()
 
227
    basis_tree.lock_read()
 
228
    test.addCleanup(basis_tree.unlock)
 
229
    basis_inv = basis_tree.root_inventory
 
230
    if target_entries:
 
231
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
232
        test.assertEqual(target_entries, basis_entries)
 
233
    return basis_inv
 
234
 
 
235
 
 
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
237
                                                      invalid_delta=True):
 
238
    """Apply delta to basis and return the result.
 
239
    
 
240
    This inserts basis as a whole inventory and then uses
 
241
    add_inventory_by_delta to add delta.
 
242
 
 
243
    :param basis: An inventory to be used as the basis.
 
244
    :param delta: The inventory delta to apply:
 
245
    :return: An inventory resulting from the application.
 
246
    """
 
247
    format = self.format()
 
248
    control = self.make_controldir('tree', format=format._matchingcontroldir)
 
249
    repo = format.initialize(control)
 
250
    repo.lock_write()
 
251
    try:
 
252
        repo.start_write_group()
 
253
        try:
 
254
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
255
                message="", committer="foo@example.com")
 
256
            basis.revision_id = 'basis'
 
257
            create_texts_for_inv(repo, basis)
 
258
            repo.add_revision('basis', rev, basis)
 
259
            repo.commit_write_group()
 
260
        except:
 
261
            repo.abort_write_group()
 
262
            raise
 
263
    finally:
 
264
        repo.unlock()
 
265
    repo.lock_write()
 
266
    try:
 
267
        repo.start_write_group()
 
268
        try:
 
269
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
270
                'result', ['basis'])
 
271
        except:
 
272
            repo.abort_write_group()
 
273
            raise
 
274
        else:
 
275
            repo.commit_write_group()
 
276
    finally:
 
277
        repo.unlock()
 
278
    # Fresh lock, reads disk again.
 
279
    repo = repo.controldir.open_repository()
 
280
    repo.lock_read()
 
281
    self.addCleanup(repo.unlock)
 
282
    return repo.get_inventory('result')
 
283
 
 
284
 
 
285
class TestInventoryUpdates(TestCase):
 
286
 
 
287
    def test_creation_from_root_id(self):
 
288
        # iff a root id is passed to the constructor, a root directory is made
 
289
        inv = inventory.Inventory(root_id=b'tree-root')
 
290
        self.assertNotEqual(None, inv.root)
 
291
        self.assertEqual(b'tree-root', inv.root.file_id)
 
292
 
 
293
    def test_add_path_of_root(self):
 
294
        # if no root id is given at creation time, there is no root directory
 
295
        inv = inventory.Inventory(root_id=None)
 
296
        self.assertIs(None, inv.root)
 
297
        # add a root entry by adding its path
 
298
        ie = inv.add_path(u"", "directory", b"my-root")
 
299
        ie.revision = b'test-rev'
 
300
        self.assertEqual(b"my-root", ie.file_id)
 
301
        self.assertIs(ie, inv.root)
 
302
 
 
303
    def test_add_path(self):
 
304
        inv = inventory.Inventory(root_id=b'tree_root')
 
305
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
306
        self.assertEqual(b'hello-id', ie.file_id)
 
307
        self.assertEqual('file', ie.kind)
 
308
 
 
309
    def test_copy(self):
 
310
        """Make sure copy() works and creates a deep copy."""
 
311
        inv = inventory.Inventory(root_id=b'some-tree-root')
 
312
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
313
        inv2 = inv.copy()
 
314
        inv.root.file_id = b'some-new-root'
 
315
        ie.name = u'file2'
 
316
        self.assertEqual(b'some-tree-root', inv2.root.file_id)
 
317
        self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
 
318
 
 
319
    def test_copy_empty(self):
 
320
        """Make sure an empty inventory can be copied."""
 
321
        inv = inventory.Inventory(root_id=None)
 
322
        inv2 = inv.copy()
 
323
        self.assertIs(None, inv2.root)
 
324
 
 
325
    def test_copy_copies_root_revision(self):
 
326
        """Make sure the revision of the root gets copied."""
 
327
        inv = inventory.Inventory(root_id=b'someroot')
 
328
        inv.root.revision = b'therev'
 
329
        inv2 = inv.copy()
 
330
        self.assertEqual(b'someroot', inv2.root.file_id)
 
331
        self.assertEqual(b'therev', inv2.root.revision)
 
332
 
 
333
    def test_create_tree_reference(self):
 
334
        inv = inventory.Inventory(b'tree-root-123')
 
335
        inv.add(TreeReference(
 
336
            b'nested-id', 'nested', parent_id=b'tree-root-123',
 
337
            revision=b'rev', reference_revision=b'rev2'))
 
338
 
 
339
    def test_error_encoding(self):
 
340
        inv = inventory.Inventory(b'tree-root')
 
341
        inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
 
342
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
343
            InventoryFile(b'b-id', u'\u1234', b'tree-root'))
 
344
        self.assertContainsRe(str(e), r'\\u1234')
 
345
 
 
346
    def test_add_recursive(self):
 
347
        parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
 
348
        child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
 
349
        parent.children[child.file_id] = child
 
350
        inv = inventory.Inventory(b'tree-root')
 
351
        inv.add(parent)
 
352
        self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
 
353
 
 
354
 
 
355
 
 
356
class TestDeltaApplication(TestCaseWithTransport):
 
357
 
 
358
    scenarios = delta_application_scenarios()
 
359
 
 
360
    def get_empty_inventory(self, reference_inv=None):
 
361
        """Get an empty inventory.
 
362
 
 
363
        Note that tests should not depend on the revision of the root for
 
364
        setting up test conditions, as it has to be flexible to accomodate non
 
365
        rich root repositories.
 
366
 
 
367
        :param reference_inv: If not None, get the revision for the root from
 
368
            this inventory. This is useful for dealing with older repositories
 
369
            that routinely discarded the root entry data. If None, the root's
 
370
            revision is set to 'basis'.
 
371
        """
 
372
        inv = inventory.Inventory()
 
373
        if reference_inv is not None:
 
374
            inv.root.revision = reference_inv.root.revision
 
375
        else:
 
376
            inv.root.revision = 'basis'
 
377
        return inv
 
378
 
 
379
    def make_file_ie(self, file_id=b'file-id', name='name', parent_id=None):
 
380
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
381
        ie_file.revision = b'result'
 
382
        ie_file.text_size = 0
 
383
        ie_file.text_sha1 = ''
 
384
        return ie_file
 
385
 
 
386
    def test_empty_delta(self):
 
387
        inv = self.get_empty_inventory()
 
388
        delta = []
 
389
        inv = self.apply_delta(self, inv, delta)
 
390
        inv2 = self.get_empty_inventory(inv)
 
391
        self.assertEqual([], inv2._make_delta(inv))
 
392
 
 
393
    def test_None_file_id(self):
 
394
        inv = self.get_empty_inventory()
 
395
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
396
        dir1.revision = b'result'
 
397
        delta = [(None, u'dir1', None, dir1)]
 
398
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
399
            inv, delta)
 
400
 
 
401
    def test_unicode_file_id(self):
 
402
        inv = self.get_empty_inventory()
 
403
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
404
        dir1.revision = b'result'
 
405
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
406
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
407
            inv, delta)
 
408
 
 
409
    def test_repeated_file_id(self):
 
410
        inv = self.get_empty_inventory()
 
411
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
412
        file1.revision = b'result'
 
413
        file1.text_size = 0
 
414
        file1.text_sha1 = ""
 
415
        file2 = file1.copy()
 
416
        file2.name = 'path2'
 
417
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
418
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
419
            inv, delta)
 
420
 
 
421
    def test_repeated_new_path(self):
 
422
        inv = self.get_empty_inventory()
 
423
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
424
        file1.revision = b'result'
 
425
        file1.text_size = 0
 
426
        file1.text_sha1 = ""
 
427
        file2 = file1.copy()
 
428
        file2.file_id = 'id2'
 
429
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
430
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
431
            inv, delta)
 
432
 
 
433
    def test_repeated_old_path(self):
 
434
        inv = self.get_empty_inventory()
 
435
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
436
        file1.revision = b'result'
 
437
        file1.text_size = 0
 
438
        file1.text_sha1 = ""
 
439
        # We can't *create* a source inventory with the same path, but
 
440
        # a badly generated partial delta might claim the same source twice.
 
441
        # This would be buggy in two ways: the path is repeated in the delta,
 
442
        # And the path for one of the file ids doesn't match the source
 
443
        # location. Alternatively, we could have a repeated fileid, but that
 
444
        # is separately checked for.
 
445
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
446
        file2.revision = b'result'
 
447
        file2.text_size = 0
 
448
        file2.text_sha1 = ""
 
449
        inv.add(file1)
 
450
        inv.add(file2)
 
451
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
452
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
453
            inv, delta)
 
454
 
 
455
    def test_mismatched_id_entry_id(self):
 
456
        inv = self.get_empty_inventory()
 
457
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
458
        file1.revision = b'result'
 
459
        file1.text_size = 0
 
460
        file1.text_sha1 = ""
 
461
        delta = [(None, u'path', 'id', file1)]
 
462
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
463
            inv, delta)
 
464
 
 
465
    def test_mismatched_new_path_entry_None(self):
 
466
        inv = self.get_empty_inventory()
 
467
        delta = [(None, u'path', 'id', None)]
 
468
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
469
            inv, delta)
 
470
 
 
471
    def test_mismatched_new_path_None_entry(self):
 
472
        inv = self.get_empty_inventory()
 
473
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
474
        file1.revision = b'result'
 
475
        file1.text_size = 0
 
476
        file1.text_sha1 = ""
 
477
        delta = [(u"path", None, 'id1', file1)]
 
478
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
479
            inv, delta)
 
480
 
 
481
    def test_parent_is_not_directory(self):
 
482
        inv = self.get_empty_inventory()
 
483
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
484
        file1.revision = b'result'
 
485
        file1.text_size = 0
 
486
        file1.text_sha1 = ""
 
487
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
488
        file2.revision = b'result'
 
489
        file2.text_size = 0
 
490
        file2.text_sha1 = ""
 
491
        inv.add(file1)
 
492
        delta = [(None, u'path/path2', 'id2', file2)]
 
493
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
494
            inv, delta)
 
495
 
 
496
    def test_parent_is_missing(self):
 
497
        inv = self.get_empty_inventory()
 
498
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
499
        file2.revision = b'result'
 
500
        file2.text_size = 0
 
501
        file2.text_sha1 = ""
 
502
        delta = [(None, u'path/path2', 'id2', file2)]
 
503
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
504
            inv, delta)
 
505
 
 
506
    def test_new_parent_path_has_wrong_id(self):
 
507
        inv = self.get_empty_inventory()
 
508
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
509
        parent1.revision = b'result'
 
510
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
511
        parent2.revision = b'result'
 
512
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
513
        file1.revision = b'result'
 
514
        file1.text_size = 0
 
515
        file1.text_sha1 = ""
 
516
        inv.add(parent1)
 
517
        inv.add(parent2)
 
518
        # This delta claims that file1 is at dir/path, but actually its at
 
519
        # dir2/path if you follow the inventory parent structure.
 
520
        delta = [(None, u'dir/path', 'id', file1)]
 
521
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
522
            inv, delta)
 
523
 
 
524
    def test_old_parent_path_is_wrong(self):
 
525
        inv = self.get_empty_inventory()
 
526
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
527
        parent1.revision = b'result'
 
528
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
529
        parent2.revision = b'result'
 
530
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
531
        file1.revision = b'result'
 
532
        file1.text_size = 0
 
533
        file1.text_sha1 = ""
 
534
        inv.add(parent1)
 
535
        inv.add(parent2)
 
536
        inv.add(file1)
 
537
        # This delta claims that file1 was at dir/path, but actually it was at
 
538
        # dir2/path if you follow the inventory parent structure.
 
539
        delta = [(u'dir/path', None, 'id', None)]
 
540
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
541
            inv, delta)
 
542
 
 
543
    def test_old_parent_path_is_for_other_id(self):
 
544
        inv = self.get_empty_inventory()
 
545
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
546
        parent1.revision = b'result'
 
547
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
548
        parent2.revision = b'result'
 
549
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
550
        file1.revision = b'result'
 
551
        file1.text_size = 0
 
552
        file1.text_sha1 = ""
 
553
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
554
        file2.revision = b'result'
 
555
        file2.text_size = 0
 
556
        file2.text_sha1 = ""
 
557
        inv.add(parent1)
 
558
        inv.add(parent2)
 
559
        inv.add(file1)
 
560
        inv.add(file2)
 
561
        # This delta claims that file1 was at dir/path, but actually it was at
 
562
        # dir2/path if you follow the inventory parent structure. At dir/path
 
563
        # is another entry we should not delete.
 
564
        delta = [(u'dir/path', None, 'id', None)]
 
565
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
566
            inv, delta)
 
567
 
 
568
    def test_add_existing_id_new_path(self):
 
569
        inv = self.get_empty_inventory()
 
570
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
571
        parent1.revision = b'result'
 
572
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
573
        parent2.revision = b'result'
 
574
        inv.add(parent1)
 
575
        delta = [(None, u'dir2', 'p-1', parent2)]
 
576
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
577
            inv, delta)
 
578
 
 
579
    def test_add_new_id_existing_path(self):
 
580
        inv = self.get_empty_inventory()
 
581
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
582
        parent1.revision = b'result'
 
583
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
584
        parent2.revision = b'result'
 
585
        inv.add(parent1)
 
586
        delta = [(None, u'dir1', 'p-2', parent2)]
 
587
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
588
            inv, delta)
 
589
 
 
590
    def test_remove_dir_leaving_dangling_child(self):
 
591
        inv = self.get_empty_inventory()
 
592
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
593
        dir1.revision = b'result'
 
594
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
595
        dir2.revision = b'result'
 
596
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
597
        dir3.revision = b'result'
 
598
        inv.add(dir1)
 
599
        inv.add(dir2)
 
600
        inv.add(dir3)
 
601
        delta = [(u'dir1', None, 'p-1', None),
 
602
            (u'dir1/child2', None, 'p-3', None)]
 
603
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
604
            inv, delta)
 
605
 
 
606
    def test_add_file(self):
 
607
        inv = self.get_empty_inventory()
 
608
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
609
        file1.revision = b'result'
 
610
        file1.text_size = 0
 
611
        file1.text_sha1 = ''
 
612
        delta = [(None, u'path', b'file-id', file1)]
 
613
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
614
        self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
 
615
 
 
616
    def test_remove_file(self):
 
617
        inv = self.get_empty_inventory()
 
618
        file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
 
619
        file1.revision = b'result'
 
620
        file1.text_size = 0
 
621
        file1.text_sha1 = ''
 
622
        inv.add(file1)
 
623
        delta = [(u'path', None, b'file-id', None)]
 
624
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
625
        self.assertEqual(None, res_inv.path2id('path'))
 
626
        self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
 
627
 
 
628
    def test_rename_file(self):
 
629
        inv = self.get_empty_inventory()
 
630
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
631
        inv.add(file1)
 
632
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
633
        delta = [(u'path', 'path2', b'file-id', file2)]
 
634
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
635
        self.assertEqual(None, res_inv.path2id('path'))
 
636
        self.assertEqual(b'file-id', res_inv.path2id('path2'))
 
637
 
 
638
    def test_replaced_at_new_path(self):
 
639
        inv = self.get_empty_inventory()
 
640
        file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
 
641
        inv.add(file1)
 
642
        file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
 
643
        delta = [(u'name', None, b'id1', None),
 
644
                 (None, u'name', b'id2', file2)]
 
645
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
646
        self.assertEqual(b'id2', res_inv.path2id('name'))
 
647
 
 
648
    def test_rename_dir(self):
 
649
        inv = self.get_empty_inventory()
 
650
        dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
 
651
        dir1.revision = b'basis'
 
652
        file1 = self.make_file_ie(parent_id=b'dir-id')
 
653
        inv.add(dir1)
 
654
        inv.add(file1)
 
655
        dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
 
656
        dir2.revision = b'result'
 
657
        delta = [('dir1', 'dir2', b'dir-id', dir2)]
 
658
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
659
        # The file should be accessible under the new path
 
660
        self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
 
661
 
 
662
    def test_renamed_dir_with_renamed_child(self):
 
663
        inv = self.get_empty_inventory()
 
664
        dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
 
665
        dir1.revision = b'basis'
 
666
        file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
 
667
        file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
 
668
        inv.add(dir1)
 
669
        inv.add(file1)
 
670
        inv.add(file2)
 
671
        dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
 
672
        dir2.revision = b'result'
 
673
        file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
 
674
        delta = [('dir1', 'dir2', b'dir-id', dir2),
 
675
                 ('dir1/name2', 'name2', b'file-id-2', file2b)]
 
676
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
677
        # The file should be accessible under the new path
 
678
        self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
 
679
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
680
        self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
 
681
 
 
682
    def test_is_root(self):
 
683
        """Ensure our root-checking code is accurate."""
 
684
        inv = inventory.Inventory('TREE_ROOT')
 
685
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
686
        self.assertFalse(inv.is_root('booga'))
 
687
        inv.root.file_id = 'booga'
 
688
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
689
        self.assertTrue(inv.is_root('booga'))
 
690
        # works properly even if no root is set
 
691
        inv.root = None
 
692
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
693
        self.assertFalse(inv.is_root('booga'))
 
694
 
 
695
    def test_entries_for_empty_inventory(self):
 
696
        """Test that entries() will not fail for an empty inventory"""
 
697
        inv = Inventory(root_id=None)
 
698
        self.assertEqual([], inv.entries())
 
699
 
 
700
 
 
701
class TestInventoryEntry(TestCase):
 
702
 
 
703
    def test_file_invalid_entry_name(self):
 
704
        self.assertRaises(errors.InvalidEntryName, inventory.InventoryFile,
 
705
            '123', 'a/hello.c', ROOT_ID)
 
706
 
 
707
    def test_file_backslash(self):
 
708
        file = inventory.InventoryFile('123', 'h\\ello.c', ROOT_ID)
 
709
        self.assertEquals(file.name, 'h\\ello.c')
 
710
 
 
711
    def test_file_kind_character(self):
 
712
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
713
        self.assertEqual(file.kind_character(), '')
 
714
 
 
715
    def test_dir_kind_character(self):
 
716
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
717
        self.assertEqual(dir.kind_character(), '/')
 
718
 
 
719
    def test_link_kind_character(self):
 
720
        dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
721
        self.assertEqual(dir.kind_character(), '')
 
722
 
 
723
    def test_dir_detect_changes(self):
 
724
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
725
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
726
        self.assertEqual((False, False), left.detect_changes(right))
 
727
        self.assertEqual((False, False), right.detect_changes(left))
 
728
 
 
729
    def test_file_detect_changes(self):
 
730
        left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
731
        left.text_sha1 = 123
 
732
        right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
733
        right.text_sha1 = 123
 
734
        self.assertEqual((False, False), left.detect_changes(right))
 
735
        self.assertEqual((False, False), right.detect_changes(left))
 
736
        left.executable = True
 
737
        self.assertEqual((False, True), left.detect_changes(right))
 
738
        self.assertEqual((False, True), right.detect_changes(left))
 
739
        right.text_sha1 = 321
 
740
        self.assertEqual((True, True), left.detect_changes(right))
 
741
        self.assertEqual((True, True), right.detect_changes(left))
 
742
 
 
743
    def test_symlink_detect_changes(self):
 
744
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
745
        left.symlink_target='foo'
 
746
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
747
        right.symlink_target='foo'
 
748
        self.assertEqual((False, False), left.detect_changes(right))
 
749
        self.assertEqual((False, False), right.detect_changes(left))
 
750
        left.symlink_target = 'different'
 
751
        self.assertEqual((True, False), left.detect_changes(right))
 
752
        self.assertEqual((True, False), right.detect_changes(left))
 
753
 
 
754
    def test_file_has_text(self):
 
755
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
756
        self.assertTrue(file.has_text())
 
757
 
 
758
    def test_directory_has_text(self):
 
759
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
760
        self.assertFalse(dir.has_text())
 
761
 
 
762
    def test_link_has_text(self):
 
763
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
764
        self.assertFalse(link.has_text())
 
765
 
 
766
    def test_make_entry(self):
 
767
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
768
            inventory.InventoryFile)
 
769
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
770
            inventory.InventoryLink)
 
771
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
772
            inventory.InventoryDirectory)
 
773
 
 
774
    def test_make_entry_non_normalized(self):
 
775
        orig_normalized_filename = osutils.normalized_filename
 
776
 
 
777
        try:
 
778
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
779
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
780
            self.assertEqual(u'\xe5', entry.name)
 
781
            self.assertIsInstance(entry, inventory.InventoryFile)
 
782
 
 
783
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
784
            self.assertRaises(errors.InvalidNormalization,
 
785
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
786
        finally:
 
787
            osutils.normalized_filename = orig_normalized_filename
 
788
 
 
789
 
 
790
class TestDescribeChanges(TestCase):
 
791
 
 
792
    def test_describe_change(self):
 
793
        # we need to test the following change combinations:
 
794
        # rename
 
795
        # reparent
 
796
        # modify
 
797
        # gone
 
798
        # added
 
799
        # renamed/reparented and modified
 
800
        # change kind (perhaps can't be done yet?)
 
801
        # also, merged in combination with all of these?
 
802
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
803
        old_a.text_sha1 = '123132'
 
804
        old_a.text_size = 0
 
805
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
806
        new_a.text_sha1 = '123132'
 
807
        new_a.text_size = 0
 
808
 
 
809
        self.assertChangeDescription('unchanged', old_a, new_a)
 
810
 
 
811
        new_a.text_size = 10
 
812
        new_a.text_sha1 = 'abcabc'
 
813
        self.assertChangeDescription('modified', old_a, new_a)
 
814
 
 
815
        self.assertChangeDescription('added', None, new_a)
 
816
        self.assertChangeDescription('removed', old_a, None)
 
817
        # perhaps a bit questionable but seems like the most reasonable thing...
 
818
        self.assertChangeDescription('unchanged', None, None)
 
819
 
 
820
        # in this case it's both renamed and modified; show a rename and
 
821
        # modification:
 
822
        new_a.name = 'newfilename'
 
823
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
824
 
 
825
        # reparenting is 'renaming'
 
826
        new_a.name = old_a.name
 
827
        new_a.parent_id = 'somedir-id'
 
828
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
829
 
 
830
        # reset the content values so its not modified
 
831
        new_a.text_size = old_a.text_size
 
832
        new_a.text_sha1 = old_a.text_sha1
 
833
        new_a.name = old_a.name
 
834
 
 
835
        new_a.name = 'newfilename'
 
836
        self.assertChangeDescription('renamed', old_a, new_a)
 
837
 
 
838
        # reparenting is 'renaming'
 
839
        new_a.name = old_a.name
 
840
        new_a.parent_id = 'somedir-id'
 
841
        self.assertChangeDescription('renamed', old_a, new_a)
 
842
 
 
843
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
844
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
845
        self.assertEqual(expected_change, change)
 
846
 
 
847
 
 
848
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
849
 
 
850
    def get_chk_bytes(self):
 
851
        factory = groupcompress.make_pack_factory(True, True, 1)
 
852
        trans = self.get_transport('')
 
853
        return factory(trans)
 
854
 
 
855
    def read_bytes(self, chk_bytes, key):
 
856
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
857
        return stream.next().get_bytes_as("fulltext")
 
858
 
 
859
    def test_deserialise_gives_CHKInventory(self):
 
860
        inv = Inventory()
 
861
        inv.revision_id = "revid"
 
862
        inv.root.revision = "rootrev"
 
863
        chk_bytes = self.get_chk_bytes()
 
864
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
865
        bytes = ''.join(chk_inv.to_lines())
 
866
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
867
        self.assertEqual("revid", new_inv.revision_id)
 
868
        self.assertEqual("directory", new_inv.root.kind)
 
869
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
870
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
871
        self.assertEqual(inv.root.name, new_inv.root.name)
 
872
        self.assertEqual("rootrev", new_inv.root.revision)
 
873
        self.assertEqual('plain', new_inv._search_key_name)
 
874
 
 
875
    def test_deserialise_wrong_revid(self):
 
876
        inv = Inventory()
 
877
        inv.revision_id = "revid"
 
878
        inv.root.revision = "rootrev"
 
879
        chk_bytes = self.get_chk_bytes()
 
880
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
881
        bytes = ''.join(chk_inv.to_lines())
 
882
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
883
            bytes, ("revid2",))
 
884
 
 
885
    def test_captures_rev_root_byid(self):
 
886
        inv = Inventory()
 
887
        inv.revision_id = "foo"
 
888
        inv.root.revision = "bar"
 
889
        chk_bytes = self.get_chk_bytes()
 
890
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
891
        lines = chk_inv.to_lines()
 
892
        self.assertEqual([
 
893
            'chkinventory:\n',
 
894
            'revision_id: foo\n',
 
895
            'root_id: TREE_ROOT\n',
 
896
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
897
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
898
            ], lines)
 
899
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
900
        self.assertEqual('plain', chk_inv._search_key_name)
 
901
 
 
902
    def test_captures_parent_id_basename_index(self):
 
903
        inv = Inventory()
 
904
        inv.revision_id = "foo"
 
905
        inv.root.revision = "bar"
 
906
        chk_bytes = self.get_chk_bytes()
 
907
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
908
        lines = chk_inv.to_lines()
 
909
        self.assertEqual([
 
910
            'chkinventory:\n',
 
911
            'revision_id: foo\n',
 
912
            'root_id: TREE_ROOT\n',
 
913
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
914
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
915
            ], lines)
 
916
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
917
        self.assertEqual('plain', chk_inv._search_key_name)
 
918
 
 
919
    def test_captures_search_key_name(self):
 
920
        inv = Inventory()
 
921
        inv.revision_id = "foo"
 
922
        inv.root.revision = "bar"
 
923
        chk_bytes = self.get_chk_bytes()
 
924
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
925
                                              search_key_name='hash-16-way')
 
926
        lines = chk_inv.to_lines()
 
927
        self.assertEqual([
 
928
            'chkinventory:\n',
 
929
            'search_key_name: hash-16-way\n',
 
930
            'root_id: TREE_ROOT\n',
 
931
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
932
            'revision_id: foo\n',
 
933
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
934
            ], lines)
 
935
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
936
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
937
 
 
938
    def test_directory_children_on_demand(self):
 
939
        inv = Inventory()
 
940
        inv.revision_id = "revid"
 
941
        inv.root.revision = "rootrev"
 
942
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
943
        inv.get_entry("fileid").revision = "filerev"
 
944
        inv.get_entry("fileid").executable = True
 
945
        inv.get_entry("fileid").text_sha1 = "ffff"
 
946
        inv.get_entry("fileid").text_size = 1
 
947
        chk_bytes = self.get_chk_bytes()
 
948
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
949
        bytes = ''.join(chk_inv.to_lines())
 
950
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
951
        root_entry = new_inv.get_entry(inv.root.file_id)
 
952
        self.assertEqual(None, root_entry._children)
 
953
        self.assertEqual({'file'}, set(root_entry.children))
 
954
        file_direct = new_inv.get_entry("fileid")
 
955
        file_found = root_entry.children['file']
 
956
        self.assertEqual(file_direct.kind, file_found.kind)
 
957
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
958
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
959
        self.assertEqual(file_direct.name, file_found.name)
 
960
        self.assertEqual(file_direct.revision, file_found.revision)
 
961
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
962
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
963
        self.assertEqual(file_direct.executable, file_found.executable)
 
964
 
 
965
    def test_from_inventory_maximum_size(self):
 
966
        # from_inventory supports the maximum_size parameter.
 
967
        inv = Inventory()
 
968
        inv.revision_id = "revid"
 
969
        inv.root.revision = "rootrev"
 
970
        chk_bytes = self.get_chk_bytes()
 
971
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
972
        chk_inv.id_to_entry._ensure_root()
 
973
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
974
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
975
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
976
        p_id_basename._ensure_root()
 
977
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
978
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
979
 
 
980
    def test_iter_all_ids(self):
 
981
        inv = Inventory()
 
982
        inv.revision_id = "revid"
 
983
        inv.root.revision = "rootrev"
 
984
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
985
        inv.get_entry("fileid").revision = "filerev"
 
986
        inv.get_entry("fileid").executable = True
 
987
        inv.get_entry("fileid").text_sha1 = "ffff"
 
988
        inv.get_entry("fileid").text_size = 1
 
989
        chk_bytes = self.get_chk_bytes()
 
990
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
991
        bytes = ''.join(chk_inv.to_lines())
 
992
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
993
        fileids = sorted(new_inv.iter_all_ids())
 
994
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
995
 
 
996
    def test__len__(self):
 
997
        inv = Inventory()
 
998
        inv.revision_id = "revid"
 
999
        inv.root.revision = "rootrev"
 
1000
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1001
        inv.get_entry("fileid").revision = "filerev"
 
1002
        inv.get_entry("fileid").executable = True
 
1003
        inv.get_entry("fileid").text_sha1 = "ffff"
 
1004
        inv.get_entry("fileid").text_size = 1
 
1005
        chk_bytes = self.get_chk_bytes()
 
1006
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1007
        self.assertEqual(2, len(chk_inv))
 
1008
 
 
1009
    def test_get_entry(self):
 
1010
        inv = Inventory()
 
1011
        inv.revision_id = b"revid"
 
1012
        inv.root.revision = b"rootrev"
 
1013
        inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
 
1014
        inv.get_entry(b"fileid").revision = b"filerev"
 
1015
        inv.get_entry(b"fileid").executable = True
 
1016
        inv.get_entry(b"fileid").text_sha1 = b"ffff"
 
1017
        inv.get_entry(b"fileid").text_size = 1
 
1018
        chk_bytes = self.get_chk_bytes()
 
1019
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1020
        data = b''.join(chk_inv.to_lines())
 
1021
        new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
 
1022
        root_entry = new_inv.get_entry(inv.root.file_id)
 
1023
        file_entry = new_inv.get_entry(b"fileid")
 
1024
        self.assertEqual("directory", root_entry.kind)
 
1025
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
1026
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
1027
        self.assertEqual(inv.root.name, root_entry.name)
 
1028
        self.assertEqual(b"rootrev", root_entry.revision)
 
1029
        self.assertEqual("file", file_entry.kind)
 
1030
        self.assertEqual(b"fileid", file_entry.file_id)
 
1031
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
1032
        self.assertEqual(u"file", file_entry.name)
 
1033
        self.assertEqual(b"filerev", file_entry.revision)
 
1034
        self.assertEqual(b"ffff", file_entry.text_sha1)
 
1035
        self.assertEqual(1, file_entry.text_size)
 
1036
        self.assertEqual(True, file_entry.executable)
 
1037
        self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
 
1038
 
 
1039
    def test_has_id_true(self):
 
1040
        inv = Inventory()
 
1041
        inv.revision_id = b"revid"
 
1042
        inv.root.revision = b"rootrev"
 
1043
        inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
 
1044
        inv.get_entry(b"fileid").revision = b"filerev"
 
1045
        inv.get_entry(b"fileid").executable = True
 
1046
        inv.get_entry(b"fileid").text_sha1 = "ffff"
 
1047
        inv.get_entry(b"fileid").text_size = 1
 
1048
        chk_bytes = self.get_chk_bytes()
 
1049
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1050
        self.assertTrue(chk_inv.has_id(b'fileid'))
 
1051
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
1052
 
 
1053
    def test_has_id_not(self):
 
1054
        inv = Inventory()
 
1055
        inv.revision_id = b"revid"
 
1056
        inv.root.revision = b"rootrev"
 
1057
        chk_bytes = self.get_chk_bytes()
 
1058
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1059
        self.assertFalse(chk_inv.has_id(b'fileid'))
 
1060
 
 
1061
    def test_id2path(self):
 
1062
        inv = Inventory()
 
1063
        inv.revision_id = b"revid"
 
1064
        inv.root.revision = b"rootrev"
 
1065
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1066
        fileentry = InventoryFile("fileid", "file", "dirid")
 
1067
        inv.add(direntry)
 
1068
        inv.add(fileentry)
 
1069
        inv.get_entry(b"fileid").revision = b"filerev"
 
1070
        inv.get_entry(b"fileid").executable = True
 
1071
        inv.get_entry(b"fileid").text_sha1 = "ffff"
 
1072
        inv.get_entry(b"fileid").text_size = 1
 
1073
        inv.get_entry(b"dirid").revision = b"filerev"
 
1074
        chk_bytes = self.get_chk_bytes()
 
1075
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1076
        bytes = ''.join(chk_inv.to_lines())
 
1077
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1078
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
1079
        self.assertEqual('dir', new_inv.id2path(b'dirid'))
 
1080
        self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
 
1081
 
 
1082
    def test_path2id(self):
 
1083
        inv = Inventory()
 
1084
        inv.revision_id = b"revid"
 
1085
        inv.root.revision = b"rootrev"
 
1086
        direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
 
1087
        fileentry = InventoryFile(b"fileid", "file", b"dirid")
 
1088
        inv.add(direntry)
 
1089
        inv.add(fileentry)
 
1090
        inv.get_entry(b"fileid").revision = b"filerev"
 
1091
        inv.get_entry(b"fileid").executable = True
 
1092
        inv.get_entry(b"fileid").text_sha1 = "ffff"
 
1093
        inv.get_entry(b"fileid").text_size = 1
 
1094
        inv.get_entry(b"dirid").revision = b"filerev"
 
1095
        chk_bytes = self.get_chk_bytes()
 
1096
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1097
        bytes = ''.join(chk_inv.to_lines())
 
1098
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1099
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
1100
        self.assertEqual(b'dirid', new_inv.path2id('dir'))
 
1101
        self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
 
1102
 
 
1103
    def test_create_by_apply_delta_sets_root(self):
 
1104
        inv = Inventory()
 
1105
        inv.revision_id = "revid"
 
1106
        chk_bytes = self.get_chk_bytes()
 
1107
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1108
        inv.add_path("", "directory", "myrootid", None)
 
1109
        inv.revision_id = "expectedid"
 
1110
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1111
        delta = [("", None, base_inv.root.file_id, None),
 
1112
            (None, "",  "myrootid", inv.root)]
 
1113
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1114
        self.assertEqual(reference_inv.root, new_inv.root)
 
1115
 
 
1116
    def test_create_by_apply_delta_empty_add_child(self):
 
1117
        inv = Inventory()
 
1118
        inv.revision_id = "revid"
 
1119
        inv.root.revision = "rootrev"
 
1120
        chk_bytes = self.get_chk_bytes()
 
1121
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1122
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1123
        a_entry.revision = "filerev"
 
1124
        a_entry.executable = True
 
1125
        a_entry.text_sha1 = "ffff"
 
1126
        a_entry.text_size = 1
 
1127
        inv.add(a_entry)
 
1128
        inv.revision_id = "expectedid"
 
1129
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1130
        delta = [(None, "A",  "A-id", a_entry)]
 
1131
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1132
        # new_inv should be the same as reference_inv.
 
1133
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1134
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1135
        reference_inv.id_to_entry._ensure_root()
 
1136
        new_inv.id_to_entry._ensure_root()
 
1137
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1138
            new_inv.id_to_entry._root_node._key)
 
1139
 
 
1140
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1141
        inv = Inventory()
 
1142
        inv.revision_id = "revid"
 
1143
        inv.root.revision = "rootrev"
 
1144
        chk_bytes = self.get_chk_bytes()
 
1145
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1146
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1147
        a_entry.revision = "filerev"
 
1148
        a_entry.executable = True
 
1149
        a_entry.text_sha1 = "ffff"
 
1150
        a_entry.text_size = 1
 
1151
        inv.add(a_entry)
 
1152
        inv.revision_id = "expectedid"
 
1153
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1154
        delta = [(None, "A",  "A-id", a_entry)]
 
1155
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1156
        reference_inv.id_to_entry._ensure_root()
 
1157
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1158
        new_inv.id_to_entry._ensure_root()
 
1159
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1160
        # new_inv should be the same as reference_inv.
 
1161
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1162
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1163
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1164
            new_inv.id_to_entry._root_node._key)
 
1165
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1166
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1167
 
 
1168
    def test_iter_changes(self):
 
1169
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1170
        # InterTree are coming.
 
1171
        inv = Inventory()
 
1172
        inv.revision_id = "revid"
 
1173
        inv.root.revision = "rootrev"
 
1174
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1175
        inv.get_entry("fileid").revision = "filerev"
 
1176
        inv.get_entry("fileid").executable = True
 
1177
        inv.get_entry("fileid").text_sha1 = "ffff"
 
1178
        inv.get_entry("fileid").text_size = 1
 
1179
        inv2 = Inventory()
 
1180
        inv2.revision_id = "revid2"
 
1181
        inv2.root.revision = "rootrev"
 
1182
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1183
        inv2.get_entry("fileid").revision = "filerev2"
 
1184
        inv2.get_entry("fileid").executable = False
 
1185
        inv2.get_entry("fileid").text_sha1 = "bbbb"
 
1186
        inv2.get_entry("fileid").text_size = 2
 
1187
        # get fresh objects.
 
1188
        chk_bytes = self.get_chk_bytes()
 
1189
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1190
        bytes = ''.join(chk_inv.to_lines())
 
1191
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1192
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1193
        bytes = ''.join(chk_inv2.to_lines())
 
1194
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
1195
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
1196
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1197
            (False, True))],
 
1198
            list(inv_1.iter_changes(inv_2)))
 
1199
 
 
1200
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1201
        inv = Inventory()
 
1202
        inv.revision_id = "revid"
 
1203
        inv.root.revision = "rootrev"
 
1204
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1205
        inv.get_entry("fileid").revision = "filerev"
 
1206
        inv.get_entry("fileid").executable = True
 
1207
        inv.get_entry("fileid").text_sha1 = "ffff"
 
1208
        inv.get_entry("fileid").text_size = 1
 
1209
        # get fresh objects.
 
1210
        chk_bytes = self.get_chk_bytes()
 
1211
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1212
        bytes = ''.join(tmp_inv.to_lines())
 
1213
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1214
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1215
        self.assertEqual(
 
1216
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
1217
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1218
 
 
1219
    def test_file_entry_to_bytes(self):
 
1220
        inv = CHKInventory(None)
 
1221
        ie = inventory.InventoryFile(b'file-id', 'filename', 'parent-id')
 
1222
        ie.executable = True
 
1223
        ie.revision = b'file-rev-id'
 
1224
        ie.text_sha1 = 'abcdefgh'
 
1225
        ie.text_size = 100
 
1226
        bytes = inv._entry_to_bytes(ie)
 
1227
        self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
 
1228
                         b'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1229
        ie2 = inv._bytes_to_entry(bytes)
 
1230
        self.assertEqual(ie, ie2)
 
1231
        self.assertIsInstance(ie2.name, unicode)
 
1232
        self.assertEqual(('filename', b'file-id', b'file-rev-id'),
 
1233
                         inv._bytes_to_utf8name_key(bytes))
 
1234
 
 
1235
    def test_file2_entry_to_bytes(self):
 
1236
        inv = CHKInventory(None)
 
1237
        # \u30a9 == 'omega'
 
1238
        ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
 
1239
        ie.executable = False
 
1240
        ie.revision = b'file-rev-id'
 
1241
        ie.text_sha1 = '123456'
 
1242
        ie.text_size = 25
 
1243
        bytes = inv._entry_to_bytes(ie)
 
1244
        self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
 
1245
                         b'file-rev-id\n123456\n25\nN', bytes)
 
1246
        ie2 = inv._bytes_to_entry(bytes)
 
1247
        self.assertEqual(ie, ie2)
 
1248
        self.assertIsInstance(ie2.name, unicode)
 
1249
        self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
 
1250
                         inv._bytes_to_utf8name_key(bytes))
 
1251
 
 
1252
    def test_dir_entry_to_bytes(self):
 
1253
        inv = CHKInventory(None)
 
1254
        ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
 
1255
        ie.revision = b'dir-rev-id'
 
1256
        bytes = inv._entry_to_bytes(ie)
 
1257
        self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1258
        ie2 = inv._bytes_to_entry(bytes)
 
1259
        self.assertEqual(ie, ie2)
 
1260
        self.assertIsInstance(ie2.name, unicode)
 
1261
        self.assertEqual(('dirname', b'dir-id', b'dir-rev-id'),
 
1262
                         inv._bytes_to_utf8name_key(bytes))
 
1263
 
 
1264
    def test_dir2_entry_to_bytes(self):
 
1265
        inv = CHKInventory(None)
 
1266
        ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
 
1267
                                          None)
 
1268
        ie.revision = b'dir-rev-id'
 
1269
        bytes = inv._entry_to_bytes(ie)
 
1270
        self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
 
1271
                         b'dir-rev-id', bytes)
 
1272
        ie2 = inv._bytes_to_entry(bytes)
 
1273
        self.assertEqual(ie, ie2)
 
1274
        self.assertIsInstance(ie2.name, unicode)
 
1275
        self.assertIs(ie2.parent_id, None)
 
1276
        self.assertEqual(('dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
 
1277
                         inv._bytes_to_utf8name_key(bytes))
 
1278
 
 
1279
    def test_symlink_entry_to_bytes(self):
 
1280
        inv = CHKInventory(None)
 
1281
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
 
1282
        ie.revision = 'link-rev-id'
 
1283
        ie.symlink_target = u'target/path'
 
1284
        bytes = inv._entry_to_bytes(ie)
 
1285
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
 
1286
                         'link-rev-id\ntarget/path', bytes)
 
1287
        ie2 = inv._bytes_to_entry(bytes)
 
1288
        self.assertEqual(ie, ie2)
 
1289
        self.assertIsInstance(ie2.name, unicode)
 
1290
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1291
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
 
1292
                         inv._bytes_to_utf8name_key(bytes))
 
1293
 
 
1294
    def test_symlink2_entry_to_bytes(self):
 
1295
        inv = CHKInventory(None)
 
1296
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
 
1297
        ie.revision = 'link-rev-id'
 
1298
        ie.symlink_target = u'target/\u03a9path'
 
1299
        bytes = inv._entry_to_bytes(ie)
 
1300
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1301
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1302
        ie2 = inv._bytes_to_entry(bytes)
 
1303
        self.assertEqual(ie, ie2)
 
1304
        self.assertIsInstance(ie2.name, unicode)
 
1305
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1306
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
 
1307
                         inv._bytes_to_utf8name_key(bytes))
 
1308
 
 
1309
    def test_tree_reference_entry_to_bytes(self):
 
1310
        inv = CHKInventory(None)
 
1311
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
 
1312
                                     'parent-id')
 
1313
        ie.revision = 'tree-rev-id'
 
1314
        ie.reference_revision = b'ref-rev-id'
 
1315
        bytes = inv._entry_to_bytes(ie)
 
1316
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1317
                         'tree-rev-id\nref-rev-id', bytes)
 
1318
        ie2 = inv._bytes_to_entry(bytes)
 
1319
        self.assertEqual(ie, ie2)
 
1320
        self.assertIsInstance(ie2.name, unicode)
 
1321
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
 
1322
                         inv._bytes_to_utf8name_key(bytes))
 
1323
 
 
1324
    def make_basic_utf8_inventory(self):
 
1325
        inv = Inventory()
 
1326
        inv.revision_id = "revid"
 
1327
        inv.root.revision = "rootrev"
 
1328
        root_id = inv.root.file_id
 
1329
        inv.add(InventoryFile("fileid", u'f\xefle', root_id))
 
1330
        inv.get_entry("fileid").revision = "filerev"
 
1331
        inv.get_entry("fileid").text_sha1 = "ffff"
 
1332
        inv.get_entry("fileid").text_size = 0
 
1333
        inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
 
1334
        inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
 
1335
        inv.get_entry("childid").revision = "filerev"
 
1336
        inv.get_entry("childid").text_sha1 = "ffff"
 
1337
        inv.get_entry("childid").text_size = 0
 
1338
        chk_bytes = self.get_chk_bytes()
 
1339
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1340
        bytes = ''.join(chk_inv.to_lines())
 
1341
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1342
 
 
1343
    def test__preload_handles_utf8(self):
 
1344
        new_inv = self.make_basic_utf8_inventory()
 
1345
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1346
        self.assertFalse(new_inv._fully_cached)
 
1347
        new_inv._preload_cache()
 
1348
        self.assertEqual(
 
1349
            sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
 
1350
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1351
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1352
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1353
                         sorted(ie_root._children.keys()))
 
1354
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1355
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1356
 
 
1357
    def test__preload_populates_cache(self):
 
1358
        inv = Inventory()
 
1359
        inv.revision_id = "revid"
 
1360
        inv.root.revision = "rootrev"
 
1361
        root_id = inv.root.file_id
 
1362
        inv.add(InventoryFile("fileid", "file", root_id))
 
1363
        inv.get_entry("fileid").revision = "filerev"
 
1364
        inv.get_entry("fileid").executable = True
 
1365
        inv.get_entry("fileid").text_sha1 = "ffff"
 
1366
        inv.get_entry("fileid").text_size = 1
 
1367
        inv.add(InventoryDirectory("dirid", "dir", root_id))
 
1368
        inv.add(InventoryFile("childid", "child", "dirid"))
 
1369
        inv.get_entry("childid").revision = "filerev"
 
1370
        inv.get_entry("childid").executable = False
 
1371
        inv.get_entry("childid").text_sha1 = "dddd"
 
1372
        inv.get_entry("childid").text_size = 1
 
1373
        chk_bytes = self.get_chk_bytes()
 
1374
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1375
        bytes = ''.join(chk_inv.to_lines())
 
1376
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1377
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1378
        self.assertFalse(new_inv._fully_cached)
 
1379
        new_inv._preload_cache()
 
1380
        self.assertEqual(
 
1381
            sorted([root_id, "fileid", "dirid", "childid"]),
 
1382
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1383
        self.assertTrue(new_inv._fully_cached)
 
1384
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1385
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1386
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1387
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1388
 
 
1389
    def test__preload_handles_partially_evaluated_inventory(self):
 
1390
        new_inv = self.make_basic_utf8_inventory()
 
1391
        ie = new_inv.get_entry(new_inv.root_id)
 
1392
        self.assertIs(None, ie._children)
 
1393
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1394
                         sorted(ie.children.keys()))
 
1395
        # Accessing .children loads _children
 
1396
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1397
                         sorted(ie._children.keys()))
 
1398
        new_inv._preload_cache()
 
1399
        # No change
 
1400
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1401
                         sorted(ie._children.keys()))
 
1402
        ie_dir = new_inv.get_entry("dirid")
 
1403
        self.assertEqual([u'ch\xefld'],
 
1404
                         sorted(ie_dir._children.keys()))
 
1405
 
 
1406
    def test_filter_change_in_renamed_subfolder(self):
 
1407
        inv = Inventory('tree-root')
 
1408
        src_ie = inv.add_path('src', 'directory', 'src-id')
 
1409
        inv.add_path('src/sub/', 'directory', 'sub-id')
 
1410
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
 
1411
        a_ie.text_sha1 = osutils.sha_string('content\n')
 
1412
        a_ie.text_size = len('content\n')
 
1413
        chk_bytes = self.get_chk_bytes()
 
1414
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1415
        inv = inv.create_by_apply_delta([
 
1416
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
 
1417
            ("src", "src2", "src-id", src_ie),
 
1418
            ], 'new-rev-2')
 
1419
        new_inv = inv.filter(['a-id', 'src-id'])
 
1420
        self.assertEqual([
 
1421
            ('', 'tree-root'),
 
1422
            ('src', 'src-id'),
 
1423
            ('src/sub', 'sub-id'),
 
1424
            ('src/sub/a', 'a-id'),
 
1425
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1426
 
 
1427
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1428
 
 
1429
    def get_chk_bytes(self):
 
1430
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1431
        trans = self.get_transport('')
 
1432
        return factory(trans)
 
1433
 
 
1434
    def make_dir(self, inv, name, parent_id):
 
1435
        inv.add(inv.make_entry('directory', name, parent_id, name.encode('utf-8') + b'-id'))
 
1436
 
 
1437
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1438
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1439
        ie.text_sha1 = osutils.sha_string(content)
 
1440
        ie.text_size = len(content)
 
1441
        inv.add(ie)
 
1442
 
 
1443
    def make_simple_inventory(self):
 
1444
        inv = Inventory('TREE_ROOT')
 
1445
        inv.revision_id = "revid"
 
1446
        inv.root.revision = "rootrev"
 
1447
        # /                 TREE_ROOT
 
1448
        # dir1/             dir1-id
 
1449
        #   sub-file1       sub-file1-id
 
1450
        #   sub-file2       sub-file2-id
 
1451
        #   sub-dir1/       sub-dir1-id
 
1452
        #     subsub-file1  subsub-file1-id
 
1453
        # dir2/             dir2-id
 
1454
        #   sub2-file1      sub2-file1-id
 
1455
        # top               top-id
 
1456
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1457
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1458
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1459
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1460
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1461
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1462
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1463
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1464
        chk_bytes = self.get_chk_bytes()
 
1465
        #  use a small maximum_size to force internal paging structures
 
1466
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1467
                        maximum_size=100,
 
1468
                        search_key_name=b'hash-255-way')
 
1469
        bytes = b''.join(chk_inv.to_lines())
 
1470
        return CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
 
1471
 
 
1472
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1473
        self.assertEqual(sorted(expected_fileids),
 
1474
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1475
 
 
1476
    def assertExpand(self, all_ids, inv, file_ids):
 
1477
        (val_all_ids,
 
1478
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1479
        self.assertEqual(set(all_ids), val_all_ids)
 
1480
        entries = inv._getitems(val_all_ids)
 
1481
        expected_children = {}
 
1482
        for entry in entries:
 
1483
            s = expected_children.setdefault(entry.parent_id, [])
 
1484
            s.append(entry.file_id)
 
1485
        val_children = dict((k, sorted(v)) for k, v
 
1486
                            in val_children.items())
 
1487
        expected_children = dict((k, sorted(v)) for k, v
 
1488
                            in expected_children.items())
 
1489
        self.assertEqual(expected_children, val_children)
 
1490
 
 
1491
    def test_make_simple_inventory(self):
 
1492
        inv = self.make_simple_inventory()
 
1493
        layout = []
 
1494
        for path, entry in inv.iter_entries_by_dir():
 
1495
            layout.append((path, entry.file_id))
 
1496
        self.assertEqual([
 
1497
            ('', 'TREE_ROOT'),
 
1498
            ('dir1', 'dir1-id'),
 
1499
            ('dir2', 'dir2-id'),
 
1500
            ('top', 'top-id'),
 
1501
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1502
            ('dir1/sub-file1', 'sub-file1-id'),
 
1503
            ('dir1/sub-file2', 'sub-file2-id'),
 
1504
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1505
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1506
            ], layout)
 
1507
 
 
1508
    def test__getitems(self):
 
1509
        inv = self.make_simple_inventory()
 
1510
        # Reading from disk
 
1511
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1512
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1513
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1514
        # From cache
 
1515
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1516
        # Mixed
 
1517
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1518
                             ['dir1-id', 'sub-file2-id'])
 
1519
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1520
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1521
 
 
1522
    def test_single_file(self):
 
1523
        inv = self.make_simple_inventory()
 
1524
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1525
 
 
1526
    def test_get_all_parents(self):
 
1527
        inv = self.make_simple_inventory()
 
1528
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1529
                           'subsub-file1-id',
 
1530
                          ], inv, ['subsub-file1-id'])
 
1531
 
 
1532
    def test_get_children(self):
 
1533
        inv = self.make_simple_inventory()
 
1534
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1535
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1536
                          ], inv, ['dir1-id'])
 
1537
 
 
1538
    def test_from_root(self):
 
1539
        inv = self.make_simple_inventory()
 
1540
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1541
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1542
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1543
 
 
1544
    def test_top_level_file(self):
 
1545
        inv = self.make_simple_inventory()
 
1546
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1547
 
 
1548
    def test_subsub_file(self):
 
1549
        inv = self.make_simple_inventory()
 
1550
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1551
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1552
 
 
1553
    def test_sub_and_root(self):
 
1554
        inv = self.make_simple_inventory()
 
1555
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1556
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
1557
 
 
1558
 
 
1559
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1560
 
 
1561
    def test_empty(self):
 
1562
        repository = self.make_repository('.')
 
1563
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1564
        inv = mutable_inventory_from_tree(tree)
 
1565
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1566
        self.assertEqual(0, len(inv))
 
1567
 
 
1568
    def test_some_files(self):
 
1569
        wt = self.make_branch_and_tree('.')
 
1570
        self.build_tree(['a'])
 
1571
        wt.add(['a'], ['thefileid'])
 
1572
        revid = wt.commit("commit")
 
1573
        tree = wt.branch.repository.revision_tree(revid)
 
1574
        inv = mutable_inventory_from_tree(tree)
 
1575
        self.assertEqual(revid, inv.revision_id)
 
1576
        self.assertEqual(2, len(inv))
 
1577
        self.assertEqual("a", inv.get_entry('thefileid').name)
 
1578
        # The inventory should be mutable and independent of
 
1579
        # the original tree
 
1580
        self.assertFalse(tree.root_inventory.get_entry('thefileid').executable)
 
1581
        inv.get_entry('thefileid').executable = True
 
1582
        self.assertFalse(tree.root_inventory.get_entry('thefileid').executable)