/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

Late bind to PatienceSequenceMatcher to allow plugin to override.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from bzrlib import (
19
 
    chk_map,
20
 
    groupcompress,
21
 
    bzrdir,
22
 
    errors,
23
 
    inventory,
24
 
    osutils,
25
 
    repository,
26
 
    revision,
27
 
    tests,
28
 
    )
29
 
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
30
 
    InventoryDirectory, InventoryEntry, TreeReference)
31
 
from bzrlib.tests import (
32
 
    TestCase,
33
 
    TestCaseWithTransport,
34
 
    condition_isinstance,
35
 
    multiply_tests,
36
 
    split_suite_by_condition,
37
 
    )
38
 
from bzrlib.tests.per_workingtree import workingtree_formats
39
 
 
40
 
 
41
 
def load_tests(standard_tests, module, loader):
42
 
    """Parameterise some inventory tests."""
43
 
    to_adapt, result = split_suite_by_condition(standard_tests,
44
 
        condition_isinstance(TestDeltaApplication))
45
 
    scenarios = [
46
 
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
47
 
        ]
48
 
    # Working tree basis delta application
49
 
    # Repository add_inv_by_delta.
50
 
    # Reduce form of the per_repository test logic - that logic needs to be
51
 
    # be able to get /just/ repositories whereas these tests are fine with
52
 
    # just creating trees.
53
 
    formats = set()
54
 
    for _, format in repository.format_registry.iteritems():
55
 
        scenarios.append((str(format.__name__), {
56
 
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
57
 
            'format':format}))
58
 
    for format in workingtree_formats():
59
 
        scenarios.append(
60
 
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
61
 
            'apply_delta':apply_inventory_WT_basis,
62
 
            'format':format}))
63
 
        scenarios.append(
64
 
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
65
 
            'apply_delta':apply_inventory_WT,
66
 
            'format':format}))
67
 
    return multiply_tests(to_adapt, scenarios, result)
68
 
 
69
 
 
70
 
def create_texts_for_inv(repo, inv):
71
 
    for path, ie in inv.iter_entries():
72
 
        if ie.text_size:
73
 
            lines = ['a' * ie.text_size]
74
 
        else:
75
 
            lines = []
76
 
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
77
 
    
78
 
def apply_inventory_Inventory(self, basis, delta):
79
 
    """Apply delta to basis and return the result.
80
 
    
81
 
    :param basis: An inventory to be used as the basis.
82
 
    :param delta: The inventory delta to apply:
83
 
    :return: An inventory resulting from the application.
84
 
    """
85
 
    basis.apply_delta(delta)
86
 
    return basis
87
 
 
88
 
 
89
 
def apply_inventory_WT(self, basis, delta):
90
 
    """Apply delta to basis and return the result.
91
 
 
92
 
    This sets the tree state to be basis, and then calls apply_inventory_delta.
93
 
    
94
 
    :param basis: An inventory to be used as the basis.
95
 
    :param delta: The inventory delta to apply:
96
 
    :return: An inventory resulting from the application.
97
 
    """
98
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
99
 
    control.create_repository()
100
 
    control.create_branch()
101
 
    tree = self.format.initialize(control)
102
 
    tree.lock_write()
103
 
    try:
104
 
        tree._write_inventory(basis)
105
 
    finally:
106
 
        tree.unlock()
107
 
    # Fresh object, reads disk again.
108
 
    tree = tree.bzrdir.open_workingtree()
109
 
    tree.lock_write()
110
 
    try:
111
 
        tree.apply_inventory_delta(delta)
112
 
    finally:
113
 
        tree.unlock()
114
 
    # reload tree - ensure we get what was written.
115
 
    tree = tree.bzrdir.open_workingtree()
116
 
    tree.lock_read()
117
 
    self.addCleanup(tree.unlock)
118
 
    # One could add 'tree._validate' here but that would cause 'early' failues 
119
 
    # as far as higher level code is concerned. Possibly adding an
120
 
    # expect_fail parameter to this function and if that is False then do a
121
 
    # validate call.
122
 
    return tree.inventory
123
 
 
124
 
 
125
 
def apply_inventory_WT_basis(self, basis, delta):
126
 
    """Apply delta to basis and return the result.
127
 
 
128
 
    This sets the parent and then calls update_basis_by_delta.
129
 
    It also puts the basis in the repository under both 'basis' and 'result' to
130
 
    allow safety checks made by the WT to succeed, and finally ensures that all
131
 
    items in the delta with a new path are present in the WT before calling
132
 
    update_basis_by_delta.
133
 
    
134
 
    :param basis: An inventory to be used as the basis.
135
 
    :param delta: The inventory delta to apply:
136
 
    :return: An inventory resulting from the application.
137
 
    """
138
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
139
 
    control.create_repository()
140
 
    control.create_branch()
141
 
    tree = self.format.initialize(control)
142
 
    tree.lock_write()
143
 
    try:
144
 
        repo = tree.branch.repository
145
 
        repo.start_write_group()
146
 
        try:
147
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
148
 
                message="", committer="foo@example.com")
149
 
            basis.revision_id = 'basis'
150
 
            create_texts_for_inv(tree.branch.repository, basis)
151
 
            repo.add_revision('basis', rev, basis)
152
 
            # Add a revision for the result, with the basis content - 
153
 
            # update_basis_by_delta doesn't check that the delta results in
154
 
            # result, and we want inconsistent deltas to get called on the
155
 
            # tree, or else the code isn't actually checked.
156
 
            rev = revision.Revision('result', timestamp=0, timezone=None,
157
 
                message="", committer="foo@example.com")
158
 
            basis.revision_id = 'result'
159
 
            repo.add_revision('result', rev, basis)
160
 
            repo.commit_write_group()
161
 
        except:
162
 
            repo.abort_write_group()
163
 
            raise
164
 
        # Set the basis state as the trees current state
165
 
        tree._write_inventory(basis)
166
 
        # This reads basis from the repo and puts it into the tree's local
167
 
        # cache, if it has one.
168
 
        tree.set_parent_ids(['basis'])
169
 
        paths = {}
170
 
        parents = set()
171
 
        for old, new, id, entry in delta:
172
 
            if None in (new, entry):
173
 
                continue
174
 
            paths[new] = (entry.file_id, entry.kind)
175
 
            parents.add(osutils.dirname(new))
176
 
        parents = osutils.minimum_path_selection(parents)
177
 
        parents.discard('')
178
 
        # Put place holders in the tree to permit adding the other entries.
179
 
        for pos, parent in enumerate(parents):
180
 
            if not tree.path2id(parent):
181
 
                # add a synthetic directory in the tree so we can can put the
182
 
                # tree0 entries in place for dirstate.
183
 
                tree.add([parent], ["id%d" % pos], ["directory"])
184
 
        if paths:
185
 
            # Many deltas may cause this mini-apply to fail, but we want to see what
186
 
            # the delta application code says, not the prep that we do to deal with 
187
 
            # limitations of dirstate's update_basis code.
188
 
            for path, (file_id, kind) in sorted(paths.items()):
189
 
                try:
190
 
                    tree.add([path], [file_id], [kind])
191
 
                except (KeyboardInterrupt, SystemExit):
192
 
                    raise
193
 
                except:
194
 
                    pass
195
 
    finally:
196
 
        tree.unlock()
197
 
    # Fresh lock, reads disk again.
198
 
    tree.lock_write()
199
 
    try:
200
 
        tree.update_basis_by_delta('result', delta)
201
 
    finally:
202
 
        tree.unlock()
203
 
    # reload tree - ensure we get what was written.
204
 
    tree = tree.bzrdir.open_workingtree()
205
 
    basis_tree = tree.basis_tree()
206
 
    basis_tree.lock_read()
207
 
    self.addCleanup(basis_tree.unlock)
208
 
    # Note, that if the tree does not have a local cache, the trick above of
209
 
    # setting the result as the basis, will come back to bite us. That said,
210
 
    # all the implementations in bzr do have a local cache.
211
 
    return basis_tree.inventory
212
 
 
213
 
 
214
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
215
 
    """Apply delta to basis and return the result.
216
 
    
217
 
    This inserts basis as a whole inventory and then uses
218
 
    add_inventory_by_delta to add delta.
219
 
 
220
 
    :param basis: An inventory to be used as the basis.
221
 
    :param delta: The inventory delta to apply:
222
 
    :return: An inventory resulting from the application.
223
 
    """
224
 
    format = self.format()
225
 
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
226
 
    repo = format.initialize(control)
227
 
    repo.lock_write()
228
 
    try:
229
 
        repo.start_write_group()
230
 
        try:
231
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
232
 
                message="", committer="foo@example.com")
233
 
            basis.revision_id = 'basis'
234
 
            create_texts_for_inv(repo, basis)
235
 
            repo.add_revision('basis', rev, basis)
236
 
            repo.commit_write_group()
237
 
        except:
238
 
            repo.abort_write_group()
239
 
            raise
240
 
    finally:
241
 
        repo.unlock()
242
 
    repo.lock_write()
243
 
    try:
244
 
        repo.start_write_group()
245
 
        try:
246
 
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
247
 
                'result', ['basis'])
248
 
        except:
249
 
            repo.abort_write_group()
250
 
            raise
251
 
        else:
252
 
            repo.commit_write_group()
253
 
    finally:
254
 
        repo.unlock()
255
 
    # Fresh lock, reads disk again.
256
 
    repo = repo.bzrdir.open_repository()
257
 
    repo.lock_read()
258
 
    self.addCleanup(repo.unlock)
259
 
    return repo.get_inventory('result')
260
 
 
261
 
 
262
 
class TestInventoryUpdates(TestCase):
263
 
 
264
 
    def test_creation_from_root_id(self):
265
 
        # iff a root id is passed to the constructor, a root directory is made
266
 
        inv = inventory.Inventory(root_id='tree-root')
267
 
        self.assertNotEqual(None, inv.root)
268
 
        self.assertEqual('tree-root', inv.root.file_id)
269
 
 
270
 
    def test_add_path_of_root(self):
271
 
        # if no root id is given at creation time, there is no root directory
272
 
        inv = inventory.Inventory(root_id=None)
273
 
        self.assertIs(None, inv.root)
274
 
        # add a root entry by adding its path
275
 
        ie = inv.add_path("", "directory", "my-root")
276
 
        ie.revision = 'test-rev'
277
 
        self.assertEqual("my-root", ie.file_id)
278
 
        self.assertIs(ie, inv.root)
279
 
 
280
 
    def test_add_path(self):
281
 
        inv = inventory.Inventory(root_id='tree_root')
282
 
        ie = inv.add_path('hello', 'file', 'hello-id')
283
 
        self.assertEqual('hello-id', ie.file_id)
284
 
        self.assertEqual('file', ie.kind)
285
 
 
286
 
    def test_copy(self):
287
 
        """Make sure copy() works and creates a deep copy."""
288
 
        inv = inventory.Inventory(root_id='some-tree-root')
289
 
        ie = inv.add_path('hello', 'file', 'hello-id')
290
 
        inv2 = inv.copy()
291
 
        inv.root.file_id = 'some-new-root'
292
 
        ie.name = 'file2'
293
 
        self.assertEqual('some-tree-root', inv2.root.file_id)
294
 
        self.assertEqual('hello', inv2['hello-id'].name)
295
 
 
296
 
    def test_copy_empty(self):
297
 
        """Make sure an empty inventory can be copied."""
298
 
        inv = inventory.Inventory(root_id=None)
299
 
        inv2 = inv.copy()
300
 
        self.assertIs(None, inv2.root)
301
 
 
302
 
    def test_copy_copies_root_revision(self):
303
 
        """Make sure the revision of the root gets copied."""
304
 
        inv = inventory.Inventory(root_id='someroot')
305
 
        inv.root.revision = 'therev'
306
 
        inv2 = inv.copy()
307
 
        self.assertEquals('someroot', inv2.root.file_id)
308
 
        self.assertEquals('therev', inv2.root.revision)
309
 
 
310
 
    def test_create_tree_reference(self):
311
 
        inv = inventory.Inventory('tree-root-123')
312
 
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
313
 
                              revision='rev', reference_revision='rev2'))
314
 
 
315
 
    def test_error_encoding(self):
316
 
        inv = inventory.Inventory('tree-root')
317
 
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
318
 
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
319
 
            InventoryFile('b-id', u'\u1234', 'tree-root'))
320
 
        self.assertContainsRe(str(e), r'\\u1234')
321
 
 
322
 
    def test_add_recursive(self):
323
 
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
324
 
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
325
 
        parent.children[child.file_id] = child
326
 
        inv = inventory.Inventory('tree-root')
327
 
        inv.add(parent)
328
 
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
329
 
 
330
 
 
331
 
 
332
 
class TestDeltaApplication(TestCaseWithTransport):
333
 
 
334
 
    def get_empty_inventory(self, reference_inv=None):
335
 
        """Get an empty inventory.
336
 
 
337
 
        Note that tests should not depend on the revision of the root for
338
 
        setting up test conditions, as it has to be flexible to accomodate non
339
 
        rich root repositories.
340
 
 
341
 
        :param reference_inv: If not None, get the revision for the root from
342
 
            this inventory. This is useful for dealing with older repositories
343
 
            that routinely discarded the root entry data. If None, the root's
344
 
            revision is set to 'basis'.
345
 
        """
346
 
        inv = inventory.Inventory()
347
 
        if reference_inv is not None:
348
 
            inv.root.revision = reference_inv.root.revision
349
 
        else:
350
 
            inv.root.revision = 'basis'
351
 
        return inv
352
 
 
353
 
    def test_empty_delta(self):
354
 
        inv = self.get_empty_inventory()
355
 
        delta = []
356
 
        inv = self.apply_delta(self, inv, delta)
357
 
        inv2 = self.get_empty_inventory(inv)
358
 
        self.assertEqual([], inv2._make_delta(inv))
359
 
 
360
 
    def test_None_file_id(self):
361
 
        inv = self.get_empty_inventory()
362
 
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
363
 
        dir1.revision = 'result'
364
 
        delta = [(None, u'dir1', None, dir1)]
365
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
366
 
            inv, delta)
367
 
 
368
 
    def test_unicode_file_id(self):
369
 
        inv = self.get_empty_inventory()
370
 
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
371
 
        dir1.revision = 'result'
372
 
        delta = [(None, u'dir1', dir1.file_id, dir1)]
373
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
374
 
            inv, delta)
375
 
 
376
 
    def test_repeated_file_id(self):
377
 
        inv = self.get_empty_inventory()
378
 
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
379
 
        file1.revision = 'result'
380
 
        file1.text_size = 0
381
 
        file1.text_sha1 = ""
382
 
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
383
 
        file2.revision = 'result'
384
 
        file2.text_size = 0
385
 
        file2.text_sha1 = ""
386
 
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
387
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
388
 
            inv, delta)
389
 
 
390
 
    def test_repeated_new_path(self):
391
 
        inv = self.get_empty_inventory()
392
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
393
 
        file1.revision = 'result'
394
 
        file1.text_size = 0
395
 
        file1.text_sha1 = ""
396
 
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
397
 
        file2.revision = 'result'
398
 
        file2.text_size = 0
399
 
        file2.text_sha1 = ""
400
 
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
401
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
402
 
            inv, delta)
403
 
 
404
 
    def test_repeated_old_path(self):
405
 
        inv = self.get_empty_inventory()
406
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
407
 
        file1.revision = 'result'
408
 
        file1.text_size = 0
409
 
        file1.text_sha1 = ""
410
 
        # We can't *create* a source inventory with the same path, but
411
 
        # a badly generated partial delta might claim the same source twice.
412
 
        # This would be buggy in two ways: the path is repeated in the delta,
413
 
        # And the path for one of the file ids doesn't match the source
414
 
        # location. Alternatively, we could have a repeated fileid, but that
415
 
        # is separately checked for.
416
 
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
417
 
        file2.revision = 'result'
418
 
        file2.text_size = 0
419
 
        file2.text_sha1 = ""
420
 
        inv.add(file1)
421
 
        inv.add(file2)
422
 
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
423
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
424
 
            inv, delta)
425
 
 
426
 
    def test_mismatched_id_entry_id(self):
427
 
        inv = self.get_empty_inventory()
428
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
429
 
        file1.revision = 'result'
430
 
        file1.text_size = 0
431
 
        file1.text_sha1 = ""
432
 
        delta = [(None, u'path', 'id', file1)]
433
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
434
 
            inv, delta)
435
 
 
436
 
    def test_mismatched_new_path_entry_None(self):
437
 
        inv = self.get_empty_inventory()
438
 
        delta = [(None, u'path', 'id', None)]
439
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
440
 
            inv, delta)
441
 
 
442
 
    def test_mismatched_new_path_None_entry(self):
443
 
        inv = self.get_empty_inventory()
444
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
445
 
        file1.revision = 'result'
446
 
        file1.text_size = 0
447
 
        file1.text_sha1 = ""
448
 
        delta = [(u"path", None, 'id1', file1)]
449
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
450
 
            inv, delta)
451
 
 
452
 
    def test_parent_is_not_directory(self):
453
 
        inv = self.get_empty_inventory()
454
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
455
 
        file1.revision = 'result'
456
 
        file1.text_size = 0
457
 
        file1.text_sha1 = ""
458
 
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
459
 
        file2.revision = 'result'
460
 
        file2.text_size = 0
461
 
        file2.text_sha1 = ""
462
 
        inv.add(file1)
463
 
        delta = [(None, u'path/path2', 'id2', file2)]
464
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
465
 
            inv, delta)
466
 
 
467
 
    def test_parent_is_missing(self):
468
 
        inv = self.get_empty_inventory()
469
 
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
470
 
        file2.revision = 'result'
471
 
        file2.text_size = 0
472
 
        file2.text_sha1 = ""
473
 
        delta = [(None, u'path/path2', 'id2', file2)]
474
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
475
 
            inv, delta)
476
 
 
477
 
    def test_new_parent_path_has_wrong_id(self):
478
 
        inv = self.get_empty_inventory()
479
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
480
 
        parent1.revision = 'result'
481
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
482
 
        parent2.revision = 'result'
483
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
484
 
        file1.revision = 'result'
485
 
        file1.text_size = 0
486
 
        file1.text_sha1 = ""
487
 
        inv.add(parent1)
488
 
        inv.add(parent2)
489
 
        # This delta claims that file1 is at dir/path, but actually its at
490
 
        # dir2/path if you follow the inventory parent structure.
491
 
        delta = [(None, u'dir/path', 'id', file1)]
492
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
493
 
            inv, delta)
494
 
 
495
 
    def test_old_parent_path_is_wrong(self):
496
 
        inv = self.get_empty_inventory()
497
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
498
 
        parent1.revision = 'result'
499
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
500
 
        parent2.revision = 'result'
501
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
502
 
        file1.revision = 'result'
503
 
        file1.text_size = 0
504
 
        file1.text_sha1 = ""
505
 
        inv.add(parent1)
506
 
        inv.add(parent2)
507
 
        inv.add(file1)
508
 
        # This delta claims that file1 was at dir/path, but actually it was at
509
 
        # dir2/path if you follow the inventory parent structure.
510
 
        delta = [(u'dir/path', None, 'id', None)]
511
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
512
 
            inv, delta)
513
 
 
514
 
    def test_old_parent_path_is_for_other_id(self):
515
 
        inv = self.get_empty_inventory()
516
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
517
 
        parent1.revision = 'result'
518
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
519
 
        parent2.revision = 'result'
520
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
521
 
        file1.revision = 'result'
522
 
        file1.text_size = 0
523
 
        file1.text_sha1 = ""
524
 
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
525
 
        file2.revision = 'result'
526
 
        file2.text_size = 0
527
 
        file2.text_sha1 = ""
528
 
        inv.add(parent1)
529
 
        inv.add(parent2)
530
 
        inv.add(file1)
531
 
        inv.add(file2)
532
 
        # This delta claims that file1 was at dir/path, but actually it was at
533
 
        # dir2/path if you follow the inventory parent structure. At dir/path
534
 
        # is another entry we should not delete.
535
 
        delta = [(u'dir/path', None, 'id', None)]
536
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
537
 
            inv, delta)
538
 
 
539
 
    def test_add_existing_id_new_path(self):
540
 
        inv = self.get_empty_inventory()
541
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
542
 
        parent1.revision = 'result'
543
 
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
544
 
        parent2.revision = 'result'
545
 
        inv.add(parent1)
546
 
        delta = [(None, u'dir2', 'p-1', parent2)]
547
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
548
 
            inv, delta)
549
 
 
550
 
    def test_add_new_id_existing_path(self):
551
 
        inv = self.get_empty_inventory()
552
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
553
 
        parent1.revision = 'result'
554
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
555
 
        parent2.revision = 'result'
556
 
        inv.add(parent1)
557
 
        delta = [(None, u'dir1', 'p-2', parent2)]
558
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
559
 
            inv, delta)
560
 
 
561
 
    def test_remove_dir_leaving_dangling_child(self):
562
 
        inv = self.get_empty_inventory()
563
 
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
564
 
        dir1.revision = 'result'
565
 
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
566
 
        dir2.revision = 'result'
567
 
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
568
 
        dir3.revision = 'result'
569
 
        inv.add(dir1)
570
 
        inv.add(dir2)
571
 
        inv.add(dir3)
572
 
        delta = [(u'dir1', None, 'p-1', None),
573
 
            (u'dir1/child2', None, 'p-3', None)]
574
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
575
 
            inv, delta)
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from cStringIO import StringIO
 
18
import os
 
19
 
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.diff import internal_diff
 
23
from bzrlib.inventory import (Inventory, ROOT_ID, InventoryFile,
 
24
    InventoryDirectory, InventoryEntry)
 
25
import bzrlib.inventory as inventory
 
26
from bzrlib.osutils import has_symlinks, rename, pathjoin
 
27
from bzrlib.tests import TestCase, TestCaseWithTransport
 
28
from bzrlib.transform import TreeTransform
 
29
from bzrlib.uncommit import uncommit
576
30
 
577
31
 
578
32
class TestInventory(TestCase):
579
33
 
580
 
    def test_is_root(self):
581
 
        """Ensure our root-checking code is accurate."""
582
 
        inv = inventory.Inventory('TREE_ROOT')
583
 
        self.assertTrue(inv.is_root('TREE_ROOT'))
584
 
        self.assertFalse(inv.is_root('booga'))
585
 
        inv.root.file_id = 'booga'
586
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
587
 
        self.assertTrue(inv.is_root('booga'))
588
 
        # works properly even if no root is set
589
 
        inv.root = None
590
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
591
 
        self.assertFalse(inv.is_root('booga'))
 
34
    def test_is_within(self):
 
35
        from bzrlib.osutils import is_inside_any
 
36
 
 
37
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
38
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
39
                         (['src'], SRC_FOO_C),
 
40
                         (['src'], 'src'),
 
41
                         ]:
 
42
            self.assert_(is_inside_any(dirs, fn))
 
43
            
 
44
        for dirs, fn in [(['src'], 'srccontrol'),
 
45
                         (['src'], 'srccontrol/foo')]:
 
46
            self.assertFalse(is_inside_any(dirs, fn))
 
47
            
 
48
    def test_ids(self):
 
49
        """Test detection of files within selected directories."""
 
50
        inv = Inventory()
 
51
        
 
52
        for args in [('src', 'directory', 'src-id'), 
 
53
                     ('doc', 'directory', 'doc-id'), 
 
54
                     ('src/hello.c', 'file'),
 
55
                     ('src/bye.c', 'file', 'bye-id'),
 
56
                     ('Makefile', 'file')]:
 
57
            inv.add_path(*args)
 
58
            
 
59
        self.assertEqual(inv.path2id('src'), 'src-id')
 
60
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
 
61
        
 
62
        self.assert_('src-id' in inv)
 
63
 
 
64
    def test_version(self):
 
65
        """Inventory remembers the text's version."""
 
66
        inv = Inventory()
 
67
        ie = inv.add_path('foo.txt', 'file')
 
68
        ## XXX
592
69
 
593
70
 
594
71
class TestInventoryEntry(TestCase):
664
141
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
665
142
            inventory.InventoryDirectory)
666
143
 
667
 
    def test_make_entry_non_normalized(self):
668
 
        orig_normalized_filename = osutils.normalized_filename
669
 
 
670
 
        try:
671
 
            osutils.normalized_filename = osutils._accessible_normalized_filename
672
 
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
673
 
            self.assertEqual(u'\xe5', entry.name)
674
 
            self.assertIsInstance(entry, inventory.InventoryFile)
675
 
 
676
 
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
677
 
            self.assertRaises(errors.InvalidNormalization,
678
 
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
679
 
        finally:
680
 
            osutils.normalized_filename = orig_normalized_filename
 
144
class TestEntryDiffing(TestCaseWithTransport):
 
145
 
 
146
    def setUp(self):
 
147
        super(TestEntryDiffing, self).setUp()
 
148
        self.wt = self.make_branch_and_tree('.')
 
149
        self.branch = self.wt.branch
 
150
        print >> open('file', 'wb'), 'foo'
 
151
        print >> open('binfile', 'wb'), 'foo'
 
152
        self.wt.add(['file'], ['fileid'])
 
153
        self.wt.add(['binfile'], ['binfileid'])
 
154
        if has_symlinks():
 
155
            os.symlink('target1', 'symlink')
 
156
            self.wt.add(['symlink'], ['linkid'])
 
157
        self.wt.commit('message_1', rev_id = '1')
 
158
        print >> open('file', 'wb'), 'bar'
 
159
        print >> open('binfile', 'wb'), 'x' * 1023 + '\x00'
 
160
        if has_symlinks():
 
161
            os.unlink('symlink')
 
162
            os.symlink('target2', 'symlink')
 
163
        self.tree_1 = self.branch.repository.revision_tree('1')
 
164
        self.inv_1 = self.branch.repository.get_inventory('1')
 
165
        self.file_1 = self.inv_1['fileid']
 
166
        self.file_1b = self.inv_1['binfileid']
 
167
        self.tree_2 = self.wt
 
168
        self.inv_2 = self.tree_2.read_working_inventory()
 
169
        self.file_2 = self.inv_2['fileid']
 
170
        self.file_2b = self.inv_2['binfileid']
 
171
        if has_symlinks():
 
172
            self.link_1 = self.inv_1['linkid']
 
173
            self.link_2 = self.inv_2['linkid']
 
174
 
 
175
    def test_file_diff_deleted(self):
 
176
        output = StringIO()
 
177
        self.file_1.diff(internal_diff, 
 
178
                          "old_label", self.tree_1,
 
179
                          "/dev/null", None, None,
 
180
                          output)
 
181
        self.assertEqual(output.getvalue(), "--- old_label\t\n"
 
182
                                            "+++ /dev/null\t\n"
 
183
                                            "@@ -1,1 +0,0 @@\n"
 
184
                                            "-foo\n"
 
185
                                            "\n")
 
186
 
 
187
    def test_file_diff_added(self):
 
188
        output = StringIO()
 
189
        self.file_1.diff(internal_diff, 
 
190
                          "new_label", self.tree_1,
 
191
                          "/dev/null", None, None,
 
192
                          output, reverse=True)
 
193
        self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
 
194
                                            "+++ new_label\t\n"
 
195
                                            "@@ -0,0 +1,1 @@\n"
 
196
                                            "+foo\n"
 
197
                                            "\n")
 
198
 
 
199
    def test_file_diff_changed(self):
 
200
        output = StringIO()
 
201
        self.file_1.diff(internal_diff, 
 
202
                          "/dev/null", self.tree_1, 
 
203
                          "new_label", self.file_2, self.tree_2,
 
204
                          output)
 
205
        self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
 
206
                                            "+++ new_label\t\n"
 
207
                                            "@@ -1,1 +1,1 @@\n"
 
208
                                            "-foo\n"
 
209
                                            "+bar\n"
 
210
                                            "\n")
 
211
        
 
212
    def test_file_diff_binary(self):
 
213
        output = StringIO()
 
214
        self.file_1.diff(internal_diff, 
 
215
                          "/dev/null", self.tree_1, 
 
216
                          "new_label", self.file_2b, self.tree_2,
 
217
                          output)
 
218
        self.assertEqual(output.getvalue(), 
 
219
                         "Binary files /dev/null and new_label differ\n")
 
220
    def test_link_diff_deleted(self):
 
221
        if not has_symlinks():
 
222
            return
 
223
        output = StringIO()
 
224
        self.link_1.diff(internal_diff, 
 
225
                          "old_label", self.tree_1,
 
226
                          "/dev/null", None, None,
 
227
                          output)
 
228
        self.assertEqual(output.getvalue(),
 
229
                         "=== target was 'target1'\n")
 
230
 
 
231
    def test_link_diff_added(self):
 
232
        if not has_symlinks():
 
233
            return
 
234
        output = StringIO()
 
235
        self.link_1.diff(internal_diff, 
 
236
                          "new_label", self.tree_1,
 
237
                          "/dev/null", None, None,
 
238
                          output, reverse=True)
 
239
        self.assertEqual(output.getvalue(),
 
240
                         "=== target is 'target1'\n")
 
241
 
 
242
    def test_link_diff_changed(self):
 
243
        if not has_symlinks():
 
244
            return
 
245
        output = StringIO()
 
246
        self.link_1.diff(internal_diff, 
 
247
                          "/dev/null", self.tree_1, 
 
248
                          "new_label", self.link_2, self.tree_2,
 
249
                          output)
 
250
        self.assertEqual(output.getvalue(),
 
251
                         "=== target changed 'target1' => 'target2'\n")
 
252
 
 
253
 
 
254
class TestSnapshot(TestCaseWithTransport):
 
255
 
 
256
    def setUp(self):
 
257
        # for full testing we'll need a branch
 
258
        # with a subdir to test parent changes.
 
259
        # and a file, link and dir under that.
 
260
        # but right now I only need one attribute
 
261
        # to change, and then test merge patterns
 
262
        # with fake parent entries.
 
263
        super(TestSnapshot, self).setUp()
 
264
        self.wt = self.make_branch_and_tree('.')
 
265
        self.branch = self.wt.branch
 
266
        self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
 
267
        self.wt.add(['subdir', 'subdir/file'],
 
268
                                       ['dirid', 'fileid'])
 
269
        if has_symlinks():
 
270
            pass
 
271
        self.wt.commit('message_1', rev_id = '1')
 
272
        self.tree_1 = self.branch.repository.revision_tree('1')
 
273
        self.inv_1 = self.branch.repository.get_inventory('1')
 
274
        self.file_1 = self.inv_1['fileid']
 
275
        self.file_active = self.wt.inventory['fileid']
 
276
 
 
277
    def test_snapshot_new_revision(self):
 
278
        # This tests that a simple commit with no parents makes a new
 
279
        # revision value in the inventory entry
 
280
        self.file_active.snapshot('2', 'subdir/file', {}, self.wt, 
 
281
                                  self.branch.repository.weave_store,
 
282
                                  self.branch.get_transaction())
 
283
        # expected outcome - file_1 has a revision id of '2', and we can get
 
284
        # its text of 'file contents' out of the weave.
 
285
        self.assertEqual(self.file_1.revision, '1')
 
286
        self.assertEqual(self.file_active.revision, '2')
 
287
        # this should be a separate test probably, but lets check it once..
 
288
        lines = self.branch.repository.weave_store.get_weave(
 
289
            'fileid', 
 
290
            self.branch.get_transaction()).get_lines('2')
 
291
        self.assertEqual(lines, ['contents of subdir/file\n'])
 
292
 
 
293
    def test_snapshot_unchanged(self):
 
294
        #This tests that a simple commit does not make a new entry for
 
295
        # an unchanged inventory entry
 
296
        self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
 
297
                                  self.wt, 
 
298
                                  self.branch.repository.weave_store,
 
299
                                  self.branch.get_transaction())
 
300
        self.assertEqual(self.file_1.revision, '1')
 
301
        self.assertEqual(self.file_active.revision, '1')
 
302
        vf = self.branch.repository.weave_store.get_weave(
 
303
            'fileid', 
 
304
            self.branch.repository.get_transaction())
 
305
        self.assertRaises(errors.RevisionNotPresent,
 
306
                          vf.get_lines,
 
307
                          '2')
 
308
 
 
309
    def test_snapshot_merge_identical_different_revid(self):
 
310
        # This tests that a commit with two identical parents, one of which has
 
311
        # a different revision id, results in a new revision id in the entry.
 
312
        # 1->other, commit a merge of other against 1, results in 2.
 
313
        other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
 
314
        other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
 
315
        other_ie.revision = '1'
 
316
        other_ie.text_sha1 = self.file_1.text_sha1
 
317
        other_ie.text_size = self.file_1.text_size
 
318
        self.assertEqual(self.file_1, other_ie)
 
319
        other_ie.revision = 'other'
 
320
        self.assertNotEqual(self.file_1, other_ie)
 
321
        versionfile = self.branch.repository.weave_store.get_weave(
 
322
            'fileid', self.branch.repository.get_transaction())
 
323
        versionfile.clone_text('other', '1', ['1'])
 
324
        self.file_active.snapshot('2', 'subdir/file', 
 
325
                                  {'1':self.file_1, 'other':other_ie},
 
326
                                  self.wt, 
 
327
                                  self.branch.repository.weave_store,
 
328
                                  self.branch.get_transaction())
 
329
        self.assertEqual(self.file_active.revision, '2')
 
330
 
 
331
    def test_snapshot_changed(self):
 
332
        # This tests that a commit with one different parent results in a new
 
333
        # revision id in the entry.
 
334
        self.file_active.name='newname'
 
335
        rename('subdir/file', 'subdir/newname')
 
336
        self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1}, 
 
337
                                  self.wt,
 
338
                                  self.branch.repository.weave_store,
 
339
                                  self.branch.get_transaction())
 
340
        # expected outcome - file_1 has a revision id of '2'
 
341
        self.assertEqual(self.file_active.revision, '2')
 
342
 
 
343
 
 
344
class TestPreviousHeads(TestCaseWithTransport):
 
345
 
 
346
    def setUp(self):
 
347
        # we want several inventories, that respectively
 
348
        # give use the following scenarios:
 
349
        # A) fileid not in any inventory (A),
 
350
        # B) fileid present in one inventory (B) and (A,B)
 
351
        # C) fileid present in two inventories, and they
 
352
        #   are not mutual descendents (B, C)
 
353
        # D) fileid present in two inventories and one is
 
354
        #   a descendent of the other. (B, D)
 
355
        super(TestPreviousHeads, self).setUp()
 
356
        self.wt = self.make_branch_and_tree('.')
 
357
        self.branch = self.wt.branch
 
358
        self.build_tree(['file'])
 
359
        self.wt.commit('new branch', allow_pointless=True, rev_id='A')
 
360
        self.inv_A = self.branch.repository.get_inventory('A')
 
361
        self.wt.add(['file'], ['fileid'])
 
362
        self.wt.commit('add file', rev_id='B')
 
363
        self.inv_B = self.branch.repository.get_inventory('B')
 
364
        uncommit(self.branch, tree=self.wt)
 
365
        self.assertEqual(self.branch.revision_history(), ['A'])
 
366
        self.wt.commit('another add of file', rev_id='C')
 
367
        self.inv_C = self.branch.repository.get_inventory('C')
 
368
        self.wt.add_pending_merge('B')
 
369
        self.wt.commit('merge in B', rev_id='D')
 
370
        self.inv_D = self.branch.repository.get_inventory('D')
 
371
        self.file_active = self.wt.inventory['fileid']
 
372
        self.weave = self.branch.repository.weave_store.get_weave('fileid',
 
373
            self.branch.repository.get_transaction())
 
374
        
 
375
    def get_previous_heads(self, inventories):
 
376
        return self.file_active.find_previous_heads(
 
377
            inventories, 
 
378
            self.branch.repository.weave_store,
 
379
            self.branch.repository.get_transaction())
 
380
        
 
381
    def test_fileid_in_no_inventory(self):
 
382
        self.assertEqual({}, self.get_previous_heads([self.inv_A]))
 
383
 
 
384
    def test_fileid_in_one_inventory(self):
 
385
        self.assertEqual({'B':self.inv_B['fileid']},
 
386
                         self.get_previous_heads([self.inv_B]))
 
387
        self.assertEqual({'B':self.inv_B['fileid']},
 
388
                         self.get_previous_heads([self.inv_A, self.inv_B]))
 
389
        self.assertEqual({'B':self.inv_B['fileid']},
 
390
                         self.get_previous_heads([self.inv_B, self.inv_A]))
 
391
 
 
392
    def test_fileid_in_two_inventories_gives_both_entries(self):
 
393
        self.assertEqual({'B':self.inv_B['fileid'],
 
394
                          'C':self.inv_C['fileid']},
 
395
                          self.get_previous_heads([self.inv_B, self.inv_C]))
 
396
        self.assertEqual({'B':self.inv_B['fileid'],
 
397
                          'C':self.inv_C['fileid']},
 
398
                          self.get_previous_heads([self.inv_C, self.inv_B]))
 
399
 
 
400
    def test_fileid_in_two_inventories_already_merged_gives_head(self):
 
401
        self.assertEqual({'D':self.inv_D['fileid']},
 
402
                         self.get_previous_heads([self.inv_B, self.inv_D]))
 
403
        self.assertEqual({'D':self.inv_D['fileid']},
 
404
                         self.get_previous_heads([self.inv_D, self.inv_B]))
 
405
 
 
406
    # TODO: test two inventories with the same file revision 
681
407
 
682
408
 
683
409
class TestDescribeChanges(TestCase):
710
436
        # perhaps a bit questionable but seems like the most reasonable thing...
711
437
        self.assertChangeDescription('unchanged', None, None)
712
438
 
713
 
        # in this case it's both renamed and modified; show a rename and
 
439
        # in this case it's both renamed and modified; show a rename and 
714
440
        # modification:
715
441
        new_a.name = 'newfilename'
716
442
        self.assertChangeDescription('modified and renamed', old_a, new_a)
738
464
        self.assertEqual(expected_change, change)
739
465
 
740
466
 
741
 
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
742
 
 
743
 
    def get_chk_bytes(self):
744
 
        factory = groupcompress.make_pack_factory(True, True, 1)
745
 
        trans = self.get_transport('')
746
 
        return factory(trans)
747
 
 
748
 
    def read_bytes(self, chk_bytes, key):
749
 
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
750
 
        return stream.next().get_bytes_as("fulltext")
751
 
 
752
 
    def test_deserialise_gives_CHKInventory(self):
753
 
        inv = Inventory()
754
 
        inv.revision_id = "revid"
755
 
        inv.root.revision = "rootrev"
756
 
        chk_bytes = self.get_chk_bytes()
757
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
758
 
        bytes = ''.join(chk_inv.to_lines())
759
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
760
 
        self.assertEqual("revid", new_inv.revision_id)
761
 
        self.assertEqual("directory", new_inv.root.kind)
762
 
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
763
 
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
764
 
        self.assertEqual(inv.root.name, new_inv.root.name)
765
 
        self.assertEqual("rootrev", new_inv.root.revision)
766
 
        self.assertEqual('plain', new_inv._search_key_name)
767
 
 
768
 
    def test_deserialise_wrong_revid(self):
769
 
        inv = Inventory()
770
 
        inv.revision_id = "revid"
771
 
        inv.root.revision = "rootrev"
772
 
        chk_bytes = self.get_chk_bytes()
773
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
774
 
        bytes = ''.join(chk_inv.to_lines())
775
 
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
776
 
            bytes, ("revid2",))
777
 
 
778
 
    def test_captures_rev_root_byid(self):
779
 
        inv = Inventory()
780
 
        inv.revision_id = "foo"
781
 
        inv.root.revision = "bar"
782
 
        chk_bytes = self.get_chk_bytes()
783
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
784
 
        lines = chk_inv.to_lines()
785
 
        self.assertEqual([
786
 
            'chkinventory:\n',
787
 
            'revision_id: foo\n',
788
 
            'root_id: TREE_ROOT\n',
789
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
790
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
791
 
            ], lines)
792
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
793
 
        self.assertEqual('plain', chk_inv._search_key_name)
794
 
 
795
 
    def test_captures_parent_id_basename_index(self):
796
 
        inv = Inventory()
797
 
        inv.revision_id = "foo"
798
 
        inv.root.revision = "bar"
799
 
        chk_bytes = self.get_chk_bytes()
800
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
801
 
        lines = chk_inv.to_lines()
802
 
        self.assertEqual([
803
 
            'chkinventory:\n',
804
 
            'revision_id: foo\n',
805
 
            'root_id: TREE_ROOT\n',
806
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
807
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
808
 
            ], lines)
809
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
810
 
        self.assertEqual('plain', chk_inv._search_key_name)
811
 
 
812
 
    def test_captures_search_key_name(self):
813
 
        inv = Inventory()
814
 
        inv.revision_id = "foo"
815
 
        inv.root.revision = "bar"
816
 
        chk_bytes = self.get_chk_bytes()
817
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
818
 
                                              search_key_name='hash-16-way')
819
 
        lines = chk_inv.to_lines()
820
 
        self.assertEqual([
821
 
            'chkinventory:\n',
822
 
            'search_key_name: hash-16-way\n',
823
 
            'root_id: TREE_ROOT\n',
824
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
825
 
            'revision_id: foo\n',
826
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
827
 
            ], lines)
828
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
829
 
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
830
 
 
831
 
    def test_directory_children_on_demand(self):
832
 
        inv = Inventory()
833
 
        inv.revision_id = "revid"
834
 
        inv.root.revision = "rootrev"
835
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
836
 
        inv["fileid"].revision = "filerev"
837
 
        inv["fileid"].executable = True
838
 
        inv["fileid"].text_sha1 = "ffff"
839
 
        inv["fileid"].text_size = 1
840
 
        chk_bytes = self.get_chk_bytes()
841
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
842
 
        bytes = ''.join(chk_inv.to_lines())
843
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
844
 
        root_entry = new_inv[inv.root.file_id]
845
 
        self.assertEqual(None, root_entry._children)
846
 
        self.assertEqual(['file'], root_entry.children.keys())
847
 
        file_direct = new_inv["fileid"]
848
 
        file_found = root_entry.children['file']
849
 
        self.assertEqual(file_direct.kind, file_found.kind)
850
 
        self.assertEqual(file_direct.file_id, file_found.file_id)
851
 
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
852
 
        self.assertEqual(file_direct.name, file_found.name)
853
 
        self.assertEqual(file_direct.revision, file_found.revision)
854
 
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
855
 
        self.assertEqual(file_direct.text_size, file_found.text_size)
856
 
        self.assertEqual(file_direct.executable, file_found.executable)
857
 
 
858
 
    def test_from_inventory_maximum_size(self):
859
 
        # from_inventory supports the maximum_size parameter.
860
 
        inv = Inventory()
861
 
        inv.revision_id = "revid"
862
 
        inv.root.revision = "rootrev"
863
 
        chk_bytes = self.get_chk_bytes()
864
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
865
 
        chk_inv.id_to_entry._ensure_root()
866
 
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
867
 
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
868
 
        p_id_basename = chk_inv.parent_id_basename_to_file_id
869
 
        p_id_basename._ensure_root()
870
 
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
871
 
        self.assertEqual(2, p_id_basename._root_node._key_width)
872
 
 
873
 
    def test___iter__(self):
874
 
        inv = Inventory()
875
 
        inv.revision_id = "revid"
876
 
        inv.root.revision = "rootrev"
877
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
878
 
        inv["fileid"].revision = "filerev"
879
 
        inv["fileid"].executable = True
880
 
        inv["fileid"].text_sha1 = "ffff"
881
 
        inv["fileid"].text_size = 1
882
 
        chk_bytes = self.get_chk_bytes()
883
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
884
 
        bytes = ''.join(chk_inv.to_lines())
885
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
886
 
        fileids = list(new_inv.__iter__())
887
 
        fileids.sort()
888
 
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
889
 
 
890
 
    def test__len__(self):
891
 
        inv = Inventory()
892
 
        inv.revision_id = "revid"
893
 
        inv.root.revision = "rootrev"
894
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
895
 
        inv["fileid"].revision = "filerev"
896
 
        inv["fileid"].executable = True
897
 
        inv["fileid"].text_sha1 = "ffff"
898
 
        inv["fileid"].text_size = 1
899
 
        chk_bytes = self.get_chk_bytes()
900
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
901
 
        self.assertEqual(2, len(chk_inv))
902
 
 
903
 
    def test___getitem__(self):
904
 
        inv = Inventory()
905
 
        inv.revision_id = "revid"
906
 
        inv.root.revision = "rootrev"
907
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
908
 
        inv["fileid"].revision = "filerev"
909
 
        inv["fileid"].executable = True
910
 
        inv["fileid"].text_sha1 = "ffff"
911
 
        inv["fileid"].text_size = 1
912
 
        chk_bytes = self.get_chk_bytes()
913
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
914
 
        bytes = ''.join(chk_inv.to_lines())
915
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
916
 
        root_entry = new_inv[inv.root.file_id]
917
 
        file_entry = new_inv["fileid"]
918
 
        self.assertEqual("directory", root_entry.kind)
919
 
        self.assertEqual(inv.root.file_id, root_entry.file_id)
920
 
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
921
 
        self.assertEqual(inv.root.name, root_entry.name)
922
 
        self.assertEqual("rootrev", root_entry.revision)
923
 
        self.assertEqual("file", file_entry.kind)
924
 
        self.assertEqual("fileid", file_entry.file_id)
925
 
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
926
 
        self.assertEqual("file", file_entry.name)
927
 
        self.assertEqual("filerev", file_entry.revision)
928
 
        self.assertEqual("ffff", file_entry.text_sha1)
929
 
        self.assertEqual(1, file_entry.text_size)
930
 
        self.assertEqual(True, file_entry.executable)
931
 
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
932
 
 
933
 
    def test_has_id_true(self):
934
 
        inv = Inventory()
935
 
        inv.revision_id = "revid"
936
 
        inv.root.revision = "rootrev"
937
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
938
 
        inv["fileid"].revision = "filerev"
939
 
        inv["fileid"].executable = True
940
 
        inv["fileid"].text_sha1 = "ffff"
941
 
        inv["fileid"].text_size = 1
942
 
        chk_bytes = self.get_chk_bytes()
943
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
944
 
        self.assertTrue(chk_inv.has_id('fileid'))
945
 
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
946
 
 
947
 
    def test_has_id_not(self):
948
 
        inv = Inventory()
949
 
        inv.revision_id = "revid"
950
 
        inv.root.revision = "rootrev"
951
 
        chk_bytes = self.get_chk_bytes()
952
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
953
 
        self.assertFalse(chk_inv.has_id('fileid'))
954
 
 
955
 
    def test_id2path(self):
956
 
        inv = Inventory()
957
 
        inv.revision_id = "revid"
958
 
        inv.root.revision = "rootrev"
959
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
960
 
        fileentry = InventoryFile("fileid", "file", "dirid")
961
 
        inv.add(direntry)
962
 
        inv.add(fileentry)
963
 
        inv["fileid"].revision = "filerev"
964
 
        inv["fileid"].executable = True
965
 
        inv["fileid"].text_sha1 = "ffff"
966
 
        inv["fileid"].text_size = 1
967
 
        inv["dirid"].revision = "filerev"
968
 
        chk_bytes = self.get_chk_bytes()
969
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
970
 
        bytes = ''.join(chk_inv.to_lines())
971
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
972
 
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
973
 
        self.assertEqual('dir', new_inv.id2path('dirid'))
974
 
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
975
 
 
976
 
    def test_path2id(self):
977
 
        inv = Inventory()
978
 
        inv.revision_id = "revid"
979
 
        inv.root.revision = "rootrev"
980
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
981
 
        fileentry = InventoryFile("fileid", "file", "dirid")
982
 
        inv.add(direntry)
983
 
        inv.add(fileentry)
984
 
        inv["fileid"].revision = "filerev"
985
 
        inv["fileid"].executable = True
986
 
        inv["fileid"].text_sha1 = "ffff"
987
 
        inv["fileid"].text_size = 1
988
 
        inv["dirid"].revision = "filerev"
989
 
        chk_bytes = self.get_chk_bytes()
990
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
991
 
        bytes = ''.join(chk_inv.to_lines())
992
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
993
 
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
994
 
        self.assertEqual('dirid', new_inv.path2id('dir'))
995
 
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
996
 
 
997
 
    def test_create_by_apply_delta_sets_root(self):
998
 
        inv = Inventory()
999
 
        inv.revision_id = "revid"
1000
 
        chk_bytes = self.get_chk_bytes()
1001
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1002
 
        inv.add_path("", "directory", "myrootid", None)
1003
 
        inv.revision_id = "expectedid"
1004
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1005
 
        delta = [("", None, base_inv.root.file_id, None),
1006
 
            (None, "",  "myrootid", inv.root)]
1007
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1008
 
        self.assertEquals(reference_inv.root, new_inv.root)
1009
 
 
1010
 
    def test_create_by_apply_delta_empty_add_child(self):
1011
 
        inv = Inventory()
1012
 
        inv.revision_id = "revid"
1013
 
        inv.root.revision = "rootrev"
1014
 
        chk_bytes = self.get_chk_bytes()
1015
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1016
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1017
 
        a_entry.revision = "filerev"
1018
 
        a_entry.executable = True
1019
 
        a_entry.text_sha1 = "ffff"
1020
 
        a_entry.text_size = 1
1021
 
        inv.add(a_entry)
1022
 
        inv.revision_id = "expectedid"
1023
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1024
 
        delta = [(None, "A",  "A-id", a_entry)]
1025
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1026
 
        # new_inv should be the same as reference_inv.
1027
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1028
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1029
 
        reference_inv.id_to_entry._ensure_root()
1030
 
        new_inv.id_to_entry._ensure_root()
1031
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1032
 
            new_inv.id_to_entry._root_node._key)
1033
 
 
1034
 
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1035
 
        inv = Inventory()
1036
 
        inv.revision_id = "revid"
1037
 
        inv.root.revision = "rootrev"
1038
 
        chk_bytes = self.get_chk_bytes()
1039
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1040
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1041
 
        a_entry.revision = "filerev"
1042
 
        a_entry.executable = True
1043
 
        a_entry.text_sha1 = "ffff"
1044
 
        a_entry.text_size = 1
1045
 
        inv.add(a_entry)
1046
 
        inv.revision_id = "expectedid"
1047
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1048
 
        delta = [(None, "A",  "A-id", a_entry)]
1049
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1050
 
        reference_inv.id_to_entry._ensure_root()
1051
 
        reference_inv.parent_id_basename_to_file_id._ensure_root()
1052
 
        new_inv.id_to_entry._ensure_root()
1053
 
        new_inv.parent_id_basename_to_file_id._ensure_root()
1054
 
        # new_inv should be the same as reference_inv.
1055
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1056
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1057
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1058
 
            new_inv.id_to_entry._root_node._key)
1059
 
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1060
 
            new_inv.parent_id_basename_to_file_id._root_node._key)
1061
 
 
1062
 
    def test_iter_changes(self):
1063
 
        # Low level bootstrapping smoke test; comprehensive generic tests via
1064
 
        # InterTree are coming.
1065
 
        inv = Inventory()
1066
 
        inv.revision_id = "revid"
1067
 
        inv.root.revision = "rootrev"
1068
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1069
 
        inv["fileid"].revision = "filerev"
1070
 
        inv["fileid"].executable = True
1071
 
        inv["fileid"].text_sha1 = "ffff"
1072
 
        inv["fileid"].text_size = 1
1073
 
        inv2 = Inventory()
1074
 
        inv2.revision_id = "revid2"
1075
 
        inv2.root.revision = "rootrev"
1076
 
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1077
 
        inv2["fileid"].revision = "filerev2"
1078
 
        inv2["fileid"].executable = False
1079
 
        inv2["fileid"].text_sha1 = "bbbb"
1080
 
        inv2["fileid"].text_size = 2
1081
 
        # get fresh objects.
1082
 
        chk_bytes = self.get_chk_bytes()
1083
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1084
 
        bytes = ''.join(chk_inv.to_lines())
1085
 
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1086
 
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1087
 
        bytes = ''.join(chk_inv2.to_lines())
1088
 
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1089
 
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1090
 
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1091
 
            (False, True))],
1092
 
            list(inv_1.iter_changes(inv_2)))
1093
 
 
1094
 
    def test_parent_id_basename_to_file_id_index_enabled(self):
1095
 
        inv = Inventory()
1096
 
        inv.revision_id = "revid"
1097
 
        inv.root.revision = "rootrev"
1098
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1099
 
        inv["fileid"].revision = "filerev"
1100
 
        inv["fileid"].executable = True
1101
 
        inv["fileid"].text_sha1 = "ffff"
1102
 
        inv["fileid"].text_size = 1
1103
 
        # get fresh objects.
1104
 
        chk_bytes = self.get_chk_bytes()
1105
 
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1106
 
        bytes = ''.join(tmp_inv.to_lines())
1107
 
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1108
 
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1109
 
        self.assertEqual(
1110
 
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1111
 
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1112
 
 
1113
 
    def test_file_entry_to_bytes(self):
1114
 
        inv = CHKInventory(None)
1115
 
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1116
 
        ie.executable = True
1117
 
        ie.revision = 'file-rev-id'
1118
 
        ie.text_sha1 = 'abcdefgh'
1119
 
        ie.text_size = 100
1120
 
        bytes = inv._entry_to_bytes(ie)
1121
 
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
1122
 
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
1123
 
        ie2 = inv._bytes_to_entry(bytes)
1124
 
        self.assertEqual(ie, ie2)
1125
 
        self.assertIsInstance(ie2.name, unicode)
1126
 
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1127
 
                         inv._bytes_to_utf8name_key(bytes))
1128
 
 
1129
 
    def test_file2_entry_to_bytes(self):
1130
 
        inv = CHKInventory(None)
1131
 
        # \u30a9 == 'omega'
1132
 
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1133
 
        ie.executable = False
1134
 
        ie.revision = 'file-rev-id'
1135
 
        ie.text_sha1 = '123456'
1136
 
        ie.text_size = 25
1137
 
        bytes = inv._entry_to_bytes(ie)
1138
 
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1139
 
                         'file-rev-id\n123456\n25\nN', bytes)
1140
 
        ie2 = inv._bytes_to_entry(bytes)
1141
 
        self.assertEqual(ie, ie2)
1142
 
        self.assertIsInstance(ie2.name, unicode)
1143
 
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1144
 
                         inv._bytes_to_utf8name_key(bytes))
1145
 
 
1146
 
    def test_dir_entry_to_bytes(self):
1147
 
        inv = CHKInventory(None)
1148
 
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1149
 
        ie.revision = 'dir-rev-id'
1150
 
        bytes = inv._entry_to_bytes(ie)
1151
 
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1152
 
        ie2 = inv._bytes_to_entry(bytes)
1153
 
        self.assertEqual(ie, ie2)
1154
 
        self.assertIsInstance(ie2.name, unicode)
1155
 
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1156
 
                         inv._bytes_to_utf8name_key(bytes))
1157
 
 
1158
 
    def test_dir2_entry_to_bytes(self):
1159
 
        inv = CHKInventory(None)
1160
 
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1161
 
                                          None)
1162
 
        ie.revision = 'dir-rev-id'
1163
 
        bytes = inv._entry_to_bytes(ie)
1164
 
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1165
 
                         'dir-rev-id', bytes)
1166
 
        ie2 = inv._bytes_to_entry(bytes)
1167
 
        self.assertEqual(ie, ie2)
1168
 
        self.assertIsInstance(ie2.name, unicode)
1169
 
        self.assertIs(ie2.parent_id, None)
1170
 
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1171
 
                         inv._bytes_to_utf8name_key(bytes))
1172
 
 
1173
 
    def test_symlink_entry_to_bytes(self):
1174
 
        inv = CHKInventory(None)
1175
 
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1176
 
        ie.revision = 'link-rev-id'
1177
 
        ie.symlink_target = u'target/path'
1178
 
        bytes = inv._entry_to_bytes(ie)
1179
 
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1180
 
                         'link-rev-id\ntarget/path', bytes)
1181
 
        ie2 = inv._bytes_to_entry(bytes)
1182
 
        self.assertEqual(ie, ie2)
1183
 
        self.assertIsInstance(ie2.name, unicode)
1184
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1185
 
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1186
 
                         inv._bytes_to_utf8name_key(bytes))
1187
 
 
1188
 
    def test_symlink2_entry_to_bytes(self):
1189
 
        inv = CHKInventory(None)
1190
 
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1191
 
        ie.revision = 'link-rev-id'
1192
 
        ie.symlink_target = u'target/\u03a9path'
1193
 
        bytes = inv._entry_to_bytes(ie)
1194
 
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1195
 
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
1196
 
        ie2 = inv._bytes_to_entry(bytes)
1197
 
        self.assertEqual(ie, ie2)
1198
 
        self.assertIsInstance(ie2.name, unicode)
1199
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1200
 
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1201
 
                         inv._bytes_to_utf8name_key(bytes))
1202
 
 
1203
 
    def test_tree_reference_entry_to_bytes(self):
1204
 
        inv = CHKInventory(None)
1205
 
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1206
 
                                     'parent-id')
1207
 
        ie.revision = 'tree-rev-id'
1208
 
        ie.reference_revision = 'ref-rev-id'
1209
 
        bytes = inv._entry_to_bytes(ie)
1210
 
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1211
 
                         'tree-rev-id\nref-rev-id', bytes)
1212
 
        ie2 = inv._bytes_to_entry(bytes)
1213
 
        self.assertEqual(ie, ie2)
1214
 
        self.assertIsInstance(ie2.name, unicode)
1215
 
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1216
 
                         inv._bytes_to_utf8name_key(bytes))
1217
 
 
1218
 
 
1219
 
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1220
 
 
1221
 
    def get_chk_bytes(self):
1222
 
        factory = groupcompress.make_pack_factory(True, True, 1)
1223
 
        trans = self.get_transport('')
1224
 
        return factory(trans)
1225
 
 
1226
 
    def make_dir(self, inv, name, parent_id):
1227
 
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1228
 
 
1229
 
    def make_file(self, inv, name, parent_id, content='content\n'):
1230
 
        ie = inv.make_entry('file', name, parent_id, name + '-id')
1231
 
        ie.text_sha1 = osutils.sha_string(content)
1232
 
        ie.text_size = len(content)
1233
 
        inv.add(ie)
1234
 
 
1235
 
    def make_simple_inventory(self):
1236
 
        inv = Inventory('TREE_ROOT')
1237
 
        inv.revision_id = "revid"
1238
 
        inv.root.revision = "rootrev"
1239
 
        # /                 TREE_ROOT
1240
 
        # dir1/             dir1-id
1241
 
        #   sub-file1       sub-file1-id
1242
 
        #   sub-file2       sub-file2-id
1243
 
        #   sub-dir1/       sub-dir1-id
1244
 
        #     subsub-file1  subsub-file1-id
1245
 
        # dir2/             dir2-id
1246
 
        #   sub2-file1      sub2-file1-id
1247
 
        # top               top-id
1248
 
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
1249
 
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
1250
 
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
1251
 
        self.make_file(inv, 'top', 'TREE_ROOT')
1252
 
        self.make_file(inv, 'sub-file1', 'dir1-id')
1253
 
        self.make_file(inv, 'sub-file2', 'dir1-id')
1254
 
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1255
 
        self.make_file(inv, 'sub2-file1', 'dir2-id')
1256
 
        chk_bytes = self.get_chk_bytes()
1257
 
        #  use a small maximum_size to force internal paging structures
1258
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1259
 
                        maximum_size=100,
1260
 
                        search_key_name='hash-255-way')
1261
 
        bytes = ''.join(chk_inv.to_lines())
1262
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1263
 
 
1264
 
    def assert_Getitems(self, expected_fileids, inv, file_ids):
1265
 
        self.assertEqual(sorted(expected_fileids),
1266
 
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1267
 
 
1268
 
    def assertExpand(self, all_ids, inv, file_ids):
1269
 
        (val_all_ids,
1270
 
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1271
 
        self.assertEqual(set(all_ids), val_all_ids)
1272
 
        entries = inv._getitems(val_all_ids)
1273
 
        expected_children = {}
1274
 
        for entry in entries:
1275
 
            s = expected_children.setdefault(entry.parent_id, [])
1276
 
            s.append(entry.file_id)
1277
 
        val_children = dict((k, sorted(v)) for k, v
1278
 
                            in val_children.iteritems())
1279
 
        expected_children = dict((k, sorted(v)) for k, v
1280
 
                            in expected_children.iteritems())
1281
 
        self.assertEqual(expected_children, val_children)
1282
 
 
1283
 
    def test_make_simple_inventory(self):
1284
 
        inv = self.make_simple_inventory()
1285
 
        layout = []
1286
 
        for path, entry in inv.iter_entries_by_dir():
1287
 
            layout.append((path, entry.file_id))
1288
 
        self.assertEqual([
1289
 
            ('', 'TREE_ROOT'),
1290
 
            ('dir1', 'dir1-id'),
1291
 
            ('dir2', 'dir2-id'),
1292
 
            ('top', 'top-id'),
1293
 
            ('dir1/sub-dir1', 'sub-dir1-id'),
1294
 
            ('dir1/sub-file1', 'sub-file1-id'),
1295
 
            ('dir1/sub-file2', 'sub-file2-id'),
1296
 
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1297
 
            ('dir2/sub2-file1', 'sub2-file1-id'),
1298
 
            ], layout)
1299
 
 
1300
 
    def test__getitems(self):
1301
 
        inv = self.make_simple_inventory()
1302
 
        # Reading from disk
1303
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1304
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1305
 
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1306
 
        # From cache
1307
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1308
 
        # Mixed
1309
 
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1310
 
                             ['dir1-id', 'sub-file2-id'])
1311
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1312
 
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1313
 
 
1314
 
    def test_single_file(self):
1315
 
        inv = self.make_simple_inventory()
1316
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1317
 
 
1318
 
    def test_get_all_parents(self):
1319
 
        inv = self.make_simple_inventory()
1320
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1321
 
                           'subsub-file1-id',
1322
 
                          ], inv, ['subsub-file1-id'])
1323
 
 
1324
 
    def test_get_children(self):
1325
 
        inv = self.make_simple_inventory()
1326
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1327
 
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1328
 
                          ], inv, ['dir1-id'])
1329
 
 
1330
 
    def test_from_root(self):
1331
 
        inv = self.make_simple_inventory()
1332
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1333
 
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1334
 
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1335
 
 
1336
 
    def test_top_level_file(self):
1337
 
        inv = self.make_simple_inventory()
1338
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1339
 
 
1340
 
    def test_subsub_file(self):
1341
 
        inv = self.make_simple_inventory()
1342
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1343
 
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
1344
 
 
1345
 
    def test_sub_and_root(self):
1346
 
        inv = self.make_simple_inventory()
1347
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1348
 
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
467
class TestExecutable(TestCaseWithTransport):
 
468
 
 
469
    def test_stays_executable(self):
 
470
        a_id = "a-20051208024829-849e76f7968d7a86"
 
471
        b_id = "b-20051208024829-849e76f7968d7a86"
 
472
        wt = self.make_branch_and_tree('b1')
 
473
        b = wt.branch
 
474
        tt = TreeTransform(wt)
 
475
        tt.new_file('a', tt.root, 'a test\n', a_id, True)
 
476
        tt.new_file('b', tt.root, 'b test\n', b_id, False)
 
477
        tt.apply()
 
478
 
 
479
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
480
 
 
481
        # reopen the tree and ensure it stuck.
 
482
        wt = wt.bzrdir.open_workingtree()
 
483
        self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
 
484
 
 
485
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
486
        self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
 
487
 
 
488
        wt.commit('adding a,b', rev_id='r1')
 
489
 
 
490
        rev_tree = b.repository.revision_tree('r1')
 
491
        self.failUnless(rev_tree.is_executable(a_id), "'a' lost the execute bit")
 
492
        self.failIf(rev_tree.is_executable(b_id), "'b' gained an execute bit")
 
493
 
 
494
        self.failUnless(rev_tree.inventory[a_id].executable)
 
495
        self.failIf(rev_tree.inventory[b_id].executable)
 
496
 
 
497
        # Make sure the entries are gone
 
498
        os.remove('b1/a')
 
499
        os.remove('b1/b')
 
500
        self.failIf(wt.has_id(a_id))
 
501
        self.failIf(wt.has_filename('a'))
 
502
        self.failIf(wt.has_id(b_id))
 
503
        self.failIf(wt.has_filename('b'))
 
504
 
 
505
        # Make sure that revert is able to bring them back,
 
506
        # and sets 'a' back to being executable
 
507
 
 
508
        wt.revert(['a', 'b'], rev_tree, backups=False)
 
509
        self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
 
510
 
 
511
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
512
        self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
 
513
 
 
514
        # Now remove them again, and make sure that after a
 
515
        # commit, they are still marked correctly
 
516
        os.remove('b1/a')
 
517
        os.remove('b1/b')
 
518
        wt.commit('removed', rev_id='r2')
 
519
 
 
520
        self.assertEqual([], [cn for cn,ie in wt.inventory.iter_entries()])
 
521
        self.failIf(wt.has_id(a_id))
 
522
        self.failIf(wt.has_filename('a'))
 
523
        self.failIf(wt.has_id(b_id))
 
524
        self.failIf(wt.has_filename('b'))
 
525
 
 
526
        # Now revert back to the previous commit
 
527
        wt.revert([], rev_tree, backups=False)
 
528
        self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
 
529
 
 
530
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
531
        self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
 
532
 
 
533
        # Now make sure that 'bzr branch' also preserves the
 
534
        # executable bit
 
535
        # TODO: Maybe this should be a blackbox test
 
536
        d2 = b.bzrdir.clone('b2', revision_id='r1')
 
537
        t2 = d2.open_workingtree()
 
538
        b2 = t2.branch
 
539
        self.assertEquals('r1', b2.last_revision())
 
540
 
 
541
        self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
 
542
        self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
 
543
        self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
 
544
 
 
545
        # Make sure pull will delete the files
 
546
        t2.pull(b)
 
547
        self.assertEquals('r2', b2.last_revision())
 
548
        self.assertEqual([], [cn for cn,ie in t2.inventory.iter_entries()])
 
549
 
 
550
        # Now commit the changes on the first branch
 
551
        # so that the second branch can pull the changes
 
552
        # and make sure that the executable bit has been copied
 
553
        wt.commit('resurrected', rev_id='r3')
 
554
 
 
555
        t2.pull(b)
 
556
        self.assertEquals('r3', b2.last_revision())
 
557
        self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
 
558
 
 
559
        self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
 
560
        self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
 
561
 
 
562
 
 
563
class TestRevert(TestCaseWithTransport):
 
564
 
 
565
    def test_dangling_id(self):
 
566
        wt = self.make_branch_and_tree('b1')
 
567
        self.assertEqual(len(wt.inventory), 1)
 
568
        open('b1/a', 'wb').write('a test\n')
 
569
        wt.add('a')
 
570
        self.assertEqual(len(wt.inventory), 2)
 
571
        os.unlink('b1/a')
 
572
        wt.revert([])
 
573
        self.assertEqual(len(wt.inventory), 1)