170
108
self._content = None
172
110
def __len__(self):
173
return self._content_length + self._header_length
175
def _parse_header(self):
176
"""Parse the header part of the block."""
177
assert self._z_header is not None
178
if self._z_header == '':
180
self._z_header = None
182
if self._compressor_name == 'lzma':
183
header = pylzma.decompress(self._z_header)
185
assert self._compressor_name == 'zlib'
186
header = zlib.decompress(self._z_header)
187
self._z_header = None # We have consumed the header
188
lines = header.split('\n')
192
if not line: #End of record
195
self.add_entry(**info_dict)
198
key, value = line.split(':', 1)
200
value = tuple(map(intern, value.split('\x00')))
201
elif key in ('start', 'length'):
204
value = intern(value)
205
info_dict[key] = value
111
# This is the maximum number of bytes this object will reference if
112
# everything is decompressed. However, if we decompress less than
113
# everything... (this would cause some problems for LRUSizeCache)
114
return self._content_length + self._z_content_length
207
116
def _ensure_content(self, num_bytes=None):
208
117
"""Make sure that content has been expanded enough.
277
186
# The stream is finished
278
187
self._z_content_decompressor = None
280
def _parse_bytes(self, bytes):
189
def _parse_bytes(self, bytes, pos):
281
190
"""Read the various lengths from the header.
283
192
This also populates the various 'compressed' buffers.
285
194
:return: The position in bytes just after the last newline
287
# At present, there are 4 lengths to be read, we have 2 integers for
288
# the length of the compressed and uncompressed header, and 2 integers
289
# for the compressed and uncompressed content
290
# 14 bytes can represent > 1TB, so to avoid checking too far, cap the
291
# search to 14 bytes.
292
pos = bytes.index('\n', 6, 20)
293
self._z_header_length = int(bytes[6:pos])
295
pos2 = bytes.index('\n', pos, pos + 14)
296
self._header_length = int(bytes[pos:pos2])
297
end_of_z_lengths = pos2
299
# Older versions don't have the content lengths, if we want to preserve
300
# backwards compatibility, we could try/except over these, and allow
303
pos = bytes.index('\n', pos2, pos2 + 14)
304
self._z_content_length = int(bytes[pos2:pos])
306
pos2 = bytes.index('\n', pos, pos + 14)
307
self._content_length = int(bytes[pos:pos2])
309
assert len(bytes) == (pos + self._z_header_length +
310
self._z_content_length)
311
pos2 = pos + self._z_header_length
312
self._z_header = bytes[pos:pos2]
313
self._z_content = bytes[pos2:]
314
assert len(self._z_content) == self._z_content_length
316
# This is the older form, which did not encode its content length
317
pos = end_of_z_lengths + 1
318
pos2 = pos + self._z_header_length
319
self._z_header = bytes[pos:pos2]
320
self._z_content = bytes[pos2:]
321
self._z_content_length = len(self._z_content)
196
# At present, we have 2 integers for the compressed and uncompressed
197
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
198
# checking too far, cap the search to 14 bytes.
199
pos2 = bytes.index('\n', pos, pos + 14)
200
self._z_content_length = int(bytes[pos:pos2])
202
pos2 = bytes.index('\n', pos, pos + 14)
203
self._content_length = int(bytes[pos:pos2])
205
assert len(bytes) == (pos + self._z_content_length)
206
self._z_content = bytes[pos:]
207
assert len(self._z_content) == self._z_content_length
324
210
def from_bytes(cls, bytes):
364
248
if end != content_start + content_len:
365
249
raise ValueError('end != len according to field header'
366
250
' %s != %s' % (end, content_start + content_len))
367
content = self._content[content_start:end]
252
bytes = self._content[content_start:end]
371
bytes = _groupcompress_pyx.apply_delta(self._content, content)
254
bytes = apply_delta_to_source(self._content, content_start, end)
374
def add_entry(self, key, type, sha1, start, length):
375
"""Add new meta info about an entry.
377
:param key: The key for the new content
378
:param type: Whether this is a delta or fulltext entry (external?)
379
:param sha1: sha1sum of the fulltext of this entry
380
:param start: where the encoded bytes start
381
:param length: total number of bytes in the encoded form
384
entry = GroupCompressBlockEntry(key, type, sha1, start, length)
385
if key in self._entries:
386
raise ValueError('Duplicate key found: %s' % (key,))
387
self._entries[key] = entry
390
257
def set_content(self, content):
391
258
"""Set the content of this block."""
392
259
self._content_length = len(content)
393
260
self._content = content
394
261
self._z_content = None
395
self._z_header_length = None
397
263
def to_bytes(self):
398
264
"""Encode the information into a byte stream."""
399
265
compress = zlib.compress
401
267
compress = pylzma.compress
403
for key in sorted(self._entries):
404
entry = self._entries[key]
411
) % ('\x00'.join(entry.key),
418
bytes = ''.join(chunks)
419
info_len = len(bytes)
420
z_header_bytes = compress(bytes)
422
z_header_len = len(z_header_bytes)
423
# TODO: we may want to have the header compressed in the same chain
424
# as the data, or we may not, evaulate it
425
# having them compressed together is probably a win for
426
# revisions and the 'inv' portion of chk inventories. As the
427
# label in the header is duplicated in the text.
428
# For chk pages and real bytes, I would guess this is not
434
if self._z_content is not None:
435
content_len = self._content_length
436
z_content_len = self._z_content_length
437
z_content_bytes = self._z_content
268
if self._z_content is None:
439
269
assert self._content is not None
440
content_len = self._content_length
441
z_content_bytes = compress(self._content)
442
self._z_content = z_content_bytes
443
z_content_len = len(z_content_bytes)
444
self._z_content_length = z_content_len
270
self._z_content = compress(self._content)
271
self._z_content_length = len(self._z_content)
446
273
header = self.GCB_LZ_HEADER
448
275
header = self.GCB_HEADER
449
276
chunks = [header,
450
'%d\n%d\n%d\n%d\n' % (z_header_len, info_len,
451
z_content_len, content_len)
277
'%d\n%d\n' % (self._z_content_length, self._content_length),
453
chunks.append(z_header_bytes)
454
chunks.append(z_content_bytes)
455
280
return ''.join(chunks)
740
565
return manager.get_record_stream()
743
class GroupCompressor(object):
744
"""Produce a serialised group of compressed texts.
746
It contains code very similar to SequenceMatcher because of having a similar
747
task. However some key differences apply:
748
- there is no junk, we want a minimal edit not a human readable diff.
749
- we don't filter very common lines (because we don't know where a good
750
range will start, and after the first text we want to be emitting minmal
752
- we chain the left side, not the right side
753
- we incrementally update the adjacency matrix as new lines are provided.
754
- we look for matches in all of the left side, so the routine which does
755
the analagous task of find_longest_match does not need to filter on the
568
class _CommonGroupCompressor(object):
759
570
def __init__(self):
760
571
"""Create a GroupCompressor."""
761
# Consider seeding the lines with some sort of GC Start flag, or
762
# putting it as part of the output stream, rather than in the
765
574
self.endpoint = 0
766
575
self.input_bytes = 0
768
576
self.labels_deltas = {}
770
self._delta_index = _groupcompress_pyx.DeltaIndex()
577
self._delta_index = None # Set by the children
771
578
self._block = GroupCompressBlock()
773
580
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
806
613
raise errors.ExistingContent()
807
614
if key[-1] is None:
808
615
key = key[:-1] + ('sha1:' + sha1,)
617
return self._compress(key, bytes, sha1, len(bytes) / 2, soft)
619
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
620
"""Compress lines with label key.
622
:param key: A key tuple. It is stored in the output for identification
623
of the text during decompression.
625
:param bytes: The bytes to be compressed
627
:param sha1: The sha1 for 'bytes'.
629
:param max_delta_size: The size above which we issue a fulltext instead
632
:param soft: Do a 'soft' compression. This means that we require larger
633
ranges to match to be considered for a copy command.
635
:return: The sha1 of lines, the start and end offsets in the delta, the
636
type ('fulltext' or 'delta') and the number of bytes accumulated in
637
the group output so far.
639
raise NotImplementedError(self._compress)
641
def extract(self, key):
642
"""Extract a key previously added to the compressor.
644
:param key: The key to extract.
645
:return: An iterable over bytes and the sha1.
647
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
648
delta_chunks = self.chunks[start_chunk:end_chunk]
649
stored_bytes = ''.join(delta_chunks)
650
if stored_bytes[0] == 'f':
651
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
652
data_len = fulltext_len + 1 + offset
653
if data_len != len(stored_bytes):
654
raise ValueError('Index claimed fulltext len, but stored bytes'
656
% (len(stored_bytes), data_len))
657
bytes = stored_bytes[offset + 1:]
659
# XXX: This is inefficient at best
660
source = ''.join(self.chunks[:start_chunk])
661
if stored_bytes[0] != 'd':
662
raise ValueError('Unknown content kind, bytes claim %s'
663
% (stored_bytes[0],))
664
delta_len, offset = decode_base128_int(stored_bytes[1:10])
665
data_len = delta_len + 1 + offset
666
if data_len != len(stored_bytes):
667
raise ValueError('Index claimed delta len, but stored bytes'
669
% (len(stored_bytes), data_len))
670
bytes = apply_delta(source, stored_bytes[offset + 1:])
671
bytes_sha1 = osutils.sha_string(bytes)
672
return bytes, bytes_sha1
675
"""Finish this group, creating a formatted stream.
677
After calling this, the compressor should no longer be used
679
content = ''.join(self.chunks)
681
self._delta_index = None
682
self._block.set_content(content)
686
"""Call this if you want to 'revoke' the last compression.
688
After this, the data structures will be rolled back, but you cannot do
691
self._delta_index = None
692
del self.chunks[self._last[0]:]
693
self.endpoint = self._last[1]
697
"""Return the overall compression ratio."""
698
return float(self.input_bytes) / float(self.endpoint)
701
class PythonGroupCompressor(_CommonGroupCompressor):
704
"""Create a GroupCompressor.
706
:param delta: If False, do not compress records.
708
super(PythonGroupCompressor, self).__init__()
709
self._delta_index = LinesDeltaIndex([])
710
# The actual content is managed by LinesDeltaIndex
711
self.chunks = self._delta_index.lines
713
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
714
"""see _CommonGroupCompressor._compress"""
715
bytes_length = len(bytes)
716
new_lines = osutils.split_lines(bytes)
717
out_lines, index_lines = self._delta_index.make_delta(new_lines,
718
bytes_length=bytes_length, soft=soft)
719
delta_length = sum(map(len, out_lines))
720
if delta_length > max_delta_size:
721
# The delta is longer than the fulltext, insert a fulltext
723
out_lines = ['f', encode_base128_int(bytes_length)]
724
out_lines.extend(new_lines)
725
index_lines = [False, False]
726
index_lines.extend([True] * len(new_lines))
727
out_length = len(out_lines[1]) + bytes_length + 1
729
# this is a worthy delta, output it
732
# Update the delta_length to include those two encoded integers
733
out_lines[1] = encode_base128_int(delta_length)
734
out_length = len(out_lines[3]) + 1 + delta_length
735
start = self.endpoint # Before insertion
736
chunk_start = len(self._delta_index.lines)
737
self._delta_index.extend_lines(out_lines, index_lines)
738
self.endpoint = self._delta_index.endpoint
739
self.input_bytes += bytes_length
740
chunk_end = len(self._delta_index.lines)
741
self.labels_deltas[key] = (start, chunk_start,
742
self.endpoint, chunk_end)
743
return sha1, start, self.endpoint, type, out_length
746
class PyrexGroupCompressor(_CommonGroupCompressor):
747
"""Produce a serialised group of compressed texts.
749
It contains code very similar to SequenceMatcher because of having a similar
750
task. However some key differences apply:
751
- there is no junk, we want a minimal edit not a human readable diff.
752
- we don't filter very common lines (because we don't know where a good
753
range will start, and after the first text we want to be emitting minmal
755
- we chain the left side, not the right side
756
- we incrementally update the adjacency matrix as new lines are provided.
757
- we look for matches in all of the left side, so the routine which does
758
the analagous task of find_longest_match does not need to filter on the
763
super(PyrexGroupCompressor, self).__init__()
764
self._delta_index = DeltaIndex()
766
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
767
"""see _CommonGroupCompressor._compress"""
809
768
input_len = len(bytes)
810
769
# By having action/label/sha1/len, we can parse the group if the index
811
770
# was ever destroyed, we have the key in 'label', we know the final
835
793
len_mini_header = 1 + len(enc_length)
836
794
length = len(delta) + len_mini_header
837
795
new_chunks = ['d', enc_length, delta]
839
self._delta_index._source_offset += length
841
self._delta_index.add_delta_source(delta, len_mini_header)
842
self._block.add_entry(key, type=type, sha1=sha1,
843
start=self.endpoint, length=length)
796
self._delta_index.add_delta_source(delta, len_mini_header)
844
798
start = self.endpoint
845
delta_start = (self.endpoint, len(self.lines))
847
self.output_chunks(new_chunks)
799
chunk_start = len(self.chunks)
800
# Now output these bytes
801
self._output_chunks(new_chunks)
848
802
self.input_bytes += input_len
849
delta_end = (self.endpoint, len(self.lines))
850
self.labels_deltas[key] = (delta_start, delta_end)
803
chunk_end = len(self.chunks)
804
self.labels_deltas[key] = (start, chunk_start,
805
self.endpoint, chunk_end)
851
806
if not self._delta_index._source_offset == self.endpoint:
852
807
raise AssertionError('the delta index is out of sync'
853
808
'with the output lines %s != %s'
854
809
% (self._delta_index._source_offset, self.endpoint))
855
810
return sha1, start, self.endpoint, type, length
857
def extract(self, key):
858
"""Extract a key previously added to the compressor.
860
:param key: The key to extract.
861
:return: An iterable over bytes and the sha1.
863
delta_details = self.labels_deltas[key]
864
delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
865
stored_bytes = ''.join(delta_chunks)
866
# TODO: Fix this, we shouldn't really be peeking here
867
entry = self._block._entries[key]
868
if entry.type == 'fulltext':
869
if stored_bytes[0] != 'f':
870
raise ValueError('Index claimed fulltext, but stored bytes'
871
' indicate %s' % (stored_bytes[0],))
872
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
873
if fulltext_len + 1 + offset != len(stored_bytes):
874
raise ValueError('Index claimed fulltext len, but stored bytes'
876
% (len(stored_bytes),
877
fulltext_len + 1 + offset))
878
bytes = stored_bytes[offset + 1:]
880
if entry.type != 'delta':
881
raise ValueError('Unknown entry type: %s' % (entry.type,))
882
# XXX: This is inefficient at best
883
source = ''.join(self.lines)
884
if stored_bytes[0] != 'd':
885
raise ValueError('Entry type claims delta, bytes claim %s'
886
% (stored_bytes[0],))
887
delta_len, offset = decode_base128_int(stored_bytes[1:10])
888
if delta_len + 1 + offset != len(stored_bytes):
889
raise ValueError('Index claimed delta len, but stored bytes'
891
% (len(stored_bytes),
892
delta_len + 1 + offset))
893
bytes = _groupcompress_pyx.apply_delta(source,
894
stored_bytes[offset + 1:])
895
bytes_sha1 = sha_string(bytes)
896
if entry.sha1 != bytes_sha1:
897
raise ValueError('Recorded sha1 != measured %s != %s'
898
% (entry.sha1, bytes_sha1))
899
return bytes, entry.sha1
902
"""Finish this group, creating a formatted stream."""
903
content = ''.join(self.lines)
905
self._block.set_content(content)
908
def output_chunks(self, new_chunks):
812
def _output_chunks(self, new_chunks):
909
813
"""Output some chunks.
911
815
:param new_chunks: The chunks to output.
913
self._last = (len(self.lines), self.endpoint)
817
self._last = (len(self.chunks), self.endpoint)
914
818
endpoint = self.endpoint
915
self.lines.extend(new_chunks)
819
self.chunks.extend(new_chunks)
916
820
endpoint += sum(map(len, new_chunks))
917
821
self.endpoint = endpoint
920
"""Call this if you want to 'revoke' the last compression.
922
After this, the data structures will be rolled back, but you cannot do
925
self._delta_index = None
926
del self.lines[self._last[0]:]
927
self.endpoint = self._last[1]
931
"""Return the overall compression ratio."""
932
return float(self.input_bytes) / float(self.endpoint)
935
824
def make_pack_factory(graph, delta, keylength):
936
825
"""Create a factory for creating a pack based groupcompress.