/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Jonathan Lange
  • Date: 2009-12-09 09:20:42 UTC
  • mfrom: (4881 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4907.
  • Revision ID: jml@canonical.com-20091209092042-s2zgqcf8f39yzxpj
Merge trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
from bzrlib import errors, chk_map, inventory, osutils
 
18
from bzrlib import (
 
19
    chk_map,
 
20
    groupcompress,
 
21
    bzrdir,
 
22
    errors,
 
23
    inventory,
 
24
    osutils,
 
25
    repository,
 
26
    revision,
 
27
    tests,
 
28
    )
19
29
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
20
30
    InventoryDirectory, InventoryEntry, TreeReference)
21
 
from bzrlib.tests import TestCase, TestCaseWithTransport
 
31
from bzrlib.tests import (
 
32
    TestCase,
 
33
    TestCaseWithTransport,
 
34
    condition_isinstance,
 
35
    multiply_tests,
 
36
    split_suite_by_condition,
 
37
    )
 
38
from bzrlib.tests.per_workingtree import workingtree_formats
 
39
 
 
40
 
 
41
def load_tests(standard_tests, module, loader):
 
42
    """Parameterise some inventory tests."""
 
43
    to_adapt, result = split_suite_by_condition(standard_tests,
 
44
        condition_isinstance(TestDeltaApplication))
 
45
    scenarios = [
 
46
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
47
        ]
 
48
    # Working tree basis delta application
 
49
    # Repository add_inv_by_delta.
 
50
    # Reduce form of the per_repository test logic - that logic needs to be
 
51
    # be able to get /just/ repositories whereas these tests are fine with
 
52
    # just creating trees.
 
53
    formats = set()
 
54
    for _, format in repository.format_registry.iteritems():
 
55
        scenarios.append((str(format.__name__), {
 
56
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
57
            'format':format}))
 
58
    for format in workingtree_formats():
 
59
        scenarios.append(
 
60
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
61
            'apply_delta':apply_inventory_WT_basis,
 
62
            'format':format}))
 
63
        scenarios.append(
 
64
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
65
            'apply_delta':apply_inventory_WT,
 
66
            'format':format}))
 
67
    return multiply_tests(to_adapt, scenarios, result)
 
68
 
 
69
 
 
70
def create_texts_for_inv(repo, inv):
 
71
    for path, ie in inv.iter_entries():
 
72
        if ie.text_size:
 
73
            lines = ['a' * ie.text_size]
 
74
        else:
 
75
            lines = []
 
76
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
77
    
 
78
def apply_inventory_Inventory(self, basis, delta):
 
79
    """Apply delta to basis and return the result.
 
80
    
 
81
    :param basis: An inventory to be used as the basis.
 
82
    :param delta: The inventory delta to apply:
 
83
    :return: An inventory resulting from the application.
 
84
    """
 
85
    basis.apply_delta(delta)
 
86
    return basis
 
87
 
 
88
 
 
89
def apply_inventory_WT(self, basis, delta):
 
90
    """Apply delta to basis and return the result.
 
91
 
 
92
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
93
    
 
94
    :param basis: An inventory to be used as the basis.
 
95
    :param delta: The inventory delta to apply:
 
96
    :return: An inventory resulting from the application.
 
97
    """
 
98
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
99
    control.create_repository()
 
100
    control.create_branch()
 
101
    tree = self.format.initialize(control)
 
102
    tree.lock_write()
 
103
    try:
 
104
        tree._write_inventory(basis)
 
105
    finally:
 
106
        tree.unlock()
 
107
    # Fresh object, reads disk again.
 
108
    tree = tree.bzrdir.open_workingtree()
 
109
    tree.lock_write()
 
110
    try:
 
111
        tree.apply_inventory_delta(delta)
 
112
    finally:
 
113
        tree.unlock()
 
114
    # reload tree - ensure we get what was written.
 
115
    tree = tree.bzrdir.open_workingtree()
 
116
    tree.lock_read()
 
117
    self.addCleanup(tree.unlock)
 
118
    # One could add 'tree._validate' here but that would cause 'early' failues 
 
119
    # as far as higher level code is concerned. Possibly adding an
 
120
    # expect_fail parameter to this function and if that is False then do a
 
121
    # validate call.
 
122
    return tree.inventory
 
123
 
 
124
 
 
125
def apply_inventory_WT_basis(self, basis, delta):
 
126
    """Apply delta to basis and return the result.
 
127
 
 
128
    This sets the parent and then calls update_basis_by_delta.
 
129
    It also puts the basis in the repository under both 'basis' and 'result' to
 
130
    allow safety checks made by the WT to succeed, and finally ensures that all
 
131
    items in the delta with a new path are present in the WT before calling
 
132
    update_basis_by_delta.
 
133
    
 
134
    :param basis: An inventory to be used as the basis.
 
135
    :param delta: The inventory delta to apply:
 
136
    :return: An inventory resulting from the application.
 
137
    """
 
138
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
139
    control.create_repository()
 
140
    control.create_branch()
 
141
    tree = self.format.initialize(control)
 
142
    tree.lock_write()
 
143
    try:
 
144
        repo = tree.branch.repository
 
145
        repo.start_write_group()
 
146
        try:
 
147
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
148
                message="", committer="foo@example.com")
 
149
            basis.revision_id = 'basis'
 
150
            create_texts_for_inv(tree.branch.repository, basis)
 
151
            repo.add_revision('basis', rev, basis)
 
152
            # Add a revision for the result, with the basis content - 
 
153
            # update_basis_by_delta doesn't check that the delta results in
 
154
            # result, and we want inconsistent deltas to get called on the
 
155
            # tree, or else the code isn't actually checked.
 
156
            rev = revision.Revision('result', timestamp=0, timezone=None,
 
157
                message="", committer="foo@example.com")
 
158
            basis.revision_id = 'result'
 
159
            repo.add_revision('result', rev, basis)
 
160
            repo.commit_write_group()
 
161
        except:
 
162
            repo.abort_write_group()
 
163
            raise
 
164
        # Set the basis state as the trees current state
 
165
        tree._write_inventory(basis)
 
166
        # This reads basis from the repo and puts it into the tree's local
 
167
        # cache, if it has one.
 
168
        tree.set_parent_ids(['basis'])
 
169
        paths = {}
 
170
        parents = set()
 
171
        for old, new, id, entry in delta:
 
172
            if None in (new, entry):
 
173
                continue
 
174
            paths[new] = (entry.file_id, entry.kind)
 
175
            parents.add(osutils.dirname(new))
 
176
        parents = osutils.minimum_path_selection(parents)
 
177
        parents.discard('')
 
178
        # Put place holders in the tree to permit adding the other entries.
 
179
        for pos, parent in enumerate(parents):
 
180
            if not tree.path2id(parent):
 
181
                # add a synthetic directory in the tree so we can can put the
 
182
                # tree0 entries in place for dirstate.
 
183
                tree.add([parent], ["id%d" % pos], ["directory"])
 
184
        if paths:
 
185
            # Many deltas may cause this mini-apply to fail, but we want to see what
 
186
            # the delta application code says, not the prep that we do to deal with 
 
187
            # limitations of dirstate's update_basis code.
 
188
            for path, (file_id, kind) in sorted(paths.items()):
 
189
                try:
 
190
                    tree.add([path], [file_id], [kind])
 
191
                except (KeyboardInterrupt, SystemExit):
 
192
                    raise
 
193
                except:
 
194
                    pass
 
195
    finally:
 
196
        tree.unlock()
 
197
    # Fresh lock, reads disk again.
 
198
    tree.lock_write()
 
199
    try:
 
200
        tree.update_basis_by_delta('result', delta)
 
201
    finally:
 
202
        tree.unlock()
 
203
    # reload tree - ensure we get what was written.
 
204
    tree = tree.bzrdir.open_workingtree()
 
205
    basis_tree = tree.basis_tree()
 
206
    basis_tree.lock_read()
 
207
    self.addCleanup(basis_tree.unlock)
 
208
    # Note, that if the tree does not have a local cache, the trick above of
 
209
    # setting the result as the basis, will come back to bite us. That said,
 
210
    # all the implementations in bzr do have a local cache.
 
211
    return basis_tree.inventory
 
212
 
 
213
 
 
214
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
 
215
    """Apply delta to basis and return the result.
 
216
    
 
217
    This inserts basis as a whole inventory and then uses
 
218
    add_inventory_by_delta to add delta.
 
219
 
 
220
    :param basis: An inventory to be used as the basis.
 
221
    :param delta: The inventory delta to apply:
 
222
    :return: An inventory resulting from the application.
 
223
    """
 
224
    format = self.format()
 
225
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
 
226
    repo = format.initialize(control)
 
227
    repo.lock_write()
 
228
    try:
 
229
        repo.start_write_group()
 
230
        try:
 
231
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
232
                message="", committer="foo@example.com")
 
233
            basis.revision_id = 'basis'
 
234
            create_texts_for_inv(repo, basis)
 
235
            repo.add_revision('basis', rev, basis)
 
236
            repo.commit_write_group()
 
237
        except:
 
238
            repo.abort_write_group()
 
239
            raise
 
240
    finally:
 
241
        repo.unlock()
 
242
    repo.lock_write()
 
243
    try:
 
244
        repo.start_write_group()
 
245
        try:
 
246
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
247
                'result', ['basis'])
 
248
        except:
 
249
            repo.abort_write_group()
 
250
            raise
 
251
        else:
 
252
            repo.commit_write_group()
 
253
    finally:
 
254
        repo.unlock()
 
255
    # Fresh lock, reads disk again.
 
256
    repo = repo.bzrdir.open_repository()
 
257
    repo.lock_read()
 
258
    self.addCleanup(repo.unlock)
 
259
    return repo.get_inventory('result')
 
260
 
 
261
 
 
262
class TestInventoryUpdates(TestCase):
 
263
 
 
264
    def test_creation_from_root_id(self):
 
265
        # iff a root id is passed to the constructor, a root directory is made
 
266
        inv = inventory.Inventory(root_id='tree-root')
 
267
        self.assertNotEqual(None, inv.root)
 
268
        self.assertEqual('tree-root', inv.root.file_id)
 
269
 
 
270
    def test_add_path_of_root(self):
 
271
        # if no root id is given at creation time, there is no root directory
 
272
        inv = inventory.Inventory(root_id=None)
 
273
        self.assertIs(None, inv.root)
 
274
        # add a root entry by adding its path
 
275
        ie = inv.add_path("", "directory", "my-root")
 
276
        ie.revision = 'test-rev'
 
277
        self.assertEqual("my-root", ie.file_id)
 
278
        self.assertIs(ie, inv.root)
 
279
 
 
280
    def test_add_path(self):
 
281
        inv = inventory.Inventory(root_id='tree_root')
 
282
        ie = inv.add_path('hello', 'file', 'hello-id')
 
283
        self.assertEqual('hello-id', ie.file_id)
 
284
        self.assertEqual('file', ie.kind)
 
285
 
 
286
    def test_copy(self):
 
287
        """Make sure copy() works and creates a deep copy."""
 
288
        inv = inventory.Inventory(root_id='some-tree-root')
 
289
        ie = inv.add_path('hello', 'file', 'hello-id')
 
290
        inv2 = inv.copy()
 
291
        inv.root.file_id = 'some-new-root'
 
292
        ie.name = 'file2'
 
293
        self.assertEqual('some-tree-root', inv2.root.file_id)
 
294
        self.assertEqual('hello', inv2['hello-id'].name)
 
295
 
 
296
    def test_copy_empty(self):
 
297
        """Make sure an empty inventory can be copied."""
 
298
        inv = inventory.Inventory(root_id=None)
 
299
        inv2 = inv.copy()
 
300
        self.assertIs(None, inv2.root)
 
301
 
 
302
    def test_copy_copies_root_revision(self):
 
303
        """Make sure the revision of the root gets copied."""
 
304
        inv = inventory.Inventory(root_id='someroot')
 
305
        inv.root.revision = 'therev'
 
306
        inv2 = inv.copy()
 
307
        self.assertEquals('someroot', inv2.root.file_id)
 
308
        self.assertEquals('therev', inv2.root.revision)
 
309
 
 
310
    def test_create_tree_reference(self):
 
311
        inv = inventory.Inventory('tree-root-123')
 
312
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
 
313
                              revision='rev', reference_revision='rev2'))
 
314
 
 
315
    def test_error_encoding(self):
 
316
        inv = inventory.Inventory('tree-root')
 
317
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
318
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
319
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
320
        self.assertContainsRe(str(e), r'\\u1234')
 
321
 
 
322
    def test_add_recursive(self):
 
323
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
324
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
325
        parent.children[child.file_id] = child
 
326
        inv = inventory.Inventory('tree-root')
 
327
        inv.add(parent)
 
328
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
329
 
 
330
 
 
331
 
 
332
class TestDeltaApplication(TestCaseWithTransport):
 
333
 
 
334
    def get_empty_inventory(self, reference_inv=None):
 
335
        """Get an empty inventory.
 
336
 
 
337
        Note that tests should not depend on the revision of the root for
 
338
        setting up test conditions, as it has to be flexible to accomodate non
 
339
        rich root repositories.
 
340
 
 
341
        :param reference_inv: If not None, get the revision for the root from
 
342
            this inventory. This is useful for dealing with older repositories
 
343
            that routinely discarded the root entry data. If None, the root's
 
344
            revision is set to 'basis'.
 
345
        """
 
346
        inv = inventory.Inventory()
 
347
        if reference_inv is not None:
 
348
            inv.root.revision = reference_inv.root.revision
 
349
        else:
 
350
            inv.root.revision = 'basis'
 
351
        return inv
 
352
 
 
353
    def test_empty_delta(self):
 
354
        inv = self.get_empty_inventory()
 
355
        delta = []
 
356
        inv = self.apply_delta(self, inv, delta)
 
357
        inv2 = self.get_empty_inventory(inv)
 
358
        self.assertEqual([], inv2._make_delta(inv))
 
359
 
 
360
    def test_None_file_id(self):
 
361
        inv = self.get_empty_inventory()
 
362
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
363
        dir1.revision = 'result'
 
364
        delta = [(None, u'dir1', None, dir1)]
 
365
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
366
            inv, delta)
 
367
 
 
368
    def test_unicode_file_id(self):
 
369
        inv = self.get_empty_inventory()
 
370
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
371
        dir1.revision = 'result'
 
372
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
373
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
374
            inv, delta)
 
375
 
 
376
    def test_repeated_file_id(self):
 
377
        inv = self.get_empty_inventory()
 
378
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
379
        file1.revision = 'result'
 
380
        file1.text_size = 0
 
381
        file1.text_sha1 = ""
 
382
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
 
383
        file2.revision = 'result'
 
384
        file2.text_size = 0
 
385
        file2.text_sha1 = ""
 
386
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
387
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
388
            inv, delta)
 
389
 
 
390
    def test_repeated_new_path(self):
 
391
        inv = self.get_empty_inventory()
 
392
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
393
        file1.revision = 'result'
 
394
        file1.text_size = 0
 
395
        file1.text_sha1 = ""
 
396
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
 
397
        file2.revision = 'result'
 
398
        file2.text_size = 0
 
399
        file2.text_sha1 = ""
 
400
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
401
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
402
            inv, delta)
 
403
 
 
404
    def test_repeated_old_path(self):
 
405
        inv = self.get_empty_inventory()
 
406
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
407
        file1.revision = 'result'
 
408
        file1.text_size = 0
 
409
        file1.text_sha1 = ""
 
410
        # We can't *create* a source inventory with the same path, but
 
411
        # a badly generated partial delta might claim the same source twice.
 
412
        # This would be buggy in two ways: the path is repeated in the delta,
 
413
        # And the path for one of the file ids doesn't match the source
 
414
        # location. Alternatively, we could have a repeated fileid, but that
 
415
        # is separately checked for.
 
416
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
417
        file2.revision = 'result'
 
418
        file2.text_size = 0
 
419
        file2.text_sha1 = ""
 
420
        inv.add(file1)
 
421
        inv.add(file2)
 
422
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
423
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
424
            inv, delta)
 
425
 
 
426
    def test_mismatched_id_entry_id(self):
 
427
        inv = self.get_empty_inventory()
 
428
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
429
        file1.revision = 'result'
 
430
        file1.text_size = 0
 
431
        file1.text_sha1 = ""
 
432
        delta = [(None, u'path', 'id', file1)]
 
433
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
434
            inv, delta)
 
435
 
 
436
    def test_mismatched_new_path_entry_None(self):
 
437
        inv = self.get_empty_inventory()
 
438
        delta = [(None, u'path', 'id', None)]
 
439
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
440
            inv, delta)
 
441
 
 
442
    def test_mismatched_new_path_None_entry(self):
 
443
        inv = self.get_empty_inventory()
 
444
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
445
        file1.revision = 'result'
 
446
        file1.text_size = 0
 
447
        file1.text_sha1 = ""
 
448
        delta = [(u"path", None, 'id1', file1)]
 
449
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
450
            inv, delta)
 
451
 
 
452
    def test_parent_is_not_directory(self):
 
453
        inv = self.get_empty_inventory()
 
454
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
455
        file1.revision = 'result'
 
456
        file1.text_size = 0
 
457
        file1.text_sha1 = ""
 
458
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
459
        file2.revision = 'result'
 
460
        file2.text_size = 0
 
461
        file2.text_sha1 = ""
 
462
        inv.add(file1)
 
463
        delta = [(None, u'path/path2', 'id2', file2)]
 
464
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
465
            inv, delta)
 
466
 
 
467
    def test_parent_is_missing(self):
 
468
        inv = self.get_empty_inventory()
 
469
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
470
        file2.revision = 'result'
 
471
        file2.text_size = 0
 
472
        file2.text_sha1 = ""
 
473
        delta = [(None, u'path/path2', 'id2', file2)]
 
474
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
475
            inv, delta)
 
476
 
 
477
    def test_new_parent_path_has_wrong_id(self):
 
478
        inv = self.get_empty_inventory()
 
479
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
480
        parent1.revision = 'result'
 
481
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
482
        parent2.revision = 'result'
 
483
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
484
        file1.revision = 'result'
 
485
        file1.text_size = 0
 
486
        file1.text_sha1 = ""
 
487
        inv.add(parent1)
 
488
        inv.add(parent2)
 
489
        # This delta claims that file1 is at dir/path, but actually its at
 
490
        # dir2/path if you follow the inventory parent structure.
 
491
        delta = [(None, u'dir/path', 'id', file1)]
 
492
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
493
            inv, delta)
 
494
 
 
495
    def test_old_parent_path_is_wrong(self):
 
496
        inv = self.get_empty_inventory()
 
497
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
498
        parent1.revision = 'result'
 
499
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
500
        parent2.revision = 'result'
 
501
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
502
        file1.revision = 'result'
 
503
        file1.text_size = 0
 
504
        file1.text_sha1 = ""
 
505
        inv.add(parent1)
 
506
        inv.add(parent2)
 
507
        inv.add(file1)
 
508
        # This delta claims that file1 was at dir/path, but actually it was at
 
509
        # dir2/path if you follow the inventory parent structure.
 
510
        delta = [(u'dir/path', None, 'id', None)]
 
511
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
512
            inv, delta)
 
513
 
 
514
    def test_old_parent_path_is_for_other_id(self):
 
515
        inv = self.get_empty_inventory()
 
516
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
517
        parent1.revision = 'result'
 
518
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
519
        parent2.revision = 'result'
 
520
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
521
        file1.revision = 'result'
 
522
        file1.text_size = 0
 
523
        file1.text_sha1 = ""
 
524
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
525
        file2.revision = 'result'
 
526
        file2.text_size = 0
 
527
        file2.text_sha1 = ""
 
528
        inv.add(parent1)
 
529
        inv.add(parent2)
 
530
        inv.add(file1)
 
531
        inv.add(file2)
 
532
        # This delta claims that file1 was at dir/path, but actually it was at
 
533
        # dir2/path if you follow the inventory parent structure. At dir/path
 
534
        # is another entry we should not delete.
 
535
        delta = [(u'dir/path', None, 'id', None)]
 
536
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
537
            inv, delta)
 
538
 
 
539
    def test_add_existing_id_new_path(self):
 
540
        inv = self.get_empty_inventory()
 
541
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
542
        parent1.revision = 'result'
 
543
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
544
        parent2.revision = 'result'
 
545
        inv.add(parent1)
 
546
        delta = [(None, u'dir2', 'p-1', parent2)]
 
547
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
548
            inv, delta)
 
549
 
 
550
    def test_add_new_id_existing_path(self):
 
551
        inv = self.get_empty_inventory()
 
552
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
553
        parent1.revision = 'result'
 
554
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
555
        parent2.revision = 'result'
 
556
        inv.add(parent1)
 
557
        delta = [(None, u'dir1', 'p-2', parent2)]
 
558
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
559
            inv, delta)
 
560
 
 
561
    def test_remove_dir_leaving_dangling_child(self):
 
562
        inv = self.get_empty_inventory()
 
563
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
564
        dir1.revision = 'result'
 
565
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
566
        dir2.revision = 'result'
 
567
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
568
        dir3.revision = 'result'
 
569
        inv.add(dir1)
 
570
        inv.add(dir2)
 
571
        inv.add(dir3)
 
572
        delta = [(u'dir1', None, 'p-1', None),
 
573
            (u'dir1/child2', None, 'p-3', None)]
 
574
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
575
            inv, delta)
 
576
 
 
577
 
 
578
class TestInventory(TestCase):
 
579
 
 
580
    def test_is_root(self):
 
581
        """Ensure our root-checking code is accurate."""
 
582
        inv = inventory.Inventory('TREE_ROOT')
 
583
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
584
        self.assertFalse(inv.is_root('booga'))
 
585
        inv.root.file_id = 'booga'
 
586
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
587
        self.assertTrue(inv.is_root('booga'))
 
588
        # works properly even if no root is set
 
589
        inv.root = None
 
590
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
591
        self.assertFalse(inv.is_root('booga'))
22
592
 
23
593
 
24
594
class TestInventoryEntry(TestCase):
168
738
        self.assertEqual(expected_change, change)
169
739
 
170
740
 
171
 
class TestCHKInventory(TestCaseWithTransport):
 
741
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
172
742
 
173
743
    def get_chk_bytes(self):
174
 
        # The easiest way to get a CHK store is a development6 repository and
175
 
        # then work with the chk_bytes attribute directly.
176
 
        repo = self.make_repository(".", format="development6-rich-root")
177
 
        repo.lock_write()
178
 
        self.addCleanup(repo.unlock)
179
 
        repo.start_write_group()
180
 
        self.addCleanup(repo.abort_write_group)
181
 
        return repo.chk_bytes
 
744
        factory = groupcompress.make_pack_factory(True, True, 1)
 
745
        trans = self.get_transport('')
 
746
        return factory(trans)
182
747
 
183
748
    def read_bytes(self, chk_bytes, key):
184
749
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
437
1002
        inv.add_path("", "directory", "myrootid", None)
438
1003
        inv.revision_id = "expectedid"
439
1004
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
440
 
        delta = [(None, "",  "myrootid", inv.root)]
 
1005
        delta = [("", None, base_inv.root.file_id, None),
 
1006
            (None, "",  "myrootid", inv.root)]
441
1007
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
442
1008
        self.assertEquals(reference_inv.root, new_inv.root)
443
1009
 
648
1214
        self.assertIsInstance(ie2.name, unicode)
649
1215
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
650
1216
                         inv._bytes_to_utf8name_key(bytes))
 
1217
 
 
1218
 
 
1219
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1220
 
 
1221
    def get_chk_bytes(self):
 
1222
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1223
        trans = self.get_transport('')
 
1224
        return factory(trans)
 
1225
 
 
1226
    def make_dir(self, inv, name, parent_id):
 
1227
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1228
 
 
1229
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1230
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1231
        ie.text_sha1 = osutils.sha_string(content)
 
1232
        ie.text_size = len(content)
 
1233
        inv.add(ie)
 
1234
 
 
1235
    def make_simple_inventory(self):
 
1236
        inv = Inventory('TREE_ROOT')
 
1237
        inv.revision_id = "revid"
 
1238
        inv.root.revision = "rootrev"
 
1239
        # /                 TREE_ROOT
 
1240
        # dir1/             dir1-id
 
1241
        #   sub-file1       sub-file1-id
 
1242
        #   sub-file2       sub-file2-id
 
1243
        #   sub-dir1/       sub-dir1-id
 
1244
        #     subsub-file1  subsub-file1-id
 
1245
        # dir2/             dir2-id
 
1246
        #   sub2-file1      sub2-file1-id
 
1247
        # top               top-id
 
1248
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1249
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1250
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1251
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1252
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1253
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1254
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1255
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1256
        chk_bytes = self.get_chk_bytes()
 
1257
        #  use a small maximum_size to force internal paging structures
 
1258
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1259
                        maximum_size=100,
 
1260
                        search_key_name='hash-255-way')
 
1261
        bytes = ''.join(chk_inv.to_lines())
 
1262
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1263
 
 
1264
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1265
        self.assertEqual(sorted(expected_fileids),
 
1266
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1267
 
 
1268
    def assertExpand(self, all_ids, inv, file_ids):
 
1269
        (val_all_ids,
 
1270
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1271
        self.assertEqual(set(all_ids), val_all_ids)
 
1272
        entries = inv._getitems(val_all_ids)
 
1273
        expected_children = {}
 
1274
        for entry in entries:
 
1275
            s = expected_children.setdefault(entry.parent_id, [])
 
1276
            s.append(entry.file_id)
 
1277
        val_children = dict((k, sorted(v)) for k, v
 
1278
                            in val_children.iteritems())
 
1279
        expected_children = dict((k, sorted(v)) for k, v
 
1280
                            in expected_children.iteritems())
 
1281
        self.assertEqual(expected_children, val_children)
 
1282
 
 
1283
    def test_make_simple_inventory(self):
 
1284
        inv = self.make_simple_inventory()
 
1285
        layout = []
 
1286
        for path, entry in inv.iter_entries_by_dir():
 
1287
            layout.append((path, entry.file_id))
 
1288
        self.assertEqual([
 
1289
            ('', 'TREE_ROOT'),
 
1290
            ('dir1', 'dir1-id'),
 
1291
            ('dir2', 'dir2-id'),
 
1292
            ('top', 'top-id'),
 
1293
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1294
            ('dir1/sub-file1', 'sub-file1-id'),
 
1295
            ('dir1/sub-file2', 'sub-file2-id'),
 
1296
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1297
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1298
            ], layout)
 
1299
 
 
1300
    def test__getitems(self):
 
1301
        inv = self.make_simple_inventory()
 
1302
        # Reading from disk
 
1303
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1304
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1305
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1306
        # From cache
 
1307
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1308
        # Mixed
 
1309
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1310
                             ['dir1-id', 'sub-file2-id'])
 
1311
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1312
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1313
 
 
1314
    def test_single_file(self):
 
1315
        inv = self.make_simple_inventory()
 
1316
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1317
 
 
1318
    def test_get_all_parents(self):
 
1319
        inv = self.make_simple_inventory()
 
1320
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1321
                           'subsub-file1-id',
 
1322
                          ], inv, ['subsub-file1-id'])
 
1323
 
 
1324
    def test_get_children(self):
 
1325
        inv = self.make_simple_inventory()
 
1326
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1327
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1328
                          ], inv, ['dir1-id'])
 
1329
 
 
1330
    def test_from_root(self):
 
1331
        inv = self.make_simple_inventory()
 
1332
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1333
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1334
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1335
 
 
1336
    def test_top_level_file(self):
 
1337
        inv = self.make_simple_inventory()
 
1338
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1339
 
 
1340
    def test_subsub_file(self):
 
1341
        inv = self.make_simple_inventory()
 
1342
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1343
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1344
 
 
1345
    def test_sub_and_root(self):
 
1346
        inv = self.make_simple_inventory()
 
1347
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1348
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])