15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
29
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
30
InventoryDirectory, InventoryEntry, TreeReference)
31
from bzrlib.tests import (
31
from ..bzr.inventory import (
39
mutable_inventory_from_tree,
33
43
TestCaseWithTransport,
36
split_suite_by_condition,
38
from bzrlib.tests.per_workingtree import workingtree_formats
41
def load_tests(standard_tests, module, loader):
42
"""Parameterise some inventory tests."""
43
to_adapt, result = split_suite_by_condition(standard_tests,
44
condition_isinstance(TestDeltaApplication))
45
from .scenarios import load_tests_apply_scenarios
48
load_tests = load_tests_apply_scenarios
51
def delta_application_scenarios():
46
53
('Inventory', {'apply_delta':apply_inventory_Inventory}),
107
119
# Fresh object, reads disk again.
108
tree = tree.bzrdir.open_workingtree()
120
tree = tree.controldir.open_workingtree()
109
121
tree.lock_write()
111
123
tree.apply_inventory_delta(delta)
114
126
# reload tree - ensure we get what was written.
115
tree = tree.bzrdir.open_workingtree()
127
tree = tree.controldir.open_workingtree()
117
129
self.addCleanup(tree.unlock)
118
# One could add 'tree._validate' here but that would cause 'early' failues
119
# as far as higher level code is concerned. Possibly adding an
120
# expect_fail parameter to this function and if that is False then do a
122
return tree.inventory
125
def apply_inventory_WT_basis(self, basis, delta):
130
if not invalid_delta:
132
return tree.root_inventory
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
136
repo.start_write_group()
138
rev = revision.Revision('basis', timestamp=0, timezone=None,
139
message="", committer="foo@example.com")
140
basis.revision_id = 'basis'
141
create_texts_for_inv(repo, basis)
142
repo.add_revision('basis', rev, basis)
144
# We don't want to apply the delta to the basis, because we expect
145
# the delta is invalid.
147
result_inv.revision_id = 'result'
148
target_entries = None
150
result_inv = basis.create_by_apply_delta(delta, 'result')
151
create_texts_for_inv(repo, result_inv)
152
target_entries = list(result_inv.iter_entries_by_dir())
153
rev = revision.Revision('result', timestamp=0, timezone=None,
154
message="", committer="foo@example.com")
155
repo.add_revision('result', rev, result_inv)
156
repo.commit_write_group()
158
repo.abort_write_group()
160
return target_entries
163
def _get_basis_entries(tree):
164
basis_tree = tree.basis_tree()
165
basis_tree.lock_read()
166
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
168
return basis_tree_entries
171
def _populate_different_tree(tree, basis, delta):
172
"""Put all entries into tree, but at a unique location."""
175
tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
176
for path, ie in basis.iter_entries_by_dir():
177
if ie.file_id in added_ids:
179
# We want a unique path for each of these, we use the file-id
180
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
181
added_ids.add(ie.file_id)
182
for old_path, new_path, file_id, ie in delta:
183
if file_id in added_ids:
185
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
126
189
"""Apply delta to basis and return the result.
128
191
This sets the parent and then calls update_basis_by_delta.
130
193
allow safety checks made by the WT to succeed, and finally ensures that all
131
194
items in the delta with a new path are present in the WT before calling
132
195
update_basis_by_delta.
134
197
:param basis: An inventory to be used as the basis.
135
198
:param delta: The inventory delta to apply:
136
199
:return: An inventory resulting from the application.
138
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
201
control = test.make_controldir('tree', format=test.format._matchingbzrdir)
139
202
control.create_repository()
140
203
control.create_branch()
141
tree = self.format.initialize(control)
204
tree = test.format.initialize(control)
142
205
tree.lock_write()
144
repo = tree.branch.repository
145
repo.start_write_group()
147
rev = revision.Revision('basis', timestamp=0, timezone=None,
148
message="", committer="foo@example.com")
149
basis.revision_id = 'basis'
150
create_texts_for_inv(tree.branch.repository, basis)
151
repo.add_revision('basis', rev, basis)
152
# Add a revision for the result, with the basis content -
153
# update_basis_by_delta doesn't check that the delta results in
154
# result, and we want inconsistent deltas to get called on the
155
# tree, or else the code isn't actually checked.
156
rev = revision.Revision('result', timestamp=0, timezone=None,
157
message="", committer="foo@example.com")
158
basis.revision_id = 'result'
159
repo.add_revision('result', rev, basis)
160
repo.commit_write_group()
162
repo.abort_write_group()
207
target_entries = _create_repo_revisions(tree.branch.repository, basis,
208
delta, invalid_delta)
164
209
# Set the basis state as the trees current state
165
210
tree._write_inventory(basis)
166
211
# This reads basis from the repo and puts it into the tree's local
167
212
# cache, if it has one.
168
213
tree.set_parent_ids(['basis'])
171
for old, new, id, entry in delta:
172
if None in (new, entry):
174
paths[new] = (entry.file_id, entry.kind)
175
parents.add(osutils.dirname(new))
176
parents = osutils.minimum_path_selection(parents)
178
# Put place holders in the tree to permit adding the other entries.
179
for pos, parent in enumerate(parents):
180
if not tree.path2id(parent):
181
# add a synthetic directory in the tree so we can can put the
182
# tree0 entries in place for dirstate.
183
tree.add([parent], ["id%d" % pos], ["directory"])
185
# Many deltas may cause this mini-apply to fail, but we want to see what
186
# the delta application code says, not the prep that we do to deal with
187
# limitations of dirstate's update_basis code.
188
for path, (file_id, kind) in sorted(paths.items()):
190
tree.add([path], [file_id], [kind])
191
except (KeyboardInterrupt, SystemExit):
197
216
# Fresh lock, reads disk again.
198
217
tree.lock_write()
200
219
tree.update_basis_by_delta('result', delta)
220
if not invalid_delta:
203
224
# reload tree - ensure we get what was written.
204
tree = tree.bzrdir.open_workingtree()
225
tree = tree.controldir.open_workingtree()
205
226
basis_tree = tree.basis_tree()
206
227
basis_tree.lock_read()
207
self.addCleanup(basis_tree.unlock)
208
# Note, that if the tree does not have a local cache, the trick above of
209
# setting the result as the basis, will come back to bite us. That said,
210
# all the implementations in bzr do have a local cache.
211
return basis_tree.inventory
214
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
228
test.addCleanup(basis_tree.unlock)
229
basis_inv = basis_tree.root_inventory
231
basis_entries = list(basis_inv.iter_entries_by_dir())
232
test.assertEqual(target_entries, basis_entries)
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
215
238
"""Apply delta to basis and return the result.
217
240
This inserts basis as a whole inventory and then uses
574
602
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
578
class TestInventory(TestCase):
605
def test_add_file(self):
606
inv = self.get_empty_inventory()
607
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
608
file1.revision = 'result'
611
delta = [(None, u'path', 'file-id', file1)]
612
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
613
self.assertEqual('file-id', res_inv['file-id'].file_id)
615
def test_remove_file(self):
616
inv = self.get_empty_inventory()
617
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
618
file1.revision = 'result'
622
delta = [(u'path', None, 'file-id', None)]
623
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
624
self.assertEqual(None, res_inv.path2id('path'))
625
self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
627
def test_rename_file(self):
628
inv = self.get_empty_inventory()
629
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
631
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
632
delta = [(u'path', 'path2', 'file-id', file2)]
633
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
634
self.assertEqual(None, res_inv.path2id('path'))
635
self.assertEqual('file-id', res_inv.path2id('path2'))
637
def test_replaced_at_new_path(self):
638
inv = self.get_empty_inventory()
639
file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
641
file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
642
delta = [(u'name', None, 'id1', None),
643
(None, u'name', 'id2', file2)]
644
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
645
self.assertEqual('id2', res_inv.path2id('name'))
647
def test_rename_dir(self):
648
inv = self.get_empty_inventory()
649
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
650
dir1.revision = 'basis'
651
file1 = self.make_file_ie(parent_id='dir-id')
654
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
655
dir2.revision = 'result'
656
delta = [('dir1', 'dir2', 'dir-id', dir2)]
657
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
658
# The file should be accessible under the new path
659
self.assertEqual('file-id', res_inv.path2id('dir2/name'))
661
def test_renamed_dir_with_renamed_child(self):
662
inv = self.get_empty_inventory()
663
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
664
dir1.revision = 'basis'
665
file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
666
file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
670
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
671
dir2.revision = 'result'
672
file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
673
delta = [('dir1', 'dir2', 'dir-id', dir2),
674
('dir1/name2', 'name2', 'file-id-2', file2b)]
675
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
676
# The file should be accessible under the new path
677
self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
678
self.assertEqual(None, res_inv.path2id('dir2/name2'))
679
self.assertEqual('file-id-2', res_inv.path2id('name2'))
580
681
def test_is_root(self):
581
682
"""Ensure our root-checking code is accurate."""
1215
1312
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1216
1313
inv._bytes_to_utf8name_key(bytes))
1315
def make_basic_utf8_inventory(self):
1317
inv.revision_id = "revid"
1318
inv.root.revision = "rootrev"
1319
root_id = inv.root.file_id
1320
inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1321
inv["fileid"].revision = "filerev"
1322
inv["fileid"].text_sha1 = "ffff"
1323
inv["fileid"].text_size = 0
1324
inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1325
inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1326
inv["childid"].revision = "filerev"
1327
inv["childid"].text_sha1 = "ffff"
1328
inv["childid"].text_size = 0
1329
chk_bytes = self.get_chk_bytes()
1330
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1331
bytes = ''.join(chk_inv.to_lines())
1332
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1334
def test__preload_handles_utf8(self):
1335
new_inv = self.make_basic_utf8_inventory()
1336
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1337
self.assertFalse(new_inv._fully_cached)
1338
new_inv._preload_cache()
1340
sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1341
sorted(new_inv._fileid_to_entry_cache.keys()))
1342
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1343
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1344
sorted(ie_root._children.keys()))
1345
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1346
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1348
def test__preload_populates_cache(self):
1350
inv.revision_id = "revid"
1351
inv.root.revision = "rootrev"
1352
root_id = inv.root.file_id
1353
inv.add(InventoryFile("fileid", "file", root_id))
1354
inv["fileid"].revision = "filerev"
1355
inv["fileid"].executable = True
1356
inv["fileid"].text_sha1 = "ffff"
1357
inv["fileid"].text_size = 1
1358
inv.add(InventoryDirectory("dirid", "dir", root_id))
1359
inv.add(InventoryFile("childid", "child", "dirid"))
1360
inv["childid"].revision = "filerev"
1361
inv["childid"].executable = False
1362
inv["childid"].text_sha1 = "dddd"
1363
inv["childid"].text_size = 1
1364
chk_bytes = self.get_chk_bytes()
1365
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1366
bytes = ''.join(chk_inv.to_lines())
1367
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1368
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1369
self.assertFalse(new_inv._fully_cached)
1370
new_inv._preload_cache()
1372
sorted([root_id, "fileid", "dirid", "childid"]),
1373
sorted(new_inv._fileid_to_entry_cache.keys()))
1374
self.assertTrue(new_inv._fully_cached)
1375
ie_root = new_inv._fileid_to_entry_cache[root_id]
1376
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1377
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1378
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1380
def test__preload_handles_partially_evaluated_inventory(self):
1381
new_inv = self.make_basic_utf8_inventory()
1382
ie = new_inv[new_inv.root_id]
1383
self.assertIs(None, ie._children)
1384
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1385
sorted(ie.children.keys()))
1386
# Accessing .children loads _children
1387
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1388
sorted(ie._children.keys()))
1389
new_inv._preload_cache()
1391
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1392
sorted(ie._children.keys()))
1393
ie_dir = new_inv["dirid"]
1394
self.assertEqual([u'ch\xefld'],
1395
sorted(ie_dir._children.keys()))
1397
def test_filter_change_in_renamed_subfolder(self):
1398
inv = Inventory('tree-root')
1399
src_ie = inv.add_path('src', 'directory', 'src-id')
1400
inv.add_path('src/sub/', 'directory', 'sub-id')
1401
a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
1402
a_ie.text_sha1 = osutils.sha_string('content\n')
1403
a_ie.text_size = len('content\n')
1404
chk_bytes = self.get_chk_bytes()
1405
inv = CHKInventory.from_inventory(chk_bytes, inv)
1406
inv = inv.create_by_apply_delta([
1407
("src/sub/a", "src/sub/a", "a-id", a_ie),
1408
("src", "src2", "src-id", src_ie),
1410
new_inv = inv.filter(['a-id', 'src-id'])
1414
('src/sub', 'sub-id'),
1415
('src/sub/a', 'a-id'),
1416
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1219
1418
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1346
1545
inv = self.make_simple_inventory()
1347
1546
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1348
1547
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
1550
class TestMutableInventoryFromTree(TestCaseWithTransport):
1552
def test_empty(self):
1553
repository = self.make_repository('.')
1554
tree = repository.revision_tree(revision.NULL_REVISION)
1555
inv = mutable_inventory_from_tree(tree)
1556
self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1557
self.assertEqual(0, len(inv))
1559
def test_some_files(self):
1560
wt = self.make_branch_and_tree('.')
1561
self.build_tree(['a'])
1562
wt.add(['a'], ['thefileid'])
1563
revid = wt.commit("commit")
1564
tree = wt.branch.repository.revision_tree(revid)
1565
inv = mutable_inventory_from_tree(tree)
1566
self.assertEqual(revid, inv.revision_id)
1567
self.assertEqual(2, len(inv))
1568
self.assertEqual("a", inv['thefileid'].name)
1569
# The inventory should be mutable and independent of
1571
self.assertFalse(tree.root_inventory['thefileid'].executable)
1572
inv['thefileid'].executable = True
1573
self.assertFalse(tree.root_inventory['thefileid'].executable)