/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_inv.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-08 23:30:31 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170608233031-3qavls2o7a1pqllj
Update imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from .. import (
 
19
    errors,
 
20
    osutils,
 
21
    repository,
 
22
    revision,
 
23
    tests,
 
24
    workingtree,
 
25
    )
 
26
from ..bzr import (
 
27
    chk_map,
 
28
    groupcompress,
 
29
    inventory,
 
30
    )
 
31
from ..bzr.inventory import (
 
32
    CHKInventory,
 
33
    Inventory,
 
34
    ROOT_ID,
 
35
    InventoryFile,
 
36
    InventoryDirectory,
 
37
    InventoryEntry,
 
38
    TreeReference,
 
39
    mutable_inventory_from_tree,
 
40
    )
 
41
from . import (
 
42
    TestCase,
 
43
    TestCaseWithTransport,
 
44
    )
 
45
from .scenarios import load_tests_apply_scenarios
 
46
 
 
47
 
 
48
load_tests = load_tests_apply_scenarios
 
49
 
 
50
 
 
51
def delta_application_scenarios():
 
52
    scenarios = [
 
53
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
54
        ]
 
55
    # Working tree basis delta application
 
56
    # Repository add_inv_by_delta.
 
57
    # Reduce form of the per_repository test logic - that logic needs to be
 
58
    # be able to get /just/ repositories whereas these tests are fine with
 
59
    # just creating trees.
 
60
    formats = set()
 
61
    for _, format in repository.format_registry.iteritems():
 
62
        if format.supports_full_versioned_files:
 
63
            scenarios.append((str(format.__name__), {
 
64
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
65
                'format':format}))
 
66
    for format in workingtree.format_registry._get_all():
 
67
        repo_fmt = format._matchingbzrdir.repository_format
 
68
        if not repo_fmt.supports_full_versioned_files:
 
69
            continue
 
70
        scenarios.append(
 
71
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
72
            'apply_delta':apply_inventory_WT_basis,
 
73
            'format':format}))
 
74
        scenarios.append(
 
75
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
76
            'apply_delta':apply_inventory_WT,
 
77
            'format':format}))
 
78
    return scenarios
 
79
 
 
80
 
 
81
def create_texts_for_inv(repo, inv):
 
82
    for path, ie in inv.iter_entries():
 
83
        if ie.text_size:
 
84
            lines = ['a' * ie.text_size]
 
85
        else:
 
86
            lines = []
 
87
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
88
 
 
89
 
 
90
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
91
    """Apply delta to basis and return the result.
 
92
 
 
93
    :param basis: An inventory to be used as the basis.
 
94
    :param delta: The inventory delta to apply:
 
95
    :return: An inventory resulting from the application.
 
96
    """
 
97
    basis.apply_delta(delta)
 
98
    return basis
 
99
 
 
100
 
 
101
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
102
    """Apply delta to basis and return the result.
 
103
 
 
104
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
105
 
 
106
    :param basis: An inventory to be used as the basis.
 
107
    :param delta: The inventory delta to apply:
 
108
    :return: An inventory resulting from the application.
 
109
    """
 
110
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
111
    control.create_repository()
 
112
    control.create_branch()
 
113
    tree = self.format.initialize(control)
 
114
    tree.lock_write()
 
115
    try:
 
116
        tree._write_inventory(basis)
 
117
    finally:
 
118
        tree.unlock()
 
119
    # Fresh object, reads disk again.
 
120
    tree = tree.bzrdir.open_workingtree()
 
121
    tree.lock_write()
 
122
    try:
 
123
        tree.apply_inventory_delta(delta)
 
124
    finally:
 
125
        tree.unlock()
 
126
    # reload tree - ensure we get what was written.
 
127
    tree = tree.bzrdir.open_workingtree()
 
128
    tree.lock_read()
 
129
    self.addCleanup(tree.unlock)
 
130
    if not invalid_delta:
 
131
        tree._validate()
 
132
    return tree.root_inventory
 
133
 
 
134
 
 
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
136
    repo.start_write_group()
 
137
    try:
 
138
        rev = revision.Revision('basis', timestamp=0, timezone=None,
 
139
            message="", committer="foo@example.com")
 
140
        basis.revision_id = 'basis'
 
141
        create_texts_for_inv(repo, basis)
 
142
        repo.add_revision('basis', rev, basis)
 
143
        if invalid_delta:
 
144
            # We don't want to apply the delta to the basis, because we expect
 
145
            # the delta is invalid.
 
146
            result_inv = basis
 
147
            result_inv.revision_id = 'result'
 
148
            target_entries = None
 
149
        else:
 
150
            result_inv = basis.create_by_apply_delta(delta, 'result')
 
151
            create_texts_for_inv(repo, result_inv)
 
152
            target_entries = list(result_inv.iter_entries_by_dir())
 
153
        rev = revision.Revision('result', timestamp=0, timezone=None,
 
154
            message="", committer="foo@example.com")
 
155
        repo.add_revision('result', rev, result_inv)
 
156
        repo.commit_write_group()
 
157
    except:
 
158
        repo.abort_write_group()
 
159
        raise
 
160
    return target_entries
 
161
 
 
162
 
 
163
def _get_basis_entries(tree):
 
164
    basis_tree = tree.basis_tree()
 
165
    basis_tree.lock_read()
 
166
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
167
    basis_tree.unlock()
 
168
    return basis_tree_entries
 
169
 
 
170
 
 
171
def _populate_different_tree(tree, basis, delta):
 
172
    """Put all entries into tree, but at a unique location."""
 
173
    added_ids = set()
 
174
    added_paths = set()
 
175
    tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
 
176
    for path, ie in basis.iter_entries_by_dir():
 
177
        if ie.file_id in added_ids:
 
178
            continue
 
179
        # We want a unique path for each of these, we use the file-id
 
180
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
181
        added_ids.add(ie.file_id)
 
182
    for old_path, new_path, file_id, ie in delta:
 
183
        if file_id in added_ids:
 
184
            continue
 
185
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
186
 
 
187
 
 
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
189
    """Apply delta to basis and return the result.
 
190
 
 
191
    This sets the parent and then calls update_basis_by_delta.
 
192
    It also puts the basis in the repository under both 'basis' and 'result' to
 
193
    allow safety checks made by the WT to succeed, and finally ensures that all
 
194
    items in the delta with a new path are present in the WT before calling
 
195
    update_basis_by_delta.
 
196
 
 
197
    :param basis: An inventory to be used as the basis.
 
198
    :param delta: The inventory delta to apply:
 
199
    :return: An inventory resulting from the application.
 
200
    """
 
201
    control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
 
202
    control.create_repository()
 
203
    control.create_branch()
 
204
    tree = test.format.initialize(control)
 
205
    tree.lock_write()
 
206
    try:
 
207
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
208
                                                delta, invalid_delta)
 
209
        # Set the basis state as the trees current state
 
210
        tree._write_inventory(basis)
 
211
        # This reads basis from the repo and puts it into the tree's local
 
212
        # cache, if it has one.
 
213
        tree.set_parent_ids(['basis'])
 
214
    finally:
 
215
        tree.unlock()
 
216
    # Fresh lock, reads disk again.
 
217
    tree.lock_write()
 
218
    try:
 
219
        tree.update_basis_by_delta('result', delta)
 
220
        if not invalid_delta:
 
221
            tree._validate()
 
222
    finally:
 
223
        tree.unlock()
 
224
    # reload tree - ensure we get what was written.
 
225
    tree = tree.bzrdir.open_workingtree()
 
226
    basis_tree = tree.basis_tree()
 
227
    basis_tree.lock_read()
 
228
    test.addCleanup(basis_tree.unlock)
 
229
    basis_inv = basis_tree.root_inventory
 
230
    if target_entries:
 
231
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
232
        test.assertEqual(target_entries, basis_entries)
 
233
    return basis_inv
 
234
 
 
235
 
 
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
237
                                                      invalid_delta=True):
 
238
    """Apply delta to basis and return the result.
 
239
    
 
240
    This inserts basis as a whole inventory and then uses
 
241
    add_inventory_by_delta to add delta.
 
242
 
 
243
    :param basis: An inventory to be used as the basis.
 
244
    :param delta: The inventory delta to apply:
 
245
    :return: An inventory resulting from the application.
 
246
    """
 
247
    format = self.format()
 
248
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
 
249
    repo = format.initialize(control)
 
250
    repo.lock_write()
 
251
    try:
 
252
        repo.start_write_group()
 
253
        try:
 
254
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
255
                message="", committer="foo@example.com")
 
256
            basis.revision_id = 'basis'
 
257
            create_texts_for_inv(repo, basis)
 
258
            repo.add_revision('basis', rev, basis)
 
259
            repo.commit_write_group()
 
260
        except:
 
261
            repo.abort_write_group()
 
262
            raise
 
263
    finally:
 
264
        repo.unlock()
 
265
    repo.lock_write()
 
266
    try:
 
267
        repo.start_write_group()
 
268
        try:
 
269
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
270
                'result', ['basis'])
 
271
        except:
 
272
            repo.abort_write_group()
 
273
            raise
 
274
        else:
 
275
            repo.commit_write_group()
 
276
    finally:
 
277
        repo.unlock()
 
278
    # Fresh lock, reads disk again.
 
279
    repo = repo.bzrdir.open_repository()
 
280
    repo.lock_read()
 
281
    self.addCleanup(repo.unlock)
 
282
    return repo.get_inventory('result')
 
283
 
 
284
 
 
285
class TestInventoryUpdates(TestCase):
 
286
 
 
287
    def test_creation_from_root_id(self):
 
288
        # iff a root id is passed to the constructor, a root directory is made
 
289
        inv = inventory.Inventory(root_id='tree-root')
 
290
        self.assertNotEqual(None, inv.root)
 
291
        self.assertEqual('tree-root', inv.root.file_id)
 
292
 
 
293
    def test_add_path_of_root(self):
 
294
        # if no root id is given at creation time, there is no root directory
 
295
        inv = inventory.Inventory(root_id=None)
 
296
        self.assertIs(None, inv.root)
 
297
        # add a root entry by adding its path
 
298
        ie = inv.add_path("", "directory", "my-root")
 
299
        ie.revision = 'test-rev'
 
300
        self.assertEqual("my-root", ie.file_id)
 
301
        self.assertIs(ie, inv.root)
 
302
 
 
303
    def test_add_path(self):
 
304
        inv = inventory.Inventory(root_id='tree_root')
 
305
        ie = inv.add_path('hello', 'file', 'hello-id')
 
306
        self.assertEqual('hello-id', ie.file_id)
 
307
        self.assertEqual('file', ie.kind)
 
308
 
 
309
    def test_copy(self):
 
310
        """Make sure copy() works and creates a deep copy."""
 
311
        inv = inventory.Inventory(root_id='some-tree-root')
 
312
        ie = inv.add_path('hello', 'file', 'hello-id')
 
313
        inv2 = inv.copy()
 
314
        inv.root.file_id = 'some-new-root'
 
315
        ie.name = 'file2'
 
316
        self.assertEqual('some-tree-root', inv2.root.file_id)
 
317
        self.assertEqual('hello', inv2['hello-id'].name)
 
318
 
 
319
    def test_copy_empty(self):
 
320
        """Make sure an empty inventory can be copied."""
 
321
        inv = inventory.Inventory(root_id=None)
 
322
        inv2 = inv.copy()
 
323
        self.assertIs(None, inv2.root)
 
324
 
 
325
    def test_copy_copies_root_revision(self):
 
326
        """Make sure the revision of the root gets copied."""
 
327
        inv = inventory.Inventory(root_id='someroot')
 
328
        inv.root.revision = 'therev'
 
329
        inv2 = inv.copy()
 
330
        self.assertEqual('someroot', inv2.root.file_id)
 
331
        self.assertEqual('therev', inv2.root.revision)
 
332
 
 
333
    def test_create_tree_reference(self):
 
334
        inv = inventory.Inventory('tree-root-123')
 
335
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
 
336
                              revision='rev', reference_revision='rev2'))
 
337
 
 
338
    def test_error_encoding(self):
 
339
        inv = inventory.Inventory('tree-root')
 
340
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
341
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
342
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
343
        self.assertContainsRe(str(e), r'\\u1234')
 
344
 
 
345
    def test_add_recursive(self):
 
346
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
347
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
348
        parent.children[child.file_id] = child
 
349
        inv = inventory.Inventory('tree-root')
 
350
        inv.add(parent)
 
351
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
352
 
 
353
 
 
354
 
 
355
class TestDeltaApplication(TestCaseWithTransport):
 
356
 
 
357
    scenarios = delta_application_scenarios()
 
358
 
 
359
    def get_empty_inventory(self, reference_inv=None):
 
360
        """Get an empty inventory.
 
361
 
 
362
        Note that tests should not depend on the revision of the root for
 
363
        setting up test conditions, as it has to be flexible to accomodate non
 
364
        rich root repositories.
 
365
 
 
366
        :param reference_inv: If not None, get the revision for the root from
 
367
            this inventory. This is useful for dealing with older repositories
 
368
            that routinely discarded the root entry data. If None, the root's
 
369
            revision is set to 'basis'.
 
370
        """
 
371
        inv = inventory.Inventory()
 
372
        if reference_inv is not None:
 
373
            inv.root.revision = reference_inv.root.revision
 
374
        else:
 
375
            inv.root.revision = 'basis'
 
376
        return inv
 
377
 
 
378
    def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
 
379
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
380
        ie_file.revision = 'result'
 
381
        ie_file.text_size = 0
 
382
        ie_file.text_sha1 = ''
 
383
        return ie_file
 
384
 
 
385
    def test_empty_delta(self):
 
386
        inv = self.get_empty_inventory()
 
387
        delta = []
 
388
        inv = self.apply_delta(self, inv, delta)
 
389
        inv2 = self.get_empty_inventory(inv)
 
390
        self.assertEqual([], inv2._make_delta(inv))
 
391
 
 
392
    def test_None_file_id(self):
 
393
        inv = self.get_empty_inventory()
 
394
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
395
        dir1.revision = 'result'
 
396
        delta = [(None, u'dir1', None, dir1)]
 
397
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
398
            inv, delta)
 
399
 
 
400
    def test_unicode_file_id(self):
 
401
        inv = self.get_empty_inventory()
 
402
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
403
        dir1.revision = 'result'
 
404
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
405
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
406
            inv, delta)
 
407
 
 
408
    def test_repeated_file_id(self):
 
409
        inv = self.get_empty_inventory()
 
410
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
411
        file1.revision = 'result'
 
412
        file1.text_size = 0
 
413
        file1.text_sha1 = ""
 
414
        file2 = file1.copy()
 
415
        file2.name = 'path2'
 
416
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
417
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
418
            inv, delta)
 
419
 
 
420
    def test_repeated_new_path(self):
 
421
        inv = self.get_empty_inventory()
 
422
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
423
        file1.revision = 'result'
 
424
        file1.text_size = 0
 
425
        file1.text_sha1 = ""
 
426
        file2 = file1.copy()
 
427
        file2.file_id = 'id2'
 
428
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
429
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
430
            inv, delta)
 
431
 
 
432
    def test_repeated_old_path(self):
 
433
        inv = self.get_empty_inventory()
 
434
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
435
        file1.revision = 'result'
 
436
        file1.text_size = 0
 
437
        file1.text_sha1 = ""
 
438
        # We can't *create* a source inventory with the same path, but
 
439
        # a badly generated partial delta might claim the same source twice.
 
440
        # This would be buggy in two ways: the path is repeated in the delta,
 
441
        # And the path for one of the file ids doesn't match the source
 
442
        # location. Alternatively, we could have a repeated fileid, but that
 
443
        # is separately checked for.
 
444
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
445
        file2.revision = 'result'
 
446
        file2.text_size = 0
 
447
        file2.text_sha1 = ""
 
448
        inv.add(file1)
 
449
        inv.add(file2)
 
450
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
451
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
452
            inv, delta)
 
453
 
 
454
    def test_mismatched_id_entry_id(self):
 
455
        inv = self.get_empty_inventory()
 
456
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
457
        file1.revision = 'result'
 
458
        file1.text_size = 0
 
459
        file1.text_sha1 = ""
 
460
        delta = [(None, u'path', 'id', file1)]
 
461
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
462
            inv, delta)
 
463
 
 
464
    def test_mismatched_new_path_entry_None(self):
 
465
        inv = self.get_empty_inventory()
 
466
        delta = [(None, u'path', 'id', None)]
 
467
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
468
            inv, delta)
 
469
 
 
470
    def test_mismatched_new_path_None_entry(self):
 
471
        inv = self.get_empty_inventory()
 
472
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
473
        file1.revision = 'result'
 
474
        file1.text_size = 0
 
475
        file1.text_sha1 = ""
 
476
        delta = [(u"path", None, 'id1', file1)]
 
477
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
478
            inv, delta)
 
479
 
 
480
    def test_parent_is_not_directory(self):
 
481
        inv = self.get_empty_inventory()
 
482
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
483
        file1.revision = 'result'
 
484
        file1.text_size = 0
 
485
        file1.text_sha1 = ""
 
486
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
487
        file2.revision = 'result'
 
488
        file2.text_size = 0
 
489
        file2.text_sha1 = ""
 
490
        inv.add(file1)
 
491
        delta = [(None, u'path/path2', 'id2', file2)]
 
492
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
493
            inv, delta)
 
494
 
 
495
    def test_parent_is_missing(self):
 
496
        inv = self.get_empty_inventory()
 
497
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
498
        file2.revision = 'result'
 
499
        file2.text_size = 0
 
500
        file2.text_sha1 = ""
 
501
        delta = [(None, u'path/path2', 'id2', file2)]
 
502
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
503
            inv, delta)
 
504
 
 
505
    def test_new_parent_path_has_wrong_id(self):
 
506
        inv = self.get_empty_inventory()
 
507
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
508
        parent1.revision = 'result'
 
509
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
510
        parent2.revision = 'result'
 
511
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
512
        file1.revision = 'result'
 
513
        file1.text_size = 0
 
514
        file1.text_sha1 = ""
 
515
        inv.add(parent1)
 
516
        inv.add(parent2)
 
517
        # This delta claims that file1 is at dir/path, but actually its at
 
518
        # dir2/path if you follow the inventory parent structure.
 
519
        delta = [(None, u'dir/path', 'id', file1)]
 
520
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
521
            inv, delta)
 
522
 
 
523
    def test_old_parent_path_is_wrong(self):
 
524
        inv = self.get_empty_inventory()
 
525
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
526
        parent1.revision = 'result'
 
527
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
528
        parent2.revision = 'result'
 
529
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
530
        file1.revision = 'result'
 
531
        file1.text_size = 0
 
532
        file1.text_sha1 = ""
 
533
        inv.add(parent1)
 
534
        inv.add(parent2)
 
535
        inv.add(file1)
 
536
        # This delta claims that file1 was at dir/path, but actually it was at
 
537
        # dir2/path if you follow the inventory parent structure.
 
538
        delta = [(u'dir/path', None, 'id', None)]
 
539
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
540
            inv, delta)
 
541
 
 
542
    def test_old_parent_path_is_for_other_id(self):
 
543
        inv = self.get_empty_inventory()
 
544
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
545
        parent1.revision = 'result'
 
546
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
547
        parent2.revision = 'result'
 
548
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
549
        file1.revision = 'result'
 
550
        file1.text_size = 0
 
551
        file1.text_sha1 = ""
 
552
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
553
        file2.revision = 'result'
 
554
        file2.text_size = 0
 
555
        file2.text_sha1 = ""
 
556
        inv.add(parent1)
 
557
        inv.add(parent2)
 
558
        inv.add(file1)
 
559
        inv.add(file2)
 
560
        # This delta claims that file1 was at dir/path, but actually it was at
 
561
        # dir2/path if you follow the inventory parent structure. At dir/path
 
562
        # is another entry we should not delete.
 
563
        delta = [(u'dir/path', None, 'id', None)]
 
564
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
565
            inv, delta)
 
566
 
 
567
    def test_add_existing_id_new_path(self):
 
568
        inv = self.get_empty_inventory()
 
569
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
570
        parent1.revision = 'result'
 
571
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
572
        parent2.revision = 'result'
 
573
        inv.add(parent1)
 
574
        delta = [(None, u'dir2', 'p-1', parent2)]
 
575
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
576
            inv, delta)
 
577
 
 
578
    def test_add_new_id_existing_path(self):
 
579
        inv = self.get_empty_inventory()
 
580
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
581
        parent1.revision = 'result'
 
582
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
583
        parent2.revision = 'result'
 
584
        inv.add(parent1)
 
585
        delta = [(None, u'dir1', 'p-2', parent2)]
 
586
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
587
            inv, delta)
 
588
 
 
589
    def test_remove_dir_leaving_dangling_child(self):
 
590
        inv = self.get_empty_inventory()
 
591
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
592
        dir1.revision = 'result'
 
593
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
594
        dir2.revision = 'result'
 
595
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
596
        dir3.revision = 'result'
 
597
        inv.add(dir1)
 
598
        inv.add(dir2)
 
599
        inv.add(dir3)
 
600
        delta = [(u'dir1', None, 'p-1', None),
 
601
            (u'dir1/child2', None, 'p-3', None)]
 
602
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
603
            inv, delta)
 
604
 
 
605
    def test_add_file(self):
 
606
        inv = self.get_empty_inventory()
 
607
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
608
        file1.revision = 'result'
 
609
        file1.text_size = 0
 
610
        file1.text_sha1 = ''
 
611
        delta = [(None, u'path', 'file-id', file1)]
 
612
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
613
        self.assertEqual('file-id', res_inv['file-id'].file_id)
 
614
 
 
615
    def test_remove_file(self):
 
616
        inv = self.get_empty_inventory()
 
617
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
618
        file1.revision = 'result'
 
619
        file1.text_size = 0
 
620
        file1.text_sha1 = ''
 
621
        inv.add(file1)
 
622
        delta = [(u'path', None, 'file-id', None)]
 
623
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
624
        self.assertEqual(None, res_inv.path2id('path'))
 
625
        self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
 
626
 
 
627
    def test_rename_file(self):
 
628
        inv = self.get_empty_inventory()
 
629
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
630
        inv.add(file1)
 
631
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
632
        delta = [(u'path', 'path2', 'file-id', file2)]
 
633
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
634
        self.assertEqual(None, res_inv.path2id('path'))
 
635
        self.assertEqual('file-id', res_inv.path2id('path2'))
 
636
 
 
637
    def test_replaced_at_new_path(self):
 
638
        inv = self.get_empty_inventory()
 
639
        file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
 
640
        inv.add(file1)
 
641
        file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
 
642
        delta = [(u'name', None, 'id1', None),
 
643
                 (None, u'name', 'id2', file2)]
 
644
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
645
        self.assertEqual('id2', res_inv.path2id('name'))
 
646
 
 
647
    def test_rename_dir(self):
 
648
        inv = self.get_empty_inventory()
 
649
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
650
        dir1.revision = 'basis'
 
651
        file1 = self.make_file_ie(parent_id='dir-id')
 
652
        inv.add(dir1)
 
653
        inv.add(file1)
 
654
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
655
        dir2.revision = 'result'
 
656
        delta = [('dir1', 'dir2', 'dir-id', dir2)]
 
657
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
658
        # The file should be accessible under the new path
 
659
        self.assertEqual('file-id', res_inv.path2id('dir2/name'))
 
660
 
 
661
    def test_renamed_dir_with_renamed_child(self):
 
662
        inv = self.get_empty_inventory()
 
663
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
664
        dir1.revision = 'basis'
 
665
        file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
 
666
        file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
 
667
        inv.add(dir1)
 
668
        inv.add(file1)
 
669
        inv.add(file2)
 
670
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
671
        dir2.revision = 'result'
 
672
        file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
 
673
        delta = [('dir1', 'dir2', 'dir-id', dir2),
 
674
                 ('dir1/name2', 'name2', 'file-id-2', file2b)]
 
675
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
676
        # The file should be accessible under the new path
 
677
        self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
 
678
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
679
        self.assertEqual('file-id-2', res_inv.path2id('name2'))
 
680
 
 
681
    def test_is_root(self):
 
682
        """Ensure our root-checking code is accurate."""
 
683
        inv = inventory.Inventory('TREE_ROOT')
 
684
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
685
        self.assertFalse(inv.is_root('booga'))
 
686
        inv.root.file_id = 'booga'
 
687
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
688
        self.assertTrue(inv.is_root('booga'))
 
689
        # works properly even if no root is set
 
690
        inv.root = None
 
691
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
692
        self.assertFalse(inv.is_root('booga'))
 
693
 
 
694
    def test_entries_for_empty_inventory(self):
 
695
        """Test that entries() will not fail for an empty inventory"""
 
696
        inv = Inventory(root_id=None)
 
697
        self.assertEqual([], inv.entries())
 
698
 
 
699
 
 
700
class TestInventoryEntry(TestCase):
 
701
 
 
702
    def test_file_kind_character(self):
 
703
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
704
        self.assertEqual(file.kind_character(), '')
 
705
 
 
706
    def test_dir_kind_character(self):
 
707
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
708
        self.assertEqual(dir.kind_character(), '/')
 
709
 
 
710
    def test_link_kind_character(self):
 
711
        dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
712
        self.assertEqual(dir.kind_character(), '')
 
713
 
 
714
    def test_dir_detect_changes(self):
 
715
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
716
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
717
        self.assertEqual((False, False), left.detect_changes(right))
 
718
        self.assertEqual((False, False), right.detect_changes(left))
 
719
 
 
720
    def test_file_detect_changes(self):
 
721
        left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
722
        left.text_sha1 = 123
 
723
        right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
724
        right.text_sha1 = 123
 
725
        self.assertEqual((False, False), left.detect_changes(right))
 
726
        self.assertEqual((False, False), right.detect_changes(left))
 
727
        left.executable = True
 
728
        self.assertEqual((False, True), left.detect_changes(right))
 
729
        self.assertEqual((False, True), right.detect_changes(left))
 
730
        right.text_sha1 = 321
 
731
        self.assertEqual((True, True), left.detect_changes(right))
 
732
        self.assertEqual((True, True), right.detect_changes(left))
 
733
 
 
734
    def test_symlink_detect_changes(self):
 
735
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
736
        left.symlink_target='foo'
 
737
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
738
        right.symlink_target='foo'
 
739
        self.assertEqual((False, False), left.detect_changes(right))
 
740
        self.assertEqual((False, False), right.detect_changes(left))
 
741
        left.symlink_target = 'different'
 
742
        self.assertEqual((True, False), left.detect_changes(right))
 
743
        self.assertEqual((True, False), right.detect_changes(left))
 
744
 
 
745
    def test_file_has_text(self):
 
746
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
747
        self.assertTrue(file.has_text())
 
748
 
 
749
    def test_directory_has_text(self):
 
750
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
751
        self.assertFalse(dir.has_text())
 
752
 
 
753
    def test_link_has_text(self):
 
754
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
755
        self.assertFalse(link.has_text())
 
756
 
 
757
    def test_make_entry(self):
 
758
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
759
            inventory.InventoryFile)
 
760
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
761
            inventory.InventoryLink)
 
762
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
763
            inventory.InventoryDirectory)
 
764
 
 
765
    def test_make_entry_non_normalized(self):
 
766
        orig_normalized_filename = osutils.normalized_filename
 
767
 
 
768
        try:
 
769
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
770
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
771
            self.assertEqual(u'\xe5', entry.name)
 
772
            self.assertIsInstance(entry, inventory.InventoryFile)
 
773
 
 
774
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
775
            self.assertRaises(errors.InvalidNormalization,
 
776
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
777
        finally:
 
778
            osutils.normalized_filename = orig_normalized_filename
 
779
 
 
780
 
 
781
class TestDescribeChanges(TestCase):
 
782
 
 
783
    def test_describe_change(self):
 
784
        # we need to test the following change combinations:
 
785
        # rename
 
786
        # reparent
 
787
        # modify
 
788
        # gone
 
789
        # added
 
790
        # renamed/reparented and modified
 
791
        # change kind (perhaps can't be done yet?)
 
792
        # also, merged in combination with all of these?
 
793
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
794
        old_a.text_sha1 = '123132'
 
795
        old_a.text_size = 0
 
796
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
797
        new_a.text_sha1 = '123132'
 
798
        new_a.text_size = 0
 
799
 
 
800
        self.assertChangeDescription('unchanged', old_a, new_a)
 
801
 
 
802
        new_a.text_size = 10
 
803
        new_a.text_sha1 = 'abcabc'
 
804
        self.assertChangeDescription('modified', old_a, new_a)
 
805
 
 
806
        self.assertChangeDescription('added', None, new_a)
 
807
        self.assertChangeDescription('removed', old_a, None)
 
808
        # perhaps a bit questionable but seems like the most reasonable thing...
 
809
        self.assertChangeDescription('unchanged', None, None)
 
810
 
 
811
        # in this case it's both renamed and modified; show a rename and
 
812
        # modification:
 
813
        new_a.name = 'newfilename'
 
814
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
815
 
 
816
        # reparenting is 'renaming'
 
817
        new_a.name = old_a.name
 
818
        new_a.parent_id = 'somedir-id'
 
819
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
820
 
 
821
        # reset the content values so its not modified
 
822
        new_a.text_size = old_a.text_size
 
823
        new_a.text_sha1 = old_a.text_sha1
 
824
        new_a.name = old_a.name
 
825
 
 
826
        new_a.name = 'newfilename'
 
827
        self.assertChangeDescription('renamed', old_a, new_a)
 
828
 
 
829
        # reparenting is 'renaming'
 
830
        new_a.name = old_a.name
 
831
        new_a.parent_id = 'somedir-id'
 
832
        self.assertChangeDescription('renamed', old_a, new_a)
 
833
 
 
834
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
835
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
836
        self.assertEqual(expected_change, change)
 
837
 
 
838
 
 
839
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
840
 
 
841
    def get_chk_bytes(self):
 
842
        factory = groupcompress.make_pack_factory(True, True, 1)
 
843
        trans = self.get_transport('')
 
844
        return factory(trans)
 
845
 
 
846
    def read_bytes(self, chk_bytes, key):
 
847
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
848
        return stream.next().get_bytes_as("fulltext")
 
849
 
 
850
    def test_deserialise_gives_CHKInventory(self):
 
851
        inv = Inventory()
 
852
        inv.revision_id = "revid"
 
853
        inv.root.revision = "rootrev"
 
854
        chk_bytes = self.get_chk_bytes()
 
855
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
856
        bytes = ''.join(chk_inv.to_lines())
 
857
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
858
        self.assertEqual("revid", new_inv.revision_id)
 
859
        self.assertEqual("directory", new_inv.root.kind)
 
860
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
861
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
862
        self.assertEqual(inv.root.name, new_inv.root.name)
 
863
        self.assertEqual("rootrev", new_inv.root.revision)
 
864
        self.assertEqual('plain', new_inv._search_key_name)
 
865
 
 
866
    def test_deserialise_wrong_revid(self):
 
867
        inv = Inventory()
 
868
        inv.revision_id = "revid"
 
869
        inv.root.revision = "rootrev"
 
870
        chk_bytes = self.get_chk_bytes()
 
871
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
872
        bytes = ''.join(chk_inv.to_lines())
 
873
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
874
            bytes, ("revid2",))
 
875
 
 
876
    def test_captures_rev_root_byid(self):
 
877
        inv = Inventory()
 
878
        inv.revision_id = "foo"
 
879
        inv.root.revision = "bar"
 
880
        chk_bytes = self.get_chk_bytes()
 
881
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
882
        lines = chk_inv.to_lines()
 
883
        self.assertEqual([
 
884
            'chkinventory:\n',
 
885
            'revision_id: foo\n',
 
886
            'root_id: TREE_ROOT\n',
 
887
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
888
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
889
            ], lines)
 
890
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
891
        self.assertEqual('plain', chk_inv._search_key_name)
 
892
 
 
893
    def test_captures_parent_id_basename_index(self):
 
894
        inv = Inventory()
 
895
        inv.revision_id = "foo"
 
896
        inv.root.revision = "bar"
 
897
        chk_bytes = self.get_chk_bytes()
 
898
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
899
        lines = chk_inv.to_lines()
 
900
        self.assertEqual([
 
901
            'chkinventory:\n',
 
902
            'revision_id: foo\n',
 
903
            'root_id: TREE_ROOT\n',
 
904
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
905
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
906
            ], lines)
 
907
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
908
        self.assertEqual('plain', chk_inv._search_key_name)
 
909
 
 
910
    def test_captures_search_key_name(self):
 
911
        inv = Inventory()
 
912
        inv.revision_id = "foo"
 
913
        inv.root.revision = "bar"
 
914
        chk_bytes = self.get_chk_bytes()
 
915
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
916
                                              search_key_name='hash-16-way')
 
917
        lines = chk_inv.to_lines()
 
918
        self.assertEqual([
 
919
            'chkinventory:\n',
 
920
            'search_key_name: hash-16-way\n',
 
921
            'root_id: TREE_ROOT\n',
 
922
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
923
            'revision_id: foo\n',
 
924
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
925
            ], lines)
 
926
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
927
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
928
 
 
929
    def test_directory_children_on_demand(self):
 
930
        inv = Inventory()
 
931
        inv.revision_id = "revid"
 
932
        inv.root.revision = "rootrev"
 
933
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
934
        inv["fileid"].revision = "filerev"
 
935
        inv["fileid"].executable = True
 
936
        inv["fileid"].text_sha1 = "ffff"
 
937
        inv["fileid"].text_size = 1
 
938
        chk_bytes = self.get_chk_bytes()
 
939
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
940
        bytes = ''.join(chk_inv.to_lines())
 
941
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
942
        root_entry = new_inv[inv.root.file_id]
 
943
        self.assertEqual(None, root_entry._children)
 
944
        self.assertEqual({'file'}, set(root_entry.children))
 
945
        file_direct = new_inv["fileid"]
 
946
        file_found = root_entry.children['file']
 
947
        self.assertEqual(file_direct.kind, file_found.kind)
 
948
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
949
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
950
        self.assertEqual(file_direct.name, file_found.name)
 
951
        self.assertEqual(file_direct.revision, file_found.revision)
 
952
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
953
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
954
        self.assertEqual(file_direct.executable, file_found.executable)
 
955
 
 
956
    def test_from_inventory_maximum_size(self):
 
957
        # from_inventory supports the maximum_size parameter.
 
958
        inv = Inventory()
 
959
        inv.revision_id = "revid"
 
960
        inv.root.revision = "rootrev"
 
961
        chk_bytes = self.get_chk_bytes()
 
962
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
963
        chk_inv.id_to_entry._ensure_root()
 
964
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
965
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
966
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
967
        p_id_basename._ensure_root()
 
968
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
969
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
970
 
 
971
    def test___iter__(self):
 
972
        inv = Inventory()
 
973
        inv.revision_id = "revid"
 
974
        inv.root.revision = "rootrev"
 
975
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
976
        inv["fileid"].revision = "filerev"
 
977
        inv["fileid"].executable = True
 
978
        inv["fileid"].text_sha1 = "ffff"
 
979
        inv["fileid"].text_size = 1
 
980
        chk_bytes = self.get_chk_bytes()
 
981
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
982
        bytes = ''.join(chk_inv.to_lines())
 
983
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
984
        fileids = sorted(new_inv.__iter__())
 
985
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
986
 
 
987
    def test__len__(self):
 
988
        inv = Inventory()
 
989
        inv.revision_id = "revid"
 
990
        inv.root.revision = "rootrev"
 
991
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
992
        inv["fileid"].revision = "filerev"
 
993
        inv["fileid"].executable = True
 
994
        inv["fileid"].text_sha1 = "ffff"
 
995
        inv["fileid"].text_size = 1
 
996
        chk_bytes = self.get_chk_bytes()
 
997
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
998
        self.assertEqual(2, len(chk_inv))
 
999
 
 
1000
    def test___getitem__(self):
 
1001
        inv = Inventory()
 
1002
        inv.revision_id = "revid"
 
1003
        inv.root.revision = "rootrev"
 
1004
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1005
        inv["fileid"].revision = "filerev"
 
1006
        inv["fileid"].executable = True
 
1007
        inv["fileid"].text_sha1 = "ffff"
 
1008
        inv["fileid"].text_size = 1
 
1009
        chk_bytes = self.get_chk_bytes()
 
1010
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1011
        bytes = ''.join(chk_inv.to_lines())
 
1012
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1013
        root_entry = new_inv[inv.root.file_id]
 
1014
        file_entry = new_inv["fileid"]
 
1015
        self.assertEqual("directory", root_entry.kind)
 
1016
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
1017
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
1018
        self.assertEqual(inv.root.name, root_entry.name)
 
1019
        self.assertEqual("rootrev", root_entry.revision)
 
1020
        self.assertEqual("file", file_entry.kind)
 
1021
        self.assertEqual("fileid", file_entry.file_id)
 
1022
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
1023
        self.assertEqual("file", file_entry.name)
 
1024
        self.assertEqual("filerev", file_entry.revision)
 
1025
        self.assertEqual("ffff", file_entry.text_sha1)
 
1026
        self.assertEqual(1, file_entry.text_size)
 
1027
        self.assertEqual(True, file_entry.executable)
 
1028
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
 
1029
 
 
1030
    def test_has_id_true(self):
 
1031
        inv = Inventory()
 
1032
        inv.revision_id = "revid"
 
1033
        inv.root.revision = "rootrev"
 
1034
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1035
        inv["fileid"].revision = "filerev"
 
1036
        inv["fileid"].executable = True
 
1037
        inv["fileid"].text_sha1 = "ffff"
 
1038
        inv["fileid"].text_size = 1
 
1039
        chk_bytes = self.get_chk_bytes()
 
1040
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1041
        self.assertTrue(chk_inv.has_id('fileid'))
 
1042
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
1043
 
 
1044
    def test_has_id_not(self):
 
1045
        inv = Inventory()
 
1046
        inv.revision_id = "revid"
 
1047
        inv.root.revision = "rootrev"
 
1048
        chk_bytes = self.get_chk_bytes()
 
1049
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1050
        self.assertFalse(chk_inv.has_id('fileid'))
 
1051
 
 
1052
    def test_id2path(self):
 
1053
        inv = Inventory()
 
1054
        inv.revision_id = "revid"
 
1055
        inv.root.revision = "rootrev"
 
1056
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
1057
        fileentry = InventoryFile("fileid", "file", "dirid")
 
1058
        inv.add(direntry)
 
1059
        inv.add(fileentry)
 
1060
        inv["fileid"].revision = "filerev"
 
1061
        inv["fileid"].executable = True
 
1062
        inv["fileid"].text_sha1 = "ffff"
 
1063
        inv["fileid"].text_size = 1
 
1064
        inv["dirid"].revision = "filerev"
 
1065
        chk_bytes = self.get_chk_bytes()
 
1066
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1067
        bytes = ''.join(chk_inv.to_lines())
 
1068
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1069
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
1070
        self.assertEqual('dir', new_inv.id2path('dirid'))
 
1071
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
 
1072
 
 
1073
    def test_path2id(self):
 
1074
        inv = Inventory()
 
1075
        inv.revision_id = "revid"
 
1076
        inv.root.revision = "rootrev"
 
1077
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
1078
        fileentry = InventoryFile("fileid", "file", "dirid")
 
1079
        inv.add(direntry)
 
1080
        inv.add(fileentry)
 
1081
        inv["fileid"].revision = "filerev"
 
1082
        inv["fileid"].executable = True
 
1083
        inv["fileid"].text_sha1 = "ffff"
 
1084
        inv["fileid"].text_size = 1
 
1085
        inv["dirid"].revision = "filerev"
 
1086
        chk_bytes = self.get_chk_bytes()
 
1087
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1088
        bytes = ''.join(chk_inv.to_lines())
 
1089
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1090
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
1091
        self.assertEqual('dirid', new_inv.path2id('dir'))
 
1092
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
 
1093
 
 
1094
    def test_create_by_apply_delta_sets_root(self):
 
1095
        inv = Inventory()
 
1096
        inv.revision_id = "revid"
 
1097
        chk_bytes = self.get_chk_bytes()
 
1098
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1099
        inv.add_path("", "directory", "myrootid", None)
 
1100
        inv.revision_id = "expectedid"
 
1101
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1102
        delta = [("", None, base_inv.root.file_id, None),
 
1103
            (None, "",  "myrootid", inv.root)]
 
1104
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1105
        self.assertEqual(reference_inv.root, new_inv.root)
 
1106
 
 
1107
    def test_create_by_apply_delta_empty_add_child(self):
 
1108
        inv = Inventory()
 
1109
        inv.revision_id = "revid"
 
1110
        inv.root.revision = "rootrev"
 
1111
        chk_bytes = self.get_chk_bytes()
 
1112
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1113
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1114
        a_entry.revision = "filerev"
 
1115
        a_entry.executable = True
 
1116
        a_entry.text_sha1 = "ffff"
 
1117
        a_entry.text_size = 1
 
1118
        inv.add(a_entry)
 
1119
        inv.revision_id = "expectedid"
 
1120
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1121
        delta = [(None, "A",  "A-id", a_entry)]
 
1122
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1123
        # new_inv should be the same as reference_inv.
 
1124
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1125
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1126
        reference_inv.id_to_entry._ensure_root()
 
1127
        new_inv.id_to_entry._ensure_root()
 
1128
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1129
            new_inv.id_to_entry._root_node._key)
 
1130
 
 
1131
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1132
        inv = Inventory()
 
1133
        inv.revision_id = "revid"
 
1134
        inv.root.revision = "rootrev"
 
1135
        chk_bytes = self.get_chk_bytes()
 
1136
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1137
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1138
        a_entry.revision = "filerev"
 
1139
        a_entry.executable = True
 
1140
        a_entry.text_sha1 = "ffff"
 
1141
        a_entry.text_size = 1
 
1142
        inv.add(a_entry)
 
1143
        inv.revision_id = "expectedid"
 
1144
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1145
        delta = [(None, "A",  "A-id", a_entry)]
 
1146
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1147
        reference_inv.id_to_entry._ensure_root()
 
1148
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1149
        new_inv.id_to_entry._ensure_root()
 
1150
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1151
        # new_inv should be the same as reference_inv.
 
1152
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1153
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1154
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1155
            new_inv.id_to_entry._root_node._key)
 
1156
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1157
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1158
 
 
1159
    def test_iter_changes(self):
 
1160
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1161
        # InterTree are coming.
 
1162
        inv = Inventory()
 
1163
        inv.revision_id = "revid"
 
1164
        inv.root.revision = "rootrev"
 
1165
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1166
        inv["fileid"].revision = "filerev"
 
1167
        inv["fileid"].executable = True
 
1168
        inv["fileid"].text_sha1 = "ffff"
 
1169
        inv["fileid"].text_size = 1
 
1170
        inv2 = Inventory()
 
1171
        inv2.revision_id = "revid2"
 
1172
        inv2.root.revision = "rootrev"
 
1173
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1174
        inv2["fileid"].revision = "filerev2"
 
1175
        inv2["fileid"].executable = False
 
1176
        inv2["fileid"].text_sha1 = "bbbb"
 
1177
        inv2["fileid"].text_size = 2
 
1178
        # get fresh objects.
 
1179
        chk_bytes = self.get_chk_bytes()
 
1180
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1181
        bytes = ''.join(chk_inv.to_lines())
 
1182
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1183
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1184
        bytes = ''.join(chk_inv2.to_lines())
 
1185
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
1186
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
1187
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1188
            (False, True))],
 
1189
            list(inv_1.iter_changes(inv_2)))
 
1190
 
 
1191
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1192
        inv = Inventory()
 
1193
        inv.revision_id = "revid"
 
1194
        inv.root.revision = "rootrev"
 
1195
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1196
        inv["fileid"].revision = "filerev"
 
1197
        inv["fileid"].executable = True
 
1198
        inv["fileid"].text_sha1 = "ffff"
 
1199
        inv["fileid"].text_size = 1
 
1200
        # get fresh objects.
 
1201
        chk_bytes = self.get_chk_bytes()
 
1202
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1203
        bytes = ''.join(tmp_inv.to_lines())
 
1204
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1205
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1206
        self.assertEqual(
 
1207
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
1208
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1209
 
 
1210
    def test_file_entry_to_bytes(self):
 
1211
        inv = CHKInventory(None)
 
1212
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
 
1213
        ie.executable = True
 
1214
        ie.revision = 'file-rev-id'
 
1215
        ie.text_sha1 = 'abcdefgh'
 
1216
        ie.text_size = 100
 
1217
        bytes = inv._entry_to_bytes(ie)
 
1218
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
 
1219
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1220
        ie2 = inv._bytes_to_entry(bytes)
 
1221
        self.assertEqual(ie, ie2)
 
1222
        self.assertIsInstance(ie2.name, unicode)
 
1223
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
 
1224
                         inv._bytes_to_utf8name_key(bytes))
 
1225
 
 
1226
    def test_file2_entry_to_bytes(self):
 
1227
        inv = CHKInventory(None)
 
1228
        # \u30a9 == 'omega'
 
1229
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
 
1230
        ie.executable = False
 
1231
        ie.revision = 'file-rev-id'
 
1232
        ie.text_sha1 = '123456'
 
1233
        ie.text_size = 25
 
1234
        bytes = inv._entry_to_bytes(ie)
 
1235
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
 
1236
                         'file-rev-id\n123456\n25\nN', bytes)
 
1237
        ie2 = inv._bytes_to_entry(bytes)
 
1238
        self.assertEqual(ie, ie2)
 
1239
        self.assertIsInstance(ie2.name, unicode)
 
1240
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
 
1241
                         inv._bytes_to_utf8name_key(bytes))
 
1242
 
 
1243
    def test_dir_entry_to_bytes(self):
 
1244
        inv = CHKInventory(None)
 
1245
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
 
1246
        ie.revision = 'dir-rev-id'
 
1247
        bytes = inv._entry_to_bytes(ie)
 
1248
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1249
        ie2 = inv._bytes_to_entry(bytes)
 
1250
        self.assertEqual(ie, ie2)
 
1251
        self.assertIsInstance(ie2.name, unicode)
 
1252
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
 
1253
                         inv._bytes_to_utf8name_key(bytes))
 
1254
 
 
1255
    def test_dir2_entry_to_bytes(self):
 
1256
        inv = CHKInventory(None)
 
1257
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
 
1258
                                          None)
 
1259
        ie.revision = 'dir-rev-id'
 
1260
        bytes = inv._entry_to_bytes(ie)
 
1261
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
 
1262
                         'dir-rev-id', bytes)
 
1263
        ie2 = inv._bytes_to_entry(bytes)
 
1264
        self.assertEqual(ie, ie2)
 
1265
        self.assertIsInstance(ie2.name, unicode)
 
1266
        self.assertIs(ie2.parent_id, None)
 
1267
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
 
1268
                         inv._bytes_to_utf8name_key(bytes))
 
1269
 
 
1270
    def test_symlink_entry_to_bytes(self):
 
1271
        inv = CHKInventory(None)
 
1272
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
 
1273
        ie.revision = 'link-rev-id'
 
1274
        ie.symlink_target = u'target/path'
 
1275
        bytes = inv._entry_to_bytes(ie)
 
1276
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
 
1277
                         'link-rev-id\ntarget/path', bytes)
 
1278
        ie2 = inv._bytes_to_entry(bytes)
 
1279
        self.assertEqual(ie, ie2)
 
1280
        self.assertIsInstance(ie2.name, unicode)
 
1281
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1282
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
 
1283
                         inv._bytes_to_utf8name_key(bytes))
 
1284
 
 
1285
    def test_symlink2_entry_to_bytes(self):
 
1286
        inv = CHKInventory(None)
 
1287
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
 
1288
        ie.revision = 'link-rev-id'
 
1289
        ie.symlink_target = u'target/\u03a9path'
 
1290
        bytes = inv._entry_to_bytes(ie)
 
1291
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1292
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1293
        ie2 = inv._bytes_to_entry(bytes)
 
1294
        self.assertEqual(ie, ie2)
 
1295
        self.assertIsInstance(ie2.name, unicode)
 
1296
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1297
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
 
1298
                         inv._bytes_to_utf8name_key(bytes))
 
1299
 
 
1300
    def test_tree_reference_entry_to_bytes(self):
 
1301
        inv = CHKInventory(None)
 
1302
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
 
1303
                                     'parent-id')
 
1304
        ie.revision = 'tree-rev-id'
 
1305
        ie.reference_revision = 'ref-rev-id'
 
1306
        bytes = inv._entry_to_bytes(ie)
 
1307
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1308
                         'tree-rev-id\nref-rev-id', bytes)
 
1309
        ie2 = inv._bytes_to_entry(bytes)
 
1310
        self.assertEqual(ie, ie2)
 
1311
        self.assertIsInstance(ie2.name, unicode)
 
1312
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
 
1313
                         inv._bytes_to_utf8name_key(bytes))
 
1314
 
 
1315
    def make_basic_utf8_inventory(self):
 
1316
        inv = Inventory()
 
1317
        inv.revision_id = "revid"
 
1318
        inv.root.revision = "rootrev"
 
1319
        root_id = inv.root.file_id
 
1320
        inv.add(InventoryFile("fileid", u'f\xefle', root_id))
 
1321
        inv["fileid"].revision = "filerev"
 
1322
        inv["fileid"].text_sha1 = "ffff"
 
1323
        inv["fileid"].text_size = 0
 
1324
        inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
 
1325
        inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
 
1326
        inv["childid"].revision = "filerev"
 
1327
        inv["childid"].text_sha1 = "ffff"
 
1328
        inv["childid"].text_size = 0
 
1329
        chk_bytes = self.get_chk_bytes()
 
1330
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1331
        bytes = ''.join(chk_inv.to_lines())
 
1332
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1333
 
 
1334
    def test__preload_handles_utf8(self):
 
1335
        new_inv = self.make_basic_utf8_inventory()
 
1336
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1337
        self.assertFalse(new_inv._fully_cached)
 
1338
        new_inv._preload_cache()
 
1339
        self.assertEqual(
 
1340
            sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
 
1341
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1342
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1343
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1344
                         sorted(ie_root._children.keys()))
 
1345
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1346
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1347
 
 
1348
    def test__preload_populates_cache(self):
 
1349
        inv = Inventory()
 
1350
        inv.revision_id = "revid"
 
1351
        inv.root.revision = "rootrev"
 
1352
        root_id = inv.root.file_id
 
1353
        inv.add(InventoryFile("fileid", "file", root_id))
 
1354
        inv["fileid"].revision = "filerev"
 
1355
        inv["fileid"].executable = True
 
1356
        inv["fileid"].text_sha1 = "ffff"
 
1357
        inv["fileid"].text_size = 1
 
1358
        inv.add(InventoryDirectory("dirid", "dir", root_id))
 
1359
        inv.add(InventoryFile("childid", "child", "dirid"))
 
1360
        inv["childid"].revision = "filerev"
 
1361
        inv["childid"].executable = False
 
1362
        inv["childid"].text_sha1 = "dddd"
 
1363
        inv["childid"].text_size = 1
 
1364
        chk_bytes = self.get_chk_bytes()
 
1365
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1366
        bytes = ''.join(chk_inv.to_lines())
 
1367
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1368
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1369
        self.assertFalse(new_inv._fully_cached)
 
1370
        new_inv._preload_cache()
 
1371
        self.assertEqual(
 
1372
            sorted([root_id, "fileid", "dirid", "childid"]),
 
1373
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1374
        self.assertTrue(new_inv._fully_cached)
 
1375
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1376
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1377
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1378
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1379
 
 
1380
    def test__preload_handles_partially_evaluated_inventory(self):
 
1381
        new_inv = self.make_basic_utf8_inventory()
 
1382
        ie = new_inv[new_inv.root_id]
 
1383
        self.assertIs(None, ie._children)
 
1384
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1385
                         sorted(ie.children.keys()))
 
1386
        # Accessing .children loads _children
 
1387
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1388
                         sorted(ie._children.keys()))
 
1389
        new_inv._preload_cache()
 
1390
        # No change
 
1391
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1392
                         sorted(ie._children.keys()))
 
1393
        ie_dir = new_inv["dirid"]
 
1394
        self.assertEqual([u'ch\xefld'],
 
1395
                         sorted(ie_dir._children.keys()))
 
1396
 
 
1397
    def test_filter_change_in_renamed_subfolder(self):
 
1398
        inv = Inventory('tree-root')
 
1399
        src_ie = inv.add_path('src', 'directory', 'src-id')
 
1400
        inv.add_path('src/sub/', 'directory', 'sub-id')
 
1401
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
 
1402
        a_ie.text_sha1 = osutils.sha_string('content\n')
 
1403
        a_ie.text_size = len('content\n')
 
1404
        chk_bytes = self.get_chk_bytes()
 
1405
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1406
        inv = inv.create_by_apply_delta([
 
1407
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
 
1408
            ("src", "src2", "src-id", src_ie),
 
1409
            ], 'new-rev-2')
 
1410
        new_inv = inv.filter(['a-id', 'src-id'])
 
1411
        self.assertEqual([
 
1412
            ('', 'tree-root'),
 
1413
            ('src', 'src-id'),
 
1414
            ('src/sub', 'sub-id'),
 
1415
            ('src/sub/a', 'a-id'),
 
1416
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1417
 
 
1418
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1419
 
 
1420
    def get_chk_bytes(self):
 
1421
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1422
        trans = self.get_transport('')
 
1423
        return factory(trans)
 
1424
 
 
1425
    def make_dir(self, inv, name, parent_id):
 
1426
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1427
 
 
1428
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1429
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1430
        ie.text_sha1 = osutils.sha_string(content)
 
1431
        ie.text_size = len(content)
 
1432
        inv.add(ie)
 
1433
 
 
1434
    def make_simple_inventory(self):
 
1435
        inv = Inventory('TREE_ROOT')
 
1436
        inv.revision_id = "revid"
 
1437
        inv.root.revision = "rootrev"
 
1438
        # /                 TREE_ROOT
 
1439
        # dir1/             dir1-id
 
1440
        #   sub-file1       sub-file1-id
 
1441
        #   sub-file2       sub-file2-id
 
1442
        #   sub-dir1/       sub-dir1-id
 
1443
        #     subsub-file1  subsub-file1-id
 
1444
        # dir2/             dir2-id
 
1445
        #   sub2-file1      sub2-file1-id
 
1446
        # top               top-id
 
1447
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1448
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1449
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1450
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1451
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1452
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1453
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1454
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1455
        chk_bytes = self.get_chk_bytes()
 
1456
        #  use a small maximum_size to force internal paging structures
 
1457
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1458
                        maximum_size=100,
 
1459
                        search_key_name='hash-255-way')
 
1460
        bytes = ''.join(chk_inv.to_lines())
 
1461
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1462
 
 
1463
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1464
        self.assertEqual(sorted(expected_fileids),
 
1465
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1466
 
 
1467
    def assertExpand(self, all_ids, inv, file_ids):
 
1468
        (val_all_ids,
 
1469
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1470
        self.assertEqual(set(all_ids), val_all_ids)
 
1471
        entries = inv._getitems(val_all_ids)
 
1472
        expected_children = {}
 
1473
        for entry in entries:
 
1474
            s = expected_children.setdefault(entry.parent_id, [])
 
1475
            s.append(entry.file_id)
 
1476
        val_children = dict((k, sorted(v)) for k, v
 
1477
                            in val_children.items())
 
1478
        expected_children = dict((k, sorted(v)) for k, v
 
1479
                            in expected_children.items())
 
1480
        self.assertEqual(expected_children, val_children)
 
1481
 
 
1482
    def test_make_simple_inventory(self):
 
1483
        inv = self.make_simple_inventory()
 
1484
        layout = []
 
1485
        for path, entry in inv.iter_entries_by_dir():
 
1486
            layout.append((path, entry.file_id))
 
1487
        self.assertEqual([
 
1488
            ('', 'TREE_ROOT'),
 
1489
            ('dir1', 'dir1-id'),
 
1490
            ('dir2', 'dir2-id'),
 
1491
            ('top', 'top-id'),
 
1492
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1493
            ('dir1/sub-file1', 'sub-file1-id'),
 
1494
            ('dir1/sub-file2', 'sub-file2-id'),
 
1495
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1496
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1497
            ], layout)
 
1498
 
 
1499
    def test__getitems(self):
 
1500
        inv = self.make_simple_inventory()
 
1501
        # Reading from disk
 
1502
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1503
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1504
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1505
        # From cache
 
1506
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1507
        # Mixed
 
1508
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1509
                             ['dir1-id', 'sub-file2-id'])
 
1510
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1511
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1512
 
 
1513
    def test_single_file(self):
 
1514
        inv = self.make_simple_inventory()
 
1515
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1516
 
 
1517
    def test_get_all_parents(self):
 
1518
        inv = self.make_simple_inventory()
 
1519
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1520
                           'subsub-file1-id',
 
1521
                          ], inv, ['subsub-file1-id'])
 
1522
 
 
1523
    def test_get_children(self):
 
1524
        inv = self.make_simple_inventory()
 
1525
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1526
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1527
                          ], inv, ['dir1-id'])
 
1528
 
 
1529
    def test_from_root(self):
 
1530
        inv = self.make_simple_inventory()
 
1531
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1532
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1533
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1534
 
 
1535
    def test_top_level_file(self):
 
1536
        inv = self.make_simple_inventory()
 
1537
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1538
 
 
1539
    def test_subsub_file(self):
 
1540
        inv = self.make_simple_inventory()
 
1541
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1542
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1543
 
 
1544
    def test_sub_and_root(self):
 
1545
        inv = self.make_simple_inventory()
 
1546
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1547
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
1548
 
 
1549
 
 
1550
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1551
 
 
1552
    def test_empty(self):
 
1553
        repository = self.make_repository('.')
 
1554
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1555
        inv = mutable_inventory_from_tree(tree)
 
1556
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1557
        self.assertEqual(0, len(inv))
 
1558
 
 
1559
    def test_some_files(self):
 
1560
        wt = self.make_branch_and_tree('.')
 
1561
        self.build_tree(['a'])
 
1562
        wt.add(['a'], ['thefileid'])
 
1563
        revid = wt.commit("commit")
 
1564
        tree = wt.branch.repository.revision_tree(revid)
 
1565
        inv = mutable_inventory_from_tree(tree)
 
1566
        self.assertEqual(revid, inv.revision_id)
 
1567
        self.assertEqual(2, len(inv))
 
1568
        self.assertEqual("a", inv['thefileid'].name)
 
1569
        # The inventory should be mutable and independent of
 
1570
        # the original tree
 
1571
        self.assertFalse(tree.root_inventory['thefileid'].executable)
 
1572
        inv['thefileid'].executable = True
 
1573
        self.assertFalse(tree.root_inventory['thefileid'].executable)