584
589
:returns: format_signature, list of (version, options, length, parents),
587
if not isinstance(required_versions, set):
588
required_versions = set(required_versions)
589
# we don't care about inclusions, the caller cares.
590
# but we need to setup a list of records to visit.
592
required_version_set = frozenset(required_versions)
594
# list of revisions that can just be sent without waiting for their
597
# map from revision to the children based on it
599
# first, read all relevant index data, enough to sort into the right
591
601
for version_id in required_versions:
592
if not self.has_version(version_id):
593
raise RevisionNotPresent(version_id, self.filename)
594
# Pick the desired versions out of the index in oldest-to-newest order
596
for version_id in self.versions():
597
if version_id in required_versions:
598
version_list.append(version_id)
600
# create the list of version information for the result
601
copy_queue_records = []
603
result_version_list = []
604
for version_id in version_list:
605
602
options = self._index.get_options(version_id)
606
603
parents = self._index.get_parents_with_ghosts(version_id)
607
604
index_memo = self._index.get_position(version_id)
605
version_index[version_id] = (index_memo, options, parents)
606
if ('line-delta' in options
607
and parents[0] in required_version_set):
608
# must wait until the parent has been sent
609
deferred.setdefault(parents[0], []). \
612
# either a fulltext, or a delta whose parent the client did
613
# not ask for and presumably already has
614
ready_to_send.append(version_id)
615
# build a list of results to return, plus instructions for data to
617
copy_queue_records = []
618
temp_version_list = []
620
# XXX: pushing and popping lists may be a bit inefficient
621
version_id = ready_to_send.pop(0)
622
(index_memo, options, parents) = version_index[version_id]
608
623
copy_queue_records.append((version_id, index_memo))
609
624
none, data_pos, data_size = index_memo
610
copy_set.add(version_id)
611
# version, options, length, parents
612
result_version_list.append((version_id, options, data_size,
625
temp_version_list.append((version_id, options, data_size,
615
# Read the compressed record data.
617
# From here down to the return should really be logic in the returned
618
# callable -- in a class that adapts read_records_iter_raw to read
627
if version_id in deferred:
628
# now we can send all the children of this revision - we could
629
# put them in anywhere, but we hope that sending them soon
630
# after the fulltext will give good locality in the receiver
631
ready_to_send[:0] = deferred.pop(version_id)
632
assert len(deferred) == 0, \
633
"Still have compressed child versions waiting to be sent"
634
# XXX: The stream format is such that we cannot stream it - we have to
635
# know the length of all the data a-priori.
637
result_version_list = []
621
638
for (version_id, raw_data), \
622
639
(version_id2, options, _, parents) in \
623
640
izip(self._data.read_records_iter_raw(copy_queue_records),
624
result_version_list):
625
assert version_id == version_id2, 'logic error, inconsistent results'
642
assert version_id == version_id2, \
643
'logic error, inconsistent results'
626
644
raw_datum.append(raw_data)
645
result_version_list.append(
646
(version_id, options, len(raw_data), parents))
647
# provide a callback to get data incrementally.
627
648
pseudo_file = StringIO(''.join(raw_datum))
628
649
def read(length):
629
650
if length is None:
749
770
# line-delta is no use unless we have its parent.
750
771
# Fetching from a broken repository with this problem
751
772
# shouldn't break the target repository.
774
# See https://bugs.launchpad.net/bzr/+bug/164443
752
775
if not self._index.has_version(parents[0]):
753
776
raise KnitCorrupt(
755
'line-delta from stream references '
756
'missing parent %s' % parents[0])
778
'line-delta from stream '
781
'missing parent %s\n'
782
'Try running "bzr check" '
783
'on the source repository, and "bzr reconcile" '
785
(version_id, parents[0]))
757
786
self._add_raw_records(
758
787
[(version_id, options, parents, length)],
759
788
reader_callable(length))
2224
2253
except AttributeError:
2256
def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
2257
"""Copy texts to the target by extracting and adding them one by one.
2259
see join() for the parameter definitions.
2261
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2262
graph = self.source.get_graph(version_ids)
2263
order = topo_sort(graph.items())
2265
def size_of_content(content):
2266
return sum(len(line) for line in content.text())
2267
# Cache at most 10MB of parent texts
2268
parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
2269
compute_size=size_of_content)
2270
# TODO: jam 20071116 It would be nice to have a streaming interface to
2271
# get multiple texts from a source. The source could be smarter
2272
# about how it handled intermediate stages.
2273
# get_line_list() or make_mpdiffs() seem like a possibility, but
2274
# at the moment they extract all full texts into memory, which
2275
# causes us to store more than our 3x fulltext goal.
2276
# Repository.iter_files_bytes() may be another possibility
2277
to_process = [version for version in order
2278
if version not in self.target]
2279
total = len(to_process)
2280
pb = ui.ui_factory.nested_progress_bar()
2282
for index, version in enumerate(to_process):
2283
pb.update('Converting versioned data', index, total)
2284
sha1, num_bytes, parent_text = self.target.add_lines(version,
2285
self.source.get_parents(version),
2286
self.source.get_lines(version),
2287
parent_texts=parent_cache)
2288
parent_cache[version] = parent_text
2227
2293
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2228
2294
"""See InterVersionedFile.join."""
2229
2295
assert isinstance(self.source, KnitVersionedFile)
2236
2302
elif self.source.factory.annotated:
2237
2303
converter = self._anno_to_plain_converter
2239
# We're converting from a plain to an annotated knit. This requires
2240
# building the annotations from scratch. The generic join code
2241
# handles this implicitly so we delegate to it.
2242
return super(InterKnit, self).join(pb, msg, version_ids,
2305
# We're converting from a plain to an annotated knit. Copy them
2306
# across by full texts.
2307
return self._copy_texts(pb, msg, version_ids, ignore_missing)
2245
2309
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2246
2310
if not version_ids: