107
119
# Fresh object, reads disk again.
108
tree = tree.bzrdir.open_workingtree()
120
tree = tree.controldir.open_workingtree()
109
121
tree.lock_write()
111
123
tree.apply_inventory_delta(delta)
114
126
# reload tree - ensure we get what was written.
115
tree = tree.bzrdir.open_workingtree()
127
tree = tree.controldir.open_workingtree()
117
129
self.addCleanup(tree.unlock)
118
# One could add 'tree._validate' here but that would cause 'early' failues
119
# as far as higher level code is concerned. Possibly adding an
120
# expect_fail parameter to this function and if that is False then do a
122
return tree.inventory
125
def apply_inventory_WT_basis(self, basis, delta):
130
if not invalid_delta:
132
return tree.root_inventory
135
def _create_repo_revisions(repo, basis, delta, invalid_delta):
136
repo.start_write_group()
138
rev = revision.Revision('basis', timestamp=0, timezone=None,
139
message="", committer="foo@example.com")
140
basis.revision_id = 'basis'
141
create_texts_for_inv(repo, basis)
142
repo.add_revision('basis', rev, basis)
144
# We don't want to apply the delta to the basis, because we expect
145
# the delta is invalid.
147
result_inv.revision_id = b'result'
148
target_entries = None
150
result_inv = basis.create_by_apply_delta(delta, 'result')
151
create_texts_for_inv(repo, result_inv)
152
target_entries = list(result_inv.iter_entries_by_dir())
153
rev = revision.Revision('result', timestamp=0, timezone=None,
154
message="", committer="foo@example.com")
155
repo.add_revision('result', rev, result_inv)
156
repo.commit_write_group()
158
repo.abort_write_group()
160
return target_entries
163
def _get_basis_entries(tree):
164
basis_tree = tree.basis_tree()
165
basis_tree.lock_read()
166
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
168
return basis_tree_entries
171
def _populate_different_tree(tree, basis, delta):
172
"""Put all entries into tree, but at a unique location."""
175
tree.add(['unique-dir'], [b'unique-dir-id'], ['directory'])
176
for path, ie in basis.iter_entries_by_dir():
177
if ie.file_id in added_ids:
179
# We want a unique path for each of these, we use the file-id
180
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
181
added_ids.add(ie.file_id)
182
for old_path, new_path, file_id, ie in delta:
183
if file_id in added_ids:
185
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
188
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
126
189
"""Apply delta to basis and return the result.
128
191
This sets the parent and then calls update_basis_by_delta.
130
193
allow safety checks made by the WT to succeed, and finally ensures that all
131
194
items in the delta with a new path are present in the WT before calling
132
195
update_basis_by_delta.
134
197
:param basis: An inventory to be used as the basis.
135
198
:param delta: The inventory delta to apply:
136
199
:return: An inventory resulting from the application.
138
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
201
control = test.make_controldir('tree', format=test.format._matchingcontroldir)
139
202
control.create_repository()
140
203
control.create_branch()
141
tree = self.format.initialize(control)
204
tree = test.format.initialize(control)
142
205
tree.lock_write()
144
repo = tree.branch.repository
145
repo.start_write_group()
147
rev = revision.Revision('basis', timestamp=0, timezone=None,
148
message="", committer="foo@example.com")
149
basis.revision_id = 'basis'
150
create_texts_for_inv(tree.branch.repository, basis)
151
repo.add_revision('basis', rev, basis)
152
# Add a revision for the result, with the basis content -
153
# update_basis_by_delta doesn't check that the delta results in
154
# result, and we want inconsistent deltas to get called on the
155
# tree, or else the code isn't actually checked.
156
rev = revision.Revision('result', timestamp=0, timezone=None,
157
message="", committer="foo@example.com")
158
basis.revision_id = 'result'
159
repo.add_revision('result', rev, basis)
160
repo.commit_write_group()
162
repo.abort_write_group()
207
target_entries = _create_repo_revisions(tree.branch.repository, basis,
208
delta, invalid_delta)
164
209
# Set the basis state as the trees current state
165
210
tree._write_inventory(basis)
166
211
# This reads basis from the repo and puts it into the tree's local
167
212
# cache, if it has one.
168
213
tree.set_parent_ids(['basis'])
171
for old, new, id, entry in delta:
172
if None in (new, entry):
174
paths[new] = (entry.file_id, entry.kind)
175
parents.add(osutils.dirname(new))
176
parents = osutils.minimum_path_selection(parents)
178
# Put place holders in the tree to permit adding the other entries.
179
for pos, parent in enumerate(parents):
180
if not tree.path2id(parent):
181
# add a synthetic directory in the tree so we can can put the
182
# tree0 entries in place for dirstate.
183
tree.add([parent], ["id%d" % pos], ["directory"])
185
# Many deltas may cause this mini-apply to fail, but we want to see what
186
# the delta application code says, not the prep that we do to deal with
187
# limitations of dirstate's update_basis code.
188
for path, (file_id, kind) in sorted(paths.items()):
190
tree.add([path], [file_id], [kind])
191
except (KeyboardInterrupt, SystemExit):
197
216
# Fresh lock, reads disk again.
198
217
tree.lock_write()
200
219
tree.update_basis_by_delta('result', delta)
220
if not invalid_delta:
203
224
# reload tree - ensure we get what was written.
204
tree = tree.bzrdir.open_workingtree()
225
tree = tree.controldir.open_workingtree()
205
226
basis_tree = tree.basis_tree()
206
227
basis_tree.lock_read()
207
self.addCleanup(basis_tree.unlock)
208
# Note, that if the tree does not have a local cache, the trick above of
209
# setting the result as the basis, will come back to bite us. That said,
210
# all the implementations in bzr do have a local cache.
211
return basis_tree.inventory
214
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
228
test.addCleanup(basis_tree.unlock)
229
basis_inv = basis_tree.root_inventory
231
basis_entries = list(basis_inv.iter_entries_by_dir())
232
test.assertEqual(target_entries, basis_entries)
236
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
215
238
"""Apply delta to basis and return the result.
217
240
This inserts basis as a whole inventory and then uses
264
287
def test_creation_from_root_id(self):
265
288
# iff a root id is passed to the constructor, a root directory is made
266
inv = inventory.Inventory(root_id='tree-root')
289
inv = inventory.Inventory(root_id=b'tree-root')
267
290
self.assertNotEqual(None, inv.root)
268
self.assertEqual('tree-root', inv.root.file_id)
291
self.assertEqual(b'tree-root', inv.root.file_id)
270
293
def test_add_path_of_root(self):
271
294
# if no root id is given at creation time, there is no root directory
272
295
inv = inventory.Inventory(root_id=None)
273
296
self.assertIs(None, inv.root)
274
297
# add a root entry by adding its path
275
ie = inv.add_path("", "directory", "my-root")
276
ie.revision = 'test-rev'
277
self.assertEqual("my-root", ie.file_id)
298
ie = inv.add_path(u"", "directory", b"my-root")
299
ie.revision = b'test-rev'
300
self.assertEqual(b"my-root", ie.file_id)
278
301
self.assertIs(ie, inv.root)
280
303
def test_add_path(self):
281
inv = inventory.Inventory(root_id='tree_root')
282
ie = inv.add_path('hello', 'file', 'hello-id')
283
self.assertEqual('hello-id', ie.file_id)
304
inv = inventory.Inventory(root_id=b'tree_root')
305
ie = inv.add_path(u'hello', 'file', b'hello-id')
306
self.assertEqual(b'hello-id', ie.file_id)
284
307
self.assertEqual('file', ie.kind)
286
309
def test_copy(self):
287
310
"""Make sure copy() works and creates a deep copy."""
288
inv = inventory.Inventory(root_id='some-tree-root')
289
ie = inv.add_path('hello', 'file', 'hello-id')
311
inv = inventory.Inventory(root_id=b'some-tree-root')
312
ie = inv.add_path(u'hello', 'file', b'hello-id')
290
313
inv2 = inv.copy()
291
inv.root.file_id = 'some-new-root'
293
self.assertEqual('some-tree-root', inv2.root.file_id)
294
self.assertEqual('hello', inv2['hello-id'].name)
314
inv.root.file_id = b'some-new-root'
316
self.assertEqual(b'some-tree-root', inv2.root.file_id)
317
self.assertEqual(u'hello', inv2.get_entry(b'hello-id').name)
296
319
def test_copy_empty(self):
297
320
"""Make sure an empty inventory can be copied."""
302
325
def test_copy_copies_root_revision(self):
303
326
"""Make sure the revision of the root gets copied."""
304
inv = inventory.Inventory(root_id='someroot')
305
inv.root.revision = 'therev'
327
inv = inventory.Inventory(root_id=b'someroot')
328
inv.root.revision = b'therev'
306
329
inv2 = inv.copy()
307
self.assertEquals('someroot', inv2.root.file_id)
308
self.assertEquals('therev', inv2.root.revision)
330
self.assertEqual(b'someroot', inv2.root.file_id)
331
self.assertEqual(b'therev', inv2.root.revision)
310
333
def test_create_tree_reference(self):
311
inv = inventory.Inventory('tree-root-123')
312
inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
313
revision='rev', reference_revision='rev2'))
334
inv = inventory.Inventory(b'tree-root-123')
335
inv.add(TreeReference(
336
b'nested-id', 'nested', parent_id=b'tree-root-123',
337
revision=b'rev', reference_revision=b'rev2'))
315
339
def test_error_encoding(self):
316
inv = inventory.Inventory('tree-root')
317
inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
340
inv = inventory.Inventory(b'tree-root')
341
inv.add(InventoryFile(b'a-id', u'\u1234', b'tree-root'))
318
342
e = self.assertRaises(errors.InconsistentDelta, inv.add,
319
InventoryFile('b-id', u'\u1234', 'tree-root'))
343
InventoryFile(b'b-id', u'\u1234', b'tree-root'))
320
344
self.assertContainsRe(str(e), r'\\u1234')
322
346
def test_add_recursive(self):
323
parent = InventoryDirectory('src-id', 'src', 'tree-root')
324
child = InventoryFile('hello-id', 'hello.c', 'src-id')
347
parent = InventoryDirectory(b'src-id', 'src', b'tree-root')
348
child = InventoryFile(b'hello-id', 'hello.c', b'src-id')
325
349
parent.children[child.file_id] = child
326
inv = inventory.Inventory('tree-root')
350
inv = inventory.Inventory(b'tree-root')
328
self.assertEqual('src/hello.c', inv.id2path('hello-id'))
352
self.assertEqual('src/hello.c', inv.id2path(b'hello-id'))
332
356
class TestDeltaApplication(TestCaseWithTransport):
358
scenarios = delta_application_scenarios()
334
360
def get_empty_inventory(self, reference_inv=None):
335
361
"""Get an empty inventory.
574
603
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
578
class TestInventory(TestCase):
606
def test_add_file(self):
607
inv = self.get_empty_inventory()
608
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
609
file1.revision = b'result'
612
delta = [(None, u'path', b'file-id', file1)]
613
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
614
self.assertEqual(b'file-id', res_inv.get_entry(b'file-id').file_id)
616
def test_remove_file(self):
617
inv = self.get_empty_inventory()
618
file1 = inventory.InventoryFile(b'file-id', 'path', inv.root.file_id)
619
file1.revision = b'result'
623
delta = [(u'path', None, b'file-id', None)]
624
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
625
self.assertEqual(None, res_inv.path2id('path'))
626
self.assertRaises(errors.NoSuchId, res_inv.id2path, b'file-id')
628
def test_rename_file(self):
629
inv = self.get_empty_inventory()
630
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
632
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
633
delta = [(u'path', 'path2', b'file-id', file2)]
634
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
635
self.assertEqual(None, res_inv.path2id('path'))
636
self.assertEqual(b'file-id', res_inv.path2id('path2'))
638
def test_replaced_at_new_path(self):
639
inv = self.get_empty_inventory()
640
file1 = self.make_file_ie(file_id=b'id1', parent_id=inv.root.file_id)
642
file2 = self.make_file_ie(file_id=b'id2', parent_id=inv.root.file_id)
643
delta = [(u'name', None, b'id1', None),
644
(None, u'name', b'id2', file2)]
645
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
646
self.assertEqual(b'id2', res_inv.path2id('name'))
648
def test_rename_dir(self):
649
inv = self.get_empty_inventory()
650
dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
651
dir1.revision = b'basis'
652
file1 = self.make_file_ie(parent_id=b'dir-id')
655
dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
656
dir2.revision = b'result'
657
delta = [('dir1', 'dir2', b'dir-id', dir2)]
658
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
659
# The file should be accessible under the new path
660
self.assertEqual(b'file-id', res_inv.path2id('dir2/name'))
662
def test_renamed_dir_with_renamed_child(self):
663
inv = self.get_empty_inventory()
664
dir1 = inventory.InventoryDirectory(b'dir-id', 'dir1', inv.root.file_id)
665
dir1.revision = b'basis'
666
file1 = self.make_file_ie(b'file-id-1', 'name1', parent_id=b'dir-id')
667
file2 = self.make_file_ie(b'file-id-2', 'name2', parent_id=b'dir-id')
671
dir2 = inventory.InventoryDirectory(b'dir-id', 'dir2', inv.root.file_id)
672
dir2.revision = b'result'
673
file2b = self.make_file_ie(b'file-id-2', 'name2', inv.root.file_id)
674
delta = [('dir1', 'dir2', b'dir-id', dir2),
675
('dir1/name2', 'name2', b'file-id-2', file2b)]
676
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
677
# The file should be accessible under the new path
678
self.assertEqual(b'file-id-1', res_inv.path2id('dir2/name1'))
679
self.assertEqual(None, res_inv.path2id('dir2/name2'))
680
self.assertEqual(b'file-id-2', res_inv.path2id('name2'))
580
682
def test_is_root(self):
581
683
"""Ensure our root-checking code is accurate."""
892
998
inv.revision_id = "revid"
893
999
inv.root.revision = "rootrev"
894
1000
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
895
inv["fileid"].revision = "filerev"
896
inv["fileid"].executable = True
897
inv["fileid"].text_sha1 = "ffff"
898
inv["fileid"].text_size = 1
1001
inv.get_entry("fileid").revision = "filerev"
1002
inv.get_entry("fileid").executable = True
1003
inv.get_entry("fileid").text_sha1 = "ffff"
1004
inv.get_entry("fileid").text_size = 1
899
1005
chk_bytes = self.get_chk_bytes()
900
1006
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
901
1007
self.assertEqual(2, len(chk_inv))
903
def test___getitem__(self):
1009
def test_get_entry(self):
904
1010
inv = Inventory()
905
inv.revision_id = "revid"
906
inv.root.revision = "rootrev"
907
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
908
inv["fileid"].revision = "filerev"
909
inv["fileid"].executable = True
910
inv["fileid"].text_sha1 = "ffff"
911
inv["fileid"].text_size = 1
1011
inv.revision_id = b"revid"
1012
inv.root.revision = b"rootrev"
1013
inv.add(InventoryFile(b"fileid", u"file", inv.root.file_id))
1014
inv.get_entry(b"fileid").revision = b"filerev"
1015
inv.get_entry(b"fileid").executable = True
1016
inv.get_entry(b"fileid").text_sha1 = b"ffff"
1017
inv.get_entry(b"fileid").text_size = 1
912
1018
chk_bytes = self.get_chk_bytes()
913
1019
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
914
bytes = ''.join(chk_inv.to_lines())
915
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
916
root_entry = new_inv[inv.root.file_id]
917
file_entry = new_inv["fileid"]
1020
data = b''.join(chk_inv.to_lines())
1021
new_inv = CHKInventory.deserialise(chk_bytes, data, (b"revid",))
1022
root_entry = new_inv.get_entry(inv.root.file_id)
1023
file_entry = new_inv.get_entry(b"fileid")
918
1024
self.assertEqual("directory", root_entry.kind)
919
1025
self.assertEqual(inv.root.file_id, root_entry.file_id)
920
1026
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
921
1027
self.assertEqual(inv.root.name, root_entry.name)
922
self.assertEqual("rootrev", root_entry.revision)
1028
self.assertEqual(b"rootrev", root_entry.revision)
923
1029
self.assertEqual("file", file_entry.kind)
924
self.assertEqual("fileid", file_entry.file_id)
1030
self.assertEqual(b"fileid", file_entry.file_id)
925
1031
self.assertEqual(inv.root.file_id, file_entry.parent_id)
926
self.assertEqual("file", file_entry.name)
927
self.assertEqual("filerev", file_entry.revision)
928
self.assertEqual("ffff", file_entry.text_sha1)
1032
self.assertEqual(u"file", file_entry.name)
1033
self.assertEqual(b"filerev", file_entry.revision)
1034
self.assertEqual(b"ffff", file_entry.text_sha1)
929
1035
self.assertEqual(1, file_entry.text_size)
930
1036
self.assertEqual(True, file_entry.executable)
931
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
1037
self.assertRaises(errors.NoSuchId, new_inv.get_entry, 'missing')
933
1039
def test_has_id_true(self):
934
1040
inv = Inventory()
935
inv.revision_id = "revid"
936
inv.root.revision = "rootrev"
937
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
938
inv["fileid"].revision = "filerev"
939
inv["fileid"].executable = True
940
inv["fileid"].text_sha1 = "ffff"
941
inv["fileid"].text_size = 1
1041
inv.revision_id = b"revid"
1042
inv.root.revision = b"rootrev"
1043
inv.add(InventoryFile(b"fileid", "file", inv.root.file_id))
1044
inv.get_entry(b"fileid").revision = b"filerev"
1045
inv.get_entry(b"fileid").executable = True
1046
inv.get_entry(b"fileid").text_sha1 = "ffff"
1047
inv.get_entry(b"fileid").text_size = 1
942
1048
chk_bytes = self.get_chk_bytes()
943
1049
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
944
self.assertTrue(chk_inv.has_id('fileid'))
1050
self.assertTrue(chk_inv.has_id(b'fileid'))
945
1051
self.assertTrue(chk_inv.has_id(inv.root.file_id))
947
1053
def test_has_id_not(self):
948
1054
inv = Inventory()
949
inv.revision_id = "revid"
950
inv.root.revision = "rootrev"
1055
inv.revision_id = b"revid"
1056
inv.root.revision = b"rootrev"
951
1057
chk_bytes = self.get_chk_bytes()
952
1058
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
953
self.assertFalse(chk_inv.has_id('fileid'))
1059
self.assertFalse(chk_inv.has_id(b'fileid'))
955
1061
def test_id2path(self):
956
1062
inv = Inventory()
957
inv.revision_id = "revid"
958
inv.root.revision = "rootrev"
959
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1063
inv.revision_id = b"revid"
1064
inv.root.revision = b"rootrev"
1065
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
960
1066
fileentry = InventoryFile("fileid", "file", "dirid")
961
1067
inv.add(direntry)
962
1068
inv.add(fileentry)
963
inv["fileid"].revision = "filerev"
964
inv["fileid"].executable = True
965
inv["fileid"].text_sha1 = "ffff"
966
inv["fileid"].text_size = 1
967
inv["dirid"].revision = "filerev"
1069
inv.get_entry(b"fileid").revision = b"filerev"
1070
inv.get_entry(b"fileid").executable = True
1071
inv.get_entry(b"fileid").text_sha1 = "ffff"
1072
inv.get_entry(b"fileid").text_size = 1
1073
inv.get_entry(b"dirid").revision = b"filerev"
968
1074
chk_bytes = self.get_chk_bytes()
969
1075
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
970
1076
bytes = ''.join(chk_inv.to_lines())
971
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1077
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
972
1078
self.assertEqual('', new_inv.id2path(inv.root.file_id))
973
self.assertEqual('dir', new_inv.id2path('dirid'))
974
self.assertEqual('dir/file', new_inv.id2path('fileid'))
1079
self.assertEqual('dir', new_inv.id2path(b'dirid'))
1080
self.assertEqual('dir/file', new_inv.id2path(b'fileid'))
976
1082
def test_path2id(self):
977
1083
inv = Inventory()
978
inv.revision_id = "revid"
979
inv.root.revision = "rootrev"
980
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
981
fileentry = InventoryFile("fileid", "file", "dirid")
1084
inv.revision_id = b"revid"
1085
inv.root.revision = b"rootrev"
1086
direntry = InventoryDirectory(b"dirid", "dir", inv.root.file_id)
1087
fileentry = InventoryFile(b"fileid", "file", b"dirid")
982
1088
inv.add(direntry)
983
1089
inv.add(fileentry)
984
inv["fileid"].revision = "filerev"
985
inv["fileid"].executable = True
986
inv["fileid"].text_sha1 = "ffff"
987
inv["fileid"].text_size = 1
988
inv["dirid"].revision = "filerev"
1090
inv.get_entry(b"fileid").revision = b"filerev"
1091
inv.get_entry(b"fileid").executable = True
1092
inv.get_entry(b"fileid").text_sha1 = "ffff"
1093
inv.get_entry(b"fileid").text_size = 1
1094
inv.get_entry(b"dirid").revision = b"filerev"
989
1095
chk_bytes = self.get_chk_bytes()
990
1096
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
991
1097
bytes = ''.join(chk_inv.to_lines())
992
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1098
new_inv = CHKInventory.deserialise(chk_bytes, bytes, (b"revid",))
993
1099
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
994
self.assertEqual('dirid', new_inv.path2id('dir'))
995
self.assertEqual('fileid', new_inv.path2id('dir/file'))
1100
self.assertEqual(b'dirid', new_inv.path2id('dir'))
1101
self.assertEqual(b'fileid', new_inv.path2id('dir/file'))
997
1103
def test_create_by_apply_delta_sets_root(self):
998
1104
inv = Inventory()
1113
1219
def test_file_entry_to_bytes(self):
1114
1220
inv = CHKInventory(None)
1115
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1221
ie = inventory.InventoryFile(b'file-id', 'filename', 'parent-id')
1116
1222
ie.executable = True
1117
ie.revision = 'file-rev-id'
1223
ie.revision = b'file-rev-id'
1118
1224
ie.text_sha1 = 'abcdefgh'
1119
1225
ie.text_size = 100
1120
1226
bytes = inv._entry_to_bytes(ie)
1121
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1122
'file-rev-id\nabcdefgh\n100\nY', bytes)
1227
self.assertEqual(b'file: file-id\nparent-id\nfilename\n'
1228
b'file-rev-id\nabcdefgh\n100\nY', bytes)
1123
1229
ie2 = inv._bytes_to_entry(bytes)
1124
1230
self.assertEqual(ie, ie2)
1125
1231
self.assertIsInstance(ie2.name, unicode)
1126
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1232
self.assertEqual(('filename', b'file-id', b'file-rev-id'),
1127
1233
inv._bytes_to_utf8name_key(bytes))
1129
1235
def test_file2_entry_to_bytes(self):
1130
1236
inv = CHKInventory(None)
1131
1237
# \u30a9 == 'omega'
1132
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1238
ie = inventory.InventoryFile(b'file-id', u'\u03a9name', b'parent-id')
1133
1239
ie.executable = False
1134
ie.revision = 'file-rev-id'
1240
ie.revision = b'file-rev-id'
1135
1241
ie.text_sha1 = '123456'
1136
1242
ie.text_size = 25
1137
1243
bytes = inv._entry_to_bytes(ie)
1138
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1139
'file-rev-id\n123456\n25\nN', bytes)
1244
self.assertEqual(b'file: file-id\nparent-id\n\xce\xa9name\n'
1245
b'file-rev-id\n123456\n25\nN', bytes)
1140
1246
ie2 = inv._bytes_to_entry(bytes)
1141
1247
self.assertEqual(ie, ie2)
1142
1248
self.assertIsInstance(ie2.name, unicode)
1143
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1249
self.assertEqual((b'\xce\xa9name', b'file-id', b'file-rev-id'),
1144
1250
inv._bytes_to_utf8name_key(bytes))
1146
1252
def test_dir_entry_to_bytes(self):
1147
1253
inv = CHKInventory(None)
1148
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1149
ie.revision = 'dir-rev-id'
1254
ie = inventory.InventoryDirectory(b'dir-id', 'dirname', b'parent-id')
1255
ie.revision = b'dir-rev-id'
1150
1256
bytes = inv._entry_to_bytes(ie)
1151
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1257
self.assertEqual(b'dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1152
1258
ie2 = inv._bytes_to_entry(bytes)
1153
1259
self.assertEqual(ie, ie2)
1154
1260
self.assertIsInstance(ie2.name, unicode)
1155
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1261
self.assertEqual(('dirname', b'dir-id', b'dir-rev-id'),
1156
1262
inv._bytes_to_utf8name_key(bytes))
1158
1264
def test_dir2_entry_to_bytes(self):
1159
1265
inv = CHKInventory(None)
1160
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1266
ie = inventory.InventoryDirectory(b'dir-id', u'dir\u03a9name',
1162
ie.revision = 'dir-rev-id'
1268
ie.revision = b'dir-rev-id'
1163
1269
bytes = inv._entry_to_bytes(ie)
1164
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1165
'dir-rev-id', bytes)
1270
self.assertEqual(b'dir: dir-id\n\ndir\xce\xa9name\n'
1271
b'dir-rev-id', bytes)
1166
1272
ie2 = inv._bytes_to_entry(bytes)
1167
1273
self.assertEqual(ie, ie2)
1168
1274
self.assertIsInstance(ie2.name, unicode)
1169
1275
self.assertIs(ie2.parent_id, None)
1170
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1276
self.assertEqual(('dir\xce\xa9name', b'dir-id', b'dir-rev-id'),
1171
1277
inv._bytes_to_utf8name_key(bytes))
1173
1279
def test_symlink_entry_to_bytes(self):
1215
1321
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1216
1322
inv._bytes_to_utf8name_key(bytes))
1324
def make_basic_utf8_inventory(self):
1326
inv.revision_id = "revid"
1327
inv.root.revision = "rootrev"
1328
root_id = inv.root.file_id
1329
inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1330
inv.get_entry("fileid").revision = "filerev"
1331
inv.get_entry("fileid").text_sha1 = "ffff"
1332
inv.get_entry("fileid").text_size = 0
1333
inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1334
inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1335
inv.get_entry("childid").revision = "filerev"
1336
inv.get_entry("childid").text_sha1 = "ffff"
1337
inv.get_entry("childid").text_size = 0
1338
chk_bytes = self.get_chk_bytes()
1339
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1340
bytes = ''.join(chk_inv.to_lines())
1341
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1343
def test__preload_handles_utf8(self):
1344
new_inv = self.make_basic_utf8_inventory()
1345
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1346
self.assertFalse(new_inv._fully_cached)
1347
new_inv._preload_cache()
1349
sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1350
sorted(new_inv._fileid_to_entry_cache.keys()))
1351
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1352
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1353
sorted(ie_root._children.keys()))
1354
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1355
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1357
def test__preload_populates_cache(self):
1359
inv.revision_id = "revid"
1360
inv.root.revision = "rootrev"
1361
root_id = inv.root.file_id
1362
inv.add(InventoryFile("fileid", "file", root_id))
1363
inv.get_entry("fileid").revision = "filerev"
1364
inv.get_entry("fileid").executable = True
1365
inv.get_entry("fileid").text_sha1 = "ffff"
1366
inv.get_entry("fileid").text_size = 1
1367
inv.add(InventoryDirectory("dirid", "dir", root_id))
1368
inv.add(InventoryFile("childid", "child", "dirid"))
1369
inv.get_entry("childid").revision = "filerev"
1370
inv.get_entry("childid").executable = False
1371
inv.get_entry("childid").text_sha1 = "dddd"
1372
inv.get_entry("childid").text_size = 1
1373
chk_bytes = self.get_chk_bytes()
1374
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1375
bytes = ''.join(chk_inv.to_lines())
1376
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1377
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1378
self.assertFalse(new_inv._fully_cached)
1379
new_inv._preload_cache()
1381
sorted([root_id, "fileid", "dirid", "childid"]),
1382
sorted(new_inv._fileid_to_entry_cache.keys()))
1383
self.assertTrue(new_inv._fully_cached)
1384
ie_root = new_inv._fileid_to_entry_cache[root_id]
1385
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1386
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1387
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1389
def test__preload_handles_partially_evaluated_inventory(self):
1390
new_inv = self.make_basic_utf8_inventory()
1391
ie = new_inv.get_entry(new_inv.root_id)
1392
self.assertIs(None, ie._children)
1393
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1394
sorted(ie.children.keys()))
1395
# Accessing .children loads _children
1396
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1397
sorted(ie._children.keys()))
1398
new_inv._preload_cache()
1400
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1401
sorted(ie._children.keys()))
1402
ie_dir = new_inv.get_entry("dirid")
1403
self.assertEqual([u'ch\xefld'],
1404
sorted(ie_dir._children.keys()))
1406
def test_filter_change_in_renamed_subfolder(self):
1407
inv = Inventory('tree-root')
1408
src_ie = inv.add_path('src', 'directory', 'src-id')
1409
inv.add_path('src/sub/', 'directory', 'sub-id')
1410
a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
1411
a_ie.text_sha1 = osutils.sha_string('content\n')
1412
a_ie.text_size = len('content\n')
1413
chk_bytes = self.get_chk_bytes()
1414
inv = CHKInventory.from_inventory(chk_bytes, inv)
1415
inv = inv.create_by_apply_delta([
1416
("src/sub/a", "src/sub/a", "a-id", a_ie),
1417
("src", "src2", "src-id", src_ie),
1419
new_inv = inv.filter(['a-id', 'src-id'])
1423
('src/sub', 'sub-id'),
1424
('src/sub/a', 'a-id'),
1425
], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1219
1427
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):