140
140
# item_keys_introduced_by should have a richer API than it does at the
141
141
# moment, so that it can feed the progress information back to this
143
if (self.from_repository._format.rich_root_data and
144
not self.to_repository._format.rich_root_data):
145
raise errors.IncompatibleRepositories(
146
self.from_repository, self.to_repository,
147
"different rich-root support")
143
148
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
150
source = self.from_repository._get_source(
151
self.to_repository._format)
152
stream = source.get_stream(search)
145
153
from_format = self.from_repository._format
146
stream = self.get_stream(search, pp)
147
154
resume_tokens, missing_keys = self.sink.insert_stream(
148
155
stream, from_format, [])
150
stream = self.get_stream_for_missing_keys(missing_keys)
157
stream = source.get_stream_for_missing_keys(missing_keys)
151
158
resume_tokens, missing_keys = self.sink.insert_stream(
152
159
stream, from_format, resume_tokens)
159
166
"second push failed to commit the fetch %r." % (
161
168
self.sink.finished()
169
self.count_copied = source.count_copied
163
171
if self.pb is not None:
164
172
self.pb.finished()
166
def get_stream(self, search, pp):
168
revs = search.get_keys()
169
graph = self.from_repository.get_graph()
170
revs = list(graph.iter_topo_order(revs))
171
data_to_fetch = self.from_repository.item_keys_introduced_by(
174
for knit_kind, file_id, revisions in data_to_fetch:
175
if knit_kind != phase:
177
# Make a new progress bar for this phase
180
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
181
if knit_kind == "file":
182
# Accumulate file texts
183
text_keys.extend([(file_id, revision) for revision in
185
elif knit_kind == "inventory":
186
# Now copy the file texts.
187
from_texts = self.from_repository.texts
188
yield ('texts', from_texts.get_record_stream(
189
text_keys, self.to_repository._format._fetch_order,
190
not self.to_repository._format._fetch_uses_deltas))
191
# Cause an error if a text occurs after we have done the
194
# Before we process the inventory we generate the root
195
# texts (if necessary) so that the inventories references
197
for _ in self._generate_root_texts(revs):
199
# NB: This currently reopens the inventory weave in source;
200
# using a single stream interface instead would avoid this.
201
self.pb.update("fetch inventory", 0, 1)
202
from_weave = self.from_repository.inventories
203
# we fetch only the referenced inventories because we do not
204
# know for unselected inventories whether all their required
205
# texts are present in the other repository - it could be
207
yield ('inventories', from_weave.get_record_stream(
208
[(rev_id,) for rev_id in revs],
209
self.inventory_fetch_order(),
210
not self.delta_on_metadata()))
211
elif knit_kind == "signatures":
212
# Nothing to do here; this will be taken care of when
213
# _fetch_revision_texts happens.
215
elif knit_kind == "revisions":
216
for _ in self._fetch_revision_texts(revs, self.pb):
219
raise AssertionError("Unknown knit kind %r" % knit_kind)
221
def get_stream_for_missing_keys(self, missing_keys):
222
# missing keys can only occur when we are byte copying and not
223
# translating (because translation means we don't send
224
# unreconstructable deltas ever).
226
keys['texts'] = set()
227
keys['revisions'] = set()
228
keys['inventories'] = set()
229
keys['signatures'] = set()
230
for key in missing_keys:
231
keys[key[0]].add(key[1:])
232
if len(keys['revisions']):
233
# If we allowed copying revisions at this point, we could end up
234
# copying a revision without copying its required texts: a
235
# violation of the requirements for repository integrity.
236
raise AssertionError(
237
'cannot copy revisions to fill in missing deltas %s' % (
239
for substream_kind, keys in keys.iteritems():
240
vf = getattr(self.from_repository, substream_kind)
241
# Ask for full texts always so that we don't need more round trips
243
stream = vf.get_record_stream(keys,
244
self.to_repository._format._fetch_order, True)
245
yield substream_kind, stream
247
174
def _revids_to_fetch(self):
248
175
"""Determines the exact revisions needed from self.from_repository to
249
176
install self._last_revision in self.to_repository.
264
191
except errors.NoSuchRevision, e:
265
192
raise InstallFailed([self._last_revision])
267
def _fetch_revision_texts(self, revs, pb):
268
# fetch signatures first and then the revision texts
269
# may need to be a InterRevisionStore call here.
270
from_sf = self.from_repository.signatures
271
# A missing signature is just skipped.
272
keys = [(rev_id,) for rev_id in revs]
273
signatures = filter_absent(from_sf.get_record_stream(
275
self.to_repository._format._fetch_order,
276
not self.to_repository._format._fetch_uses_deltas))
277
# If a revision has a delta, this is actually expanded inside the
278
# insert_record_stream code now, which is an alternate fix for
280
from_rf = self.from_repository.revisions
281
revisions = from_rf.get_record_stream(
283
self.to_repository._format._fetch_order,
284
not self.delta_on_metadata())
285
return [('signatures', signatures), ('revisions', revisions)]
287
def _generate_root_texts(self, revs):
288
"""This will be called by __fetch between fetching weave texts and
289
fetching the inventory weave.
291
Subclasses should override this if they need to generate root texts
292
after fetching weave texts.
296
def inventory_fetch_order(self):
297
return self.to_repository._format._fetch_order
299
def delta_on_metadata(self):
300
src_serializer = self.from_repository._format._serializer
301
target_serializer = self.to_repository._format._serializer
302
return (self.to_repository._format._fetch_uses_deltas and
303
src_serializer == target_serializer)
306
195
class Inter1and2Helper(object):
307
196
"""Helper for operations that convert data from model 1 and 2
390
279
rev_id_to_root_id.get(parent, root_id) == root_id)
391
280
yield FulltextContentFactory(key, parent_keys, None, '')
392
281
return [('texts', yield_roots())]
395
class Model1toKnit2Fetcher(RepoFetcher):
396
"""Fetch from a Model1 repository into a Knit2 repository
398
def __init__(self, to_repository, from_repository, last_revision=None,
399
pb=None, find_ghosts=True):
400
self.helper = Inter1and2Helper(from_repository)
401
RepoFetcher.__init__(self, to_repository, from_repository,
402
last_revision, pb, find_ghosts)
404
def _generate_root_texts(self, revs):
405
return self.helper.generate_root_texts(revs)
407
def inventory_fetch_order(self):
410
Knit1to2Fetcher = Model1toKnit2Fetcher