82
82
from bzrlib import (
85
from bzrlib.trace import mutter
86
85
from bzrlib.errors import (WeaveError, WeaveFormatError, WeaveParentMismatch,
87
86
RevisionAlreadyPresent,
88
87
RevisionNotPresent,
88
UnavailableRepresentation,
89
89
WeaveRevisionAlreadyPresent,
90
90
WeaveRevisionNotPresent,
92
92
import bzrlib.errors as errors
93
from bzrlib.osutils import sha_strings
93
from bzrlib.osutils import dirname, sha_strings, split_lines
94
94
import bzrlib.patiencediff
95
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
95
from bzrlib.revision import NULL_REVISION
96
from bzrlib.symbol_versioning import *
97
from bzrlib.trace import mutter
98
from bzrlib.tsort import topo_sort
99
from bzrlib.versionedfile import (
100
AbsentContentFactory,
96
105
from bzrlib.weavefile import _read_weave_v5, write_weave_v5
108
class WeaveContentFactory(ContentFactory):
109
"""Content factory for streaming from weaves.
111
:seealso ContentFactory:
114
def __init__(self, version, weave):
115
"""Create a WeaveContentFactory for version from weave."""
116
ContentFactory.__init__(self)
117
self.sha1 = weave.get_sha1s([version])[version]
118
self.key = (version,)
119
parents = weave.get_parent_map([version])[version]
120
self.parents = tuple((parent,) for parent in parents)
121
self.storage_kind = 'fulltext'
124
def get_bytes_as(self, storage_kind):
125
if storage_kind == 'fulltext':
126
return self._weave.get_text(self.key[-1])
128
raise UnavailableRepresentation(self.key, storage_kind, 'fulltext')
99
131
class Weave(VersionedFile):
100
132
"""weave - versioned text file storage.
188
220
__slots__ = ['_weave', '_parents', '_sha1s', '_names', '_name_map',
189
'_weave_name', '_matcher']
221
'_weave_name', '_matcher', '_allow_reserved']
191
def __init__(self, weave_name=None, access_mode='w', matcher=None):
223
def __init__(self, weave_name=None, access_mode='w', matcher=None,
224
get_scope=None, allow_reserved=False):
227
:param get_scope: A callable that returns an opaque object to be used
228
for detecting when this weave goes out of scope (should stop
229
answering requests or allowing mutation).
192
231
super(Weave, self).__init__(access_mode)
194
233
self._parents = []
200
239
self._matcher = bzrlib.patiencediff.PatienceSequenceMatcher
202
241
self._matcher = matcher
242
if get_scope is None:
243
get_scope = lambda:None
244
self._get_scope = get_scope
245
self._scope = get_scope()
246
self._access_mode = access_mode
247
self._allow_reserved = allow_reserved
204
249
def __repr__(self):
205
250
return "Weave(%r)" % self._weave_name
252
def _check_write_ok(self):
253
"""Is the versioned file marked as 'finished' ? Raise if it is."""
254
if self._get_scope() != self._scope:
255
raise errors.OutSideTransaction()
256
if self._access_mode != 'w':
257
raise errors.ReadOnlyObjectDirtiedError(self)
208
260
"""Return a deep copy of self.
249
302
__contains__ = has_version
304
def get_record_stream(self, versions, ordering, include_delta_closure):
305
"""Get a stream of records for versions.
307
:param versions: The versions to include. Each version is a tuple
309
:param ordering: Either 'unordered' or 'topological'. A topologically
310
sorted stream has compression parents strictly before their
312
:param include_delta_closure: If True then the closure across any
313
compression parents will be included (in the opaque data).
314
:return: An iterator of ContentFactory objects, each of which is only
315
valid until the iterator is advanced.
317
versions = [version[-1] for version in versions]
318
if ordering == 'topological':
319
parents = self.get_parent_map(versions)
320
new_versions = topo_sort(parents)
321
new_versions.extend(set(versions).difference(set(parents)))
322
versions = new_versions
323
for version in versions:
325
yield WeaveContentFactory(version, self)
327
yield AbsentContentFactory((version,))
251
329
def get_parent_map(self, version_ids):
252
330
"""See VersionedFile.get_parent_map."""
254
332
for version_id in version_ids:
256
result[version_id] = tuple(
257
map(self._idx_to_name, self._parents[self._lookup(version_id)]))
258
except RevisionNotPresent:
333
if version_id == NULL_REVISION:
338
map(self._idx_to_name,
339
self._parents[self._lookup(version_id)]))
340
except RevisionNotPresent:
342
result[version_id] = parents
262
345
def get_parents_with_ghosts(self, version_id):
263
346
raise NotImplementedError(self.get_parents_with_ghosts)
348
def insert_record_stream(self, stream):
349
"""Insert a record stream into this versioned file.
351
:param stream: A stream of records to insert.
353
:seealso VersionedFile.get_record_stream:
356
for record in stream:
357
# Raise an error when a record is missing.
358
if record.storage_kind == 'absent':
359
raise RevisionNotPresent([record.key[0]], self)
360
# adapt to non-tuple interface
361
parents = [parent[0] for parent in record.parents]
362
if record.storage_kind == 'fulltext':
363
self.add_lines(record.key[0], parents,
364
split_lines(record.get_bytes_as('fulltext')))
366
adapter_key = record.storage_kind, 'fulltext'
368
adapter = adapters[adapter_key]
370
adapter_factory = adapter_registry.get(adapter_key)
371
adapter = adapter_factory(self)
372
adapters[adapter_key] = adapter
373
lines = split_lines(adapter.get_bytes(
374
record, record.get_bytes_as(record.storage_kind)))
376
self.add_lines(record.key[0], parents, lines)
377
except RevisionAlreadyPresent:
265
380
def _check_repeated_add(self, name, parents, text, sha1):
266
381
"""Check that a duplicated add is OK.
457
560
return len(other_parents.difference(my_parents)) == 0
459
def annotate_iter(self, version_id):
460
"""Yield list of (version-id, line) pairs for the specified version.
562
def annotate(self, version_id):
563
"""Return a list of (version-id, line) tuples for version_id.
462
565
The index indicates when the line originated in the weave."""
463
566
incls = [self._lookup(version_id)]
464
for origin, lineno, text in self._extract(incls):
465
yield self._idx_to_name(origin), text
567
return [(self._idx_to_name(origin), text) for origin, lineno, text in
568
self._extract(incls)]
467
570
def iter_lines_added_or_present_in_versions(self, version_ids=None,
673
768
expected_sha1, measured_sha1))
676
def get_sha1(self, version_id):
677
"""See VersionedFile.get_sha1()."""
678
return self._sha1s[self._lookup(version_id)]
680
771
def get_sha1s(self, version_ids):
681
772
"""See VersionedFile.get_sha1s()."""
682
return [self._sha1s[self._lookup(v)] for v in version_ids]
774
for v in version_ids:
775
result[v] = self._sha1s[self._lookup(v)]
684
778
def num_versions(self):
685
779
"""How many versions are in this weave?"""
686
780
l = len(self._parents)
687
assert l == len(self._sha1s)
690
783
__len__ = num_versions
716
809
for p in self._parents[i]:
717
810
new_inc.update(inclusions[self._idx_to_name(p)])
719
assert set(new_inc) == set(self.get_ancestry(name)), \
720
'failed %s != %s' % (set(new_inc), set(self.get_ancestry(name)))
812
if set(new_inc) != set(self.get_ancestry(name)):
813
raise AssertionError(
815
% (set(new_inc), set(self.get_ancestry(name))))
721
816
inclusions[name] = new_inc
723
818
nlines = len(self._weave)
753
848
# no lines outside of insertion blocks, that deletions are
754
849
# properly paired, etc.
756
def _join(self, other, pb, msg, version_ids, ignore_missing):
757
"""Worker routine for join()."""
758
if not other.versions():
759
return # nothing to update, easy
762
# versions is never none, InterWeave checks this.
765
# two loops so that we do not change ourselves before verifying it
767
# work through in index order to make sure we get all dependencies
770
# get the selected versions only that are in other.versions.
771
version_ids = set(other.versions()).intersection(set(version_ids))
772
# pull in the referenced graph.
773
version_ids = other.get_ancestry(version_ids)
774
pending_parents = other.get_parent_map(version_ids)
775
pending_graph = pending_parents.items()
776
if len(pending_graph) != len(version_ids):
777
raise RevisionNotPresent(
778
set(version_ids) - set(pending_parents.keys()), self)
779
for name in tsort.topo_sort(pending_graph):
780
other_idx = other._name_map[name]
781
# returns True if we have it, False if we need it.
782
if not self._check_version_consistent(other, other_idx, name):
783
names_to_join.append((other_idx, name))
791
for other_idx, name in names_to_join:
792
# TODO: If all the parents of the other version are already
793
# present then we can avoid some work by just taking the delta
794
# and adjusting the offsets.
795
new_parents = self._imported_parents(other, other_idx)
796
sha1 = other._sha1s[other_idx]
801
pb.update(msg, merged, len(names_to_join))
803
lines = other.get_lines(other_idx)
804
self._add(name, lines, new_parents, sha1)
806
mutter("merged = %d, processed = %d, file_id=%s; deltat=%d"%(
807
merged, processed, self._weave_name, time.time()-time0))
809
851
def _imported_parents(self, other, other_idx):
810
852
"""Return list of parents in self corresponding to indexes in other."""
869
911
WEAVE_SUFFIX = '.weave'
871
def __init__(self, name, transport, filemode=None, create=False, access_mode='w'):
913
def __init__(self, name, transport, filemode=None, create=False, access_mode='w', get_scope=None):
872
914
"""Create a WeaveFile.
874
916
:param create: If not True, only open an existing knit.
876
super(WeaveFile, self).__init__(name, access_mode)
918
super(WeaveFile, self).__init__(name, access_mode, get_scope=get_scope,
919
allow_reserved=False)
877
920
self._transport = transport
878
921
self._filemode = filemode
897
def _clone_text(self, new_version_id, old_version_id, parents):
898
"""See VersionedFile.clone_text."""
899
super(WeaveFile, self)._clone_text(new_version_id, old_version_id, parents)
902
940
def copy_to(self, name, transport):
903
941
"""See VersionedFile.copy_to()."""
904
942
# as we are all in memory always, just serialise to the new place.
914
952
write_weave_v5(self, sio)
916
self._transport.put_file(self._weave_name + WeaveFile.WEAVE_SUFFIX,
954
bytes = sio.getvalue()
955
path = self._weave_name + WeaveFile.WEAVE_SUFFIX
957
self._transport.put_bytes(path, bytes, self._filemode)
958
except errors.NoSuchFile:
959
self._transport.mkdir(dirname(path))
960
self._transport.put_bytes(path, bytes, self._filemode)
921
963
def get_suffixes():
922
964
"""See VersionedFile.get_suffixes()."""
923
965
return [WeaveFile.WEAVE_SUFFIX]
967
def insert_record_stream(self, stream):
968
super(WeaveFile, self).insert_record_stream(stream)
971
@deprecated_method(one_five)
925
972
def join(self, other, pb=None, msg=None, version_ids=None,
926
973
ignore_missing=False):
927
974
"""Join other into self and save."""
1191
1238
if __name__ == '__main__':
1193
1240
sys.exit(main(sys.argv))
1196
class InterWeave(InterVersionedFile):
1197
"""Optimised code paths for weave to weave operations."""
1199
_matching_file_from_factory = staticmethod(WeaveFile)
1200
_matching_file_to_factory = staticmethod(WeaveFile)
1203
def is_compatible(source, target):
1204
"""Be compatible with weaves."""
1206
return (isinstance(source, Weave) and
1207
isinstance(target, Weave))
1208
except AttributeError:
1211
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
1212
"""See InterVersionedFile.join."""
1213
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
1214
if self.target.versions() == [] and version_ids is None:
1215
self.target._copy_weave_content(self.source)
1217
self.target._join(self.source, pb, msg, version_ids, ignore_missing)
1220
InterVersionedFile.register_optimiser(InterWeave)