/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to repofmt.py

  • Committer: John Arbash Meinel
  • Date: 2009-02-26 22:41:52 UTC
  • mto: (0.20.21 trunk)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090226224152-z4jiazt0gp1vsylk
Try a different method of streaming the chk pages.
In this method, we work out what chk pages are referenced by what inventory
pages. And then fetch them based on breadth-first references.
This should mean that pages that will compress well together are
sent together, rather than in arbitrary ordering.
Note that we might want to do even a little better, and use
a list for the first time we encounter it, rather than sets everywhere.
(we still want a set to make sure we don't add it multiple times to the list)

Then again, 'unordered' may reorder it anyway, so it may not matter.
We should also consider using multiple chk streams, because it
will likely result in better compression, by forcing breaks in the
gc groups.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
    debug,
25
25
    errors,
26
26
    knit,
 
27
    inventory,
27
28
    pack,
28
29
    repository,
29
30
    ui,
64
65
##    RepositoryFormatPackDevelopment5Hash127b,
65
66
    RepositoryFormatPackDevelopment5Hash255,
66
67
    )
 
68
    from bzrlib import chk_map
67
69
    chk_support = True
68
70
except ImportError:
69
71
    chk_support = False
240
242
        self.repo.signatures._index._add_callback = self.signature_index.add_callback
241
243
        self.repo.texts._index._add_callback = self.text_index.add_callback
242
244
 
 
245
    def _get_filtered_inv_stream(self, source_vf, keys):
 
246
        """Filter the texts of inventories, to find the chk pages."""
 
247
        id_roots = set()
 
248
        p_id_roots = set()
 
249
        def _filter_inv_stream(stream):
 
250
            for idx, record in enumerate(stream):
 
251
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
 
252
                bytes = record.get_bytes_as('fulltext')
 
253
                chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
 
254
                id_roots.add(chk_inv.id_to_entry.key())
 
255
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
256
                if p_id_map is not None:
 
257
                    p_id_roots.add(p_id_map.key())
 
258
                yield record
 
259
        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
 
260
        return _filter_inv_stream(stream), id_roots, p_id_roots
 
261
 
 
262
    def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots):
 
263
        # We want to stream the keys from 'id_roots', and things they
 
264
        # reference, and then stream things from p_id_roots and things they
 
265
        # reference, and then any remaining keys that we didn't get to.
 
266
 
 
267
        # Note: We probably actually want multiple streams here, to help the
 
268
        #       client understand that the different levels won't compress well
 
269
        #       against eachother
 
270
        remaining_keys = set(keys)
 
271
        def _get_referenced_stream(root_keys):
 
272
            cur_keys = root_keys
 
273
            while cur_keys:
 
274
                remaining_keys.difference_update(cur_keys)
 
275
                next_keys = set()
 
276
                stream = source_vf.get_record_stream(cur_keys, 'unordered',
 
277
                                                     True)
 
278
                for record in stream:
 
279
                    bytes = record.get_bytes_as('fulltext')
 
280
                    # We don't care about search_key_func for this code,
 
281
                    # because we only care about external references.
 
282
                    node = chk_map._deserialise(bytes, record.key,
 
283
                                                search_key_func=None)
 
284
                    next_keys.update(node.refs())
 
285
                    yield record
 
286
                cur_keys = next_keys.intersection(remaining_keys)
 
287
        for record in _get_referenced_stream(id_roots):
 
288
            yield record
 
289
        for record in _get_referenced_stream(p_id_roots):
 
290
            yield record
 
291
        if remaining_keys:
 
292
            trace.note('There were %d keys in the chk index, which'
 
293
                       ' were not referenced from inventories',
 
294
                       len(remaining_keys))
 
295
            stream = source_vf.get_record_stream(remaining_keys, 'unordered',
 
296
                                                 True)
 
297
            for record in stream:
 
298
                yield record
 
299
 
243
300
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
244
301
                                 reload_func=None):
245
302
        """Execute a series of pack operations.
275
332
            #       issue is that pages that are similar are not transmitted
276
333
            #       together. Perhaps get_record_stream('gc-optimal') should be
277
334
            #       taught about how to group chk pages?
 
335
            has_chk = False
278
336
            if getattr(self, 'chk_index', None) is not None:
 
337
                has_chk = True
279
338
                to_copy.insert(2, ('chk_index', 'chk_bytes'))
280
339
 
281
340
            # Shouldn't we start_write_group around this?
310
369
                                      is_locked=self.repo.is_locked),
311
370
                        access=target_access,
312
371
                        delta=source_vf._delta)
313
 
                    stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
 
372
                    stream = None
 
373
                    if has_chk:
 
374
                        if vf_name == 'inventories':
 
375
                            stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
 
376
                                source_vf, keys)
 
377
                        elif vf_name == 'chk_bytes':
 
378
                            stream = self._get_chk_stream(source_vf, keys,
 
379
                                                          id_roots, p_id_roots)
 
380
                    if stream is None:
 
381
                        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
314
382
                    target_vf.insert_record_stream(stream)
315
383
                new_pack._check_references() # shouldn't be needed
316
384
            except: