1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Core compression logic for compressing streams of related files."""
22
from ..lazy_import import lazy_import
23
lazy_import(globals(), """
33
from breezy.bzr import (
39
from breezy.i18n import gettext
45
from .btree_index import BTreeBuilder
46
from ..lru_cache import LRUSizeCache
47
from .versionedfile import (
51
ChunkedContentFactory,
53
FulltextContentFactory,
54
VersionedFilesWithFallbacks,
55
UnavailableRepresentation,
58
# Minimum number of uncompressed bytes to try fetch at once when retrieving
59
# groupcompress blocks.
62
# osutils.sha_string(b'')
63
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
66
def sort_gc_optimal(parent_map):
67
"""Sort and group the keys in parent_map into groupcompress order.
69
groupcompress is defined (currently) as reverse-topological order, grouped
72
:return: A sorted-list of keys
74
# groupcompress ordering is approximately reverse topological,
75
# properly grouped by file-id.
77
for key, value in parent_map.items():
78
if isinstance(key, bytes) or len(key) == 1:
83
per_prefix_map[prefix][key] = value
85
per_prefix_map[prefix] = {key: value}
88
for prefix in sorted(per_prefix_map):
89
present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
93
class DecompressCorruption(errors.BzrError):
95
_fmt = "Corruption while decompressing repository file%(orig_error)s"
97
def __init__(self, orig_error=None):
98
if orig_error is not None:
99
self.orig_error = ", %s" % (orig_error,)
102
errors.BzrError.__init__(self)
105
# The max zlib window size is 32kB, so if we set 'max_size' output of the
106
# decompressor to the requested bytes + 32kB, then we should guarantee
107
# num_bytes coming out.
108
_ZLIB_DECOMP_WINDOW = 32 * 1024
111
class GroupCompressBlock(object):
112
"""An object which maintains the internal structure of the compressed data.
114
This tracks the meta info (start of text, length, type, etc.)
117
# Group Compress Block v1 Zlib
118
GCB_HEADER = b'gcb1z\n'
119
# Group Compress Block v1 Lzma
120
GCB_LZ_HEADER = b'gcb1l\n'
121
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
124
# map by key? or just order in file?
125
self._compressor_name = None
126
self._z_content_chunks = None
127
self._z_content_decompressor = None
128
self._z_content_length = None
129
self._content_length = None
131
self._content_chunks = None
134
# This is the maximum number of bytes this object will reference if
135
# everything is decompressed. However, if we decompress less than
136
# everything... (this would cause some problems for LRUSizeCache)
137
return self._content_length + self._z_content_length
139
def _ensure_content(self, num_bytes=None):
140
"""Make sure that content has been expanded enough.
142
:param num_bytes: Ensure that we have extracted at least num_bytes of
143
content. If None, consume everything
145
if self._content_length is None:
146
raise AssertionError('self._content_length should never be None')
147
if num_bytes is None:
148
num_bytes = self._content_length
149
elif (self._content_length is not None
150
and num_bytes > self._content_length):
151
raise AssertionError(
152
'requested num_bytes (%d) > content length (%d)'
153
% (num_bytes, self._content_length))
154
# Expand the content if required
155
if self._content is None:
156
if self._content_chunks is not None:
157
self._content = b''.join(self._content_chunks)
158
self._content_chunks = None
159
if self._content is None:
160
# We join self._z_content_chunks here, because if we are
161
# decompressing, then it is *very* likely that we have a single
163
if self._z_content_chunks is None:
164
raise AssertionError('No content to decompress')
165
z_content = b''.join(self._z_content_chunks)
168
elif self._compressor_name == 'lzma':
169
# We don't do partial lzma decomp yet
171
self._content = pylzma.decompress(z_content)
172
elif self._compressor_name == 'zlib':
173
# Start a zlib decompressor
174
if num_bytes * 4 > self._content_length * 3:
175
# If we are requesting more that 3/4ths of the content,
176
# just extract the whole thing in a single pass
177
num_bytes = self._content_length
178
self._content = zlib.decompress(z_content)
180
self._z_content_decompressor = zlib.decompressobj()
181
# Seed the decompressor with the uncompressed bytes, so
182
# that the rest of the code is simplified
183
self._content = self._z_content_decompressor.decompress(
184
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
185
if not self._z_content_decompressor.unconsumed_tail:
186
self._z_content_decompressor = None
188
raise AssertionError('Unknown compressor: %r'
189
% self._compressor_name)
190
# Any bytes remaining to be decompressed will be in the decompressors
193
# Do we have enough bytes already?
194
if len(self._content) >= num_bytes:
196
# If we got this far, and don't have a decompressor, something is wrong
197
if self._z_content_decompressor is None:
198
raise AssertionError(
199
'No decompressor to decompress %d bytes' % num_bytes)
200
remaining_decomp = self._z_content_decompressor.unconsumed_tail
201
if not remaining_decomp:
202
raise AssertionError('Nothing left to decompress')
203
needed_bytes = num_bytes - len(self._content)
204
# We always set max_size to 32kB over the minimum needed, so that
205
# zlib will give us as much as we really want.
206
# TODO: If this isn't good enough, we could make a loop here,
207
# that keeps expanding the request until we get enough
208
self._content += self._z_content_decompressor.decompress(
209
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
210
if len(self._content) < num_bytes:
211
raise AssertionError('%d bytes wanted, only %d available'
212
% (num_bytes, len(self._content)))
213
if not self._z_content_decompressor.unconsumed_tail:
214
# The stream is finished
215
self._z_content_decompressor = None
217
def _parse_bytes(self, data, pos):
218
"""Read the various lengths from the header.
220
This also populates the various 'compressed' buffers.
222
:return: The position in bytes just after the last newline
224
# At present, we have 2 integers for the compressed and uncompressed
225
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
226
# checking too far, cap the search to 14 bytes.
227
pos2 = data.index(b'\n', pos, pos + 14)
228
self._z_content_length = int(data[pos:pos2])
230
pos2 = data.index(b'\n', pos, pos + 14)
231
self._content_length = int(data[pos:pos2])
233
if len(data) != (pos + self._z_content_length):
234
# XXX: Define some GCCorrupt error ?
235
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
236
(len(data), pos, self._z_content_length))
237
self._z_content_chunks = (data[pos:],)
240
def _z_content(self):
241
"""Return z_content_chunks as a simple string.
243
Meant only to be used by the test suite.
245
if self._z_content_chunks is not None:
246
return b''.join(self._z_content_chunks)
250
def from_bytes(cls, bytes):
253
if header not in cls.GCB_KNOWN_HEADERS:
254
raise ValueError('bytes did not start with any of %r'
255
% (cls.GCB_KNOWN_HEADERS,))
256
if header == cls.GCB_HEADER:
257
out._compressor_name = 'zlib'
258
elif header == cls.GCB_LZ_HEADER:
259
out._compressor_name = 'lzma'
261
raise ValueError('unknown compressor: %r' % (header,))
262
out._parse_bytes(bytes, 6)
265
def extract(self, key, start, end, sha1=None):
266
"""Extract the text for a specific key.
268
:param key: The label used for this content
269
:param sha1: TODO (should we validate only when sha1 is supplied?)
270
:return: The bytes for the content
272
if start == end == 0:
274
self._ensure_content(end)
275
# The bytes are 'f' or 'd' for the type, then a variable-length
276
# base128 integer for the content size, then the actual content
277
# We know that the variable-length integer won't be longer than 5
278
# bytes (it takes 5 bytes to encode 2^32)
279
c = self._content[start:start + 1]
284
raise ValueError('Unknown content control code: %s'
287
content_len, len_len = decode_base128_int(
288
self._content[start + 1:start + 6])
289
content_start = start + 1 + len_len
290
if end != content_start + content_len:
291
raise ValueError('end != len according to field header'
292
' %s != %s' % (end, content_start + content_len))
294
return [self._content[content_start:end]]
295
# Must be type delta as checked above
296
return [apply_delta_to_source(self._content, content_start, end)]
298
def set_chunked_content(self, content_chunks, length):
299
"""Set the content of this block to the given chunks."""
300
# If we have lots of short lines, it is may be more efficient to join
301
# the content ahead of time. If the content is <10MiB, we don't really
302
# care about the extra memory consumption, so we can just pack it and
303
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
304
# mysql, which is below the noise margin
305
self._content_length = length
306
self._content_chunks = content_chunks
308
self._z_content_chunks = None
310
def set_content(self, content):
311
"""Set the content of this block."""
312
self._content_length = len(content)
313
self._content = content
314
self._z_content_chunks = None
316
def _create_z_content_from_chunks(self, chunks):
317
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
318
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
319
# (measured peak is maybe 30MB over the above...)
320
compressed_chunks = list(map(compressor.compress, chunks))
321
compressed_chunks.append(compressor.flush())
322
# Ignore empty chunks
323
self._z_content_chunks = [c for c in compressed_chunks if c]
324
self._z_content_length = sum(map(len, self._z_content_chunks))
326
def _create_z_content(self):
327
if self._z_content_chunks is not None:
329
if self._content_chunks is not None:
330
chunks = self._content_chunks
332
chunks = (self._content,)
333
self._create_z_content_from_chunks(chunks)
336
"""Create the byte stream as a series of 'chunks'"""
337
self._create_z_content()
338
header = self.GCB_HEADER
339
chunks = [b'%s%d\n%d\n'
340
% (header, self._z_content_length, self._content_length),
342
chunks.extend(self._z_content_chunks)
343
total_len = sum(map(len, chunks))
344
return total_len, chunks
347
"""Encode the information into a byte stream."""
348
total_len, chunks = self.to_chunks()
349
return b''.join(chunks)
351
def _dump(self, include_text=False):
352
"""Take this block, and spit out a human-readable structure.
354
:param include_text: Inserts also include text bits, chose whether you
355
want this displayed in the dump or not.
356
:return: A dump of the given block. The layout is something like:
357
[('f', length), ('d', delta_length, text_length, [delta_info])]
358
delta_info := [('i', num_bytes, text), ('c', offset, num_bytes),
361
self._ensure_content()
364
while pos < self._content_length:
365
kind = self._content[pos:pos + 1]
367
if kind not in (b'f', b'd'):
368
raise ValueError('invalid kind character: %r' % (kind,))
369
content_len, len_len = decode_base128_int(
370
self._content[pos:pos + 5])
372
if content_len + pos > self._content_length:
373
raise ValueError('invalid content_len %d for record @ pos %d'
374
% (content_len, pos - len_len - 1))
375
if kind == b'f': # Fulltext
377
text = self._content[pos:pos + content_len]
378
result.append((b'f', content_len, text))
380
result.append((b'f', content_len))
381
elif kind == b'd': # Delta
382
delta_content = self._content[pos:pos + content_len]
384
# The first entry in a delta is the decompressed length
385
decomp_len, delta_pos = decode_base128_int(delta_content)
386
result.append((b'd', content_len, decomp_len, delta_info))
388
while delta_pos < content_len:
389
c = delta_content[delta_pos]
393
delta_pos) = decode_copy_instruction(delta_content, c,
396
text = self._content[offset:offset + length]
397
delta_info.append((b'c', offset, length, text))
399
delta_info.append((b'c', offset, length))
400
measured_len += length
403
txt = delta_content[delta_pos:delta_pos + c]
406
delta_info.append((b'i', c, txt))
409
if delta_pos != content_len:
410
raise ValueError('Delta consumed a bad number of bytes:'
411
' %d != %d' % (delta_pos, content_len))
412
if measured_len != decomp_len:
413
raise ValueError('Delta claimed fulltext was %d bytes, but'
414
' extraction resulted in %d bytes'
415
% (decomp_len, measured_len))
420
class _LazyGroupCompressFactory(object):
421
"""Yield content from a GroupCompressBlock on demand."""
423
def __init__(self, key, parents, manager, start, end, first):
424
"""Create a _LazyGroupCompressFactory
426
:param key: The key of just this record
427
:param parents: The parents of this key (possibly None)
428
:param gc_block: A GroupCompressBlock object
429
:param start: Offset of the first byte for this record in the
431
:param end: Offset of the byte just after the end of this record
432
(ie, bytes = content[start:end])
433
:param first: Is this the first Factory for the given block?
436
self.parents = parents
439
# Note: This attribute coupled with Manager._factories creates a
440
# reference cycle. Perhaps we would rather use a weakref(), or
441
# find an appropriate time to release the ref. After the first
442
# get_bytes_as call? After Manager.get_record_stream() returns
444
self._manager = manager
446
self.storage_kind = 'groupcompress-block'
448
self.storage_kind = 'groupcompress-block-ref'
454
return '%s(%s, first=%s)' % (self.__class__.__name__,
455
self.key, self._first)
457
def _extract_bytes(self):
458
# Grab and cache the raw bytes for this entry
459
# and break the ref-cycle with _manager since we don't need it
462
self._manager._prepare_for_extract()
463
except zlib.error as value:
464
raise DecompressCorruption("zlib: " + str(value))
465
block = self._manager._block
466
self._chunks = block.extract(self.key, self._start, self._end)
467
# There are code paths that first extract as fulltext, and then
468
# extract as storage_kind (smart fetch). So we don't break the
469
# refcycle here, but instead in manager.get_record_stream()
471
def get_bytes_as(self, storage_kind):
472
if storage_kind == self.storage_kind:
474
# wire bytes, something...
475
return self._manager._wire_bytes()
478
if storage_kind in ('fulltext', 'chunked', 'lines'):
479
if self._chunks is None:
480
self._extract_bytes()
481
if storage_kind == 'fulltext':
482
return b''.join(self._chunks)
483
elif storage_kind == 'chunked':
486
return osutils.chunks_to_lines(self._chunks)
487
raise UnavailableRepresentation(self.key, storage_kind,
490
def iter_bytes_as(self, storage_kind):
491
if self._chunks is None:
492
self._extract_bytes()
493
if storage_kind == 'chunked':
494
return iter(self._chunks)
495
elif storage_kind == 'lines':
496
return iter(osutils.chunks_to_lines(self._chunks))
497
raise UnavailableRepresentation(self.key, storage_kind,
501
class _LazyGroupContentManager(object):
502
"""This manages a group of _LazyGroupCompressFactory objects."""
504
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
505
# current size, and still be considered
507
_full_block_size = 4 * 1024 * 1024
508
_full_mixed_block_size = 2 * 1024 * 1024
509
_full_enough_block_size = 3 * 1024 * 1024 # size at which we won't repack
510
_full_enough_mixed_block_size = 2 * 768 * 1024 # 1.5MB
512
def __init__(self, block, get_compressor_settings=None):
514
# We need to preserve the ordering
517
self._get_settings = get_compressor_settings
518
self._compressor_settings = None
520
def _get_compressor_settings(self):
521
if self._compressor_settings is not None:
522
return self._compressor_settings
524
if self._get_settings is not None:
525
settings = self._get_settings()
527
vf = GroupCompressVersionedFiles
528
settings = vf._DEFAULT_COMPRESSOR_SETTINGS
529
self._compressor_settings = settings
530
return self._compressor_settings
532
def add_factory(self, key, parents, start, end):
533
if not self._factories:
537
# Note that this creates a reference cycle....
538
factory = _LazyGroupCompressFactory(key, parents, self,
539
start, end, first=first)
540
# max() works here, but as a function call, doing a compare seems to be
541
# significantly faster, timeit says 250ms for max() and 100ms for the
543
if end > self._last_byte:
544
self._last_byte = end
545
self._factories.append(factory)
547
def get_record_stream(self):
548
"""Get a record for all keys added so far."""
549
for factory in self._factories:
551
# Break the ref-cycle
552
factory._bytes = None
553
factory._manager = None
554
# TODO: Consider setting self._factories = None after the above loop,
555
# as it will break the reference cycle
557
def _trim_block(self, last_byte):
558
"""Create a new GroupCompressBlock, with just some of the content."""
559
# None of the factories need to be adjusted, because the content is
560
# located in an identical place. Just that some of the unreferenced
561
# trailing bytes are stripped
562
trace.mutter('stripping trailing bytes from groupcompress block'
563
' %d => %d', self._block._content_length, last_byte)
564
new_block = GroupCompressBlock()
565
self._block._ensure_content(last_byte)
566
new_block.set_content(self._block._content[:last_byte])
567
self._block = new_block
569
def _make_group_compressor(self):
570
return GroupCompressor(self._get_compressor_settings())
572
def _rebuild_block(self):
573
"""Create a new GroupCompressBlock with only the referenced texts."""
574
compressor = self._make_group_compressor()
576
old_length = self._block._content_length
578
for factory in self._factories:
579
chunks = factory.get_bytes_as('chunked')
580
chunks_len = factory.size
581
if chunks_len is None:
582
chunks_len = sum(map(len, chunks))
583
(found_sha1, start_point, end_point,
584
type) = compressor.compress(
585
factory.key, chunks, chunks_len, factory.sha1)
586
# Now update this factory with the new offsets, etc
587
factory.sha1 = found_sha1
588
factory._start = start_point
589
factory._end = end_point
590
self._last_byte = end_point
591
new_block = compressor.flush()
592
# TODO: Should we check that new_block really *is* smaller than the old
593
# block? It seems hard to come up with a method that it would
594
# expand, since we do full compression again. Perhaps based on a
595
# request that ends up poorly ordered?
596
# TODO: If the content would have expanded, then we would want to
597
# handle a case where we need to split the block.
598
# Now that we have a user-tweakable option
599
# (max_bytes_to_index), it is possible that one person set it
600
# to a very low value, causing poor compression.
601
delta = time.time() - tstart
602
self._block = new_block
603
trace.mutter('creating new compressed block on-the-fly in %.3fs'
604
' %d bytes => %d bytes', delta, old_length,
605
self._block._content_length)
607
def _prepare_for_extract(self):
608
"""A _LazyGroupCompressFactory is about to extract to fulltext."""
609
# We expect that if one child is going to fulltext, all will be. This
610
# helps prevent all of them from extracting a small amount at a time.
611
# Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
612
# time (self._block._content) is a little expensive.
613
self._block._ensure_content(self._last_byte)
615
def _check_rebuild_action(self):
616
"""Check to see if our block should be repacked."""
619
for factory in self._factories:
620
total_bytes_used += factory._end - factory._start
621
if last_byte_used < factory._end:
622
last_byte_used = factory._end
623
# If we are using more than half of the bytes from the block, we have
624
# nothing else to check
625
if total_bytes_used * 2 >= self._block._content_length:
626
return None, last_byte_used, total_bytes_used
627
# We are using less than 50% of the content. Is the content we are
628
# using at the beginning of the block? If so, we can just trim the
629
# tail, rather than rebuilding from scratch.
630
if total_bytes_used * 2 > last_byte_used:
631
return 'trim', last_byte_used, total_bytes_used
633
# We are using a small amount of the data, and it isn't just packed
634
# nicely at the front, so rebuild the content.
635
# Note: This would be *nicer* as a strip-data-from-group, rather than
636
# building it up again from scratch
637
# It might be reasonable to consider the fulltext sizes for
638
# different bits when deciding this, too. As you may have a small
639
# fulltext, and a trivial delta, and you are just trading around
640
# for another fulltext. If we do a simple 'prune' you may end up
641
# expanding many deltas into fulltexts, as well.
642
# If we build a cheap enough 'strip', then we could try a strip,
643
# if that expands the content, we then rebuild.
644
return 'rebuild', last_byte_used, total_bytes_used
646
def check_is_well_utilized(self):
647
"""Is the current block considered 'well utilized'?
649
This heuristic asks if the current block considers itself to be a fully
650
developed group, rather than just a loose collection of data.
652
if len(self._factories) == 1:
653
# A block of length 1 could be improved by combining with other
654
# groups - don't look deeper. Even larger than max size groups
655
# could compress well with adjacent versions of the same thing.
657
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
658
block_size = self._block._content_length
659
if total_bytes_used < block_size * self._max_cut_fraction:
660
# This block wants to trim itself small enough that we want to
661
# consider it under-utilized.
663
# TODO: This code is meant to be the twin of _insert_record_stream's
664
# 'start_new_block' logic. It would probably be better to factor
665
# out that logic into a shared location, so that it stays
667
# We currently assume a block is properly utilized whenever it is >75%
668
# of the size of a 'full' block. In normal operation, a block is
669
# considered full when it hits 4MB of same-file content. So any block
670
# >3MB is 'full enough'.
671
# The only time this isn't true is when a given block has large-object
672
# content. (a single file >4MB, etc.)
673
# Under these circumstances, we allow a block to grow to
674
# 2 x largest_content. Which means that if a given block had a large
675
# object, it may actually be under-utilized. However, given that this
676
# is 'pack-on-the-fly' it is probably reasonable to not repack large
677
# content blobs on-the-fly. Note that because we return False for all
678
# 1-item blobs, we will repack them; we may wish to reevaluate our
679
# treatment of large object blobs in the future.
680
if block_size >= self._full_enough_block_size:
682
# If a block is <3MB, it still may be considered 'full' if it contains
683
# mixed content. The current rule is 2MB of mixed content is considered
684
# full. So check to see if this block contains mixed content, and
685
# set the threshold appropriately.
687
for factory in self._factories:
688
prefix = factory.key[:-1]
689
if common_prefix is None:
690
common_prefix = prefix
691
elif prefix != common_prefix:
692
# Mixed content, check the size appropriately
693
if block_size >= self._full_enough_mixed_block_size:
696
# The content failed both the mixed check and the single-content check
697
# so obviously it is not fully utilized
698
# TODO: there is one other constraint that isn't being checked
699
# namely, that the entries in the block are in the appropriate
700
# order. For example, you could insert the entries in exactly
701
# reverse groupcompress order, and we would think that is ok.
702
# (all the right objects are in one group, and it is fully
703
# utilized, etc.) For now, we assume that case is rare,
704
# especially since we should always fetch in 'groupcompress'
708
def _check_rebuild_block(self):
709
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
713
self._trim_block(last_byte_used)
714
elif action == 'rebuild':
715
self._rebuild_block()
717
raise ValueError('unknown rebuild action: %r' % (action,))
719
def _wire_bytes(self):
720
"""Return a byte stream suitable for transmitting over the wire."""
721
self._check_rebuild_block()
722
# The outer block starts with:
723
# 'groupcompress-block\n'
724
# <length of compressed key info>\n
725
# <length of uncompressed info>\n
726
# <length of gc block>\n
729
lines = [b'groupcompress-block\n']
730
# The minimal info we need is the key, the start offset, and the
731
# parents. The length and type are encoded in the record itself.
732
# However, passing in the other bits makes it easier. The list of
733
# keys, and the start offset, the length
735
# 1 line with parents, '' for ()
736
# 1 line for start offset
737
# 1 line for end byte
739
for factory in self._factories:
740
key_bytes = b'\x00'.join(factory.key)
741
parents = factory.parents
743
parent_bytes = b'None:'
745
parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
746
record_header = b'%s\n%s\n%d\n%d\n' % (
747
key_bytes, parent_bytes, factory._start, factory._end)
748
header_lines.append(record_header)
749
# TODO: Can we break the refcycle at this point and set
750
# factory._manager = None?
751
header_bytes = b''.join(header_lines)
753
header_bytes_len = len(header_bytes)
754
z_header_bytes = zlib.compress(header_bytes)
756
z_header_bytes_len = len(z_header_bytes)
757
block_bytes_len, block_chunks = self._block.to_chunks()
758
lines.append(b'%d\n%d\n%d\n' % (
759
z_header_bytes_len, header_bytes_len, block_bytes_len))
760
lines.append(z_header_bytes)
761
lines.extend(block_chunks)
762
del z_header_bytes, block_chunks
763
# TODO: This is a point where we will double the memory consumption. To
764
# avoid this, we probably have to switch to a 'chunked' api
765
return b''.join(lines)
768
def from_bytes(cls, bytes):
769
# TODO: This does extra string copying, probably better to do it a
770
# different way. At a minimum this creates 2 copies of the
772
(storage_kind, z_header_len, header_len,
773
block_len, rest) = bytes.split(b'\n', 4)
775
if storage_kind != b'groupcompress-block':
776
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
777
z_header_len = int(z_header_len)
778
if len(rest) < z_header_len:
779
raise ValueError('Compressed header len shorter than all bytes')
780
z_header = rest[:z_header_len]
781
header_len = int(header_len)
782
header = zlib.decompress(z_header)
783
if len(header) != header_len:
784
raise ValueError('invalid length for decompressed bytes')
786
block_len = int(block_len)
787
if len(rest) != z_header_len + block_len:
788
raise ValueError('Invalid length for block')
789
block_bytes = rest[z_header_len:]
791
# So now we have a valid GCB, we just need to parse the factories that
793
header_lines = header.split(b'\n')
795
last = header_lines.pop()
797
raise ValueError('header lines did not end with a trailing'
799
if len(header_lines) % 4 != 0:
800
raise ValueError('The header was not an even multiple of 4 lines')
801
block = GroupCompressBlock.from_bytes(block_bytes)
804
for start in range(0, len(header_lines), 4):
806
key = tuple(header_lines[start].split(b'\x00'))
807
parents_line = header_lines[start + 1]
808
if parents_line == b'None:':
811
parents = tuple([tuple(segment.split(b'\x00'))
812
for segment in parents_line.split(b'\t')
814
start_offset = int(header_lines[start + 2])
815
end_offset = int(header_lines[start + 3])
816
result.add_factory(key, parents, start_offset, end_offset)
820
def network_block_to_records(storage_kind, bytes, line_end):
821
if storage_kind != 'groupcompress-block':
822
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
823
manager = _LazyGroupContentManager.from_bytes(bytes)
824
return manager.get_record_stream()
827
class _CommonGroupCompressor(object):
829
def __init__(self, settings=None):
830
"""Create a GroupCompressor."""
835
self.labels_deltas = {}
836
self._delta_index = None # Set by the children
837
self._block = GroupCompressBlock()
841
self._settings = settings
843
def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
845
"""Compress lines with label key.
847
:param key: A key tuple. It is stored in the output
848
for identification of the text during decompression. If the last
849
element is b'None' it is replaced with the sha1 of the text -
851
:param chunks: Chunks of bytes to be compressed
852
:param length: Length of chunks
853
:param expected_sha: If non-None, the sha the lines are believed to
854
have. During compression the sha is calculated; a mismatch will
856
:param nostore_sha: If the computed sha1 sum matches, we will raise
857
ExistingContent rather than adding the text.
858
:param soft: Do a 'soft' compression. This means that we require larger
859
ranges to match to be considered for a copy command.
861
:return: The sha1 of lines, the start and end offsets in the delta, and
862
the type ('fulltext' or 'delta').
864
:seealso VersionedFiles.add_lines:
866
if length == 0: # empty, like a dir entry, etc
867
if nostore_sha == _null_sha1:
868
raise ExistingContent()
869
return _null_sha1, 0, 0, 'fulltext'
870
# we assume someone knew what they were doing when they passed it in
871
if expected_sha is not None:
874
sha1 = osutils.sha_strings(chunks)
875
if nostore_sha is not None:
876
if sha1 == nostore_sha:
877
raise ExistingContent()
879
key = key[:-1] + (b'sha1:' + sha1,)
881
start, end, type = self._compress(key, chunks, length, length / 2, soft)
882
return sha1, start, end, type
884
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
885
"""Compress lines with label key.
887
:param key: A key tuple. It is stored in the output for identification
888
of the text during decompression.
890
:param chunks: The chunks of bytes to be compressed
892
:param input_len: The length of the chunks
894
:param max_delta_size: The size above which we issue a fulltext instead
897
:param soft: Do a 'soft' compression. This means that we require larger
898
ranges to match to be considered for a copy command.
900
:return: The sha1 of lines, the start and end offsets in the delta, and
901
the type ('fulltext' or 'delta').
903
raise NotImplementedError(self._compress)
905
def extract(self, key):
906
"""Extract a key previously added to the compressor.
908
:param key: The key to extract.
909
:return: An iterable over chunks and the sha1.
911
(start_byte, start_chunk, end_byte,
912
end_chunk) = self.labels_deltas[key]
913
delta_chunks = self.chunks[start_chunk:end_chunk]
914
stored_bytes = b''.join(delta_chunks)
915
kind = stored_bytes[:1]
917
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
918
data_len = fulltext_len + 1 + offset
919
if data_len != len(stored_bytes):
920
raise ValueError('Index claimed fulltext len, but stored bytes'
922
% (len(stored_bytes), data_len))
923
data = [stored_bytes[offset + 1:]]
926
raise ValueError('Unknown content kind, bytes claim %s' % kind)
927
# XXX: This is inefficient at best
928
source = b''.join(self.chunks[:start_chunk])
929
delta_len, offset = decode_base128_int(stored_bytes[1:10])
930
data_len = delta_len + 1 + offset
931
if data_len != len(stored_bytes):
932
raise ValueError('Index claimed delta len, but stored bytes'
934
% (len(stored_bytes), data_len))
935
data = [apply_delta(source, stored_bytes[offset + 1:])]
936
data_sha1 = osutils.sha_strings(data)
937
return data, data_sha1
940
"""Finish this group, creating a formatted stream.
942
After calling this, the compressor should no longer be used
944
self._block.set_chunked_content(self.chunks, self.endpoint)
946
self._delta_index = None
950
"""Call this if you want to 'revoke' the last compression.
952
After this, the data structures will be rolled back, but you cannot do
955
self._delta_index = None
956
del self.chunks[self._last[0]:]
957
self.endpoint = self._last[1]
961
"""Return the overall compression ratio."""
962
return float(self.input_bytes) / float(self.endpoint)
965
class PythonGroupCompressor(_CommonGroupCompressor):
967
def __init__(self, settings=None):
968
"""Create a GroupCompressor.
970
Used only if the pyrex version is not available.
972
super(PythonGroupCompressor, self).__init__(settings)
973
self._delta_index = LinesDeltaIndex([])
974
# The actual content is managed by LinesDeltaIndex
975
self.chunks = self._delta_index.lines
977
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
978
"""see _CommonGroupCompressor._compress"""
979
new_lines = osutils.chunks_to_lines(chunks)
980
out_lines, index_lines = self._delta_index.make_delta(
981
new_lines, bytes_length=input_len, soft=soft)
982
delta_length = sum(map(len, out_lines))
983
if delta_length > max_delta_size:
984
# The delta is longer than the fulltext, insert a fulltext
986
out_lines = [b'f', encode_base128_int(input_len)]
987
out_lines.extend(new_lines)
988
index_lines = [False, False]
989
index_lines.extend([True] * len(new_lines))
991
# this is a worthy delta, output it
994
# Update the delta_length to include those two encoded integers
995
out_lines[1] = encode_base128_int(delta_length)
997
start = self.endpoint
998
chunk_start = len(self.chunks)
999
self._last = (chunk_start, self.endpoint)
1000
self._delta_index.extend_lines(out_lines, index_lines)
1001
self.endpoint = self._delta_index.endpoint
1002
self.input_bytes += input_len
1003
chunk_end = len(self.chunks)
1004
self.labels_deltas[key] = (start, chunk_start,
1005
self.endpoint, chunk_end)
1006
return start, self.endpoint, type
1009
class PyrexGroupCompressor(_CommonGroupCompressor):
1010
"""Produce a serialised group of compressed texts.
1012
It contains code very similar to SequenceMatcher because of having a similar
1013
task. However some key differences apply:
1015
* there is no junk, we want a minimal edit not a human readable diff.
1016
* we don't filter very common lines (because we don't know where a good
1017
range will start, and after the first text we want to be emitting minmal
1019
* we chain the left side, not the right side
1020
* we incrementally update the adjacency matrix as new lines are provided.
1021
* we look for matches in all of the left side, so the routine which does
1022
the analagous task of find_longest_match does not need to filter on the
1026
def __init__(self, settings=None):
1027
super(PyrexGroupCompressor, self).__init__(settings)
1028
max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1029
self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
1031
def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
1032
"""see _CommonGroupCompressor._compress"""
1033
# By having action/label/sha1/len, we can parse the group if the index
1034
# was ever destroyed, we have the key in 'label', we know the final
1035
# bytes are valid from sha1, and we know where to find the end of this
1036
# record because of 'len'. (the delta record itself will store the
1037
# total length for the expanded record)
1038
# 'len: %d\n' costs approximately 1% increase in total data
1039
# Having the labels at all costs us 9-10% increase, 38% increase for
1040
# inventory pages, and 5.8% increase for text pages
1041
# new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
1042
if self._delta_index._source_offset != self.endpoint:
1043
raise AssertionError('_source_offset != endpoint'
1044
' somehow the DeltaIndex got out of sync with'
1045
' the output lines')
1046
bytes = b''.join(chunks)
1047
delta = self._delta_index.make_delta(bytes, max_delta_size)
1050
enc_length = encode_base128_int(input_len)
1051
len_mini_header = 1 + len(enc_length)
1052
self._delta_index.add_source(bytes, len_mini_header)
1053
new_chunks = [b'f', enc_length] + chunks
1056
enc_length = encode_base128_int(len(delta))
1057
len_mini_header = 1 + len(enc_length)
1058
new_chunks = [b'd', enc_length, delta]
1059
self._delta_index.add_delta_source(delta, len_mini_header)
1061
start = self.endpoint
1062
chunk_start = len(self.chunks)
1063
# Now output these bytes
1064
self._output_chunks(new_chunks)
1065
self.input_bytes += input_len
1066
chunk_end = len(self.chunks)
1067
self.labels_deltas[key] = (start, chunk_start,
1068
self.endpoint, chunk_end)
1069
if not self._delta_index._source_offset == self.endpoint:
1070
raise AssertionError('the delta index is out of sync'
1071
'with the output lines %s != %s'
1072
% (self._delta_index._source_offset, self.endpoint))
1073
return start, self.endpoint, type
1075
def _output_chunks(self, new_chunks):
1076
"""Output some chunks.
1078
:param new_chunks: The chunks to output.
1080
self._last = (len(self.chunks), self.endpoint)
1081
endpoint = self.endpoint
1082
self.chunks.extend(new_chunks)
1083
endpoint += sum(map(len, new_chunks))
1084
self.endpoint = endpoint
1087
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
1088
"""Create a factory for creating a pack based groupcompress.
1090
This is only functional enough to run interface tests, it doesn't try to
1091
provide a full pack environment.
1093
:param graph: Store a graph.
1094
:param delta: Delta compress contents.
1095
:param keylength: How long should keys be.
1097
def factory(transport):
1102
graph_index = BTreeBuilder(reference_lists=ref_length,
1103
key_elements=keylength)
1104
stream = transport.open_write_stream('newpack')
1105
writer = pack.ContainerWriter(stream.write)
1107
index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
1108
add_callback=graph_index.add_nodes,
1109
inconsistency_fatal=inconsistency_fatal)
1110
access = pack_repo._DirectPackAccess({})
1111
access.set_writer(writer, graph_index, (transport, 'newpack'))
1112
result = GroupCompressVersionedFiles(index, access, delta)
1113
result.stream = stream
1114
result.writer = writer
1119
def cleanup_pack_group(versioned_files):
1120
versioned_files.writer.end()
1121
versioned_files.stream.close()
1124
class _BatchingBlockFetcher(object):
1125
"""Fetch group compress blocks in batches.
1127
:ivar total_bytes: int of expected number of bytes needed to fetch the
1128
currently pending batch.
1131
def __init__(self, gcvf, locations, get_compressor_settings=None):
1133
self.locations = locations
1135
self.batch_memos = {}
1136
self.memos_to_get = []
1137
self.total_bytes = 0
1138
self.last_read_memo = None
1140
self._get_compressor_settings = get_compressor_settings
1142
def add_key(self, key):
1143
"""Add another to key to fetch.
1145
:return: The estimated number of bytes needed to fetch the batch so
1148
self.keys.append(key)
1149
index_memo, _, _, _ = self.locations[key]
1150
read_memo = index_memo[0:3]
1151
# Three possibilities for this read_memo:
1152
# - it's already part of this batch; or
1153
# - it's not yet part of this batch, but is already cached; or
1154
# - it's not yet part of this batch and will need to be fetched.
1155
if read_memo in self.batch_memos:
1156
# This read memo is already in this batch.
1157
return self.total_bytes
1159
cached_block = self.gcvf._group_cache[read_memo]
1161
# This read memo is new to this batch, and the data isn't cached
1163
self.batch_memos[read_memo] = None
1164
self.memos_to_get.append(read_memo)
1165
byte_length = read_memo[2]
1166
self.total_bytes += byte_length
1168
# This read memo is new to this batch, but cached.
1169
# Keep a reference to the cached block in batch_memos because it's
1170
# certain that we'll use it when this batch is processed, but
1171
# there's a risk that it would fall out of _group_cache between now
1173
self.batch_memos[read_memo] = cached_block
1174
return self.total_bytes
1176
def _flush_manager(self):
1177
if self.manager is not None:
1178
for factory in self.manager.get_record_stream():
1181
self.last_read_memo = None
1183
def yield_factories(self, full_flush=False):
1184
"""Yield factories for keys added since the last yield. They will be
1185
returned in the order they were added via add_key.
1187
:param full_flush: by default, some results may not be returned in case
1188
they can be part of the next batch. If full_flush is True, then
1189
all results are returned.
1191
if self.manager is None and not self.keys:
1193
# Fetch all memos in this batch.
1194
blocks = self.gcvf._get_blocks(self.memos_to_get)
1195
# Turn blocks into factories and yield them.
1196
memos_to_get_stack = list(self.memos_to_get)
1197
memos_to_get_stack.reverse()
1198
for key in self.keys:
1199
index_memo, _, parents, _ = self.locations[key]
1200
read_memo = index_memo[:3]
1201
if self.last_read_memo != read_memo:
1202
# We are starting a new block. If we have a
1203
# manager, we have found everything that fits for
1204
# now, so yield records
1205
for factory in self._flush_manager():
1207
# Now start a new manager.
1208
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1209
# The next block from _get_blocks will be the block we
1211
block_read_memo, block = next(blocks)
1212
if block_read_memo != read_memo:
1213
raise AssertionError(
1214
"block_read_memo out of sync with read_memo"
1215
"(%r != %r)" % (block_read_memo, read_memo))
1216
self.batch_memos[read_memo] = block
1217
memos_to_get_stack.pop()
1219
block = self.batch_memos[read_memo]
1220
self.manager = _LazyGroupContentManager(block,
1221
get_compressor_settings=self._get_compressor_settings)
1222
self.last_read_memo = read_memo
1223
start, end = index_memo[3:5]
1224
self.manager.add_factory(key, parents, start, end)
1226
for factory in self._flush_manager():
1229
self.batch_memos.clear()
1230
del self.memos_to_get[:]
1231
self.total_bytes = 0
1234
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1235
"""A group-compress based VersionedFiles implementation."""
1237
# This controls how the GroupCompress DeltaIndex works. Basically, we
1238
# compute hash pointers into the source blocks (so hash(text) => text).
1239
# However each of these references costs some memory in trade against a
1240
# more accurate match result. For very large files, they either are
1241
# pre-compressed and change in bulk whenever they change, or change in just
1242
# local blocks. Either way, 'improved resolution' is not very helpful,
1243
# versus running out of memory trying to track everything. The default max
1244
# gives 100% sampling of a 1MB file.
1245
_DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1246
_DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1247
_DEFAULT_MAX_BYTES_TO_INDEX}
1249
def __init__(self, index, access, delta=True, _unadded_refs=None,
1251
"""Create a GroupCompressVersionedFiles object.
1253
:param index: The index object storing access and graph data.
1254
:param access: The access object storing raw data.
1255
:param delta: Whether to delta compress or just entropy compress.
1256
:param _unadded_refs: private parameter, don't use.
1257
:param _group_cache: private parameter, don't use.
1260
self._access = access
1262
if _unadded_refs is None:
1264
self._unadded_refs = _unadded_refs
1265
if _group_cache is None:
1266
_group_cache = LRUSizeCache(max_size=50 * 1024 * 1024)
1267
self._group_cache = _group_cache
1268
self._immediate_fallback_vfs = []
1269
self._max_bytes_to_index = None
1271
def without_fallbacks(self):
1272
"""Return a clone of this object without any fallbacks configured."""
1273
return GroupCompressVersionedFiles(self._index, self._access,
1274
self._delta, _unadded_refs=dict(
1275
self._unadded_refs),
1276
_group_cache=self._group_cache)
1278
def add_lines(self, key, parents, lines, parent_texts=None,
1279
left_matching_blocks=None, nostore_sha=None, random_id=False,
1280
check_content=True):
1281
"""Add a text to the store.
1283
:param key: The key tuple of the text to add.
1284
:param parents: The parents key tuples of the text to add.
1285
:param lines: A list of lines. Each line must be a bytestring. And all
1286
of them except the last must be terminated with \\n and contain no
1287
other \\n's. The last line may either contain no \\n's or a single
1288
terminating \\n. If the lines list does meet this constraint the
1289
add routine may error or may succeed - but you will be unable to
1290
read the data back accurately. (Checking the lines have been split
1291
correctly is expensive and extremely unlikely to catch bugs so it
1292
is not done at runtime unless check_content is True.)
1293
:param parent_texts: An optional dictionary containing the opaque
1294
representations of some or all of the parents of version_id to
1295
allow delta optimisations. VERY IMPORTANT: the texts must be those
1296
returned by add_lines or data corruption can be caused.
1297
:param left_matching_blocks: a hint about which areas are common
1298
between the text and its left-hand-parent. The format is
1299
the SequenceMatcher.get_matching_blocks format.
1300
:param nostore_sha: Raise ExistingContent and do not add the lines to
1301
the versioned file if the digest of the lines matches this.
1302
:param random_id: If True a random id has been selected rather than
1303
an id determined by some deterministic process such as a converter
1304
from a foreign VCS. When True the backend may choose not to check
1305
for uniqueness of the resulting key within the versioned file, so
1306
this should only be done when the result is expected to be unique
1308
:param check_content: If True, the lines supplied are verified to be
1309
bytestrings that are correctly formed lines.
1310
:return: The text sha1, the number of bytes in the text, and an opaque
1311
representation of the inserted version which can be provided
1312
back to future add_lines calls in the parent_texts dictionary.
1314
self._index._check_write_ok()
1316
self._check_lines_not_unicode(lines)
1317
self._check_lines_are_lines(lines)
1318
return self.add_content(
1319
ChunkedContentFactory(
1320
key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
1321
parent_texts, left_matching_blocks, nostore_sha, random_id)
1323
def add_content(self, factory, parent_texts=None,
1324
left_matching_blocks=None, nostore_sha=None,
1326
"""Add a text to the store.
1328
:param factory: A ContentFactory that can be used to retrieve the key,
1329
parents and contents.
1330
:param parent_texts: An optional dictionary containing the opaque
1331
representations of some or all of the parents of version_id to
1332
allow delta optimisations. VERY IMPORTANT: the texts must be those
1333
returned by add_lines or data corruption can be caused.
1334
:param left_matching_blocks: a hint about which areas are common
1335
between the text and its left-hand-parent. The format is
1336
the SequenceMatcher.get_matching_blocks format.
1337
:param nostore_sha: Raise ExistingContent and do not add the lines to
1338
the versioned file if the digest of the lines matches this.
1339
:param random_id: If True a random id has been selected rather than
1340
an id determined by some deterministic process such as a converter
1341
from a foreign VCS. When True the backend may choose not to check
1342
for uniqueness of the resulting key within the versioned file, so
1343
this should only be done when the result is expected to be unique
1345
:return: The text sha1, the number of bytes in the text, and an opaque
1346
representation of the inserted version which can be provided
1347
back to future add_lines calls in the parent_texts dictionary.
1349
self._index._check_write_ok()
1350
parents = factory.parents
1351
self._check_add(factory.key, random_id)
1353
# The caller might pass None if there is no graph data, but kndx
1354
# indexes can't directly store that, so we give them
1355
# an empty tuple instead.
1357
# double handling for now. Make it work until then.
1358
sha1, length = list(self._insert_record_stream(
1359
[factory], random_id=random_id, nostore_sha=nostore_sha))[0]
1360
return sha1, length, None
1362
def add_fallback_versioned_files(self, a_versioned_files):
1363
"""Add a source of texts for texts not present in this knit.
1365
:param a_versioned_files: A VersionedFiles object.
1367
self._immediate_fallback_vfs.append(a_versioned_files)
1369
def annotate(self, key):
1370
"""See VersionedFiles.annotate."""
1371
ann = annotate.Annotator(self)
1372
return ann.annotate_flat(key)
1374
def get_annotator(self):
1375
return annotate.Annotator(self)
1377
def check(self, progress_bar=None, keys=None):
1378
"""See VersionedFiles.check()."""
1381
for record in self.get_record_stream(keys, 'unordered', True):
1382
for chunk in record.iter_bytes_as('chunked'):
1385
return self.get_record_stream(keys, 'unordered', True)
1387
def clear_cache(self):
1388
"""See VersionedFiles.clear_cache()"""
1389
self._group_cache.clear()
1390
self._index._graph_index.clear_cache()
1391
self._index._int_cache.clear()
1393
def _check_add(self, key, random_id):
1394
"""check that version_id and lines are safe to add."""
1395
version_id = key[-1]
1396
if version_id is not None:
1397
if osutils.contains_whitespace(version_id):
1398
raise errors.InvalidRevisionId(version_id, self)
1399
self.check_not_reserved_id(version_id)
1400
# TODO: If random_id==False and the key is already present, we should
1401
# probably check that the existing content is identical to what is
1402
# being inserted, and otherwise raise an exception. This would make
1403
# the bundle code simpler.
1405
def get_parent_map(self, keys):
1406
"""Get a map of the graph parents of keys.
1408
:param keys: The keys to look up parents for.
1409
:return: A mapping from keys to parents. Absent keys are absent from
1412
return self._get_parent_map_with_sources(keys)[0]
1414
def _get_parent_map_with_sources(self, keys):
1415
"""Get a map of the parents of keys.
1417
:param keys: The keys to look up parents for.
1418
:return: A tuple. The first element is a mapping from keys to parents.
1419
Absent keys are absent from the mapping. The second element is a
1420
list with the locations each key was found in. The first element
1421
is the in-this-knit parents, the second the first fallback source,
1425
sources = [self._index] + self._immediate_fallback_vfs
1428
for source in sources:
1431
new_result = source.get_parent_map(missing)
1432
source_results.append(new_result)
1433
result.update(new_result)
1434
missing.difference_update(set(new_result))
1435
return result, source_results
1437
def _get_blocks(self, read_memos):
1438
"""Get GroupCompressBlocks for the given read_memos.
1440
:returns: a series of (read_memo, block) pairs, in the order they were
1444
for read_memo in read_memos:
1446
block = self._group_cache[read_memo]
1450
cached[read_memo] = block
1452
not_cached_seen = set()
1453
for read_memo in read_memos:
1454
if read_memo in cached:
1455
# Don't fetch what we already have
1457
if read_memo in not_cached_seen:
1458
# Don't try to fetch the same data twice
1460
not_cached.append(read_memo)
1461
not_cached_seen.add(read_memo)
1462
raw_records = self._access.get_raw_records(not_cached)
1463
for read_memo in read_memos:
1465
yield read_memo, cached[read_memo]
1467
# Read the block, and cache it.
1468
zdata = next(raw_records)
1469
block = GroupCompressBlock.from_bytes(zdata)
1470
self._group_cache[read_memo] = block
1471
cached[read_memo] = block
1472
yield read_memo, block
1474
def get_missing_compression_parent_keys(self):
1475
"""Return the keys of missing compression parents.
1477
Missing compression parents occur when a record stream was missing
1478
basis texts, or a index was scanned that had missing basis texts.
1480
# GroupCompress cannot currently reference texts that are not in the
1481
# group, so this is valid for now
1484
def get_record_stream(self, keys, ordering, include_delta_closure):
1485
"""Get a stream of records for keys.
1487
:param keys: The keys to include.
1488
:param ordering: Either 'unordered' or 'topological'. A topologically
1489
sorted stream has compression parents strictly before their
1491
:param include_delta_closure: If True then the closure across any
1492
compression parents will be included (in the opaque data).
1493
:return: An iterator of ContentFactory objects, each of which is only
1494
valid until the iterator is advanced.
1496
# keys might be a generator
1497
orig_keys = list(keys)
1501
if (not self._index.has_graph
1502
and ordering in ('topological', 'groupcompress')):
1503
# Cannot topological order when no graph has been stored.
1504
# but we allow 'as-requested' or 'unordered'
1505
ordering = 'unordered'
1507
remaining_keys = keys
1510
keys = set(remaining_keys)
1511
for content_factory in self._get_remaining_record_stream(keys,
1512
orig_keys, ordering, include_delta_closure):
1513
remaining_keys.discard(content_factory.key)
1514
yield content_factory
1516
except errors.RetryWithNewPacks as e:
1517
self._access.reload_or_raise(e)
1519
def _find_from_fallback(self, missing):
1520
"""Find whatever keys you can from the fallbacks.
1522
:param missing: A set of missing keys. This set will be mutated as keys
1523
are found from a fallback_vfs
1524
:return: (parent_map, key_to_source_map, source_results)
1525
parent_map the overall key => parent_keys
1526
key_to_source_map a dict from {key: source}
1527
source_results a list of (source: keys)
1530
key_to_source_map = {}
1532
for source in self._immediate_fallback_vfs:
1535
source_parents = source.get_parent_map(missing)
1536
parent_map.update(source_parents)
1537
source_parents = list(source_parents)
1538
source_results.append((source, source_parents))
1539
key_to_source_map.update((key, source) for key in source_parents)
1540
missing.difference_update(source_parents)
1541
return parent_map, key_to_source_map, source_results
1543
def _get_ordered_source_keys(self, ordering, parent_map, key_to_source_map):
1544
"""Get the (source, [keys]) list.
1546
The returned objects should be in the order defined by 'ordering',
1547
which can weave between different sources.
1549
:param ordering: Must be one of 'topological' or 'groupcompress'
1550
:return: List of [(source, [keys])] tuples, such that all keys are in
1551
the defined order, regardless of source.
1553
if ordering == 'topological':
1554
present_keys = tsort.topo_sort(parent_map)
1556
# ordering == 'groupcompress'
1557
# XXX: This only optimizes for the target ordering. We may need
1558
# to balance that with the time it takes to extract
1559
# ordering, by somehow grouping based on
1560
# locations[key][0:3]
1561
present_keys = sort_gc_optimal(parent_map)
1562
# Now group by source:
1564
current_source = None
1565
for key in present_keys:
1566
source = key_to_source_map.get(key, self)
1567
if source is not current_source:
1568
source_keys.append((source, []))
1569
current_source = source
1570
source_keys[-1][1].append(key)
1573
def _get_as_requested_source_keys(self, orig_keys, locations, unadded_keys,
1576
current_source = None
1577
for key in orig_keys:
1578
if key in locations or key in unadded_keys:
1580
elif key in key_to_source_map:
1581
source = key_to_source_map[key]
1584
if source is not current_source:
1585
source_keys.append((source, []))
1586
current_source = source
1587
source_keys[-1][1].append(key)
1590
def _get_io_ordered_source_keys(self, locations, unadded_keys,
1593
# This is the group the bytes are stored in, followed by the
1594
# location in the group
1595
return locations[key][0]
1596
# We don't have an ordering for keys in the in-memory object, but
1597
# lets process the in-memory ones first.
1598
present_keys = list(unadded_keys)
1599
present_keys.extend(sorted(locations, key=get_group))
1600
# Now grab all of the ones from other sources
1601
source_keys = [(self, present_keys)]
1602
source_keys.extend(source_result)
1605
def _get_remaining_record_stream(self, keys, orig_keys, ordering,
1606
include_delta_closure):
1607
"""Get a stream of records for keys.
1609
:param keys: The keys to include.
1610
:param ordering: one of 'unordered', 'topological', 'groupcompress' or
1612
:param include_delta_closure: If True then the closure across any
1613
compression parents will be included (in the opaque data).
1614
:return: An iterator of ContentFactory objects, each of which is only
1615
valid until the iterator is advanced.
1618
locations = self._index.get_build_details(keys)
1619
unadded_keys = set(self._unadded_refs).intersection(keys)
1620
missing = keys.difference(locations)
1621
missing.difference_update(unadded_keys)
1622
(fallback_parent_map, key_to_source_map,
1623
source_result) = self._find_from_fallback(missing)
1624
if ordering in ('topological', 'groupcompress'):
1625
# would be better to not globally sort initially but instead
1626
# start with one key, recurse to its oldest parent, then grab
1627
# everything in the same group, etc.
1628
parent_map = dict((key, details[2]) for key, details in
1630
for key in unadded_keys:
1631
parent_map[key] = self._unadded_refs[key]
1632
parent_map.update(fallback_parent_map)
1633
source_keys = self._get_ordered_source_keys(ordering, parent_map,
1635
elif ordering == 'as-requested':
1636
source_keys = self._get_as_requested_source_keys(orig_keys,
1637
locations, unadded_keys, key_to_source_map)
1639
# We want to yield the keys in a semi-optimal (read-wise) ordering.
1640
# Otherwise we thrash the _group_cache and destroy performance
1641
source_keys = self._get_io_ordered_source_keys(locations,
1642
unadded_keys, source_result)
1644
yield AbsentContentFactory(key)
1645
# Batch up as many keys as we can until either:
1646
# - we encounter an unadded ref, or
1647
# - we run out of keys, or
1648
# - the total bytes to retrieve for this batch > BATCH_SIZE
1649
batcher = _BatchingBlockFetcher(self, locations,
1650
get_compressor_settings=self._get_compressor_settings)
1651
for source, keys in source_keys:
1654
if key in self._unadded_refs:
1655
# Flush batch, then yield unadded ref from
1657
for factory in batcher.yield_factories(full_flush=True):
1659
chunks, sha1 = self._compressor.extract(key)
1660
parents = self._unadded_refs[key]
1661
yield ChunkedContentFactory(key, parents, sha1, chunks)
1663
if batcher.add_key(key) > BATCH_SIZE:
1664
# Ok, this batch is big enough. Yield some results.
1665
for factory in batcher.yield_factories():
1668
for factory in batcher.yield_factories(full_flush=True):
1670
for record in source.get_record_stream(keys, ordering,
1671
include_delta_closure):
1673
for factory in batcher.yield_factories(full_flush=True):
1676
def get_sha1s(self, keys):
1677
"""See VersionedFiles.get_sha1s()."""
1679
for record in self.get_record_stream(keys, 'unordered', True):
1680
if record.sha1 is not None:
1681
result[record.key] = record.sha1
1683
if record.storage_kind != 'absent':
1684
result[record.key] = osutils.sha_strings(
1685
record.iter_bytes_as('chunked'))
1688
def insert_record_stream(self, stream):
1689
"""Insert a record stream into this container.
1691
:param stream: A stream of records to insert.
1693
:seealso VersionedFiles.get_record_stream:
1695
# XXX: Setting random_id=True makes
1696
# test_insert_record_stream_existing_keys fail for groupcompress and
1697
# groupcompress-nograph, this needs to be revisited while addressing
1698
# 'bzr branch' performance issues.
1699
for _, _ in self._insert_record_stream(stream, random_id=False):
1702
def _get_compressor_settings(self):
1703
if self._max_bytes_to_index is None:
1704
# TODO: VersionedFiles don't know about their containing
1705
# repository, so they don't have much of an idea about their
1706
# location. So for now, this is only a global option.
1707
c = config.GlobalConfig()
1708
val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1712
except ValueError as e:
1713
trace.warning('Value for '
1714
'"bzr.groupcompress.max_bytes_to_index"'
1715
' %r is not an integer'
1719
val = self._DEFAULT_MAX_BYTES_TO_INDEX
1720
self._max_bytes_to_index = val
1721
return {'max_bytes_to_index': self._max_bytes_to_index}
1723
def _make_group_compressor(self):
1724
return GroupCompressor(self._get_compressor_settings())
1726
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1728
"""Internal core to insert a record stream into this container.
1730
This helper function has a different interface than insert_record_stream
1731
to allow add_lines to be minimal, but still return the needed data.
1733
:param stream: A stream of records to insert.
1734
:param nostore_sha: If the sha1 of a given text matches nostore_sha,
1735
raise ExistingContent, rather than committing the new text.
1736
:param reuse_blocks: If the source is streaming from
1737
groupcompress-blocks, just insert the blocks as-is, rather than
1738
expanding the texts and inserting again.
1739
:return: An iterator over (sha1, length) of the inserted records.
1740
:seealso insert_record_stream:
1745
def get_adapter(adapter_key):
1747
return adapters[adapter_key]
1749
adapter_factory = adapter_registry.get(adapter_key)
1750
adapter = adapter_factory(self)
1751
adapters[adapter_key] = adapter
1753
# This will go up to fulltexts for gc to gc fetching, which isn't
1755
self._compressor = self._make_group_compressor()
1756
self._unadded_refs = {}
1760
bytes_len, chunks = self._compressor.flush().to_chunks()
1761
self._compressor = self._make_group_compressor()
1762
# Note: At this point we still have 1 copy of the fulltext (in
1763
# record and the var 'bytes'), and this generates 2 copies of
1764
# the compressed text (one for bytes, one in chunks)
1765
# TODO: Figure out how to indicate that we would be happy to free
1766
# the fulltext content at this point. Note that sometimes we
1767
# will want it later (streaming CHK pages), but most of the
1768
# time we won't (everything else)
1769
index, start, length = self._access.add_raw_record(
1770
None, bytes_len, chunks)
1772
for key, reads, refs in keys_to_add:
1773
nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1774
self._index.add_records(nodes, random_id=random_id)
1775
self._unadded_refs = {}
1779
max_fulltext_len = 0
1780
max_fulltext_prefix = None
1781
insert_manager = None
1784
# XXX: TODO: remove this, it is just for safety checking for now
1785
inserted_keys = set()
1786
reuse_this_block = reuse_blocks
1787
for record in stream:
1788
# Raise an error when a record is missing.
1789
if record.storage_kind == 'absent':
1790
raise errors.RevisionNotPresent(record.key, self)
1792
if record.key in inserted_keys:
1793
trace.note(gettext('Insert claimed random_id=True,'
1794
' but then inserted %r two times'), record.key)
1796
inserted_keys.add(record.key)
1798
# If the reuse_blocks flag is set, check to see if we can just
1799
# copy a groupcompress block as-is.
1800
# We only check on the first record (groupcompress-block) not
1801
# on all of the (groupcompress-block-ref) entries.
1802
# The reuse_this_block flag is then kept for as long as
1803
if record.storage_kind == 'groupcompress-block':
1804
# Check to see if we really want to re-use this block
1805
insert_manager = record._manager
1806
reuse_this_block = insert_manager.check_is_well_utilized()
1808
reuse_this_block = False
1809
if reuse_this_block:
1810
# We still want to reuse this block
1811
if record.storage_kind == 'groupcompress-block':
1812
# Insert the raw block into the target repo
1813
insert_manager = record._manager
1814
bytes_len, chunks = record._manager._block.to_chunks()
1815
_, start, length = self._access.add_raw_record(
1816
None, bytes_len, chunks)
1818
block_length = length
1819
if record.storage_kind in ('groupcompress-block',
1820
'groupcompress-block-ref'):
1821
if insert_manager is None:
1822
raise AssertionError('No insert_manager set')
1823
if insert_manager is not record._manager:
1824
raise AssertionError('insert_manager does not match'
1825
' the current record, we cannot be positive'
1826
' that the appropriate content was inserted.'
1828
value = b"%d %d %d %d" % (block_start, block_length,
1829
record._start, record._end)
1830
nodes = [(record.key, value, (record.parents,))]
1831
# TODO: Consider buffering up many nodes to be added, not
1832
# sure how much overhead this has, but we're seeing
1833
# ~23s / 120s in add_records calls
1834
self._index.add_records(nodes, random_id=random_id)
1837
chunks = record.get_bytes_as('chunked')
1838
except UnavailableRepresentation:
1839
adapter_key = record.storage_kind, 'chunked'
1840
adapter = get_adapter(adapter_key)
1841
chunks = adapter.get_bytes(record, 'chunked')
1842
chunks_len = record.size
1843
if chunks_len is None:
1844
chunks_len = sum(map(len, chunks))
1845
if len(record.key) > 1:
1846
prefix = record.key[0]
1847
soft = (prefix == last_prefix)
1851
if max_fulltext_len < chunks_len:
1852
max_fulltext_len = chunks_len
1853
max_fulltext_prefix = prefix
1854
(found_sha1, start_point, end_point,
1855
type) = self._compressor.compress(
1856
record.key, chunks, chunks_len, record.sha1, soft=soft,
1857
nostore_sha=nostore_sha)
1858
# delta_ratio = float(chunks_len) / (end_point - start_point)
1859
# Check if we want to continue to include that text
1860
if (prefix == max_fulltext_prefix
1861
and end_point < 2 * max_fulltext_len):
1862
# As long as we are on the same file_id, we will fill at least
1863
# 2 * max_fulltext_len
1864
start_new_block = False
1865
elif end_point > 4 * 1024 * 1024:
1866
start_new_block = True
1867
elif (prefix is not None and prefix != last_prefix
1868
and end_point > 2 * 1024 * 1024):
1869
start_new_block = True
1871
start_new_block = False
1872
last_prefix = prefix
1874
self._compressor.pop_last()
1876
max_fulltext_len = chunks_len
1877
(found_sha1, start_point, end_point,
1878
type) = self._compressor.compress(
1879
record.key, chunks, chunks_len, record.sha1)
1880
if record.key[-1] is None:
1881
key = record.key[:-1] + (b'sha1:' + found_sha1,)
1884
self._unadded_refs[key] = record.parents
1885
yield found_sha1, chunks_len
1886
as_st = static_tuple.StaticTuple.from_sequence
1887
if record.parents is not None:
1888
parents = as_st([as_st(p) for p in record.parents])
1891
refs = static_tuple.StaticTuple(parents)
1893
(key, b'%d %d' % (start_point, end_point), refs))
1894
if len(keys_to_add):
1896
self._compressor = None
1898
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1899
"""Iterate over the lines in the versioned files from keys.
1901
This may return lines from other keys. Each item the returned
1902
iterator yields is a tuple of a line and a text version that that line
1903
is present in (not introduced in).
1905
Ordering of results is in whatever order is most suitable for the
1906
underlying storage format.
1908
If a progress bar is supplied, it may be used to indicate progress.
1909
The caller is responsible for cleaning up progress bars (because this
1913
* Lines are normalised by the underlying store: they will all have \n
1915
* Lines are returned in arbitrary order.
1917
:return: An iterator over (line, key).
1921
# we don't care about inclusions, the caller cares.
1922
# but we need to setup a list of records to visit.
1923
# we need key, position, length
1924
for key_idx, record in enumerate(self.get_record_stream(keys,
1925
'unordered', True)):
1926
# XXX: todo - optimise to use less than full texts.
1929
pb.update('Walking content', key_idx, total)
1930
if record.storage_kind == 'absent':
1931
raise errors.RevisionNotPresent(key, self)
1932
for line in record.iter_bytes_as('lines'):
1935
pb.update('Walking content', total, total)
1938
"""See VersionedFiles.keys."""
1939
if 'evil' in debug.debug_flags:
1940
trace.mutter_callsite(2, "keys scales with size of history")
1941
sources = [self._index] + self._immediate_fallback_vfs
1943
for source in sources:
1944
result.update(source.keys())
1948
class _GCBuildDetails(object):
1949
"""A blob of data about the build details.
1951
This stores the minimal data, which then allows compatibility with the old
1952
api, without taking as much memory.
1955
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1956
'_delta_end', '_parents')
1959
compression_parent = None
1961
def __init__(self, parents, position_info):
1962
self._parents = parents
1963
(self._index, self._group_start, self._group_end, self._basis_end,
1964
self._delta_end) = position_info
1967
return '%s(%s, %s)' % (self.__class__.__name__,
1968
self.index_memo, self._parents)
1971
def index_memo(self):
1972
return (self._index, self._group_start, self._group_end,
1973
self._basis_end, self._delta_end)
1976
def record_details(self):
1977
return static_tuple.StaticTuple(self.method, None)
1979
def __getitem__(self, offset):
1980
"""Compatibility thunk to act like a tuple."""
1982
return self.index_memo
1984
return self.compression_parent # Always None
1986
return self._parents
1988
return self.record_details
1990
raise IndexError('offset out of range')
1996
class _GCGraphIndex(object):
1997
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1999
def __init__(self, graph_index, is_locked, parents=True,
2000
add_callback=None, track_external_parent_refs=False,
2001
inconsistency_fatal=True, track_new_keys=False):
2002
"""Construct a _GCGraphIndex on a graph_index.
2004
:param graph_index: An implementation of breezy.index.GraphIndex.
2005
:param is_locked: A callback, returns True if the index is locked and
2007
:param parents: If True, record knits parents, if not do not record
2009
:param add_callback: If not None, allow additions to the index and call
2010
this callback with a list of added GraphIndex nodes:
2011
[(node, value, node_refs), ...]
2012
:param track_external_parent_refs: As keys are added, keep track of the
2013
keys they reference, so that we can query get_missing_parents(),
2015
:param inconsistency_fatal: When asked to add records that are already
2016
present, and the details are inconsistent with the existing
2017
record, raise an exception instead of warning (and skipping the
2020
self._add_callback = add_callback
2021
self._graph_index = graph_index
2022
self._parents = parents
2023
self.has_graph = parents
2024
self._is_locked = is_locked
2025
self._inconsistency_fatal = inconsistency_fatal
2026
# GroupCompress records tend to have the same 'group' start + offset
2027
# repeated over and over, this creates a surplus of ints
2028
self._int_cache = {}
2029
if track_external_parent_refs:
2030
self._key_dependencies = _KeyRefs(
2031
track_new_keys=track_new_keys)
2033
self._key_dependencies = None
2035
def add_records(self, records, random_id=False):
2036
"""Add multiple records to the index.
2038
This function does not insert data into the Immutable GraphIndex
2039
backing the KnitGraphIndex, instead it prepares data for insertion by
2040
the caller and checks that it is safe to insert then calls
2041
self._add_callback with the prepared GraphIndex nodes.
2043
:param records: a list of tuples:
2044
(key, options, access_memo, parents).
2045
:param random_id: If True the ids being added were randomly generated
2046
and no check for existence will be performed.
2048
if not self._add_callback:
2049
raise errors.ReadOnlyError(self)
2050
# we hope there are no repositories with inconsistent parentage
2055
for (key, value, refs) in records:
2056
if not self._parents:
2060
raise knit.KnitCorrupt(self,
2061
"attempt to add node with parents "
2062
"in parentless index.")
2065
keys[key] = (value, refs)
2068
present_nodes = self._get_entries(keys)
2069
for (index, key, value, node_refs) in present_nodes:
2070
# Sometimes these are passed as a list rather than a tuple
2071
node_refs = static_tuple.as_tuples(node_refs)
2072
passed = static_tuple.as_tuples(keys[key])
2073
if node_refs != passed[1]:
2074
details = '%s %s %s' % (key, (value, node_refs), passed)
2075
if self._inconsistency_fatal:
2076
raise knit.KnitCorrupt(self, "inconsistent details"
2077
" in add_records: %s" %
2080
trace.warning("inconsistent details in skipped"
2081
" record: %s", details)
2087
for key, (value, node_refs) in keys.items():
2088
result.append((key, value, node_refs))
2090
for key, (value, node_refs) in keys.items():
2091
result.append((key, value))
2093
key_dependencies = self._key_dependencies
2094
if key_dependencies is not None:
2096
for key, value, refs in records:
2098
key_dependencies.add_references(key, parents)
2100
for key, value, refs in records:
2101
new_keys.add_key(key)
2102
self._add_callback(records)
2104
def _check_read(self):
2105
"""Raise an exception if reads are not permitted."""
2106
if not self._is_locked():
2107
raise errors.ObjectNotLocked(self)
2109
def _check_write_ok(self):
2110
"""Raise an exception if writes are not permitted."""
2111
if not self._is_locked():
2112
raise errors.ObjectNotLocked(self)
2114
def _get_entries(self, keys, check_present=False):
2115
"""Get the entries for keys.
2117
Note: Callers are responsible for checking that the index is locked
2118
before calling this method.
2120
:param keys: An iterable of index key tuples.
2125
for node in self._graph_index.iter_entries(keys):
2127
found_keys.add(node[1])
2129
# adapt parentless index to the rest of the code.
2130
for node in self._graph_index.iter_entries(keys):
2131
yield node[0], node[1], node[2], ()
2132
found_keys.add(node[1])
2134
missing_keys = keys.difference(found_keys)
2136
raise errors.RevisionNotPresent(missing_keys.pop(), self)
2138
def find_ancestry(self, keys):
2139
"""See CombinedGraphIndex.find_ancestry"""
2140
return self._graph_index.find_ancestry(keys, 0)
2142
def get_parent_map(self, keys):
2143
"""Get a map of the parents of keys.
2145
:param keys: The keys to look up parents for.
2146
:return: A mapping from keys to parents. Absent keys are absent from
2150
nodes = self._get_entries(keys)
2154
result[node[1]] = node[3][0]
2157
result[node[1]] = None
2160
def get_missing_parents(self):
2161
"""Return the keys of missing parents."""
2162
# Copied from _KnitGraphIndex.get_missing_parents
2163
# We may have false positives, so filter those out.
2164
self._key_dependencies.satisfy_refs_for_keys(
2165
self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
2166
return frozenset(self._key_dependencies.get_unsatisfied_refs())
2168
def get_build_details(self, keys):
2169
"""Get the various build details for keys.
2171
Ghosts are omitted from the result.
2173
:param keys: An iterable of keys.
2174
:return: A dict of key:
2175
(index_memo, compression_parent, parents, record_details).
2177
* index_memo: opaque structure to pass to read_records to extract
2179
* compression_parent: Content that this record is built upon, may
2181
* parents: Logical parents of this node
2182
* record_details: extra information about the content which needs
2183
to be passed to Factory.parse_record
2187
entries = self._get_entries(keys)
2188
for entry in entries:
2190
if not self._parents:
2193
parents = entry[3][0]
2194
details = _GCBuildDetails(parents, self._node_to_position(entry))
2195
result[key] = details
2199
"""Get all the keys in the collection.
2201
The keys are not ordered.
2204
return [node[1] for node in self._graph_index.iter_all_entries()]
2206
def _node_to_position(self, node):
2207
"""Convert an index value to position details."""
2208
bits = node[2].split(b' ')
2209
# It would be nice not to read the entire gzip.
2210
# start and stop are put into _int_cache because they are very common.
2211
# They define the 'group' that an entry is in, and many groups can have
2212
# thousands of objects.
2213
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2214
# each, or about 7MB. Note that it might be even more when you consider
2215
# how PyInt is allocated in separate slabs. And you can't return a slab
2216
# to the OS if even 1 int on it is in use. Note though that Python uses
2217
# a LIFO when re-using PyInt slots, which might cause more
2219
start = int(bits[0])
2220
start = self._int_cache.setdefault(start, start)
2222
stop = self._int_cache.setdefault(stop, stop)
2223
basis_end = int(bits[2])
2224
delta_end = int(bits[3])
2225
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2227
return (node[0], start, stop, basis_end, delta_end)
2229
def scan_unvalidated_index(self, graph_index):
2230
"""Inform this _GCGraphIndex that there is an unvalidated index.
2232
This allows this _GCGraphIndex to keep track of any missing
2233
compression parents we may want to have filled in to make those
2234
indices valid. It also allows _GCGraphIndex to track any new keys.
2236
:param graph_index: A GraphIndex
2238
key_dependencies = self._key_dependencies
2239
if key_dependencies is None:
2241
for node in graph_index.iter_all_entries():
2242
# Add parent refs from graph_index (and discard parent refs
2243
# that the graph_index has).
2244
key_dependencies.add_references(node[1], node[3][0])
2247
from ._groupcompress_py import (
2249
apply_delta_to_source,
2252
decode_copy_instruction,
2256
from ._groupcompress_pyx import (
2258
apply_delta_to_source,
2263
GroupCompressor = PyrexGroupCompressor
2264
except ImportError as e:
2265
osutils.failed_to_load_extension(e)
2266
GroupCompressor = PythonGroupCompressor