/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/fetch.py

Change the generic fetch code to give better progress indication.

Show diffs side-by-side

added added

removed removed

Lines of Context:
62
62
# - then go through all files; for each one get the weave,
63
63
#   and add in all file versions
64
64
 
 
65
def _pb_stream_adapter(pb, msg, num_keys, stream):
 
66
    def adapter():
 
67
        for idx, record in enumerate(stream):
 
68
            pb.update(msg, idx, num_keys)
 
69
            yield record
 
70
    return adapter
 
71
 
65
72
 
66
73
class RepoFetcher(object):
67
74
    """Pull revisions and texts from one repository to another.
180
187
                    # When the inventory keys start being reported, all text
181
188
                    # keys have already been issued - and we want the text keys
182
189
                    # inserted before inventory keys: copy the texts.
183
 
                    to_texts = self.to_repository.texts
184
 
                    from_texts = self.from_repository.texts
185
 
                    to_texts.insert_record_stream(from_texts.get_record_stream(
186
 
                        text_keys, self.to_repository._fetch_order,
187
 
                        not self.to_repository._fetch_uses_deltas))
 
190
                    self._fetch_text_texts(text_keys, pb=pb)
188
191
                    # Cause an error if a text occurs after we have done the
189
192
                    # copy.
190
193
                    text_keys = None
230
233
        except errors.NoSuchRevision, e:
231
234
            raise InstallFailed([self._last_revision])
232
235
 
 
236
    def _fetch_text_texts(self, text_keys, pb):
 
237
        to_texts = self.to_repository.texts
 
238
        from_texts = self.from_repository.texts
 
239
        text_stream = from_texts.get_record_stream(text_keys,
 
240
                            self.to_repository._fetch_order,
 
241
                            not self.to_repository._fetch_uses_deltas)
 
242
        adapter = _pb_stream_adapter(pb, 'fetch text', len(text_keys),
 
243
                                     text_stream)
 
244
        to_texts.insert_record_stream(adapter())
 
245
 
233
246
    def _fetch_inventory_weave(self, revs, pb):
234
 
        pb.update("fetch inventory", 0, 2)
235
 
        to_weave = self.to_repository.inventories
236
 
        # just merge, this is optimisable and its means we don't
237
 
        # copy unreferenced data such as not-needed inventories.
238
 
        pb.update("fetch inventory", 1, 3)
239
 
        from_weave = self.from_repository.inventories
240
 
        pb.update("fetch inventory", 2, 3)
241
247
        # we fetch only the referenced inventories because we do not
242
248
        # know for unselected inventories whether all their required
243
249
        # texts are present in the other repository - it could be
244
250
        # corrupt.
245
 
        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
246
 
        try:
247
 
            if (self.from_repository._format.supports_chks and
248
 
                self.to_repository._format.supports_chks):
249
 
                self._fetch_chk_inventories(revs, child_pb)
250
 
            elif (self.from_repository._format.supports_chks or
251
 
                self.to_repository._format.supports_chks):
252
 
                # Hack to make not-chk->chk fetch: copy the inventories as
253
 
                # inventories.
254
 
                total = len(revs)
255
 
                for pos, inv in enumerate(
256
 
                    self.from_repository.iter_inventories(revs)):
257
 
                    child_pb.update("Copying inventories", pos, total)
258
 
                    self.to_repository.add_inventory(inv.revision_id, inv, [])
259
 
            else:
260
 
                to_weave.insert_record_stream(from_weave.get_record_stream(
261
 
                    [(rev_id,) for rev_id in revs],
262
 
                    self.to_repository._fetch_order,
263
 
                    not self.to_repository._fetch_uses_deltas))
264
 
        finally:
265
 
            child_pb.finished()
266
 
        pb.update("fetch inventory", 3, 3)
 
251
        if (self.from_repository._format.supports_chks and
 
252
            self.to_repository._format.supports_chks):
 
253
            self._fetch_chk_inventories(revs, pb)
 
254
        elif (self.from_repository._format.supports_chks or
 
255
              self.to_repository._format.supports_chks):
 
256
            # Hack to make not-chk->chk fetch: copy the inventories as
 
257
            # inventories.
 
258
            total = len(revs)
 
259
            for idx, inv in enumerate(
 
260
                self.from_repository.iter_inventories(revs)):
 
261
                pb.update("Copying inventories", idx, total)
 
262
                self.to_repository.add_inventory(inv.revision_id, inv, [])
 
263
        else:
 
264
            to_weave = self.to_repository.inventories
 
265
            from_weave = self.from_repository.inventories
 
266
            adapter = _pb_stream_adapter(pb, 'fetch inv', len(revs),
 
267
                from_weave.get_record_stream([(rev_id,) for rev_id in revs],
 
268
                            self.to_repository._fetch_order,
 
269
                            not self.to_repository._fetch_uses_deltas))
 
270
            to_weave.insert_record_stream(adapter())
267
271
 
268
272
    def _fetch_revision_texts(self, revs, pb):
269
273
        # fetch signatures first and then the revision texts
312
316
        uninteresting_chk_roots = set()
313
317
        interesting_chk_roots = set()
314
318
        def filter_inv_stream(inv_stream):
315
 
            for record in inv_stream:
 
319
            for idx, record in enumerate(inv_stream):
 
320
                child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
316
321
                bytes = record.get_bytes_as('fulltext')
317
322
                chk_inv = inventory.CHKInventory.deserialise(
318
323
                    self.from_repository.chk_bytes, bytes, record.key)
327
332
                    p_id_map = chk_inv.parent_id_basename_to_file_id
328
333
                    if p_id_map is not None:
329
334
                        interesting_chk_roots.add(p_id_map.key())
330
 
        self.to_repository.inventories.insert_record_stream(filter_inv_stream(inv_stream))
 
335
        pb.update('fetch inventory', 0, 2)
 
336
        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
337
        try:
 
338
            self.to_repository.inventories.insert_record_stream(
 
339
                filter_inv_stream(inv_stream))
 
340
        finally:
 
341
            child_pb.finished()
331
342
        # Now that we have worked out all of the interesting root nodes, grab
332
343
        # all of the interesting pages and insert them
333
 
        interesting = chk_map.iter_interesting_nodes(
334
 
            self.from_repository.chk_bytes, interesting_chk_roots,
335
 
            uninteresting_chk_roots, pb=pb)
336
 
        def to_stream_adapter():
337
 
            """Adapt the iter_interesting_nodes result to a single stream.
 
344
        pb.update('fetch inventory', 1, 2)
 
345
        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
346
        try:
 
347
            interesting = chk_map.iter_interesting_nodes(
 
348
                self.from_repository.chk_bytes, interesting_chk_roots,
 
349
                uninteresting_chk_roots, pb=child_pb)
 
350
            def to_stream_adapter():
 
351
                """Adapt the iter_interesting_nodes result to a single stream.
338
352
 
339
 
            iter_interesting_nodes returns records as it processes them, which
340
 
            can be in batches. But we only want a single stream to be inserted.
341
 
            """
342
 
            for record, items in interesting:
343
 
                for value in record.itervalues():
344
 
                    yield value
345
 
        # XXX: We could instead call get_record_stream(records.keys())
346
 
        #      ATM, this will always insert the records as fulltexts, and
347
 
        #      requires that you can hang on to records once you have gone
348
 
        #      on to the next one. Further, it causes the target to
349
 
        #      recompress the data. Testing shows it to be faster than
350
 
        #      requesting the records again, though.
351
 
        self.to_repository.chk_bytes.insert_record_stream(
352
 
            to_stream_adapter())
 
353
                iter_interesting_nodes returns records as it processes them, which
 
354
                can be in batches. But we only want a single stream to be inserted.
 
355
                """
 
356
                for record, items in interesting:
 
357
                    for value in record.itervalues():
 
358
                        yield value
 
359
            # XXX: We could instead call get_record_stream(records.keys())
 
360
            #      ATM, this will always insert the records as fulltexts, and
 
361
            #      requires that you can hang on to records once you have gone
 
362
            #      on to the next one. Further, it causes the target to
 
363
            #      recompress the data. Testing shows it to be faster than
 
364
            #      requesting the records again, though.
 
365
            self.to_repository.chk_bytes.insert_record_stream(
 
366
                to_stream_adapter())
 
367
        finally:
 
368
            child_pb.finished()
 
369
        pb.update('fetch inventory', 2, 2)
353
370
 
354
371
    def _generate_root_texts(self, revs):
355
372
        """This will be called by __fetch between fetching weave texts and