164
164
# item_keys_introduced_by should have a richer API than it does at the
165
165
# moment, so that it can feed the progress information back to this
167
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
169
from_format = self.from_repository._format
170
stream = self.get_stream(search, pp)
171
missing_keys = self.sink.insert_stream(stream, from_format)
173
stream = self.get_stream_for_missing_keys(missing_keys)
174
missing_keys = self.sink.insert_stream(stream, from_format)
176
raise AssertionError(
177
"second push failed to complete a fetch %r." % (
181
if self.pb is not None:
184
def get_stream(self, search, pp):
168
pb = bzrlib.ui.ui_factory.nested_progress_bar()
170
from_repo = self.from_repository
171
revs = search.get_keys()
172
revs = list(from_repo.get_graph().iter_topo_order(revs))
173
data_to_fetch = from_repo.item_keys_introduced_by(revs, pb)
175
for knit_kind, file_id, revisions in data_to_fetch:
176
if knit_kind != phase:
178
# Make a new progress bar for this phase
181
pb = bzrlib.ui.ui_factory.nested_progress_bar()
182
if knit_kind == "file":
183
# Accumulate file texts
184
text_keys.extend([(file_id, revision) for revision in
186
elif knit_kind == "inventory":
187
# When the inventory keys start being reported, all text
188
# keys have already been issued - and we want the text keys
189
# inserted before inventory keys: copy the texts.
190
self._fetch_text_texts(text_keys, pb=pb)
191
# Cause an error if a text occurs after we have done the
194
# Before we process the inventory we generate the root
195
# texts (if necessary) so that the inventories references
197
self._generate_root_texts(revisions)
198
# NB: This currently reopens the inventory weave in source;
199
# using a single stream interface instead would avoid this.
200
self._fetch_inventory_weave(revisions, pb)
201
elif knit_kind == "signatures":
202
# Nothing to do here; this will be taken care of when
203
# _fetch_revision_texts happens.
205
elif knit_kind == "revisions":
206
self._fetch_revision_texts(revisions, pb)
207
self.count_copied += len(revisions)
209
raise AssertionError("Unknown knit kind %r" % knit_kind)
210
if self.to_repository._fetch_reconcile:
211
self.to_repository.reconcile()
186
revs = search.get_keys()
187
graph = self.from_repository.get_graph()
188
revs = list(graph.iter_topo_order(revs))
189
data_to_fetch = self.from_repository.item_keys_introduced_by(
192
for knit_kind, file_id, revisions in data_to_fetch:
193
if knit_kind != phase:
195
# Make a new progress bar for this phase
198
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
199
if knit_kind == "file":
200
# Accumulate file texts
201
text_keys.extend([(file_id, revision) for revision in
203
elif knit_kind == "inventory":
204
# Now copy the file texts.
205
to_texts = self.to_repository.texts
206
from_texts = self.from_repository.texts
207
yield ('texts', from_texts.get_record_stream(
208
text_keys, self.to_repository._fetch_order,
209
not self.to_repository._fetch_uses_deltas))
210
# Cause an error if a text occurs after we have done the
213
# Before we process the inventory we generate the root
214
# texts (if necessary) so that the inventories references
216
for _ in self._generate_root_texts(revs):
218
# NB: This currently reopens the inventory weave in source;
219
# using a single stream interface instead would avoid this.
220
self.pb.update("fetch inventory", 0, 1)
221
# we fetch only the referenced inventories because we do not
222
# know for unselected inventories whether all their required
223
# texts are present in the other repository - it could be
225
for info in self._get_inventory_stream(revs):
227
elif knit_kind == "signatures":
228
# Nothing to do here; this will be taken care of when
229
# _fetch_revision_texts happens.
231
elif knit_kind == "revisions":
232
for _ in self._fetch_revision_texts(revs, self.pb):
235
raise AssertionError("Unknown knit kind %r" % knit_kind)
236
self.count_copied += len(revs)
238
def get_stream_for_missing_keys(self, missing_keys):
239
# missing keys can only occur when we are byte copying and not
240
# translating (because translation means we don't send
241
# unreconstructable deltas ever).
243
keys['texts'] = set()
244
keys['revisions'] = set()
245
keys['inventories'] = set()
246
keys['signatures'] = set()
247
for key in missing_keys:
248
keys[key[0]].add(key[1:])
249
if len(keys['revisions']):
250
# If we allowed copying revisions at this point, we could end up
251
# copying a revision without copying its required texts: a
252
# violation of the requirements for repository integrity.
253
raise AssertionError(
254
'cannot copy revisions to fill in missing deltas %s' % (
256
for substream_kind, keys in keys.iteritems():
257
vf = getattr(self.from_repository, substream_kind)
258
# Ask for full texts always so that we don't need more round trips
260
stream = vf.get_record_stream(keys,
261
self.to_repository._fetch_order, True)
262
yield substream_kind, stream
216
264
def _revids_to_fetch(self):
217
265
"""Determines the exact revisions needed from self.from_repository to
218
266
install self._last_revision in self.to_repository.
233
281
except errors.NoSuchRevision, e:
234
282
raise InstallFailed([self._last_revision])
236
def _fetch_text_texts(self, text_keys, pb):
237
to_texts = self.to_repository.texts
238
from_texts = self.from_repository.texts
239
text_stream = from_texts.get_record_stream(text_keys,
240
self.to_repository._fetch_order,
241
not self.to_repository._fetch_uses_deltas)
242
adapter = _pb_stream_adapter(pb, 'fetch text', len(text_keys),
244
to_texts.insert_record_stream(adapter())
246
def _fetch_inventory_weave(self, revs, pb):
247
# we fetch only the referenced inventories because we do not
248
# know for unselected inventories whether all their required
249
# texts are present in the other repository - it could be
284
def _get_inventory_stream(self, revision_ids):
251
285
if (self.from_repository._format.supports_chks and
252
self.to_repository._format.supports_chks):
253
self._fetch_chk_inventories(revs, pb)
254
elif (self.from_repository._format.supports_chks or
255
self.to_repository._format.supports_chks):
256
# Hack to make not-chk->chk fetch: copy the inventories as
259
for idx, inv in enumerate(
260
self.from_repository.iter_inventories(revs)):
261
pb.update("Copying inventories", idx, total)
262
self.to_repository.add_inventory(inv.revision_id, inv, [])
286
self.to_repository._format.supports_chks
287
and (self.from_repository._format._serializer
288
== self.to_repository._format._serializer)):
289
# Both sides support chks, and they use the same serializer, so it
290
# is safe to transmit the chk pages and inventory pages across
292
return self._get_chk_inventory_stream(revision_ids)
293
elif (not self.from_repository._format.supports_chks):
294
# Source repository doesn't support chks. So we can transmit the
295
# inventories 'as-is' and either they are just accepted on the
296
# target, or the Sink will properly convert it.
297
return self._get_simple_inventory_stream(revision_ids)
264
to_weave = self.to_repository.inventories
265
from_weave = self.from_repository.inventories
266
adapter = _pb_stream_adapter(pb, 'fetch inv', len(revs),
267
from_weave.get_record_stream([(rev_id,) for rev_id in revs],
268
self.to_repository._fetch_order,
269
not self.to_repository._fetch_uses_deltas))
270
to_weave.insert_record_stream(adapter())
272
def _fetch_revision_texts(self, revs, pb):
273
# fetch signatures first and then the revision texts
274
# may need to be a InterRevisionStore call here.
275
to_sf = self.to_repository.signatures
276
from_sf = self.from_repository.signatures
277
# A missing signature is just skipped.
278
to_sf.insert_record_stream(filter_absent(from_sf.get_record_stream(
279
[(rev_id,) for rev_id in revs],
280
self.to_repository._fetch_order,
281
not self.to_repository._fetch_uses_deltas)))
282
self._fetch_just_revision_texts(revs)
284
def _fetch_just_revision_texts(self, version_ids):
285
to_rf = self.to_repository.revisions
286
from_rf = self.from_repository.revisions
287
# If a revision has a delta, this is actually expanded inside the
288
# insert_record_stream code now, which is an alternate fix for
290
to_rf.insert_record_stream(from_rf.get_record_stream(
291
[(rev_id,) for rev_id in version_ids],
292
self.to_repository._fetch_order,
293
not self.to_repository._fetch_uses_deltas))
295
def _fetch_chk_inventories(self, revs, pb):
299
# XXX: Hack to make not-chk->chk fetch: copy the inventories as
300
# inventories. Note that this should probably be done somehow
301
# as part of bzrlib.repository.StreamSink. Except JAM couldn't
302
# figure out how a non-chk repository could possibly handle
303
# deserializing an inventory stream from a chk repo, as it
304
# doesn't have a way to understand individual pages.
305
return self._get_convertable_inventory_stream(revision_ids)
307
def _get_simple_inventory_stream(self, revision_ids):
308
from_weave = self.from_repository.inventories
309
yield ('inventories', from_weave.get_record_stream(
310
[(rev_id,) for rev_id in revision_ids],
311
self.inventory_fetch_order(),
312
not self.delta_on_metadata()))
314
def _get_chk_inventory_stream(self, revision_ids):
296
315
"""Fetch the inventory texts, along with the associated chk maps."""
297
316
from bzrlib import inventory, chk_map
298
317
# We want an inventory outside of the search set, so that we can filter
299
318
# out uninteresting chk pages. For now we use
300
319
# _find_revision_outside_set, but if we had a Search with cut_revs, we
301
320
# could use that instead.
302
start_rev_id = self.from_repository._find_revision_outside_set(revs)
321
start_rev_id = self.from_repository._find_revision_outside_set(
303
323
start_rev_key = (start_rev_id,)
304
inv_keys_to_fetch = [(rev_id,) for rev_id in revs]
324
inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
305
325
if start_rev_id != NULL_REVISION:
306
326
inv_keys_to_fetch.append((start_rev_id,))
307
327
# Any repo that supports chk_bytes must also support out-of-order
332
352
p_id_map = chk_inv.parent_id_basename_to_file_id
333
353
if p_id_map is not None:
334
354
interesting_chk_roots.add(p_id_map.key())
335
pb.update('fetch inventory', 0, 2)
336
child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
338
self.to_repository.inventories.insert_record_stream(
339
filter_inv_stream(inv_stream))
355
### pb.update('fetch inventory', 0, 2)
356
yield ('inventories', filter_inv_stream(inv_stream))
342
357
# Now that we have worked out all of the interesting root nodes, grab
343
358
# all of the interesting pages and insert them
344
pb.update('fetch inventory', 1, 2)
345
child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
347
interesting = chk_map.iter_interesting_nodes(
348
self.from_repository.chk_bytes, interesting_chk_roots,
349
uninteresting_chk_roots, pb=child_pb)
350
def to_stream_adapter():
351
"""Adapt the iter_interesting_nodes result to a single stream.
353
iter_interesting_nodes returns records as it processes them, which
354
can be in batches. But we only want a single stream to be inserted.
356
for record, items in interesting:
357
for value in record.itervalues():
359
# XXX: We could instead call get_record_stream(records.keys())
360
# ATM, this will always insert the records as fulltexts, and
361
# requires that you can hang on to records once you have gone
362
# on to the next one. Further, it causes the target to
363
# recompress the data. Testing shows it to be faster than
364
# requesting the records again, though.
365
self.to_repository.chk_bytes.insert_record_stream(
369
pb.update('fetch inventory', 2, 2)
359
### pb.update('fetch inventory', 1, 2)
360
interesting = chk_map.iter_interesting_nodes(
361
self.from_repository.chk_bytes, interesting_chk_roots,
362
uninteresting_chk_roots)
363
def to_stream_adapter():
364
"""Adapt the iter_interesting_nodes result to a single stream.
366
iter_interesting_nodes returns records as it processes them, which
367
can be in batches. But we only want a single stream to be inserted.
369
for record, items in interesting:
370
for value in record.itervalues():
372
# XXX: We could instead call get_record_stream(records.keys())
373
# ATM, this will always insert the records as fulltexts, and
374
# requires that you can hang on to records once you have gone
375
# on to the next one. Further, it causes the target to
376
# recompress the data. Testing shows it to be faster than
377
# requesting the records again, though.
378
yield ('chk_bytes', to_stream_adapter())
379
### pb.update('fetch inventory', 2, 2)
381
def _get_convertable_inventory_stream(self, revision_ids):
382
# XXX: One of source or target is using chks, and they don't have
383
# compatible serializations. The StreamSink code expects to be
384
# able to convert on the target, so we need to put
385
# bytes-on-the-wire that can be converted
386
yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
388
def _stream_invs_as_fulltexts(self, revision_ids):
389
from_serializer = self.from_repository._format._serializer
390
revision_keys = [(rev_id,) for rev_id in revision_ids]
391
parent_map = self.from_repository.inventory.get_parent_map(revision_keys)
392
for inv in self.from_repository.iter_inventories(revision_ids):
393
# XXX: This is a bit hackish, but it works. Basically,
394
# CHKSerializer 'accidentally' supports
395
# read/write_inventory_to_string, even though that is never
396
# the format that is stored on disk. It *does* give us a
397
# single string representation for an inventory, so live with
399
# This would be far better if we had a 'serialized inventory
400
# delta' form. Then we could use 'inventory._make_delta', and
401
# transmit that. This would both be faster to generate, and
402
# result in fewer bytes-on-the-wire.
403
as_bytes = from_serializer.write_inventory_to_string(inv)
404
key = (inv.revision_id,)
405
parent_keys = parent_map.get(key, ())
406
yield FulltextContentFactory(key, parent_keys, None, as_bytes)
408
def _fetch_revision_texts(self, revs, pb):
409
# fetch signatures first and then the revision texts
410
# may need to be a InterRevisionStore call here.
411
from_sf = self.from_repository.signatures
412
# A missing signature is just skipped.
413
keys = [(rev_id,) for rev_id in revs]
414
signatures = filter_absent(from_sf.get_record_stream(
416
self.to_repository._fetch_order,
417
not self.to_repository._fetch_uses_deltas))
418
# If a revision has a delta, this is actually expanded inside the
419
# insert_record_stream code now, which is an alternate fix for
421
from_rf = self.from_repository.revisions
422
revisions = from_rf.get_record_stream(
424
self.to_repository._fetch_order,
425
not self.delta_on_metadata())
426
return [('signatures', signatures), ('revisions', revisions)]
371
428
def _generate_root_texts(self, revs):
372
429
"""This will be called by __fetch between fetching weave texts and
467
530
if parent != NULL_REVISION and
468
531
rev_id_to_root_id.get(parent, root_id) == root_id)
469
532
yield FulltextContentFactory(key, parent_keys, None, '')
470
to_texts.insert_record_stream(yield_roots())
472
def regenerate_inventory(self, revs):
473
"""Generate a new inventory versionedfile in target, convertin data.
475
The inventory is retrieved from the source, (deserializing it), and
476
stored in the target (reserializing it in a different format).
477
:param revs: The revisions to include
479
for tree in self.iter_rev_trees(revs):
480
parents = tree.get_parent_ids()
481
self.target.add_inventory(tree.get_revision_id(), tree.inventory,
484
def fetch_revisions(self, revision_ids):
485
# TODO: should this batch them up rather than requesting 10,000
487
for revision in self.source.get_revisions(revision_ids):
488
self.target.add_revision(revision.revision_id, revision)
533
return [('texts', yield_roots())]
491
536
class Model1toKnit2Fetcher(RepoFetcher):
494
539
def __init__(self, to_repository, from_repository, last_revision=None,
495
540
pb=None, find_ghosts=True):
496
self.helper = Inter1and2Helper(from_repository, to_repository)
497
RepoFetcher.__init__(self, to_repository, from_repository,
498
last_revision, pb, find_ghosts)
500
def _generate_root_texts(self, revs):
501
self.helper.generate_root_texts(revs)
503
def _fetch_inventory_weave(self, revs, pb):
504
self.helper.regenerate_inventory(revs)
506
def _fetch_revision_texts(self, revs, pb):
507
"""Fetch revision object texts"""
511
pb.update('copying revisions', count, total)
513
sig_text = self.from_repository.get_signature_text(rev)
514
self.to_repository.add_signature_text(rev, sig_text)
515
except errors.NoSuchRevision:
518
self._copy_revision(rev)
521
def _copy_revision(self, rev):
522
self.helper.fetch_revisions([rev])
525
class Knit1to2Fetcher(RepoFetcher):
526
"""Fetch from a Knit1 repository into a Knit2 repository"""
528
def __init__(self, to_repository, from_repository, last_revision=None,
529
pb=None, find_ghosts=True):
530
self.helper = Inter1and2Helper(from_repository, to_repository)
531
RepoFetcher.__init__(self, to_repository, from_repository,
532
last_revision, pb, find_ghosts)
534
def _generate_root_texts(self, revs):
535
self.helper.generate_root_texts(revs)
537
def _fetch_inventory_weave(self, revs, pb):
538
self.helper.regenerate_inventory(revs)
540
def _fetch_just_revision_texts(self, version_ids):
541
self.helper.fetch_revisions(version_ids)
541
self.helper = Inter1and2Helper(from_repository)
542
RepoFetcher.__init__(self, to_repository, from_repository,
543
last_revision, pb, find_ghosts)
545
def _generate_root_texts(self, revs):
546
return self.helper.generate_root_texts(revs)
548
def inventory_fetch_order(self):
551
Knit1to2Fetcher = Model1toKnit2Fetcher