/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_inv.py

  • Committer: Jelmer Vernooij
  • Date: 2017-07-30 18:38:48 UTC
  • mfrom: (6740.1.1 breezy-conf-1)
  • Revision ID: jelmer@jelmer.uk-20170730183848-195b9ch7sclkxmqs
Merge lp:~jelmer/brz/breezy-conf

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from .. import (
 
19
    errors,
 
20
    osutils,
 
21
    repository,
 
22
    revision,
 
23
    tests,
 
24
    workingtree,
 
25
    )
 
26
from ..bzr import (
 
27
    chk_map,
 
28
    groupcompress,
 
29
    inventory,
 
30
    )
 
31
from ..bzr.inventory import (
 
32
    CHKInventory,
 
33
    Inventory,
 
34
    ROOT_ID,
 
35
    InventoryFile,
 
36
    InventoryDirectory,
 
37
    InventoryEntry,
 
38
    TreeReference,
 
39
    mutable_inventory_from_tree,
 
40
    )
 
41
from . import (
 
42
    TestCase,
 
43
    TestCaseWithTransport,
 
44
    )
 
45
from .scenarios import load_tests_apply_scenarios
 
46
 
 
47
 
 
48
load_tests = load_tests_apply_scenarios
 
49
 
 
50
 
 
51
def delta_application_scenarios():
 
52
    scenarios = [
 
53
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
54
        ]
 
55
    # Working tree basis delta application
 
56
    # Repository add_inv_by_delta.
 
57
    # Reduce form of the per_repository test logic - that logic needs to be
 
58
    # be able to get /just/ repositories whereas these tests are fine with
 
59
    # just creating trees.
 
60
    formats = set()
 
61
    for _, format in repository.format_registry.iteritems():
 
62
        if format.supports_full_versioned_files:
 
63
            scenarios.append((str(format.__name__), {
 
64
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
65
                'format':format}))
 
66
    for format in workingtree.format_registry._get_all():
 
67
        repo_fmt = format._matchingbzrdir.repository_format
 
68
        if not repo_fmt.supports_full_versioned_files:
 
69
            continue
 
70
        scenarios.append(
 
71
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
72
            'apply_delta':apply_inventory_WT_basis,
 
73
            'format':format}))
 
74
        scenarios.append(
 
75
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
76
            'apply_delta':apply_inventory_WT,
 
77
            'format':format}))
 
78
    return scenarios
 
79
 
 
80
 
 
81
def create_texts_for_inv(repo, inv):
 
82
    for path, ie in inv.iter_entries():
 
83
        if ie.text_size:
 
84
            lines = ['a' * ie.text_size]
 
85
        else:
 
86
            lines = []
 
87
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
88
 
 
89
 
 
90
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
91
    """Apply delta to basis and return the result.
 
92
 
 
93
    :param basis: An inventory to be used as the basis.
 
94
    :param delta: The inventory delta to apply:
 
95
    :return: An inventory resulting from the application.
 
96
    """
 
97
    basis.apply_delta(delta)
 
98
    return basis
 
99
 
 
100
 
 
101
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
102
    """Apply delta to basis and return the result.
 
103
 
 
104
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
105
 
 
106
    :param basis: An inventory to be used as the basis.
 
107
    :param delta: The inventory delta to apply:
 
108
    :return: An inventory resulting from the application.
 
109
    """
 
110
    control = self.make_controldir('tree', format=self.format._matchingbzrdir)
 
111
    control.create_repository()
 
112
    control.create_branch()
 
113
    tree = self.format.initialize(control)
 
114
    tree.lock_write()
 
115
    try:
 
116
        tree._write_inventory(basis)
 
117
    finally:
 
118
        tree.unlock()
 
119
    # Fresh object, reads disk again.
 
120
    tree = tree.controldir.open_workingtree()
 
121
    tree.lock_write()
 
122
    try:
 
123
        tree.apply_inventory_delta(delta)
 
124
    finally:
 
125
        tree.unlock()
 
126
    # reload tree - ensure we get what was written.
 
127
    tree = tree.controldir.open_workingtree()
 
128
    tree.lock_read()
 
129
    self.addCleanup(tree.unlock)
 
130
    if not invalid_delta:
 
131
        tree._validate()
 
132
    return tree.root_inventory
 
133
 
 
134
 
 
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
136
    repo.start_write_group()
 
137
    try:
 
138
        rev = revision.Revision('basis', timestamp=0, timezone=None,
 
139
            message="", committer="foo@example.com")
 
140
        basis.revision_id = 'basis'
 
141
        create_texts_for_inv(repo, basis)
 
142
        repo.add_revision('basis', rev, basis)
 
143
        if invalid_delta:
 
144
            # We don't want to apply the delta to the basis, because we expect
 
145
            # the delta is invalid.
 
146
            result_inv = basis
 
147
            result_inv.revision_id = 'result'
 
148
            target_entries = None
 
149
        else:
 
150
            result_inv = basis.create_by_apply_delta(delta, 'result')
 
151
            create_texts_for_inv(repo, result_inv)
 
152
            target_entries = list(result_inv.iter_entries_by_dir())
 
153
        rev = revision.Revision('result', timestamp=0, timezone=None,
 
154
            message="", committer="foo@example.com")
 
155
        repo.add_revision('result', rev, result_inv)
 
156
        repo.commit_write_group()
 
157
    except:
 
158
        repo.abort_write_group()
 
159
        raise
 
160
    return target_entries
 
161
 
 
162
 
 
163
def _get_basis_entries(tree):
 
164
    basis_tree = tree.basis_tree()
 
165
    basis_tree.lock_read()
 
166
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
167
    basis_tree.unlock()
 
168
    return basis_tree_entries
 
169
 
 
170
 
 
171
def _populate_different_tree(tree, basis, delta):
 
172
    """Put all entries into tree, but at a unique location."""
 
173
    added_ids = set()
 
174
    added_paths = set()
 
175
    tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
 
176
    for path, ie in basis.iter_entries_by_dir():
 
177
        if ie.file_id in added_ids:
 
178
            continue
 
179
        # We want a unique path for each of these, we use the file-id
 
180
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
181
        added_ids.add(ie.file_id)
 
182
    for old_path, new_path, file_id, ie in delta:
 
183
        if file_id in added_ids:
 
184
            continue
 
185
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
186
 
 
187
 
 
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
189
    """Apply delta to basis and return the result.
 
190
 
 
191
    This sets the parent and then calls update_basis_by_delta.
 
192
    It also puts the basis in the repository under both 'basis' and 'result' to
 
193
    allow safety checks made by the WT to succeed, and finally ensures that all
 
194
    items in the delta with a new path are present in the WT before calling
 
195
    update_basis_by_delta.
 
196
 
 
197
    :param basis: An inventory to be used as the basis.
 
198
    :param delta: The inventory delta to apply:
 
199
    :return: An inventory resulting from the application.
 
200
    """
 
201
    control = test.make_controldir('tree', format=test.format._matchingbzrdir)
 
202
    control.create_repository()
 
203
    control.create_branch()
 
204
    tree = test.format.initialize(control)
 
205
    tree.lock_write()
 
206
    try:
 
207
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
208
                                                delta, invalid_delta)
 
209
        # Set the basis state as the trees current state
 
210
        tree._write_inventory(basis)
 
211
        # This reads basis from the repo and puts it into the tree's local
 
212
        # cache, if it has one.
 
213
        tree.set_parent_ids(['basis'])
 
214
    finally:
 
215
        tree.unlock()
 
216
    # Fresh lock, reads disk again.
 
217
    tree.lock_write()
 
218
    try:
 
219
        tree.update_basis_by_delta('result', delta)
 
220
        if not invalid_delta:
 
221
            tree._validate()
 
222
    finally:
 
223
        tree.unlock()
 
224
    # reload tree - ensure we get what was written.
 
225
    tree = tree.controldir.open_workingtree()
 
226
    basis_tree = tree.basis_tree()
 
227
    basis_tree.lock_read()
 
228
    test.addCleanup(basis_tree.unlock)
 
229
    basis_inv = basis_tree.root_inventory
 
230
    if target_entries:
 
231
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
232
        test.assertEqual(target_entries, basis_entries)
 
233
    return basis_inv
 
234
 
 
235
 
 
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
237
                                                      invalid_delta=True):
 
238
    """Apply delta to basis and return the result.
 
239
    
 
240
    This inserts basis as a whole inventory and then uses
 
241
    add_inventory_by_delta to add delta.
 
242
 
 
243
    :param basis: An inventory to be used as the basis.
 
244
    :param delta: The inventory delta to apply:
 
245
    :return: An inventory resulting from the application.
 
246
    """
 
247
    format = self.format()
 
248
    control = self.make_controldir('tree', format=format._matchingbzrdir)
 
249
    repo = format.initialize(control)
 
250
    repo.lock_write()
 
251
    try:
 
252
        repo.start_write_group()
 
253
        try:
 
254
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
255
                message="", committer="foo@example.com")
 
256
            basis.revision_id = 'basis'
 
257
            create_texts_for_inv(repo, basis)
 
258
            repo.add_revision('basis', rev, basis)
 
259
            repo.commit_write_group()
 
260
        except:
 
261
            repo.abort_write_group()
 
262
            raise
 
263
    finally:
 
264
        repo.unlock()
 
265
    repo.lock_write()
 
266
    try:
 
267
        repo.start_write_group()
 
268
        try:
 
269
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
270
                'result', ['basis'])
 
271
        except:
 
272
            repo.abort_write_group()
 
273
            raise
 
274
        else:
 
275
            repo.commit_write_group()
 
276
    finally:
 
277
        repo.unlock()
 
278
    # Fresh lock, reads disk again.
 
279
    repo = repo.controldir.open_repository()
 
280
    repo.lock_read()
 
281
    self.addCleanup(repo.unlock)
 
282
    return repo.get_inventory('result')
 
283
 
 
284
 
 
285
class TestInventoryUpdates(TestCase):
 
286
 
 
287
    def test_creation_from_root_id(self):
 
288
        # iff a root id is passed to the constructor, a root directory is made
 
289
        inv = inventory.Inventory(root_id=b'tree-root')
 
290
        self.assertNotEqual(None, inv.root)
 
291
        self.assertEqual(b'tree-root', inv.root.file_id)
 
292
 
 
293
    def test_add_path_of_root(self):
 
294
        # if no root id is given at creation time, there is no root directory
 
295
        inv = inventory.Inventory(root_id=None)
 
296
        self.assertIs(None, inv.root)
 
297
        # add a root entry by adding its path
 
298
        ie = inv.add_path(u"", "directory", b"my-root")
 
299
        ie.revision = b'test-rev'
 
300
        self.assertEqual(b"my-root", ie.file_id)
 
301
        self.assertIs(ie, inv.root)
 
302
 
 
303
    def test_add_path(self):
 
304
        inv = inventory.Inventory(root_id=b'tree_root')
 
305
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
306
        self.assertEqual(b'hello-id', ie.file_id)
 
307
        self.assertEqual('file', ie.kind)
 
308
 
 
309
    def test_copy(self):
 
310
        """Make sure copy() works and creates a deep copy."""
 
311
        inv = inventory.Inventory(root_id=b'some-tree-root')
 
312
        ie = inv.add_path(u'hello', 'file', b'hello-id')
 
313
        inv2 = inv.copy()
 
314
        inv.root.file_id = b'some-new-root'
 
315
        ie.name = u'file2'
 
316
        self.assertEqual(b'some-tree-root', inv2.root.file_id)
 
317
        self.assertEqual(u'hello', inv2[b'hello-id'].name)
 
318
 
 
319
    def test_copy_empty(self):
 
320
        """Make sure an empty inventory can be copied."""
 
321
        inv = inventory.Inventory(root_id=None)
 
322
        inv2 = inv.copy()
 
323
        self.assertIs(None, inv2.root)
 
324
 
 
325
    def test_copy_copies_root_revision(self):
 
326
        """Make sure the revision of the root gets copied."""
 
327
        inv = inventory.Inventory(root_id=b'someroot')
 
328
        inv.root.revision = b'therev'
 
329
        inv2 = inv.copy()
 
330
        self.assertEqual(b'someroot', inv2.root.file_id)
 
331
        self.assertEqual(b'therev', inv2.root.revision)
 
332
 
 
333
    def test_create_tree_reference(self):
 
334
        inv = inventory.Inventory(b'tree-root-123')
 
335
        inv.add(TreeReference(
 
336
            b'nested-id', 'nested', parent_id=b'tree-root-123',
 
337
            revision=b'rev', reference_revision=b'rev2'))
 
338
 
 
339
    def test_error_encoding(self):
 
340
        inv = inventory.Inventory('tree-root')
 
341
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
342
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
343
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
344
        self.assertContainsRe(str(e), r'\\u1234')
 
345
 
 
346
    def test_add_recursive(self):
 
347
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
348
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
349
        parent.children[child.file_id] = child
 
350
        inv = inventory.Inventory('tree-root')
 
351
        inv.add(parent)
 
352
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
353
 
 
354
 
 
355
 
 
356
class TestDeltaApplication(TestCaseWithTransport):
 
357
 
 
358
    scenarios = delta_application_scenarios()
 
359
 
 
360
    def get_empty_inventory(self, reference_inv=None):
 
361
        """Get an empty inventory.
 
362
 
 
363
        Note that tests should not depend on the revision of the root for
 
364
        setting up test conditions, as it has to be flexible to accomodate non
 
365
        rich root repositories.
 
366
 
 
367
        :param reference_inv: If not None, get the revision for the root from
 
368
            this inventory. This is useful for dealing with older repositories
 
369
            that routinely discarded the root entry data. If None, the root's
 
370
            revision is set to 'basis'.
 
371
        """
 
372
        inv = inventory.Inventory()
 
373
        if reference_inv is not None:
 
374
            inv.root.revision = reference_inv.root.revision
 
375
        else:
 
376
            inv.root.revision = 'basis'
 
377
        return inv
 
378
 
 
379
    def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
 
380
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
381
        ie_file.revision = 'result'
 
382
        ie_file.text_size = 0
 
383
        ie_file.text_sha1 = ''
 
384
        return ie_file
 
385
 
 
386
    def test_empty_delta(self):
 
387
        inv = self.get_empty_inventory()
 
388
        delta = []
 
389
        inv = self.apply_delta(self, inv, delta)
 
390
        inv2 = self.get_empty_inventory(inv)
 
391
        self.assertEqual([], inv2._make_delta(inv))
 
392
 
 
393
    def test_None_file_id(self):
 
394
        inv = self.get_empty_inventory()
 
395
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
396
        dir1.revision = 'result'
 
397
        delta = [(None, u'dir1', None, dir1)]
 
398
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
399
            inv, delta)
 
400
 
 
401
    def test_unicode_file_id(self):
 
402
        inv = self.get_empty_inventory()
 
403
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
404
        dir1.revision = 'result'
 
405
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
406
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
407
            inv, delta)
 
408
 
 
409
    def test_repeated_file_id(self):
 
410
        inv = self.get_empty_inventory()
 
411
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
412
        file1.revision = 'result'
 
413
        file1.text_size = 0
 
414
        file1.text_sha1 = ""
 
415
        file2 = file1.copy()
 
416
        file2.name = 'path2'
 
417
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
418
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
419
            inv, delta)
 
420
 
 
421
    def test_repeated_new_path(self):
 
422
        inv = self.get_empty_inventory()
 
423
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
424
        file1.revision = 'result'
 
425
        file1.text_size = 0
 
426
        file1.text_sha1 = ""
 
427
        file2 = file1.copy()
 
428
        file2.file_id = 'id2'
 
429
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
430
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
431
            inv, delta)
 
432
 
 
433
    def test_repeated_old_path(self):
 
434
        inv = self.get_empty_inventory()
 
435
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
436
        file1.revision = 'result'
 
437
        file1.text_size = 0
 
438
        file1.text_sha1 = ""
 
439
        # We can't *create* a source inventory with the same path, but
 
440
        # a badly generated partial delta might claim the same source twice.
 
441
        # This would be buggy in two ways: the path is repeated in the delta,
 
442
        # And the path for one of the file ids doesn't match the source
 
443
        # location. Alternatively, we could have a repeated fileid, but that
 
444
        # is separately checked for.
 
445
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
446
        file2.revision = 'result'
 
447
        file2.text_size = 0
 
448
        file2.text_sha1 = ""
 
449
        inv.add(file1)
 
450
        inv.add(file2)
 
451
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
452
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
453
            inv, delta)
 
454
 
 
455
    def test_mismatched_id_entry_id(self):
 
456
        inv = self.get_empty_inventory()
 
457
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
458
        file1.revision = 'result'
 
459
        file1.text_size = 0
 
460
        file1.text_sha1 = ""
 
461
        delta = [(None, u'path', 'id', file1)]
 
462
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
463
            inv, delta)
 
464
 
 
465
    def test_mismatched_new_path_entry_None(self):
 
466
        inv = self.get_empty_inventory()
 
467
        delta = [(None, u'path', 'id', None)]
 
468
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
469
            inv, delta)
 
470
 
 
471
    def test_mismatched_new_path_None_entry(self):
 
472
        inv = self.get_empty_inventory()
 
473
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
474
        file1.revision = 'result'
 
475
        file1.text_size = 0
 
476
        file1.text_sha1 = ""
 
477
        delta = [(u"path", None, 'id1', file1)]
 
478
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
479
            inv, delta)
 
480
 
 
481
    def test_parent_is_not_directory(self):
 
482
        inv = self.get_empty_inventory()
 
483
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
484
        file1.revision = 'result'
 
485
        file1.text_size = 0
 
486
        file1.text_sha1 = ""
 
487
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
488
        file2.revision = 'result'
 
489
        file2.text_size = 0
 
490
        file2.text_sha1 = ""
 
491
        inv.add(file1)
 
492
        delta = [(None, u'path/path2', 'id2', file2)]
 
493
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
494
            inv, delta)
 
495
 
 
496
    def test_parent_is_missing(self):
 
497
        inv = self.get_empty_inventory()
 
498
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
499
        file2.revision = 'result'
 
500
        file2.text_size = 0
 
501
        file2.text_sha1 = ""
 
502
        delta = [(None, u'path/path2', 'id2', file2)]
 
503
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
504
            inv, delta)
 
505
 
 
506
    def test_new_parent_path_has_wrong_id(self):
 
507
        inv = self.get_empty_inventory()
 
508
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
509
        parent1.revision = 'result'
 
510
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
511
        parent2.revision = 'result'
 
512
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
513
        file1.revision = 'result'
 
514
        file1.text_size = 0
 
515
        file1.text_sha1 = ""
 
516
        inv.add(parent1)
 
517
        inv.add(parent2)
 
518
        # This delta claims that file1 is at dir/path, but actually its at
 
519
        # dir2/path if you follow the inventory parent structure.
 
520
        delta = [(None, u'dir/path', 'id', file1)]
 
521
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
522
            inv, delta)
 
523
 
 
524
    def test_old_parent_path_is_wrong(self):
 
525
        inv = self.get_empty_inventory()
 
526
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
527
        parent1.revision = 'result'
 
528
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
529
        parent2.revision = 'result'
 
530
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
531
        file1.revision = 'result'
 
532
        file1.text_size = 0
 
533
        file1.text_sha1 = ""
 
534
        inv.add(parent1)
 
535
        inv.add(parent2)
 
536
        inv.add(file1)
 
537
        # This delta claims that file1 was at dir/path, but actually it was at
 
538
        # dir2/path if you follow the inventory parent structure.
 
539
        delta = [(u'dir/path', None, 'id', None)]
 
540
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
541
            inv, delta)
 
542
 
 
543
    def test_old_parent_path_is_for_other_id(self):
 
544
        inv = self.get_empty_inventory()
 
545
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
546
        parent1.revision = 'result'
 
547
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
548
        parent2.revision = 'result'
 
549
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
550
        file1.revision = 'result'
 
551
        file1.text_size = 0
 
552
        file1.text_sha1 = ""
 
553
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
554
        file2.revision = 'result'
 
555
        file2.text_size = 0
 
556
        file2.text_sha1 = ""
 
557
        inv.add(parent1)
 
558
        inv.add(parent2)
 
559
        inv.add(file1)
 
560
        inv.add(file2)
 
561
        # This delta claims that file1 was at dir/path, but actually it was at
 
562
        # dir2/path if you follow the inventory parent structure. At dir/path
 
563
        # is another entry we should not delete.
 
564
        delta = [(u'dir/path', None, 'id', None)]
 
565
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
566
            inv, delta)
 
567
 
 
568
    def test_add_existing_id_new_path(self):
 
569
        inv = self.get_empty_inventory()
 
570
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
571
        parent1.revision = 'result'
 
572
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
573
        parent2.revision = 'result'
 
574
        inv.add(parent1)
 
575
        delta = [(None, u'dir2', 'p-1', parent2)]
 
576
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
577
            inv, delta)
 
578
 
 
579
    def test_add_new_id_existing_path(self):
 
580
        inv = self.get_empty_inventory()
 
581
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
582
        parent1.revision = 'result'
 
583
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
584
        parent2.revision = 'result'
 
585
        inv.add(parent1)
 
586
        delta = [(None, u'dir1', 'p-2', parent2)]
 
587
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
588
            inv, delta)
 
589
 
 
590
    def test_remove_dir_leaving_dangling_child(self):
 
591
        inv = self.get_empty_inventory()
 
592
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
593
        dir1.revision = 'result'
 
594
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
595
        dir2.revision = 'result'
 
596
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
597
        dir3.revision = 'result'
 
598
        inv.add(dir1)
 
599
        inv.add(dir2)
 
600
        inv.add(dir3)
 
601
        delta = [(u'dir1', None, 'p-1', None),
 
602
            (u'dir1/child2', None, 'p-3', None)]
 
603
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
604
            inv, delta)
 
605
 
 
606
    def test_add_file(self):
 
607
        inv = self.get_empty_inventory()
 
608
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
609
        file1.revision = 'result'
 
610
        file1.text_size = 0
 
611
        file1.text_sha1 = ''
 
612
        delta = [(None, u'path', 'file-id', file1)]
 
613
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
614
        self.assertEqual('file-id', res_inv['file-id'].file_id)
 
615
 
 
616
    def test_remove_file(self):
 
617
        inv = self.get_empty_inventory()
 
618
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
619
        file1.revision = 'result'
 
620
        file1.text_size = 0
 
621
        file1.text_sha1 = ''
 
622
        inv.add(file1)
 
623
        delta = [(u'path', None, 'file-id', None)]
 
624
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
625
        self.assertEqual(None, res_inv.path2id('path'))
 
626
        self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
 
627
 
 
628
    def test_rename_file(self):
 
629
        inv = self.get_empty_inventory()
 
630
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
631
        inv.add(file1)
 
632
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
633
        delta = [(u'path', 'path2', 'file-id', file2)]
 
634
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
635
        self.assertEqual(None, res_inv.path2id('path'))
 
636
        self.assertEqual('file-id', res_inv.path2id('path2'))
 
637
 
 
638
    def test_replaced_at_new_path(self):
 
639
        inv = self.get_empty_inventory()
 
640
        file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
 
641
        inv.add(file1)
 
642
        file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
 
643
        delta = [(u'name', None, 'id1', None),
 
644
                 (None, u'name', 'id2', file2)]
 
645
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
646
        self.assertEqual('id2', res_inv.path2id('name'))
 
647
 
 
648
    def test_rename_dir(self):
 
649
        inv = self.get_empty_inventory()
 
650
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
651
        dir1.revision = 'basis'
 
652
        file1 = self.make_file_ie(parent_id='dir-id')
 
653
        inv.add(dir1)
 
654
        inv.add(file1)
 
655
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
656
        dir2.revision = 'result'
 
657
        delta = [('dir1', 'dir2', 'dir-id', dir2)]
 
658
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
659
        # The file should be accessible under the new path
 
660
        self.assertEqual('file-id', res_inv.path2id('dir2/name'))
 
661
 
 
662
    def test_renamed_dir_with_renamed_child(self):
 
663
        inv = self.get_empty_inventory()
 
664
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
665
        dir1.revision = 'basis'
 
666
        file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
 
667
        file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
 
668
        inv.add(dir1)
 
669
        inv.add(file1)
 
670
        inv.add(file2)
 
671
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
672
        dir2.revision = 'result'
 
673
        file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
 
674
        delta = [('dir1', 'dir2', 'dir-id', dir2),
 
675
                 ('dir1/name2', 'name2', 'file-id-2', file2b)]
 
676
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
677
        # The file should be accessible under the new path
 
678
        self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
 
679
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
680
        self.assertEqual('file-id-2', res_inv.path2id('name2'))
 
681
 
 
682
    def test_is_root(self):
 
683
        """Ensure our root-checking code is accurate."""
 
684
        inv = inventory.Inventory('TREE_ROOT')
 
685
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
686
        self.assertFalse(inv.is_root('booga'))
 
687
        inv.root.file_id = 'booga'
 
688
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
689
        self.assertTrue(inv.is_root('booga'))
 
690
        # works properly even if no root is set
 
691
        inv.root = None
 
692
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
693
        self.assertFalse(inv.is_root('booga'))
 
694
 
 
695
    def test_entries_for_empty_inventory(self):
 
696
        """Test that entries() will not fail for an empty inventory"""
 
697
        inv = Inventory(root_id=None)
 
698
        self.assertEqual([], inv.entries())
 
699
 
 
700
 
 
701
class TestInventoryEntry(TestCase):
 
702
 
 
703
    def test_file_kind_character(self):
 
704
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
705
        self.assertEqual(file.kind_character(), '')
 
706
 
 
707
    def test_dir_kind_character(self):
 
708
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
709
        self.assertEqual(dir.kind_character(), '/')
 
710
 
 
711
    def test_link_kind_character(self):
 
712
        dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
713
        self.assertEqual(dir.kind_character(), '')
 
714
 
 
715
    def test_dir_detect_changes(self):
 
716
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
717
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
718
        self.assertEqual((False, False), left.detect_changes(right))
 
719
        self.assertEqual((False, False), right.detect_changes(left))
 
720
 
 
721
    def test_file_detect_changes(self):
 
722
        left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
723
        left.text_sha1 = 123
 
724
        right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
725
        right.text_sha1 = 123
 
726
        self.assertEqual((False, False), left.detect_changes(right))
 
727
        self.assertEqual((False, False), right.detect_changes(left))
 
728
        left.executable = True
 
729
        self.assertEqual((False, True), left.detect_changes(right))
 
730
        self.assertEqual((False, True), right.detect_changes(left))
 
731
        right.text_sha1 = 321
 
732
        self.assertEqual((True, True), left.detect_changes(right))
 
733
        self.assertEqual((True, True), right.detect_changes(left))
 
734
 
 
735
    def test_symlink_detect_changes(self):
 
736
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
737
        left.symlink_target='foo'
 
738
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
739
        right.symlink_target='foo'
 
740
        self.assertEqual((False, False), left.detect_changes(right))
 
741
        self.assertEqual((False, False), right.detect_changes(left))
 
742
        left.symlink_target = 'different'
 
743
        self.assertEqual((True, False), left.detect_changes(right))
 
744
        self.assertEqual((True, False), right.detect_changes(left))
 
745
 
 
746
    def test_file_has_text(self):
 
747
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
748
        self.assertTrue(file.has_text())
 
749
 
 
750
    def test_directory_has_text(self):
 
751
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
752
        self.assertFalse(dir.has_text())
 
753
 
 
754
    def test_link_has_text(self):
 
755
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
756
        self.assertFalse(link.has_text())
 
757
 
 
758
    def test_make_entry(self):
 
759
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
760
            inventory.InventoryFile)
 
761
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
762
            inventory.InventoryLink)
 
763
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
764
            inventory.InventoryDirectory)
 
765
 
 
766
    def test_make_entry_non_normalized(self):
 
767
        orig_normalized_filename = osutils.normalized_filename
 
768
 
 
769
        try:
 
770
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
771
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
772
            self.assertEqual(u'\xe5', entry.name)
 
773
            self.assertIsInstance(entry, inventory.InventoryFile)
 
774
 
 
775
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
776
            self.assertRaises(errors.InvalidNormalization,
 
777
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
778
        finally:
 
779
            osutils.normalized_filename = orig_normalized_filename
 
780
 
 
781
 
 
782
class TestDescribeChanges(TestCase):
 
783
 
 
784
    def test_describe_change(self):
 
785
        # we need to test the following change combinations:
 
786
        # rename
 
787
        # reparent
 
788
        # modify
 
789
        # gone
 
790
        # added
 
791
        # renamed/reparented and modified
 
792
        # change kind (perhaps can't be done yet?)
 
793
        # also, merged in combination with all of these?
 
794
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
795
        old_a.text_sha1 = '123132'
 
796
        old_a.text_size = 0
 
797
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
798
        new_a.text_sha1 = '123132'
 
799
        new_a.text_size = 0
 
800
 
 
801
        self.assertChangeDescription('unchanged', old_a, new_a)
 
802
 
 
803
        new_a.text_size = 10
 
804
        new_a.text_sha1 = 'abcabc'
 
805
        self.assertChangeDescription('modified', old_a, new_a)
 
806
 
 
807
        self.assertChangeDescription('added', None, new_a)
 
808
        self.assertChangeDescription('removed', old_a, None)
 
809
        # perhaps a bit questionable but seems like the most reasonable thing...
 
810
        self.assertChangeDescription('unchanged', None, None)
 
811
 
 
812
        # in this case it's both renamed and modified; show a rename and
 
813
        # modification:
 
814
        new_a.name = 'newfilename'
 
815
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
816
 
 
817
        # reparenting is 'renaming'
 
818
        new_a.name = old_a.name
 
819
        new_a.parent_id = 'somedir-id'
 
820
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
821
 
 
822
        # reset the content values so its not modified
 
823
        new_a.text_size = old_a.text_size
 
824
        new_a.text_sha1 = old_a.text_sha1
 
825
        new_a.name = old_a.name
 
826
 
 
827
        new_a.name = 'newfilename'
 
828
        self.assertChangeDescription('renamed', old_a, new_a)
 
829
 
 
830
        # reparenting is 'renaming'
 
831
        new_a.name = old_a.name
 
832
        new_a.parent_id = 'somedir-id'
 
833
        self.assertChangeDescription('renamed', old_a, new_a)
 
834
 
 
835
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
836
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
837
        self.assertEqual(expected_change, change)
 
838
 
 
839
 
 
840
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
841
 
 
842
    def get_chk_bytes(self):
 
843
        factory = groupcompress.make_pack_factory(True, True, 1)
 
844
        trans = self.get_transport('')
 
845
        return factory(trans)
 
846
 
 
847
    def read_bytes(self, chk_bytes, key):
 
848
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
849
        return stream.next().get_bytes_as("fulltext")
 
850
 
 
851
    def test_deserialise_gives_CHKInventory(self):
 
852
        inv = Inventory()
 
853
        inv.revision_id = "revid"
 
854
        inv.root.revision = "rootrev"
 
855
        chk_bytes = self.get_chk_bytes()
 
856
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
857
        bytes = ''.join(chk_inv.to_lines())
 
858
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
859
        self.assertEqual("revid", new_inv.revision_id)
 
860
        self.assertEqual("directory", new_inv.root.kind)
 
861
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
862
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
863
        self.assertEqual(inv.root.name, new_inv.root.name)
 
864
        self.assertEqual("rootrev", new_inv.root.revision)
 
865
        self.assertEqual('plain', new_inv._search_key_name)
 
866
 
 
867
    def test_deserialise_wrong_revid(self):
 
868
        inv = Inventory()
 
869
        inv.revision_id = "revid"
 
870
        inv.root.revision = "rootrev"
 
871
        chk_bytes = self.get_chk_bytes()
 
872
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
873
        bytes = ''.join(chk_inv.to_lines())
 
874
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
875
            bytes, ("revid2",))
 
876
 
 
877
    def test_captures_rev_root_byid(self):
 
878
        inv = Inventory()
 
879
        inv.revision_id = "foo"
 
880
        inv.root.revision = "bar"
 
881
        chk_bytes = self.get_chk_bytes()
 
882
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
883
        lines = chk_inv.to_lines()
 
884
        self.assertEqual([
 
885
            'chkinventory:\n',
 
886
            'revision_id: foo\n',
 
887
            'root_id: TREE_ROOT\n',
 
888
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
889
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
890
            ], lines)
 
891
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
892
        self.assertEqual('plain', chk_inv._search_key_name)
 
893
 
 
894
    def test_captures_parent_id_basename_index(self):
 
895
        inv = Inventory()
 
896
        inv.revision_id = "foo"
 
897
        inv.root.revision = "bar"
 
898
        chk_bytes = self.get_chk_bytes()
 
899
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
900
        lines = chk_inv.to_lines()
 
901
        self.assertEqual([
 
902
            'chkinventory:\n',
 
903
            'revision_id: foo\n',
 
904
            'root_id: TREE_ROOT\n',
 
905
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
906
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
907
            ], lines)
 
908
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
909
        self.assertEqual('plain', chk_inv._search_key_name)
 
910
 
 
911
    def test_captures_search_key_name(self):
 
912
        inv = Inventory()
 
913
        inv.revision_id = "foo"
 
914
        inv.root.revision = "bar"
 
915
        chk_bytes = self.get_chk_bytes()
 
916
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
917
                                              search_key_name='hash-16-way')
 
918
        lines = chk_inv.to_lines()
 
919
        self.assertEqual([
 
920
            'chkinventory:\n',
 
921
            'search_key_name: hash-16-way\n',
 
922
            'root_id: TREE_ROOT\n',
 
923
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
924
            'revision_id: foo\n',
 
925
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
926
            ], lines)
 
927
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
928
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
929
 
 
930
    def test_directory_children_on_demand(self):
 
931
        inv = Inventory()
 
932
        inv.revision_id = "revid"
 
933
        inv.root.revision = "rootrev"
 
934
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
935
        inv["fileid"].revision = "filerev"
 
936
        inv["fileid"].executable = True
 
937
        inv["fileid"].text_sha1 = "ffff"
 
938
        inv["fileid"].text_size = 1
 
939
        chk_bytes = self.get_chk_bytes()
 
940
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
941
        bytes = ''.join(chk_inv.to_lines())
 
942
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
943
        root_entry = new_inv[inv.root.file_id]
 
944
        self.assertEqual(None, root_entry._children)
 
945
        self.assertEqual({'file'}, set(root_entry.children))
 
946
        file_direct = new_inv["fileid"]
 
947
        file_found = root_entry.children['file']
 
948
        self.assertEqual(file_direct.kind, file_found.kind)
 
949
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
950
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
951
        self.assertEqual(file_direct.name, file_found.name)
 
952
        self.assertEqual(file_direct.revision, file_found.revision)
 
953
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
954
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
955
        self.assertEqual(file_direct.executable, file_found.executable)
 
956
 
 
957
    def test_from_inventory_maximum_size(self):
 
958
        # from_inventory supports the maximum_size parameter.
 
959
        inv = Inventory()
 
960
        inv.revision_id = "revid"
 
961
        inv.root.revision = "rootrev"
 
962
        chk_bytes = self.get_chk_bytes()
 
963
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
964
        chk_inv.id_to_entry._ensure_root()
 
965
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
966
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
967
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
968
        p_id_basename._ensure_root()
 
969
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
970
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
971
 
 
972
    def test___iter__(self):
 
973
        inv = Inventory()
 
974
        inv.revision_id = "revid"
 
975
        inv.root.revision = "rootrev"
 
976
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
977
        inv["fileid"].revision = "filerev"
 
978
        inv["fileid"].executable = True
 
979
        inv["fileid"].text_sha1 = "ffff"
 
980
        inv["fileid"].text_size = 1
 
981
        chk_bytes = self.get_chk_bytes()
 
982
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
983
        bytes = ''.join(chk_inv.to_lines())
 
984
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
985
        fileids = sorted(new_inv.__iter__())
 
986
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
987
 
 
988
    def test__len__(self):
 
989
        inv = Inventory()
 
990
        inv.revision_id = "revid"
 
991
        inv.root.revision = "rootrev"
 
992
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
993
        inv["fileid"].revision = "filerev"
 
994
        inv["fileid"].executable = True
 
995
        inv["fileid"].text_sha1 = "ffff"
 
996
        inv["fileid"].text_size = 1
 
997
        chk_bytes = self.get_chk_bytes()
 
998
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
999
        self.assertEqual(2, len(chk_inv))
 
1000
 
 
1001
    def test___getitem__(self):
 
1002
        inv = Inventory()
 
1003
        inv.revision_id = b"revid"
 
1004
        inv.root.revision = b"rootrev"
 
1005
        inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
 
1006
        inv[b"fileid"].revision = b"filerev"
 
1007
        inv[b"fileid"].executable = True
 
1008
        inv[b"fileid"].text_sha1 = b"ffff"
 
1009
        inv[b"fileid"].text_size = 1
 
1010
        chk_bytes = self.get_chk_bytes()
 
1011
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1012
        data = b''.join(chk_inv.to_lines())
 
1013
        new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
 
1014
        root_entry = new_inv[inv.root.file_id]
 
1015
        file_entry = new_inv[b"fileid"]
 
1016
        self.assertEqual("directory", root_entry.kind)
 
1017
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
1018
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
1019
        self.assertEqual(inv.root.name, root_entry.name)
 
1020
        self.assertEqual(b"rootrev", root_entry.revision)
 
1021
        self.assertEqual("file", file_entry.kind)
 
1022
        self.assertEqual(b"fileid", file_entry.file_id)
 
1023
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
1024
        self.assertEqual(u"file", file_entry.name)
 
1025
        self.assertEqual(b"filerev", file_entry.revision)
 
1026
        self.assertEqual(b"ffff", file_entry.text_sha1)
 
1027
        self.assertEqual(1, file_entry.text_size)
 
1028
        self.assertEqual(True, file_entry.executable)
 
1029
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
 
1030
 
 
1031
    def test_has_id_true(self):
 
1032
        inv = Inventory()
 
1033
        inv.revision_id = "revid"
 
1034
        inv.root.revision = "rootrev"
 
1035
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1036
        inv["fileid"].revision = "filerev"
 
1037
        inv["fileid"].executable = True
 
1038
        inv["fileid"].text_sha1 = "ffff"
 
1039
        inv["fileid"].text_size = 1
 
1040
        chk_bytes = self.get_chk_bytes()
 
1041
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1042
        self.assertTrue(chk_inv.has_id('fileid'))
 
1043
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
1044
 
 
1045
    def test_has_id_not(self):
 
1046
        inv = Inventory()
 
1047
        inv.revision_id = "revid"
 
1048
        inv.root.revision = "rootrev"
 
1049
        chk_bytes = self.get_chk_bytes()
 
1050
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1051
        self.assertFalse(chk_inv.has_id('fileid'))
 
1052
 
 
1053
    def test_id2path(self):
 
1054
        inv = Inventory()
 
1055
        inv.revision_id = "revid"
 
1056
        inv.root.revision = "rootrev"
 
1057
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
1058
        fileentry = InventoryFile("fileid", "file", "dirid")
 
1059
        inv.add(direntry)
 
1060
        inv.add(fileentry)
 
1061
        inv["fileid"].revision = "filerev"
 
1062
        inv["fileid"].executable = True
 
1063
        inv["fileid"].text_sha1 = "ffff"
 
1064
        inv["fileid"].text_size = 1
 
1065
        inv["dirid"].revision = "filerev"
 
1066
        chk_bytes = self.get_chk_bytes()
 
1067
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1068
        bytes = ''.join(chk_inv.to_lines())
 
1069
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1070
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
1071
        self.assertEqual('dir', new_inv.id2path('dirid'))
 
1072
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
 
1073
 
 
1074
    def test_path2id(self):
 
1075
        inv = Inventory()
 
1076
        inv.revision_id = "revid"
 
1077
        inv.root.revision = "rootrev"
 
1078
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
1079
        fileentry = InventoryFile("fileid", "file", "dirid")
 
1080
        inv.add(direntry)
 
1081
        inv.add(fileentry)
 
1082
        inv["fileid"].revision = "filerev"
 
1083
        inv["fileid"].executable = True
 
1084
        inv["fileid"].text_sha1 = "ffff"
 
1085
        inv["fileid"].text_size = 1
 
1086
        inv["dirid"].revision = "filerev"
 
1087
        chk_bytes = self.get_chk_bytes()
 
1088
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1089
        bytes = ''.join(chk_inv.to_lines())
 
1090
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1091
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
1092
        self.assertEqual('dirid', new_inv.path2id('dir'))
 
1093
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
 
1094
 
 
1095
    def test_create_by_apply_delta_sets_root(self):
 
1096
        inv = Inventory()
 
1097
        inv.revision_id = "revid"
 
1098
        chk_bytes = self.get_chk_bytes()
 
1099
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1100
        inv.add_path("", "directory", "myrootid", None)
 
1101
        inv.revision_id = "expectedid"
 
1102
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1103
        delta = [("", None, base_inv.root.file_id, None),
 
1104
            (None, "",  "myrootid", inv.root)]
 
1105
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1106
        self.assertEqual(reference_inv.root, new_inv.root)
 
1107
 
 
1108
    def test_create_by_apply_delta_empty_add_child(self):
 
1109
        inv = Inventory()
 
1110
        inv.revision_id = "revid"
 
1111
        inv.root.revision = "rootrev"
 
1112
        chk_bytes = self.get_chk_bytes()
 
1113
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1114
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1115
        a_entry.revision = "filerev"
 
1116
        a_entry.executable = True
 
1117
        a_entry.text_sha1 = "ffff"
 
1118
        a_entry.text_size = 1
 
1119
        inv.add(a_entry)
 
1120
        inv.revision_id = "expectedid"
 
1121
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1122
        delta = [(None, "A",  "A-id", a_entry)]
 
1123
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1124
        # new_inv should be the same as reference_inv.
 
1125
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1126
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1127
        reference_inv.id_to_entry._ensure_root()
 
1128
        new_inv.id_to_entry._ensure_root()
 
1129
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1130
            new_inv.id_to_entry._root_node._key)
 
1131
 
 
1132
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1133
        inv = Inventory()
 
1134
        inv.revision_id = "revid"
 
1135
        inv.root.revision = "rootrev"
 
1136
        chk_bytes = self.get_chk_bytes()
 
1137
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1138
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1139
        a_entry.revision = "filerev"
 
1140
        a_entry.executable = True
 
1141
        a_entry.text_sha1 = "ffff"
 
1142
        a_entry.text_size = 1
 
1143
        inv.add(a_entry)
 
1144
        inv.revision_id = "expectedid"
 
1145
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1146
        delta = [(None, "A",  "A-id", a_entry)]
 
1147
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1148
        reference_inv.id_to_entry._ensure_root()
 
1149
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1150
        new_inv.id_to_entry._ensure_root()
 
1151
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1152
        # new_inv should be the same as reference_inv.
 
1153
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1154
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1155
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1156
            new_inv.id_to_entry._root_node._key)
 
1157
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1158
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1159
 
 
1160
    def test_iter_changes(self):
 
1161
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1162
        # InterTree are coming.
 
1163
        inv = Inventory()
 
1164
        inv.revision_id = "revid"
 
1165
        inv.root.revision = "rootrev"
 
1166
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1167
        inv["fileid"].revision = "filerev"
 
1168
        inv["fileid"].executable = True
 
1169
        inv["fileid"].text_sha1 = "ffff"
 
1170
        inv["fileid"].text_size = 1
 
1171
        inv2 = Inventory()
 
1172
        inv2.revision_id = "revid2"
 
1173
        inv2.root.revision = "rootrev"
 
1174
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1175
        inv2["fileid"].revision = "filerev2"
 
1176
        inv2["fileid"].executable = False
 
1177
        inv2["fileid"].text_sha1 = "bbbb"
 
1178
        inv2["fileid"].text_size = 2
 
1179
        # get fresh objects.
 
1180
        chk_bytes = self.get_chk_bytes()
 
1181
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1182
        bytes = ''.join(chk_inv.to_lines())
 
1183
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1184
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1185
        bytes = ''.join(chk_inv2.to_lines())
 
1186
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
1187
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
1188
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1189
            (False, True))],
 
1190
            list(inv_1.iter_changes(inv_2)))
 
1191
 
 
1192
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1193
        inv = Inventory()
 
1194
        inv.revision_id = "revid"
 
1195
        inv.root.revision = "rootrev"
 
1196
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1197
        inv["fileid"].revision = "filerev"
 
1198
        inv["fileid"].executable = True
 
1199
        inv["fileid"].text_sha1 = "ffff"
 
1200
        inv["fileid"].text_size = 1
 
1201
        # get fresh objects.
 
1202
        chk_bytes = self.get_chk_bytes()
 
1203
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1204
        bytes = ''.join(tmp_inv.to_lines())
 
1205
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1206
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1207
        self.assertEqual(
 
1208
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
1209
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1210
 
 
1211
    def test_file_entry_to_bytes(self):
 
1212
        inv = CHKInventory(None)
 
1213
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
 
1214
        ie.executable = True
 
1215
        ie.revision = 'file-rev-id'
 
1216
        ie.text_sha1 = 'abcdefgh'
 
1217
        ie.text_size = 100
 
1218
        bytes = inv._entry_to_bytes(ie)
 
1219
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
 
1220
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1221
        ie2 = inv._bytes_to_entry(bytes)
 
1222
        self.assertEqual(ie, ie2)
 
1223
        self.assertIsInstance(ie2.name, unicode)
 
1224
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
 
1225
                         inv._bytes_to_utf8name_key(bytes))
 
1226
 
 
1227
    def test_file2_entry_to_bytes(self):
 
1228
        inv = CHKInventory(None)
 
1229
        # \u30a9 == 'omega'
 
1230
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
 
1231
        ie.executable = False
 
1232
        ie.revision = 'file-rev-id'
 
1233
        ie.text_sha1 = '123456'
 
1234
        ie.text_size = 25
 
1235
        bytes = inv._entry_to_bytes(ie)
 
1236
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
 
1237
                         'file-rev-id\n123456\n25\nN', bytes)
 
1238
        ie2 = inv._bytes_to_entry(bytes)
 
1239
        self.assertEqual(ie, ie2)
 
1240
        self.assertIsInstance(ie2.name, unicode)
 
1241
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
 
1242
                         inv._bytes_to_utf8name_key(bytes))
 
1243
 
 
1244
    def test_dir_entry_to_bytes(self):
 
1245
        inv = CHKInventory(None)
 
1246
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
 
1247
        ie.revision = 'dir-rev-id'
 
1248
        bytes = inv._entry_to_bytes(ie)
 
1249
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1250
        ie2 = inv._bytes_to_entry(bytes)
 
1251
        self.assertEqual(ie, ie2)
 
1252
        self.assertIsInstance(ie2.name, unicode)
 
1253
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
 
1254
                         inv._bytes_to_utf8name_key(bytes))
 
1255
 
 
1256
    def test_dir2_entry_to_bytes(self):
 
1257
        inv = CHKInventory(None)
 
1258
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
 
1259
                                          None)
 
1260
        ie.revision = 'dir-rev-id'
 
1261
        bytes = inv._entry_to_bytes(ie)
 
1262
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
 
1263
                         'dir-rev-id', bytes)
 
1264
        ie2 = inv._bytes_to_entry(bytes)
 
1265
        self.assertEqual(ie, ie2)
 
1266
        self.assertIsInstance(ie2.name, unicode)
 
1267
        self.assertIs(ie2.parent_id, None)
 
1268
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
 
1269
                         inv._bytes_to_utf8name_key(bytes))
 
1270
 
 
1271
    def test_symlink_entry_to_bytes(self):
 
1272
        inv = CHKInventory(None)
 
1273
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
 
1274
        ie.revision = 'link-rev-id'
 
1275
        ie.symlink_target = u'target/path'
 
1276
        bytes = inv._entry_to_bytes(ie)
 
1277
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
 
1278
                         'link-rev-id\ntarget/path', bytes)
 
1279
        ie2 = inv._bytes_to_entry(bytes)
 
1280
        self.assertEqual(ie, ie2)
 
1281
        self.assertIsInstance(ie2.name, unicode)
 
1282
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1283
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
 
1284
                         inv._bytes_to_utf8name_key(bytes))
 
1285
 
 
1286
    def test_symlink2_entry_to_bytes(self):
 
1287
        inv = CHKInventory(None)
 
1288
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
 
1289
        ie.revision = 'link-rev-id'
 
1290
        ie.symlink_target = u'target/\u03a9path'
 
1291
        bytes = inv._entry_to_bytes(ie)
 
1292
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1293
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1294
        ie2 = inv._bytes_to_entry(bytes)
 
1295
        self.assertEqual(ie, ie2)
 
1296
        self.assertIsInstance(ie2.name, unicode)
 
1297
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1298
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
 
1299
                         inv._bytes_to_utf8name_key(bytes))
 
1300
 
 
1301
    def test_tree_reference_entry_to_bytes(self):
 
1302
        inv = CHKInventory(None)
 
1303
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
 
1304
                                     'parent-id')
 
1305
        ie.revision = 'tree-rev-id'
 
1306
        ie.reference_revision = 'ref-rev-id'
 
1307
        bytes = inv._entry_to_bytes(ie)
 
1308
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1309
                         'tree-rev-id\nref-rev-id', bytes)
 
1310
        ie2 = inv._bytes_to_entry(bytes)
 
1311
        self.assertEqual(ie, ie2)
 
1312
        self.assertIsInstance(ie2.name, unicode)
 
1313
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
 
1314
                         inv._bytes_to_utf8name_key(bytes))
 
1315
 
 
1316
    def make_basic_utf8_inventory(self):
 
1317
        inv = Inventory()
 
1318
        inv.revision_id = "revid"
 
1319
        inv.root.revision = "rootrev"
 
1320
        root_id = inv.root.file_id
 
1321
        inv.add(InventoryFile("fileid", u'f\xefle', root_id))
 
1322
        inv["fileid"].revision = "filerev"
 
1323
        inv["fileid"].text_sha1 = "ffff"
 
1324
        inv["fileid"].text_size = 0
 
1325
        inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
 
1326
        inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
 
1327
        inv["childid"].revision = "filerev"
 
1328
        inv["childid"].text_sha1 = "ffff"
 
1329
        inv["childid"].text_size = 0
 
1330
        chk_bytes = self.get_chk_bytes()
 
1331
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1332
        bytes = ''.join(chk_inv.to_lines())
 
1333
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1334
 
 
1335
    def test__preload_handles_utf8(self):
 
1336
        new_inv = self.make_basic_utf8_inventory()
 
1337
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1338
        self.assertFalse(new_inv._fully_cached)
 
1339
        new_inv._preload_cache()
 
1340
        self.assertEqual(
 
1341
            sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
 
1342
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1343
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1344
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1345
                         sorted(ie_root._children.keys()))
 
1346
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1347
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1348
 
 
1349
    def test__preload_populates_cache(self):
 
1350
        inv = Inventory()
 
1351
        inv.revision_id = "revid"
 
1352
        inv.root.revision = "rootrev"
 
1353
        root_id = inv.root.file_id
 
1354
        inv.add(InventoryFile("fileid", "file", root_id))
 
1355
        inv["fileid"].revision = "filerev"
 
1356
        inv["fileid"].executable = True
 
1357
        inv["fileid"].text_sha1 = "ffff"
 
1358
        inv["fileid"].text_size = 1
 
1359
        inv.add(InventoryDirectory("dirid", "dir", root_id))
 
1360
        inv.add(InventoryFile("childid", "child", "dirid"))
 
1361
        inv["childid"].revision = "filerev"
 
1362
        inv["childid"].executable = False
 
1363
        inv["childid"].text_sha1 = "dddd"
 
1364
        inv["childid"].text_size = 1
 
1365
        chk_bytes = self.get_chk_bytes()
 
1366
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1367
        bytes = ''.join(chk_inv.to_lines())
 
1368
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1369
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1370
        self.assertFalse(new_inv._fully_cached)
 
1371
        new_inv._preload_cache()
 
1372
        self.assertEqual(
 
1373
            sorted([root_id, "fileid", "dirid", "childid"]),
 
1374
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1375
        self.assertTrue(new_inv._fully_cached)
 
1376
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1377
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1378
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1379
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1380
 
 
1381
    def test__preload_handles_partially_evaluated_inventory(self):
 
1382
        new_inv = self.make_basic_utf8_inventory()
 
1383
        ie = new_inv[new_inv.root_id]
 
1384
        self.assertIs(None, ie._children)
 
1385
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1386
                         sorted(ie.children.keys()))
 
1387
        # Accessing .children loads _children
 
1388
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1389
                         sorted(ie._children.keys()))
 
1390
        new_inv._preload_cache()
 
1391
        # No change
 
1392
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1393
                         sorted(ie._children.keys()))
 
1394
        ie_dir = new_inv["dirid"]
 
1395
        self.assertEqual([u'ch\xefld'],
 
1396
                         sorted(ie_dir._children.keys()))
 
1397
 
 
1398
    def test_filter_change_in_renamed_subfolder(self):
 
1399
        inv = Inventory('tree-root')
 
1400
        src_ie = inv.add_path('src', 'directory', 'src-id')
 
1401
        inv.add_path('src/sub/', 'directory', 'sub-id')
 
1402
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
 
1403
        a_ie.text_sha1 = osutils.sha_string('content\n')
 
1404
        a_ie.text_size = len('content\n')
 
1405
        chk_bytes = self.get_chk_bytes()
 
1406
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1407
        inv = inv.create_by_apply_delta([
 
1408
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
 
1409
            ("src", "src2", "src-id", src_ie),
 
1410
            ], 'new-rev-2')
 
1411
        new_inv = inv.filter(['a-id', 'src-id'])
 
1412
        self.assertEqual([
 
1413
            ('', 'tree-root'),
 
1414
            ('src', 'src-id'),
 
1415
            ('src/sub', 'sub-id'),
 
1416
            ('src/sub/a', 'a-id'),
 
1417
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1418
 
 
1419
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1420
 
 
1421
    def get_chk_bytes(self):
 
1422
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1423
        trans = self.get_transport('')
 
1424
        return factory(trans)
 
1425
 
 
1426
    def make_dir(self, inv, name, parent_id):
 
1427
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1428
 
 
1429
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1430
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1431
        ie.text_sha1 = osutils.sha_string(content)
 
1432
        ie.text_size = len(content)
 
1433
        inv.add(ie)
 
1434
 
 
1435
    def make_simple_inventory(self):
 
1436
        inv = Inventory('TREE_ROOT')
 
1437
        inv.revision_id = "revid"
 
1438
        inv.root.revision = "rootrev"
 
1439
        # /                 TREE_ROOT
 
1440
        # dir1/             dir1-id
 
1441
        #   sub-file1       sub-file1-id
 
1442
        #   sub-file2       sub-file2-id
 
1443
        #   sub-dir1/       sub-dir1-id
 
1444
        #     subsub-file1  subsub-file1-id
 
1445
        # dir2/             dir2-id
 
1446
        #   sub2-file1      sub2-file1-id
 
1447
        # top               top-id
 
1448
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1449
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1450
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1451
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1452
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1453
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1454
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1455
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1456
        chk_bytes = self.get_chk_bytes()
 
1457
        #  use a small maximum_size to force internal paging structures
 
1458
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1459
                        maximum_size=100,
 
1460
                        search_key_name='hash-255-way')
 
1461
        bytes = ''.join(chk_inv.to_lines())
 
1462
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1463
 
 
1464
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1465
        self.assertEqual(sorted(expected_fileids),
 
1466
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1467
 
 
1468
    def assertExpand(self, all_ids, inv, file_ids):
 
1469
        (val_all_ids,
 
1470
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1471
        self.assertEqual(set(all_ids), val_all_ids)
 
1472
        entries = inv._getitems(val_all_ids)
 
1473
        expected_children = {}
 
1474
        for entry in entries:
 
1475
            s = expected_children.setdefault(entry.parent_id, [])
 
1476
            s.append(entry.file_id)
 
1477
        val_children = dict((k, sorted(v)) for k, v
 
1478
                            in val_children.items())
 
1479
        expected_children = dict((k, sorted(v)) for k, v
 
1480
                            in expected_children.items())
 
1481
        self.assertEqual(expected_children, val_children)
 
1482
 
 
1483
    def test_make_simple_inventory(self):
 
1484
        inv = self.make_simple_inventory()
 
1485
        layout = []
 
1486
        for path, entry in inv.iter_entries_by_dir():
 
1487
            layout.append((path, entry.file_id))
 
1488
        self.assertEqual([
 
1489
            ('', 'TREE_ROOT'),
 
1490
            ('dir1', 'dir1-id'),
 
1491
            ('dir2', 'dir2-id'),
 
1492
            ('top', 'top-id'),
 
1493
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1494
            ('dir1/sub-file1', 'sub-file1-id'),
 
1495
            ('dir1/sub-file2', 'sub-file2-id'),
 
1496
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1497
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1498
            ], layout)
 
1499
 
 
1500
    def test__getitems(self):
 
1501
        inv = self.make_simple_inventory()
 
1502
        # Reading from disk
 
1503
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1504
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1505
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1506
        # From cache
 
1507
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1508
        # Mixed
 
1509
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1510
                             ['dir1-id', 'sub-file2-id'])
 
1511
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1512
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1513
 
 
1514
    def test_single_file(self):
 
1515
        inv = self.make_simple_inventory()
 
1516
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1517
 
 
1518
    def test_get_all_parents(self):
 
1519
        inv = self.make_simple_inventory()
 
1520
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1521
                           'subsub-file1-id',
 
1522
                          ], inv, ['subsub-file1-id'])
 
1523
 
 
1524
    def test_get_children(self):
 
1525
        inv = self.make_simple_inventory()
 
1526
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1527
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1528
                          ], inv, ['dir1-id'])
 
1529
 
 
1530
    def test_from_root(self):
 
1531
        inv = self.make_simple_inventory()
 
1532
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1533
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1534
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1535
 
 
1536
    def test_top_level_file(self):
 
1537
        inv = self.make_simple_inventory()
 
1538
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1539
 
 
1540
    def test_subsub_file(self):
 
1541
        inv = self.make_simple_inventory()
 
1542
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1543
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1544
 
 
1545
    def test_sub_and_root(self):
 
1546
        inv = self.make_simple_inventory()
 
1547
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1548
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
1549
 
 
1550
 
 
1551
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1552
 
 
1553
    def test_empty(self):
 
1554
        repository = self.make_repository('.')
 
1555
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1556
        inv = mutable_inventory_from_tree(tree)
 
1557
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1558
        self.assertEqual(0, len(inv))
 
1559
 
 
1560
    def test_some_files(self):
 
1561
        wt = self.make_branch_and_tree('.')
 
1562
        self.build_tree(['a'])
 
1563
        wt.add(['a'], ['thefileid'])
 
1564
        revid = wt.commit("commit")
 
1565
        tree = wt.branch.repository.revision_tree(revid)
 
1566
        inv = mutable_inventory_from_tree(tree)
 
1567
        self.assertEqual(revid, inv.revision_id)
 
1568
        self.assertEqual(2, len(inv))
 
1569
        self.assertEqual("a", inv['thefileid'].name)
 
1570
        # The inventory should be mutable and independent of
 
1571
        # the original tree
 
1572
        self.assertFalse(tree.root_inventory['thefileid'].executable)
 
1573
        inv['thefileid'].executable = True
 
1574
        self.assertFalse(tree.root_inventory['thefileid'].executable)