162
162
from_format = self.from_repository._format
163
163
stream = self.get_stream(search, pp)
164
self.sink.insert_stream(stream, from_format)
164
missing_keys = self.sink.insert_stream(stream, from_format)
166
stream = self.get_stream_for_missing_keys(missing_keys)
167
missing_keys = self.sink.insert_stream(stream, from_format)
169
raise AssertionError(
170
"second push failed to complete a fetch %r." % (
165
172
self.sink.finished()
167
174
if self.pb is not None:
168
175
self.pb.finished()
170
177
def get_stream(self, search, pp):
172
179
revs = search.get_keys()
224
231
raise AssertionError("Unknown knit kind %r" % knit_kind)
225
232
self.count_copied += len(revs)
234
def get_stream_for_missing_keys(self, missing_keys):
235
# missing keys can only occur when we are byte copying and not
236
# translating (because translation means we don't send
237
# unreconstructable deltas ever).
239
keys['texts'] = set()
240
keys['revisions'] = set()
241
keys['inventories'] = set()
242
keys['signatures'] = set()
243
for key in missing_keys:
244
keys[key[0]].add(key[1:])
245
if len(keys['revisions']):
246
# If we allowed copying revisions at this point, we could end up
247
# copying a revision without copying its required texts: a
248
# violation of the requirements for repository integrity.
249
raise AssertionError(
250
'cannot copy revisions to fill in missing deltas %s' % (
252
for substream_kind, keys in keys.iteritems():
253
vf = getattr(self.from_repository, substream_kind)
254
# Ask for full texts always so that we don't need more round trips
256
stream = vf.get_record_stream(keys,
257
self.to_repository._fetch_order, True)
258
yield substream_kind, stream
227
260
def _revids_to_fetch(self):
228
261
"""Determines the exact revisions needed from self.from_repository to
229
262
install self._last_revision in self.to_repository.