/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

Merge jam python groupcompress implementation

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from itertools import izip
20
20
from cStringIO import StringIO
21
 
import struct
22
21
import time
23
22
import zlib
24
23
try:
39
38
    )
40
39
from bzrlib.graph import Graph
41
40
from bzrlib.knit import _DirectPackAccess
42
 
from bzrlib.osutils import (
43
 
    contains_whitespace,
44
 
    sha_string,
45
 
    split_lines,
46
 
    )
47
41
from bzrlib.btree_index import BTreeBuilder
48
42
from bzrlib.lru_cache import LRUSizeCache
49
43
from bzrlib.tsort import topo_sort
56
50
    )
57
51
 
58
52
_USE_LZMA = False and (pylzma is not None)
59
 
_NO_LABELS = True
60
 
_FAST = False
61
53
 
62
54
# osutils.sha_string('')
63
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
64
56
 
65
57
 
66
 
def encode_base128_int(val):
67
 
    """Convert an integer into a 7-bit lsb encoding."""
68
 
    bytes = []
69
 
    count = 0
70
 
    while val >= 0x80:
71
 
        bytes.append(chr((val | 0x80) & 0xFF))
72
 
        val >>= 7
73
 
    bytes.append(chr(val))
74
 
    return ''.join(bytes)
75
 
 
76
 
 
77
 
def decode_base128_int(bytes):
78
 
    """Decode an integer from a 7-bit lsb encoding."""
79
 
    offset = 0
80
 
    val = 0
81
 
    shift = 0
82
 
    bval = ord(bytes[offset])
83
 
    while bval >= 0x80:
84
 
        val |= (bval & 0x7F) << shift
85
 
        shift += 7
86
 
        offset += 1
87
 
        bval = ord(bytes[offset])
88
 
    val |= bval << shift
89
 
    offset += 1
90
 
    return val, offset
91
 
 
92
 
 
93
58
def sort_gc_optimal(parent_map):
94
59
    """Sort and group the keys in parent_map into groupcompress order.
95
60
 
96
 
    groupcompress is defined (currently) as reverse-topological order, grouped by
97
 
    the key prefix.
 
61
    groupcompress is defined (currently) as reverse-topological order, grouped
 
62
    by the key prefix.
98
63
 
99
64
    :return: A sorted-list of keys
100
65
    """
118
83
    return present_keys
119
84
 
120
85
 
121
 
class GroupCompressBlockEntry(object):
122
 
    """Track the information about a single object inside a GC group.
123
 
 
124
 
    This is generally just the dumb data structure.
125
 
    """
126
 
 
127
 
    def __init__(self, key, type, sha1, start, length):
128
 
        self.key = key
129
 
        self.type = type # delta, fulltext, external?
130
 
        self.sha1 = sha1 # Sha1 of content
131
 
        self.start = start # Byte offset to start of data
132
 
        self.length = length # Length of content
133
 
 
134
 
    def __repr__(self):
135
 
        return '%s(%s, %s, %s, %s, %s)' % (
136
 
            self.__class__.__name__,
137
 
            self.key, self.type, self.sha1, self.start, self.length
138
 
            )
139
 
 
140
 
    @property
141
 
    def end(self):
142
 
        return self.start + self.length
143
 
 
144
86
# The max zlib window size is 32kB, so if we set 'max_size' output of the
145
87
# decompressor to the requested bytes + 32kB, then we should guarantee
146
88
# num_bytes coming out.
158
100
 
159
101
    def __init__(self):
160
102
        # map by key? or just order in file?
161
 
        self._entries = {}
162
103
        self._compressor_name = None
163
 
        self._z_header_length = None
164
 
        self._header_length = None
165
 
        self._z_header = None
166
104
        self._z_content = None
167
105
        self._z_content_decompressor = None
168
106
        self._z_content_length = None
170
108
        self._content = None
171
109
 
172
110
    def __len__(self):
173
 
        return self._content_length + self._header_length
174
 
 
175
 
    def _parse_header(self):
176
 
        """Parse the header part of the block."""
177
 
        assert self._z_header is not None
178
 
        if self._z_header == '':
179
 
            # Nothing to process
180
 
            self._z_header = None
181
 
            return
182
 
        if self._compressor_name == 'lzma':
183
 
            header = pylzma.decompress(self._z_header)
184
 
        else:
185
 
            assert self._compressor_name == 'zlib'
186
 
            header = zlib.decompress(self._z_header)
187
 
        self._z_header = None # We have consumed the header
188
 
        lines = header.split('\n')
189
 
        del header
190
 
        info_dict = {}
191
 
        for line in lines:
192
 
            if not line: #End of record
193
 
                if not info_dict:
194
 
                    break
195
 
                self.add_entry(**info_dict)
196
 
                info_dict = {}
197
 
                continue
198
 
            key, value = line.split(':', 1)
199
 
            if key == 'key':
200
 
                value = tuple(map(intern, value.split('\x00')))
201
 
            elif key in ('start', 'length'):
202
 
                value = int(value)
203
 
            elif key == 'type':
204
 
                value = intern(value)
205
 
            info_dict[key] = value
 
111
        # This is the maximum number of bytes this object will reference if
 
112
        # everything is decompressed. However, if we decompress less than
 
113
        # everything... (this would cause some problems for LRUSizeCache)
 
114
        return self._content_length + self._z_content_length
206
115
 
207
116
    def _ensure_content(self, num_bytes=None):
208
117
        """Make sure that content has been expanded enough.
277
186
                # The stream is finished
278
187
                self._z_content_decompressor = None
279
188
 
280
 
    def _parse_bytes(self, bytes):
 
189
    def _parse_bytes(self, bytes, pos):
281
190
        """Read the various lengths from the header.
282
191
 
283
192
        This also populates the various 'compressed' buffers.
284
193
 
285
194
        :return: The position in bytes just after the last newline
286
195
        """
287
 
        # At present, there are 4 lengths to be read, we have 2 integers for
288
 
        # the length of the compressed and uncompressed header, and 2 integers
289
 
        # for the compressed and uncompressed content
290
 
        # 14 bytes can represent > 1TB, so to avoid checking too far, cap the
291
 
        # search to 14 bytes.
292
 
        pos = bytes.index('\n', 6, 20)
293
 
        self._z_header_length = int(bytes[6:pos])
294
 
        pos += 1
295
 
        pos2 = bytes.index('\n', pos, pos + 14)
296
 
        self._header_length = int(bytes[pos:pos2])
297
 
        end_of_z_lengths = pos2
298
 
        pos2 += 1
299
 
        # Older versions don't have the content lengths, if we want to preserve
300
 
        # backwards compatibility, we could try/except over these, and allow
301
 
        # them to be skipped
302
 
        try:
303
 
            pos = bytes.index('\n', pos2, pos2 + 14)
304
 
            self._z_content_length = int(bytes[pos2:pos])
305
 
            pos += 1
306
 
            pos2 = bytes.index('\n', pos, pos + 14)
307
 
            self._content_length = int(bytes[pos:pos2])
308
 
            pos = pos2 + 1
309
 
            assert len(bytes) == (pos + self._z_header_length +
310
 
                                  self._z_content_length)
311
 
            pos2 = pos + self._z_header_length
312
 
            self._z_header = bytes[pos:pos2]
313
 
            self._z_content = bytes[pos2:]
314
 
            assert len(self._z_content) == self._z_content_length
315
 
        except ValueError:
316
 
            # This is the older form, which did not encode its content length
317
 
            pos = end_of_z_lengths + 1
318
 
            pos2 = pos + self._z_header_length
319
 
            self._z_header = bytes[pos:pos2]
320
 
            self._z_content = bytes[pos2:]
321
 
            self._z_content_length = len(self._z_content)
 
196
        # At present, we have 2 integers for the compressed and uncompressed
 
197
        # content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
 
198
        # checking too far, cap the search to 14 bytes.
 
199
        pos2 = bytes.index('\n', pos, pos + 14)
 
200
        self._z_content_length = int(bytes[pos:pos2])
 
201
        pos = pos2 + 1
 
202
        pos2 = bytes.index('\n', pos, pos + 14)
 
203
        self._content_length = int(bytes[pos:pos2])
 
204
        pos = pos2 + 1
 
205
        assert len(bytes) == (pos + self._z_content_length)
 
206
        self._z_content = bytes[pos:]
 
207
        assert len(self._z_content) == self._z_content_length
322
208
 
323
209
    @classmethod
324
210
    def from_bytes(cls, bytes):
331
217
            out._compressor_name = 'lzma'
332
218
        else:
333
219
            raise ValueError('unknown compressor: %r' % (bytes,))
334
 
        out._parse_bytes(bytes)
335
 
        if not _NO_LABELS:
336
 
            out._parse_header()
 
220
        out._parse_bytes(bytes, 6)
337
221
        return out
338
222
 
339
223
    def extract(self, key, start, end, sha1=None):
364
248
        if end != content_start + content_len:
365
249
            raise ValueError('end != len according to field header'
366
250
                ' %s != %s' % (end, content_start + content_len))
367
 
        content = self._content[content_start:end]
368
251
        if c == 'f':
369
 
            bytes = content
 
252
            bytes = self._content[content_start:end]
370
253
        elif c == 'd':
371
 
            bytes = _groupcompress_pyx.apply_delta(self._content, content)
 
254
            bytes = apply_delta_to_source(self._content, content_start, end)
372
255
        return bytes
373
256
 
374
 
    def add_entry(self, key, type, sha1, start, length):
375
 
        """Add new meta info about an entry.
376
 
 
377
 
        :param key: The key for the new content
378
 
        :param type: Whether this is a delta or fulltext entry (external?)
379
 
        :param sha1: sha1sum of the fulltext of this entry
380
 
        :param start: where the encoded bytes start
381
 
        :param length: total number of bytes in the encoded form
382
 
        :return: The entry?
383
 
        """
384
 
        entry = GroupCompressBlockEntry(key, type, sha1, start, length)
385
 
        if key in self._entries:
386
 
            raise ValueError('Duplicate key found: %s' % (key,))
387
 
        self._entries[key] = entry
388
 
        return entry
389
 
 
390
257
    def set_content(self, content):
391
258
        """Set the content of this block."""
392
259
        self._content_length = len(content)
393
260
        self._content = content
394
261
        self._z_content = None
395
 
        self._z_header_length = None
396
262
 
397
263
    def to_bytes(self):
398
264
        """Encode the information into a byte stream."""
399
265
        compress = zlib.compress
400
266
        if _USE_LZMA:
401
267
            compress = pylzma.compress
402
 
        chunks = []
403
 
        for key in sorted(self._entries):
404
 
            entry = self._entries[key]
405
 
            chunk = ('key:%s\n'
406
 
                     'sha1:%s\n'
407
 
                     'type:%s\n'
408
 
                     'start:%s\n'
409
 
                     'length:%s\n'
410
 
                     '\n'
411
 
                     ) % ('\x00'.join(entry.key),
412
 
                          entry.sha1,
413
 
                          entry.type,
414
 
                          entry.start,
415
 
                          entry.length,
416
 
                          )
417
 
            chunks.append(chunk)
418
 
        bytes = ''.join(chunks)
419
 
        info_len = len(bytes)
420
 
        z_header_bytes = compress(bytes)
421
 
        del bytes, chunks
422
 
        z_header_len = len(z_header_bytes)
423
 
        # TODO: we may want to have the header compressed in the same chain
424
 
        #       as the data, or we may not, evaulate it
425
 
        #       having them compressed together is probably a win for
426
 
        #       revisions and the 'inv' portion of chk inventories. As the
427
 
        #       label in the header is duplicated in the text.
428
 
        #       For chk pages and real bytes, I would guess this is not
429
 
        #       true.
430
 
        if _NO_LABELS:
431
 
            z_header_bytes = ''
432
 
            z_header_len = 0
433
 
            info_len = 0
434
 
        if self._z_content is not None:
435
 
            content_len = self._content_length
436
 
            z_content_len = self._z_content_length
437
 
            z_content_bytes = self._z_content
438
 
        else:
 
268
        if self._z_content is None:
439
269
            assert self._content is not None
440
 
            content_len = self._content_length
441
 
            z_content_bytes = compress(self._content)
442
 
            self._z_content = z_content_bytes
443
 
            z_content_len = len(z_content_bytes)
444
 
            self._z_content_length = z_content_len
 
270
            self._z_content = compress(self._content)
 
271
            self._z_content_length = len(self._z_content)
445
272
        if _USE_LZMA:
446
273
            header = self.GCB_LZ_HEADER
447
274
        else:
448
275
            header = self.GCB_HEADER
449
276
        chunks = [header,
450
 
                  '%d\n%d\n%d\n%d\n' % (z_header_len, info_len,
451
 
                                        z_content_len, content_len)
 
277
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
278
                  self._z_content,
452
279
                 ]
453
 
        chunks.append(z_header_bytes)
454
 
        chunks.append(z_content_bytes)
455
280
        return ''.join(chunks)
456
281
 
457
282
 
740
565
    return manager.get_record_stream()
741
566
 
742
567
 
743
 
class GroupCompressor(object):
744
 
    """Produce a serialised group of compressed texts.
745
 
 
746
 
    It contains code very similar to SequenceMatcher because of having a similar
747
 
    task. However some key differences apply:
748
 
     - there is no junk, we want a minimal edit not a human readable diff.
749
 
     - we don't filter very common lines (because we don't know where a good
750
 
       range will start, and after the first text we want to be emitting minmal
751
 
       edits only.
752
 
     - we chain the left side, not the right side
753
 
     - we incrementally update the adjacency matrix as new lines are provided.
754
 
     - we look for matches in all of the left side, so the routine which does
755
 
       the analagous task of find_longest_match does not need to filter on the
756
 
       left side.
757
 
    """
 
568
class _CommonGroupCompressor(object):
758
569
 
759
570
    def __init__(self):
760
571
        """Create a GroupCompressor."""
761
 
        # Consider seeding the lines with some sort of GC Start flag, or
762
 
        # putting it as part of the output stream, rather than in the
763
 
        # compressed bytes.
764
 
        self.lines = []
 
572
        self.chunks = []
 
573
        self._last = None
765
574
        self.endpoint = 0
766
575
        self.input_bytes = 0
767
 
        self.num_keys = 0
768
576
        self.labels_deltas = {}
769
 
        self._last = None
770
 
        self._delta_index = _groupcompress_pyx.DeltaIndex()
 
577
        self._delta_index = None # Set by the children
771
578
        self._block = GroupCompressBlock()
772
579
 
773
580
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
785
592
            ExistingContent rather than adding the text.
786
593
        :param soft: Do a 'soft' compression. This means that we require larger
787
594
            ranges to match to be considered for a copy command.
788
 
        :return: The sha1 of lines, and the number of bytes accumulated in
 
595
 
 
596
        :return: The sha1 of lines, the start and end offsets in the delta, the
 
597
            type ('fulltext' or 'delta') and the number of bytes accumulated in
789
598
            the group output so far.
 
599
 
790
600
        :seealso VersionedFiles.add_lines:
791
601
        """
792
602
        if not bytes: # empty, like a dir entry, etc
793
603
            if nostore_sha == _null_sha1:
794
604
                raise errors.ExistingContent()
795
 
            self._block.add_entry(key, type='empty',
796
 
                                  sha1=None, start=0,
797
 
                                  length=0)
798
605
            return _null_sha1, 0, 0, 'fulltext', 0
799
606
        # we assume someone knew what they were doing when they passed it in
800
607
        if expected_sha is not None:
806
613
                raise errors.ExistingContent()
807
614
        if key[-1] is None:
808
615
            key = key[:-1] + ('sha1:' + sha1,)
 
616
 
 
617
        return self._compress(key, bytes, sha1, len(bytes) / 2, soft)
 
618
 
 
619
    def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
 
620
        """Compress lines with label key.
 
621
 
 
622
        :param key: A key tuple. It is stored in the output for identification
 
623
            of the text during decompression.
 
624
 
 
625
        :param bytes: The bytes to be compressed
 
626
 
 
627
        :param sha1: The sha1 for 'bytes'.
 
628
 
 
629
        :param max_delta_size: The size above which we issue a fulltext instead
 
630
            of a delta.
 
631
 
 
632
        :param soft: Do a 'soft' compression. This means that we require larger
 
633
            ranges to match to be considered for a copy command.
 
634
 
 
635
        :return: The sha1 of lines, the start and end offsets in the delta, the
 
636
            type ('fulltext' or 'delta') and the number of bytes accumulated in
 
637
            the group output so far.
 
638
        """
 
639
        raise NotImplementedError(self._compress)
 
640
 
 
641
    def extract(self, key):
 
642
        """Extract a key previously added to the compressor.
 
643
 
 
644
        :param key: The key to extract.
 
645
        :return: An iterable over bytes and the sha1.
 
646
        """
 
647
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
 
648
        delta_chunks = self.chunks[start_chunk:end_chunk]
 
649
        stored_bytes = ''.join(delta_chunks)
 
650
        if stored_bytes[0] == 'f':
 
651
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
 
652
            data_len = fulltext_len + 1 + offset
 
653
            if  data_len != len(stored_bytes):
 
654
                raise ValueError('Index claimed fulltext len, but stored bytes'
 
655
                                 ' claim %s != %s'
 
656
                                 % (len(stored_bytes), data_len))
 
657
            bytes = stored_bytes[offset + 1:]
 
658
        else:
 
659
            # XXX: This is inefficient at best
 
660
            source = ''.join(self.chunks[:start_chunk])
 
661
            if stored_bytes[0] != 'd':
 
662
                raise ValueError('Unknown content kind, bytes claim %s'
 
663
                                 % (stored_bytes[0],))
 
664
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
 
665
            data_len = delta_len + 1 + offset
 
666
            if data_len != len(stored_bytes):
 
667
                raise ValueError('Index claimed delta len, but stored bytes'
 
668
                                 ' claim %s != %s'
 
669
                                 % (len(stored_bytes), data_len))
 
670
            bytes = apply_delta(source, stored_bytes[offset + 1:])
 
671
        bytes_sha1 = osutils.sha_string(bytes)
 
672
        return bytes, bytes_sha1
 
673
 
 
674
    def flush(self):
 
675
        """Finish this group, creating a formatted stream.
 
676
 
 
677
        After calling this, the compressor should no longer be used
 
678
        """
 
679
        content = ''.join(self.chunks)
 
680
        self.chunks = None
 
681
        self._delta_index = None
 
682
        self._block.set_content(content)
 
683
        return self._block
 
684
 
 
685
    def pop_last(self):
 
686
        """Call this if you want to 'revoke' the last compression.
 
687
 
 
688
        After this, the data structures will be rolled back, but you cannot do
 
689
        more compression.
 
690
        """
 
691
        self._delta_index = None
 
692
        del self.chunks[self._last[0]:]
 
693
        self.endpoint = self._last[1]
 
694
        self._last = None
 
695
 
 
696
    def ratio(self):
 
697
        """Return the overall compression ratio."""
 
698
        return float(self.input_bytes) / float(self.endpoint)
 
699
 
 
700
 
 
701
class PythonGroupCompressor(_CommonGroupCompressor):
 
702
 
 
703
    def __init__(self):
 
704
        """Create a GroupCompressor.
 
705
 
 
706
        :param delta: If False, do not compress records.
 
707
        """
 
708
        super(PythonGroupCompressor, self).__init__()
 
709
        self._delta_index = LinesDeltaIndex([])
 
710
        # The actual content is managed by LinesDeltaIndex
 
711
        self.chunks = self._delta_index.lines
 
712
 
 
713
    def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
 
714
        """see _CommonGroupCompressor._compress"""
 
715
        bytes_length = len(bytes)
 
716
        new_lines = osutils.split_lines(bytes)
 
717
        out_lines, index_lines = self._delta_index.make_delta(new_lines,
 
718
            bytes_length=bytes_length, soft=soft)
 
719
        delta_length = sum(map(len, out_lines))
 
720
        if delta_length > max_delta_size:
 
721
            # The delta is longer than the fulltext, insert a fulltext
 
722
            type = 'fulltext'
 
723
            out_lines = ['f', encode_base128_int(bytes_length)]
 
724
            out_lines.extend(new_lines)
 
725
            index_lines = [False, False]
 
726
            index_lines.extend([True] * len(new_lines))
 
727
            out_length = len(out_lines[1]) + bytes_length + 1
 
728
        else:
 
729
            # this is a worthy delta, output it
 
730
            type = 'delta'
 
731
            out_lines[0] = 'd'
 
732
            # Update the delta_length to include those two encoded integers
 
733
            out_lines[1] = encode_base128_int(delta_length)
 
734
            out_length = len(out_lines[3]) + 1 + delta_length
 
735
        start = self.endpoint # Before insertion
 
736
        chunk_start = len(self._delta_index.lines)
 
737
        self._delta_index.extend_lines(out_lines, index_lines)
 
738
        self.endpoint = self._delta_index.endpoint
 
739
        self.input_bytes += bytes_length
 
740
        chunk_end = len(self._delta_index.lines)
 
741
        self.labels_deltas[key] = (start, chunk_start,
 
742
                                   self.endpoint, chunk_end)
 
743
        return sha1, start, self.endpoint, type, out_length
 
744
 
 
745
 
 
746
class PyrexGroupCompressor(_CommonGroupCompressor):
 
747
    """Produce a serialised group of compressed texts.
 
748
 
 
749
    It contains code very similar to SequenceMatcher because of having a similar
 
750
    task. However some key differences apply:
 
751
     - there is no junk, we want a minimal edit not a human readable diff.
 
752
     - we don't filter very common lines (because we don't know where a good
 
753
       range will start, and after the first text we want to be emitting minmal
 
754
       edits only.
 
755
     - we chain the left side, not the right side
 
756
     - we incrementally update the adjacency matrix as new lines are provided.
 
757
     - we look for matches in all of the left side, so the routine which does
 
758
       the analagous task of find_longest_match does not need to filter on the
 
759
       left side.
 
760
    """
 
761
 
 
762
    def __init__(self):
 
763
        super(PyrexGroupCompressor, self).__init__()
 
764
        self._delta_index = DeltaIndex()
 
765
 
 
766
    def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
 
767
        """see _CommonGroupCompressor._compress"""
809
768
        input_len = len(bytes)
810
769
        # By having action/label/sha1/len, we can parse the group if the index
811
770
        # was ever destroyed, we have the key in 'label', we know the final
820
779
            raise AssertionError('_source_offset != endpoint'
821
780
                ' somehow the DeltaIndex got out of sync with'
822
781
                ' the output lines')
823
 
        max_delta_size = len(bytes) / 2
824
782
        delta = self._delta_index.make_delta(bytes, max_delta_size)
825
783
        if (delta is None):
826
784
            type = 'fulltext'
835
793
            len_mini_header = 1 + len(enc_length)
836
794
            length = len(delta) + len_mini_header
837
795
            new_chunks = ['d', enc_length, delta]
838
 
            if _FAST:
839
 
                self._delta_index._source_offset += length
840
 
            else:
841
 
                self._delta_index.add_delta_source(delta, len_mini_header)
842
 
        self._block.add_entry(key, type=type, sha1=sha1,
843
 
                              start=self.endpoint, length=length)
 
796
            self._delta_index.add_delta_source(delta, len_mini_header)
 
797
        # Before insertion
844
798
        start = self.endpoint
845
 
        delta_start = (self.endpoint, len(self.lines))
846
 
        self.num_keys += 1
847
 
        self.output_chunks(new_chunks)
 
799
        chunk_start = len(self.chunks)
 
800
        # Now output these bytes
 
801
        self._output_chunks(new_chunks)
848
802
        self.input_bytes += input_len
849
 
        delta_end = (self.endpoint, len(self.lines))
850
 
        self.labels_deltas[key] = (delta_start, delta_end)
 
803
        chunk_end = len(self.chunks)
 
804
        self.labels_deltas[key] = (start, chunk_start,
 
805
                                   self.endpoint, chunk_end)
851
806
        if not self._delta_index._source_offset == self.endpoint:
852
807
            raise AssertionError('the delta index is out of sync'
853
808
                'with the output lines %s != %s'
854
809
                % (self._delta_index._source_offset, self.endpoint))
855
810
        return sha1, start, self.endpoint, type, length
856
811
 
857
 
    def extract(self, key):
858
 
        """Extract a key previously added to the compressor.
859
 
 
860
 
        :param key: The key to extract.
861
 
        :return: An iterable over bytes and the sha1.
862
 
        """
863
 
        delta_details = self.labels_deltas[key]
864
 
        delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
865
 
        stored_bytes = ''.join(delta_chunks)
866
 
        # TODO: Fix this, we shouldn't really be peeking here
867
 
        entry = self._block._entries[key]
868
 
        if entry.type == 'fulltext':
869
 
            if stored_bytes[0] != 'f':
870
 
                raise ValueError('Index claimed fulltext, but stored bytes'
871
 
                                 ' indicate %s' % (stored_bytes[0],))
872
 
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
873
 
            if fulltext_len + 1 + offset != len(stored_bytes):
874
 
                raise ValueError('Index claimed fulltext len, but stored bytes'
875
 
                                 ' claim %s != %s'
876
 
                                 % (len(stored_bytes),
877
 
                                    fulltext_len + 1 + offset))
878
 
            bytes = stored_bytes[offset + 1:]
879
 
        else:
880
 
            if entry.type != 'delta':
881
 
                raise ValueError('Unknown entry type: %s' % (entry.type,))
882
 
            # XXX: This is inefficient at best
883
 
            source = ''.join(self.lines)
884
 
            if stored_bytes[0] != 'd':
885
 
                raise ValueError('Entry type claims delta, bytes claim %s'
886
 
                                 % (stored_bytes[0],))
887
 
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
888
 
            if delta_len + 1 + offset != len(stored_bytes):
889
 
                raise ValueError('Index claimed delta len, but stored bytes'
890
 
                                 ' claim %s != %s'
891
 
                                 % (len(stored_bytes),
892
 
                                    delta_len + 1 + offset))
893
 
            bytes = _groupcompress_pyx.apply_delta(source,
894
 
                                                   stored_bytes[offset + 1:])
895
 
        bytes_sha1 = sha_string(bytes)
896
 
        if entry.sha1 != bytes_sha1:
897
 
            raise ValueError('Recorded sha1 != measured %s != %s'
898
 
                             % (entry.sha1, bytes_sha1))
899
 
        return bytes, entry.sha1
900
 
 
901
 
    def flush(self):
902
 
        """Finish this group, creating a formatted stream."""
903
 
        content = ''.join(self.lines)
904
 
        self.lines = None
905
 
        self._block.set_content(content)
906
 
        return self._block
907
 
 
908
 
    def output_chunks(self, new_chunks):
 
812
    def _output_chunks(self, new_chunks):
909
813
        """Output some chunks.
910
814
 
911
815
        :param new_chunks: The chunks to output.
912
816
        """
913
 
        self._last = (len(self.lines), self.endpoint)
 
817
        self._last = (len(self.chunks), self.endpoint)
914
818
        endpoint = self.endpoint
915
 
        self.lines.extend(new_chunks)
 
819
        self.chunks.extend(new_chunks)
916
820
        endpoint += sum(map(len, new_chunks))
917
821
        self.endpoint = endpoint
918
822
 
919
 
    def pop_last(self):
920
 
        """Call this if you want to 'revoke' the last compression.
921
 
 
922
 
        After this, the data structures will be rolled back, but you cannot do
923
 
        more compression.
924
 
        """
925
 
        self._delta_index = None
926
 
        del self.lines[self._last[0]:]
927
 
        self.endpoint = self._last[1]
928
 
        self._last = None
929
 
 
930
 
    def ratio(self):
931
 
        """Return the overall compression ratio."""
932
 
        return float(self.input_bytes) / float(self.endpoint)
933
 
 
934
823
 
935
824
def make_pack_factory(graph, delta, keylength):
936
825
    """Create a factory for creating a pack based groupcompress.
1082
971
        """check that version_id and lines are safe to add."""
1083
972
        version_id = key[-1]
1084
973
        if version_id is not None:
1085
 
            if contains_whitespace(version_id):
 
974
            if osutils.contains_whitespace(version_id):
1086
975
                raise errors.InvalidRevisionId(version_id, self)
1087
976
        self.check_not_reserved_id(version_id)
1088
977
        # TODO: If random_id==False and the key is already present, we should
1374
1263
                result[record.key] = record.sha1
1375
1264
            else:
1376
1265
                if record.storage_kind != 'absent':
1377
 
                    result[record.key] = sha_string(record.get_bytes_as(
1378
 
                        'fulltext'))
 
1266
                    result[record.key] = osutils.sha_string(
 
1267
                        record.get_bytes_as('fulltext'))
1379
1268
        return result
1380
1269
 
1381
1270
    def insert_record_stream(self, stream):
1470
1359
                    value = "%d %d %d %d" % (block_start, block_length,
1471
1360
                                             record._start, record._end)
1472
1361
                    nodes = [(record.key, value, (record.parents,))]
 
1362
                    # TODO: Consider buffering up many nodes to be added, not
 
1363
                    #       sure how much overhead this has, but we're seeing
 
1364
                    #       ~23s / 120s in add_records calls
1473
1365
                    self._index.add_records(nodes, random_id=random_id)
1474
1366
                    continue
1475
1367
            try:
1505
1397
                start_new_block = True
1506
1398
            else:
1507
1399
                start_new_block = False
1508
 
            # if type == 'fulltext':
1509
 
            #     # If this is the first text, we don't do anything
1510
 
            #     if self._compressor.num_keys > 1:
1511
 
            #         if prefix is not None and prefix != last_prefix:
1512
 
            #             # We just inserted a fulltext for a different prefix
1513
 
            #             # (aka file-id).
1514
 
            #             if end_point > 512 * 1024:
1515
 
            #                 start_new_block = True
1516
 
            #             # TODO: Consider packing several small texts together
1517
 
            #             #       maybe only flush if end_point > some threshold
1518
 
            #             # if end_point > 512 * 1024 or len(bytes) <
1519
 
            #             #     start_new_block = true
1520
 
            #         else:
1521
 
            #             # We just added a fulltext, part of the same file-id
1522
 
            #             if (end_point > 2*1024*1024
1523
 
            #                 and end_point > 5*max_fulltext_len):
1524
 
            #                 start_new_block = True
1525
 
            #     last_fulltext_len = len(bytes)
1526
 
            # else:
1527
 
            #     delta_ratio = float(len(bytes)) / length
1528
 
            #     if delta_ratio < 3: # Not much compression
1529
 
            #         if end_point > 1*1024*1024:
1530
 
            #             start_new_block = True
1531
 
            #     elif delta_ratio < 10: # 10:1 compression
1532
 
            #         if end_point > 4*1024*1024:
1533
 
            #             start_new_block = True
1534
1400
            last_prefix = prefix
1535
1401
            if start_new_block:
1536
1402
                self._compressor.pop_last()
1587
1453
            pb.update('Walking content', key_idx, total)
1588
1454
            if record.storage_kind == 'absent':
1589
1455
                raise errors.RevisionNotPresent(key, self)
1590
 
            lines = split_lines(record.get_bytes_as('fulltext'))
 
1456
            lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1591
1457
            for line in lines:
1592
1458
                yield line, key
1593
1459
        pb.update('Walking content', total, total)
1780
1646
        return node[0], start, stop, basis_end, delta_end
1781
1647
 
1782
1648
 
 
1649
from bzrlib._groupcompress_py import (
 
1650
    apply_delta,
 
1651
    apply_delta_to_source,
 
1652
    encode_base128_int,
 
1653
    decode_base128_int,
 
1654
    LinesDeltaIndex,
 
1655
    )
1783
1656
try:
1784
 
    from bzrlib import _groupcompress_pyx
 
1657
    from bzrlib._groupcompress_pyx import (
 
1658
        apply_delta,
 
1659
        apply_delta_to_source,
 
1660
        DeltaIndex,
 
1661
        encode_base128_int,
 
1662
        decode_base128_int,
 
1663
        )
 
1664
    GroupCompressor = PyrexGroupCompressor
1785
1665
except ImportError:
1786
 
    pass
 
1666
    GroupCompressor = PythonGroupCompressor
 
1667