/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
import zlib
21
23
 
22
 
from ..lazy_import import lazy_import
 
24
from brzlib.lazy_import import lazy_import
23
25
lazy_import(globals(), """
24
 
from breezy import (
 
26
from brzlib import (
25
27
    annotate,
26
28
    config,
27
29
    debug,
 
30
    errors,
 
31
    graph as _mod_graph,
28
32
    osutils,
 
33
    pack,
29
34
    static_tuple,
30
35
    trace,
31
36
    tsort,
32
37
    )
33
 
from breezy.bzr import (
34
 
    knit,
35
 
    pack,
36
 
    pack_repo,
37
 
    )
38
38
 
39
 
from breezy.i18n import gettext
 
39
from brzlib.repofmt import pack_repo
 
40
from brzlib.i18n import gettext
40
41
""")
41
42
 
42
 
from .. import (
43
 
    errors,
44
 
    )
45
 
from .btree_index import BTreeBuilder
46
 
from ..lru_cache import LRUSizeCache
47
 
from .versionedfile import (
 
43
from brzlib.btree_index import BTreeBuilder
 
44
from brzlib.lru_cache import LRUSizeCache
 
45
from brzlib.versionedfile import (
48
46
    _KeyRefs,
49
47
    adapter_registry,
50
48
    AbsentContentFactory,
57
55
# groupcompress blocks.
58
56
BATCH_SIZE = 2**16
59
57
 
60
 
# osutils.sha_string(b'')
61
 
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
62
 
 
 
58
# osutils.sha_string('')
 
59
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
63
60
 
64
61
def sort_gc_optimal(parent_map):
65
62
    """Sort and group the keys in parent_map into groupcompress order.
72
69
    # groupcompress ordering is approximately reverse topological,
73
70
    # properly grouped by file-id.
74
71
    per_prefix_map = {}
75
 
    for key, value in parent_map.items():
76
 
        if isinstance(key, bytes) or len(key) == 1:
77
 
            prefix = b''
 
72
    for key, value in parent_map.iteritems():
 
73
        if isinstance(key, str) or len(key) == 1:
 
74
            prefix = ''
78
75
        else:
79
76
            prefix = key[0]
80
77
        try:
88
85
    return present_keys
89
86
 
90
87
 
91
 
class DecompressCorruption(errors.BzrError):
92
 
 
93
 
    _fmt = "Corruption while decompressing repository file%(orig_error)s"
94
 
 
95
 
    def __init__(self, orig_error=None):
96
 
        if orig_error is not None:
97
 
            self.orig_error = ", %s" % (orig_error,)
98
 
        else:
99
 
            self.orig_error = ""
100
 
        errors.BzrError.__init__(self)
101
 
 
102
 
 
103
88
# The max zlib window size is 32kB, so if we set 'max_size' output of the
104
89
# decompressor to the requested bytes + 32kB, then we should guarantee
105
90
# num_bytes coming out.
106
 
_ZLIB_DECOMP_WINDOW = 32 * 1024
107
 
 
 
91
_ZLIB_DECOMP_WINDOW = 32*1024
108
92
 
109
93
class GroupCompressBlock(object):
110
94
    """An object which maintains the internal structure of the compressed data.
113
97
    """
114
98
 
115
99
    # Group Compress Block v1 Zlib
116
 
    GCB_HEADER = b'gcb1z\n'
 
100
    GCB_HEADER = 'gcb1z\n'
117
101
    # Group Compress Block v1 Lzma
118
 
    GCB_LZ_HEADER = b'gcb1l\n'
 
102
    GCB_LZ_HEADER = 'gcb1l\n'
119
103
    GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
120
104
 
121
105
    def __init__(self):
152
136
        # Expand the content if required
153
137
        if self._content is None:
154
138
            if self._content_chunks is not None:
155
 
                self._content = b''.join(self._content_chunks)
 
139
                self._content = ''.join(self._content_chunks)
156
140
                self._content_chunks = None
157
141
        if self._content is None:
158
142
            # We join self._z_content_chunks here, because if we are
160
144
            # chunk
161
145
            if self._z_content_chunks is None:
162
146
                raise AssertionError('No content to decompress')
163
 
            z_content = b''.join(self._z_content_chunks)
164
 
            if z_content == b'':
165
 
                self._content = b''
 
147
            z_content = ''.join(self._z_content_chunks)
 
148
            if z_content == '':
 
149
                self._content = ''
166
150
            elif self._compressor_name == 'lzma':
167
151
                # We don't do partial lzma decomp yet
168
152
                import pylzma
212
196
            # The stream is finished
213
197
            self._z_content_decompressor = None
214
198
 
215
 
    def _parse_bytes(self, data, pos):
 
199
    def _parse_bytes(self, bytes, pos):
216
200
        """Read the various lengths from the header.
217
201
 
218
202
        This also populates the various 'compressed' buffers.
222
206
        # At present, we have 2 integers for the compressed and uncompressed
223
207
        # content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
224
208
        # checking too far, cap the search to 14 bytes.
225
 
        pos2 = data.index(b'\n', pos, pos + 14)
226
 
        self._z_content_length = int(data[pos:pos2])
227
 
        pos = pos2 + 1
228
 
        pos2 = data.index(b'\n', pos, pos + 14)
229
 
        self._content_length = int(data[pos:pos2])
230
 
        pos = pos2 + 1
231
 
        if len(data) != (pos + self._z_content_length):
 
209
        pos2 = bytes.index('\n', pos, pos + 14)
 
210
        self._z_content_length = int(bytes[pos:pos2])
 
211
        pos = pos2 + 1
 
212
        pos2 = bytes.index('\n', pos, pos + 14)
 
213
        self._content_length = int(bytes[pos:pos2])
 
214
        pos = pos2 + 1
 
215
        if len(bytes) != (pos + self._z_content_length):
232
216
            # XXX: Define some GCCorrupt error ?
233
217
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
234
 
                                 (len(data), pos, self._z_content_length))
235
 
        self._z_content_chunks = (data[pos:],)
 
218
                                 (len(bytes), pos, self._z_content_length))
 
219
        self._z_content_chunks = (bytes[pos:],)
236
220
 
237
221
    @property
238
222
    def _z_content(self):
241
225
        Meant only to be used by the test suite.
242
226
        """
243
227
        if self._z_content_chunks is not None:
244
 
            return b''.join(self._z_content_chunks)
 
228
            return ''.join(self._z_content_chunks)
245
229
        return None
246
230
 
247
231
    @classmethod
248
232
    def from_bytes(cls, bytes):
249
233
        out = cls()
250
 
        header = bytes[:6]
251
 
        if header not in cls.GCB_KNOWN_HEADERS:
 
234
        if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
252
235
            raise ValueError('bytes did not start with any of %r'
253
236
                             % (cls.GCB_KNOWN_HEADERS,))
254
 
        if header == cls.GCB_HEADER:
 
237
        # XXX: why not testing the whole header ?
 
238
        if bytes[4] == 'z':
255
239
            out._compressor_name = 'zlib'
256
 
        elif header == cls.GCB_LZ_HEADER:
 
240
        elif bytes[4] == 'l':
257
241
            out._compressor_name = 'lzma'
258
242
        else:
259
 
            raise ValueError('unknown compressor: %r' % (header,))
 
243
            raise ValueError('unknown compressor: %r' % (bytes,))
260
244
        out._parse_bytes(bytes, 6)
261
245
        return out
262
246
 
268
252
        :return: The bytes for the content
269
253
        """
270
254
        if start == end == 0:
271
 
            return []
 
255
            return ''
272
256
        self._ensure_content(end)
273
257
        # The bytes are 'f' or 'd' for the type, then a variable-length
274
258
        # base128 integer for the content size, then the actual content
275
259
        # We know that the variable-length integer won't be longer than 5
276
260
        # bytes (it takes 5 bytes to encode 2^32)
277
 
        c = self._content[start:start + 1]
278
 
        if c == b'f':
 
261
        c = self._content[start]
 
262
        if c == 'f':
279
263
            type = 'fulltext'
280
264
        else:
281
 
            if c != b'd':
 
265
            if c != 'd':
282
266
                raise ValueError('Unknown content control code: %s'
283
267
                                 % (c,))
284
268
            type = 'delta'
285
269
        content_len, len_len = decode_base128_int(
286
 
            self._content[start + 1:start + 6])
 
270
                            self._content[start + 1:start + 6])
287
271
        content_start = start + 1 + len_len
288
272
        if end != content_start + content_len:
289
273
            raise ValueError('end != len according to field header'
290
 
                             ' %s != %s' % (end, content_start + content_len))
291
 
        if c == b'f':
292
 
            return [self._content[content_start:end]]
293
 
        # Must be type delta as checked above
294
 
        return [apply_delta_to_source(self._content, content_start, end)]
 
274
                ' %s != %s' % (end, content_start + content_len))
 
275
        if c == 'f':
 
276
            bytes = self._content[content_start:end]
 
277
        elif c == 'd':
 
278
            bytes = apply_delta_to_source(self._content, content_start, end)
 
279
        return bytes
295
280
 
296
281
    def set_chunked_content(self, content_chunks, length):
297
282
        """Set the content of this block to the given chunks."""
315
300
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
316
301
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
317
302
        # (measured peak is maybe 30MB over the above...)
318
 
        compressed_chunks = list(map(compressor.compress, chunks))
 
303
        compressed_chunks = map(compressor.compress, chunks)
319
304
        compressed_chunks.append(compressor.flush())
320
305
        # Ignore empty chunks
321
306
        self._z_content_chunks = [c for c in compressed_chunks if c]
334
319
        """Create the byte stream as a series of 'chunks'"""
335
320
        self._create_z_content()
336
321
        header = self.GCB_HEADER
337
 
        chunks = [b'%s%d\n%d\n'
 
322
        chunks = ['%s%d\n%d\n'
338
323
                  % (header, self._z_content_length, self._content_length),
339
 
                  ]
 
324
                 ]
340
325
        chunks.extend(self._z_content_chunks)
341
326
        total_len = sum(map(len, chunks))
342
327
        return total_len, chunks
344
329
    def to_bytes(self):
345
330
        """Encode the information into a byte stream."""
346
331
        total_len, chunks = self.to_chunks()
347
 
        return b''.join(chunks)
 
332
        return ''.join(chunks)
348
333
 
349
334
    def _dump(self, include_text=False):
350
335
        """Take this block, and spit out a human-readable structure.
360
345
        result = []
361
346
        pos = 0
362
347
        while pos < self._content_length:
363
 
            kind = self._content[pos:pos + 1]
 
348
            kind = self._content[pos]
364
349
            pos += 1
365
 
            if kind not in (b'f', b'd'):
 
350
            if kind not in ('f', 'd'):
366
351
                raise ValueError('invalid kind character: %r' % (kind,))
367
352
            content_len, len_len = decode_base128_int(
368
 
                self._content[pos:pos + 5])
 
353
                                self._content[pos:pos + 5])
369
354
            pos += len_len
370
355
            if content_len + pos > self._content_length:
371
356
                raise ValueError('invalid content_len %d for record @ pos %d'
372
357
                                 % (content_len, pos - len_len - 1))
373
 
            if kind == b'f':  # Fulltext
 
358
            if kind == 'f': # Fulltext
374
359
                if include_text:
375
 
                    text = self._content[pos:pos + content_len]
376
 
                    result.append((b'f', content_len, text))
 
360
                    text = self._content[pos:pos+content_len]
 
361
                    result.append(('f', content_len, text))
377
362
                else:
378
 
                    result.append((b'f', content_len))
379
 
            elif kind == b'd':  # Delta
380
 
                delta_content = self._content[pos:pos + content_len]
 
363
                    result.append(('f', content_len))
 
364
            elif kind == 'd': # Delta
 
365
                delta_content = self._content[pos:pos+content_len]
381
366
                delta_info = []
382
367
                # The first entry in a delta is the decompressed length
383
368
                decomp_len, delta_pos = decode_base128_int(delta_content)
384
 
                result.append((b'd', content_len, decomp_len, delta_info))
 
369
                result.append(('d', content_len, decomp_len, delta_info))
385
370
                measured_len = 0
386
371
                while delta_pos < content_len:
387
 
                    c = delta_content[delta_pos]
 
372
                    c = ord(delta_content[delta_pos])
388
373
                    delta_pos += 1
389
 
                    if c & 0x80:  # Copy
 
374
                    if c & 0x80: # Copy
390
375
                        (offset, length,
391
376
                         delta_pos) = decode_copy_instruction(delta_content, c,
392
377
                                                              delta_pos)
393
378
                        if include_text:
394
 
                            text = self._content[offset:offset + length]
395
 
                            delta_info.append((b'c', offset, length, text))
 
379
                            text = self._content[offset:offset+length]
 
380
                            delta_info.append(('c', offset, length, text))
396
381
                        else:
397
 
                            delta_info.append((b'c', offset, length))
 
382
                            delta_info.append(('c', offset, length))
398
383
                        measured_len += length
399
 
                    else:  # Insert
 
384
                    else: # Insert
400
385
                        if include_text:
401
 
                            txt = delta_content[delta_pos:delta_pos + c]
 
386
                            txt = delta_content[delta_pos:delta_pos+c]
402
387
                        else:
403
 
                            txt = b''
404
 
                        delta_info.append((b'i', c, txt))
 
388
                            txt = ''
 
389
                        delta_info.append(('i', c, txt))
405
390
                        measured_len += c
406
391
                        delta_pos += c
407
392
                if delta_pos != content_len:
433
418
        self.key = key
434
419
        self.parents = parents
435
420
        self.sha1 = None
436
 
        self.size = None
437
421
        # Note: This attribute coupled with Manager._factories creates a
438
422
        #       reference cycle. Perhaps we would rather use a weakref(), or
439
423
        #       find an appropriate time to release the ref. After the first
440
424
        #       get_bytes_as call? After Manager.get_record_stream() returns
441
425
        #       the object?
442
426
        self._manager = manager
443
 
        self._chunks = None
 
427
        self._bytes = None
444
428
        self.storage_kind = 'groupcompress-block'
445
429
        if not first:
446
430
            self.storage_kind = 'groupcompress-block-ref'
450
434
 
451
435
    def __repr__(self):
452
436
        return '%s(%s, first=%s)' % (self.__class__.__name__,
453
 
                                     self.key, self._first)
454
 
 
455
 
    def _extract_bytes(self):
456
 
        # Grab and cache the raw bytes for this entry
457
 
        # and break the ref-cycle with _manager since we don't need it
458
 
        # anymore
459
 
        try:
460
 
            self._manager._prepare_for_extract()
461
 
        except zlib.error as value:
462
 
            raise DecompressCorruption("zlib: " + str(value))
463
 
        block = self._manager._block
464
 
        self._chunks = block.extract(self.key, self._start, self._end)
465
 
        # There are code paths that first extract as fulltext, and then
466
 
        # extract as storage_kind (smart fetch). So we don't break the
467
 
        # refcycle here, but instead in manager.get_record_stream()
 
437
            self.key, self._first)
468
438
 
469
439
    def get_bytes_as(self, storage_kind):
470
440
        if storage_kind == self.storage_kind:
472
442
                # wire bytes, something...
473
443
                return self._manager._wire_bytes()
474
444
            else:
475
 
                return b''
476
 
        if storage_kind in ('fulltext', 'chunked', 'lines'):
477
 
            if self._chunks is None:
478
 
                self._extract_bytes()
 
445
                return ''
 
446
        if storage_kind in ('fulltext', 'chunked'):
 
447
            if self._bytes is None:
 
448
                # Grab and cache the raw bytes for this entry
 
449
                # and break the ref-cycle with _manager since we don't need it
 
450
                # anymore
 
451
                try:
 
452
                    self._manager._prepare_for_extract()
 
453
                except zlib.error as value:
 
454
                    raise errors.DecompressCorruption("zlib: " + str(value))
 
455
                block = self._manager._block
 
456
                self._bytes = block.extract(self.key, self._start, self._end)
 
457
                # There are code paths that first extract as fulltext, and then
 
458
                # extract as storage_kind (smart fetch). So we don't break the
 
459
                # refcycle here, but instead in manager.get_record_stream()
479
460
            if storage_kind == 'fulltext':
480
 
                return b''.join(self._chunks)
481
 
            elif storage_kind == 'chunked':
482
 
                return self._chunks
 
461
                return self._bytes
483
462
            else:
484
 
                return osutils.chunks_to_lines(self._chunks)
485
 
        raise errors.UnavailableRepresentation(self.key, storage_kind,
486
 
                                               self.storage_kind)
487
 
 
488
 
    def iter_bytes_as(self, storage_kind):
489
 
        if self._chunks is None:
490
 
            self._extract_bytes()
491
 
        if storage_kind == 'chunked':
492
 
            return iter(self._chunks)
493
 
        elif storage_kind == 'lines':
494
 
            return iter(osutils.chunks_to_lines(self._chunks))
 
463
                return [self._bytes]
495
464
        raise errors.UnavailableRepresentation(self.key, storage_kind,
496
465
                                               self.storage_kind)
497
466
 
499
468
class _LazyGroupContentManager(object):
500
469
    """This manages a group of _LazyGroupCompressFactory objects."""
501
470
 
502
 
    _max_cut_fraction = 0.75  # We allow a block to be trimmed to 75% of
503
 
    # current size, and still be considered
504
 
    # resuable
505
 
    _full_block_size = 4 * 1024 * 1024
506
 
    _full_mixed_block_size = 2 * 1024 * 1024
507
 
    _full_enough_block_size = 3 * 1024 * 1024  # size at which we won't repack
508
 
    _full_enough_mixed_block_size = 2 * 768 * 1024  # 1.5MB
 
471
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
 
472
                             # current size, and still be considered
 
473
                             # resuable
 
474
    _full_block_size = 4*1024*1024
 
475
    _full_mixed_block_size = 2*1024*1024
 
476
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
 
477
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
509
478
 
510
479
    def __init__(self, block, get_compressor_settings=None):
511
480
        self._block = block
534
503
            first = False
535
504
        # Note that this creates a reference cycle....
536
505
        factory = _LazyGroupCompressFactory(key, parents, self,
537
 
                                            start, end, first=first)
 
506
            start, end, first=first)
538
507
        # max() works here, but as a function call, doing a compare seems to be
539
508
        # significantly faster, timeit says 250ms for max() and 100ms for the
540
509
        # comparison
574
543
        old_length = self._block._content_length
575
544
        end_point = 0
576
545
        for factory in self._factories:
577
 
            chunks = factory.get_bytes_as('chunked')
578
 
            chunks_len = factory.size
579
 
            if chunks_len is None:
580
 
                chunks_len = sum(map(len, chunks))
 
546
            bytes = factory.get_bytes_as('fulltext')
581
547
            (found_sha1, start_point, end_point,
582
 
             type) = compressor.compress(
583
 
                 factory.key, chunks, chunks_len, factory.sha1)
 
548
             type) = compressor.compress(factory.key, bytes, factory.sha1)
584
549
            # Now update this factory with the new offsets, etc
585
550
            factory.sha1 = found_sha1
586
551
            factory._start = start_point
724
689
        #   <length of gc block>\n
725
690
        #   <header bytes>
726
691
        #   <gc-block>
727
 
        lines = [b'groupcompress-block\n']
 
692
        lines = ['groupcompress-block\n']
728
693
        # The minimal info we need is the key, the start offset, and the
729
694
        # parents. The length and type are encoded in the record itself.
730
695
        # However, passing in the other bits makes it easier.  The list of
735
700
        # 1 line for end byte
736
701
        header_lines = []
737
702
        for factory in self._factories:
738
 
            key_bytes = b'\x00'.join(factory.key)
 
703
            key_bytes = '\x00'.join(factory.key)
739
704
            parents = factory.parents
740
705
            if parents is None:
741
 
                parent_bytes = b'None:'
 
706
                parent_bytes = 'None:'
742
707
            else:
743
 
                parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
744
 
            record_header = b'%s\n%s\n%d\n%d\n' % (
 
708
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
709
            record_header = '%s\n%s\n%d\n%d\n' % (
745
710
                key_bytes, parent_bytes, factory._start, factory._end)
746
711
            header_lines.append(record_header)
747
712
            # TODO: Can we break the refcycle at this point and set
748
713
            #       factory._manager = None?
749
 
        header_bytes = b''.join(header_lines)
 
714
        header_bytes = ''.join(header_lines)
750
715
        del header_lines
751
716
        header_bytes_len = len(header_bytes)
752
717
        z_header_bytes = zlib.compress(header_bytes)
753
718
        del header_bytes
754
719
        z_header_bytes_len = len(z_header_bytes)
755
720
        block_bytes_len, block_chunks = self._block.to_chunks()
756
 
        lines.append(b'%d\n%d\n%d\n' % (
757
 
            z_header_bytes_len, header_bytes_len, block_bytes_len))
 
721
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
 
722
                                       block_bytes_len))
758
723
        lines.append(z_header_bytes)
759
724
        lines.extend(block_chunks)
760
725
        del z_header_bytes, block_chunks
761
726
        # TODO: This is a point where we will double the memory consumption. To
762
727
        #       avoid this, we probably have to switch to a 'chunked' api
763
 
        return b''.join(lines)
 
728
        return ''.join(lines)
764
729
 
765
730
    @classmethod
766
731
    def from_bytes(cls, bytes):
768
733
        #       different way. At a minimum this creates 2 copies of the
769
734
        #       compressed content
770
735
        (storage_kind, z_header_len, header_len,
771
 
         block_len, rest) = bytes.split(b'\n', 4)
 
736
         block_len, rest) = bytes.split('\n', 4)
772
737
        del bytes
773
 
        if storage_kind != b'groupcompress-block':
 
738
        if storage_kind != 'groupcompress-block':
774
739
            raise ValueError('Unknown storage kind: %s' % (storage_kind,))
775
740
        z_header_len = int(z_header_len)
776
741
        if len(rest) < z_header_len:
788
753
        del rest
789
754
        # So now we have a valid GCB, we just need to parse the factories that
790
755
        # were sent to us
791
 
        header_lines = header.split(b'\n')
 
756
        header_lines = header.split('\n')
792
757
        del header
793
758
        last = header_lines.pop()
794
 
        if last != b'':
 
759
        if last != '':
795
760
            raise ValueError('header lines did not end with a trailing'
796
761
                             ' newline')
797
762
        if len(header_lines) % 4 != 0:
799
764
        block = GroupCompressBlock.from_bytes(block_bytes)
800
765
        del block_bytes
801
766
        result = cls(block)
802
 
        for start in range(0, len(header_lines), 4):
 
767
        for start in xrange(0, len(header_lines), 4):
803
768
            # intern()?
804
 
            key = tuple(header_lines[start].split(b'\x00'))
805
 
            parents_line = header_lines[start + 1]
806
 
            if parents_line == b'None:':
 
769
            key = tuple(header_lines[start].split('\x00'))
 
770
            parents_line = header_lines[start+1]
 
771
            if parents_line == 'None:':
807
772
                parents = None
808
773
            else:
809
 
                parents = tuple([tuple(segment.split(b'\x00'))
810
 
                                 for segment in parents_line.split(b'\t')
811
 
                                 if segment])
812
 
            start_offset = int(header_lines[start + 2])
813
 
            end_offset = int(header_lines[start + 3])
 
774
                parents = tuple([tuple(segment.split('\x00'))
 
775
                                 for segment in parents_line.split('\t')
 
776
                                  if segment])
 
777
            start_offset = int(header_lines[start+2])
 
778
            end_offset = int(header_lines[start+3])
814
779
            result.add_factory(key, parents, start_offset, end_offset)
815
780
        return result
816
781
 
831
796
        self.endpoint = 0
832
797
        self.input_bytes = 0
833
798
        self.labels_deltas = {}
834
 
        self._delta_index = None  # Set by the children
 
799
        self._delta_index = None # Set by the children
835
800
        self._block = GroupCompressBlock()
836
801
        if settings is None:
837
802
            self._settings = {}
838
803
        else:
839
804
            self._settings = settings
840
805
 
841
 
    def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
842
 
                 soft=False):
 
806
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
843
807
        """Compress lines with label key.
844
808
 
845
809
        :param key: A key tuple. It is stored in the output
846
810
            for identification of the text during decompression. If the last
847
 
            element is b'None' it is replaced with the sha1 of the text -
 
811
            element is 'None' it is replaced with the sha1 of the text -
848
812
            e.g. sha1:xxxxxxx.
849
 
        :param chunks: Chunks of bytes to be compressed
850
 
        :param length: Length of chunks
 
813
        :param bytes: The bytes to be compressed
851
814
        :param expected_sha: If non-None, the sha the lines are believed to
852
815
            have. During compression the sha is calculated; a mismatch will
853
816
            cause an error.
861
824
 
862
825
        :seealso VersionedFiles.add_lines:
863
826
        """
864
 
        if length == 0:  # empty, like a dir entry, etc
 
827
        if not bytes: # empty, like a dir entry, etc
865
828
            if nostore_sha == _null_sha1:
866
829
                raise errors.ExistingContent()
867
830
            return _null_sha1, 0, 0, 'fulltext'
869
832
        if expected_sha is not None:
870
833
            sha1 = expected_sha
871
834
        else:
872
 
            sha1 = osutils.sha_strings(chunks)
 
835
            sha1 = osutils.sha_string(bytes)
873
836
        if nostore_sha is not None:
874
837
            if sha1 == nostore_sha:
875
838
                raise errors.ExistingContent()
876
839
        if key[-1] is None:
877
 
            key = key[:-1] + (b'sha1:' + sha1,)
 
840
            key = key[:-1] + ('sha1:' + sha1,)
878
841
 
879
 
        start, end, type = self._compress(key, chunks, length, length / 2, soft)
 
842
        start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
880
843
        return sha1, start, end, type
881
844
 
882
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
845
    def _compress(self, key, bytes, max_delta_size, soft=False):
883
846
        """Compress lines with label key.
884
847
 
885
848
        :param key: A key tuple. It is stored in the output for identification
886
849
            of the text during decompression.
887
850
 
888
 
        :param chunks: The chunks of bytes to be compressed
889
 
 
890
 
        :param input_len: The length of the chunks
 
851
        :param bytes: The bytes to be compressed
891
852
 
892
853
        :param max_delta_size: The size above which we issue a fulltext instead
893
854
            of a delta.
904
865
        """Extract a key previously added to the compressor.
905
866
 
906
867
        :param key: The key to extract.
907
 
        :return: An iterable over chunks and the sha1.
 
868
        :return: An iterable over bytes and the sha1.
908
869
        """
909
 
        (start_byte, start_chunk, end_byte,
910
 
         end_chunk) = self.labels_deltas[key]
 
870
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
911
871
        delta_chunks = self.chunks[start_chunk:end_chunk]
912
 
        stored_bytes = b''.join(delta_chunks)
913
 
        kind = stored_bytes[:1]
914
 
        if kind == b'f':
 
872
        stored_bytes = ''.join(delta_chunks)
 
873
        if stored_bytes[0] == 'f':
915
874
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
916
875
            data_len = fulltext_len + 1 + offset
917
 
            if data_len != len(stored_bytes):
 
876
            if  data_len != len(stored_bytes):
918
877
                raise ValueError('Index claimed fulltext len, but stored bytes'
919
878
                                 ' claim %s != %s'
920
879
                                 % (len(stored_bytes), data_len))
921
 
            data = [stored_bytes[offset + 1:]]
 
880
            bytes = stored_bytes[offset + 1:]
922
881
        else:
923
 
            if kind != b'd':
924
 
                raise ValueError('Unknown content kind, bytes claim %s' % kind)
925
882
            # XXX: This is inefficient at best
926
 
            source = b''.join(self.chunks[:start_chunk])
 
883
            source = ''.join(self.chunks[:start_chunk])
 
884
            if stored_bytes[0] != 'd':
 
885
                raise ValueError('Unknown content kind, bytes claim %s'
 
886
                                 % (stored_bytes[0],))
927
887
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
928
888
            data_len = delta_len + 1 + offset
929
889
            if data_len != len(stored_bytes):
930
890
                raise ValueError('Index claimed delta len, but stored bytes'
931
891
                                 ' claim %s != %s'
932
892
                                 % (len(stored_bytes), data_len))
933
 
            data = [apply_delta(source, stored_bytes[offset + 1:])]
934
 
        data_sha1 = osutils.sha_strings(data)
935
 
        return data, data_sha1
 
893
            bytes = apply_delta(source, stored_bytes[offset + 1:])
 
894
        bytes_sha1 = osutils.sha_string(bytes)
 
895
        return bytes, bytes_sha1
936
896
 
937
897
    def flush(self):
938
898
        """Finish this group, creating a formatted stream.
972
932
        # The actual content is managed by LinesDeltaIndex
973
933
        self.chunks = self._delta_index.lines
974
934
 
975
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
935
    def _compress(self, key, bytes, max_delta_size, soft=False):
976
936
        """see _CommonGroupCompressor._compress"""
977
 
        new_lines = osutils.chunks_to_lines(chunks)
 
937
        input_len = len(bytes)
 
938
        new_lines = osutils.split_lines(bytes)
978
939
        out_lines, index_lines = self._delta_index.make_delta(
979
940
            new_lines, bytes_length=input_len, soft=soft)
980
941
        delta_length = sum(map(len, out_lines))
981
942
        if delta_length > max_delta_size:
982
943
            # The delta is longer than the fulltext, insert a fulltext
983
944
            type = 'fulltext'
984
 
            out_lines = [b'f', encode_base128_int(input_len)]
 
945
            out_lines = ['f', encode_base128_int(input_len)]
985
946
            out_lines.extend(new_lines)
986
947
            index_lines = [False, False]
987
948
            index_lines.extend([True] * len(new_lines))
988
949
        else:
989
950
            # this is a worthy delta, output it
990
951
            type = 'delta'
991
 
            out_lines[0] = b'd'
 
952
            out_lines[0] = 'd'
992
953
            # Update the delta_length to include those two encoded integers
993
954
            out_lines[1] = encode_base128_int(delta_length)
994
955
        # Before insertion
1026
987
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1027
988
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
1028
989
 
1029
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
990
    def _compress(self, key, bytes, max_delta_size, soft=False):
1030
991
        """see _CommonGroupCompressor._compress"""
 
992
        input_len = len(bytes)
1031
993
        # By having action/label/sha1/len, we can parse the group if the index
1032
994
        # was ever destroyed, we have the key in 'label', we know the final
1033
995
        # bytes are valid from sha1, and we know where to find the end of this
1039
1001
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
1040
1002
        if self._delta_index._source_offset != self.endpoint:
1041
1003
            raise AssertionError('_source_offset != endpoint'
1042
 
                                 ' somehow the DeltaIndex got out of sync with'
1043
 
                                 ' the output lines')
1044
 
        bytes = b''.join(chunks)
 
1004
                ' somehow the DeltaIndex got out of sync with'
 
1005
                ' the output lines')
1045
1006
        delta = self._delta_index.make_delta(bytes, max_delta_size)
1046
 
        if delta is None:
 
1007
        if (delta is None):
1047
1008
            type = 'fulltext'
1048
 
            enc_length = encode_base128_int(input_len)
 
1009
            enc_length = encode_base128_int(len(bytes))
1049
1010
            len_mini_header = 1 + len(enc_length)
1050
1011
            self._delta_index.add_source(bytes, len_mini_header)
1051
 
            new_chunks = [b'f', enc_length] + chunks
 
1012
            new_chunks = ['f', enc_length, bytes]
1052
1013
        else:
1053
1014
            type = 'delta'
1054
1015
            enc_length = encode_base128_int(len(delta))
1055
1016
            len_mini_header = 1 + len(enc_length)
1056
 
            new_chunks = [b'd', enc_length, delta]
 
1017
            new_chunks = ['d', enc_length, delta]
1057
1018
            self._delta_index.add_delta_source(delta, len_mini_header)
1058
1019
        # Before insertion
1059
1020
        start = self.endpoint
1066
1027
                                   self.endpoint, chunk_end)
1067
1028
        if not self._delta_index._source_offset == self.endpoint:
1068
1029
            raise AssertionError('the delta index is out of sync'
1069
 
                                 'with the output lines %s != %s'
1070
 
                                 % (self._delta_index._source_offset, self.endpoint))
 
1030
                'with the output lines %s != %s'
 
1031
                % (self._delta_index._source_offset, self.endpoint))
1071
1032
        return start, self.endpoint, type
1072
1033
 
1073
1034
    def _output_chunks(self, new_chunks):
1098
1059
        if graph:
1099
1060
            ref_length = 1
1100
1061
        graph_index = BTreeBuilder(reference_lists=ref_length,
1101
 
                                   key_elements=keylength)
 
1062
            key_elements=keylength)
1102
1063
        stream = transport.open_write_stream('newpack')
1103
1064
        writer = pack.ContainerWriter(stream.write)
1104
1065
        writer.begin()
1105
 
        index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
1106
 
                              add_callback=graph_index.add_nodes,
1107
 
                              inconsistency_fatal=inconsistency_fatal)
 
1066
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
 
1067
            add_callback=graph_index.add_nodes,
 
1068
            inconsistency_fatal=inconsistency_fatal)
1108
1069
        access = pack_repo._DirectPackAccess({})
1109
1070
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1110
1071
        result = GroupCompressVersionedFiles(index, access, delta)
1206
1167
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1207
1168
                    # The next block from _get_blocks will be the block we
1208
1169
                    # need.
1209
 
                    block_read_memo, block = next(blocks)
 
1170
                    block_read_memo, block = blocks.next()
1210
1171
                    if block_read_memo != read_memo:
1211
1172
                        raise AssertionError(
1212
1173
                            "block_read_memo out of sync with read_memo"
1216
1177
                else:
1217
1178
                    block = self.batch_memos[read_memo]
1218
1179
                self.manager = _LazyGroupContentManager(block,
1219
 
                                                        get_compressor_settings=self._get_compressor_settings)
 
1180
                    get_compressor_settings=self._get_compressor_settings)
1220
1181
                self.last_read_memo = read_memo
1221
1182
            start, end = index_memo[3:5]
1222
1183
            self.manager.add_factory(key, parents, start, end)
1242
1203
    # gives 100% sampling of a 1MB file.
1243
1204
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1244
1205
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1245
 
                                    _DEFAULT_MAX_BYTES_TO_INDEX}
 
1206
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
1246
1207
 
1247
1208
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1248
1209
                 _group_cache=None):
1261
1222
            _unadded_refs = {}
1262
1223
        self._unadded_refs = _unadded_refs
1263
1224
        if _group_cache is None:
1264
 
            _group_cache = LRUSizeCache(max_size=50 * 1024 * 1024)
 
1225
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1265
1226
        self._group_cache = _group_cache
1266
1227
        self._immediate_fallback_vfs = []
1267
1228
        self._max_bytes_to_index = None
1269
1230
    def without_fallbacks(self):
1270
1231
        """Return a clone of this object without any fallbacks configured."""
1271
1232
        return GroupCompressVersionedFiles(self._index, self._access,
1272
 
                                           self._delta, _unadded_refs=dict(
1273
 
                                               self._unadded_refs),
1274
 
                                           _group_cache=self._group_cache)
 
1233
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1234
            _group_cache=self._group_cache)
1275
1235
 
1276
1236
    def add_lines(self, key, parents, lines, parent_texts=None,
1277
 
                  left_matching_blocks=None, nostore_sha=None, random_id=False,
1278
 
                  check_content=True):
 
1237
        left_matching_blocks=None, nostore_sha=None, random_id=False,
 
1238
        check_content=True):
1279
1239
        """Add a text to the store.
1280
1240
 
1281
1241
        :param key: The key tuple of the text to add.
1310
1270
                 back to future add_lines calls in the parent_texts dictionary.
1311
1271
        """
1312
1272
        self._index._check_write_ok()
1313
 
        if check_content:
1314
 
            self._check_lines_not_unicode(lines)
1315
 
            self._check_lines_are_lines(lines)
1316
 
        return self.add_content(
1317
 
            ChunkedContentFactory(
1318
 
                key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
1319
 
            parent_texts, left_matching_blocks, nostore_sha, random_id)
1320
 
 
1321
 
    def add_content(self, factory, parent_texts=None,
1322
 
                    left_matching_blocks=None, nostore_sha=None,
1323
 
                    random_id=False):
1324
 
        """Add a text to the store.
1325
 
 
1326
 
        :param factory: A ContentFactory that can be used to retrieve the key,
1327
 
            parents and contents.
1328
 
        :param parent_texts: An optional dictionary containing the opaque
1329
 
            representations of some or all of the parents of version_id to
1330
 
            allow delta optimisations.  VERY IMPORTANT: the texts must be those
1331
 
            returned by add_lines or data corruption can be caused.
1332
 
        :param left_matching_blocks: a hint about which areas are common
1333
 
            between the text and its left-hand-parent.  The format is
1334
 
            the SequenceMatcher.get_matching_blocks format.
1335
 
        :param nostore_sha: Raise ExistingContent and do not add the lines to
1336
 
            the versioned file if the digest of the lines matches this.
1337
 
        :param random_id: If True a random id has been selected rather than
1338
 
            an id determined by some deterministic process such as a converter
1339
 
            from a foreign VCS. When True the backend may choose not to check
1340
 
            for uniqueness of the resulting key within the versioned file, so
1341
 
            this should only be done when the result is expected to be unique
1342
 
            anyway.
1343
 
        :return: The text sha1, the number of bytes in the text, and an opaque
1344
 
                 representation of the inserted version which can be provided
1345
 
                 back to future add_lines calls in the parent_texts dictionary.
1346
 
        """
 
1273
        self._check_add(key, lines, random_id, check_content)
 
1274
        if parents is None:
 
1275
            # The caller might pass None if there is no graph data, but kndx
 
1276
            # indexes can't directly store that, so we give them
 
1277
            # an empty tuple instead.
 
1278
            parents = ()
 
1279
        # double handling for now. Make it work until then.
 
1280
        length = sum(map(len, lines))
 
1281
        record = ChunkedContentFactory(key, parents, None, lines)
 
1282
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
 
1283
                                               nostore_sha=nostore_sha))[0]
 
1284
        return sha1, length, None
 
1285
 
 
1286
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
1287
        """See VersionedFiles._add_text()."""
1347
1288
        self._index._check_write_ok()
1348
 
        parents = factory.parents
1349
 
        self._check_add(factory.key, random_id)
 
1289
        self._check_add(key, None, random_id, check_content=False)
 
1290
        if text.__class__ is not str:
 
1291
            raise errors.BzrBadParameterUnicode("text")
1350
1292
        if parents is None:
1351
1293
            # The caller might pass None if there is no graph data, but kndx
1352
1294
            # indexes can't directly store that, so we give them
1353
1295
            # an empty tuple instead.
1354
1296
            parents = ()
1355
1297
        # double handling for now. Make it work until then.
1356
 
        sha1, length = list(self._insert_record_stream(
1357
 
            [factory], random_id=random_id, nostore_sha=nostore_sha))[0]
 
1298
        length = len(text)
 
1299
        record = FulltextContentFactory(key, parents, None, text)
 
1300
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
 
1301
                                               nostore_sha=nostore_sha))[0]
1358
1302
        return sha1, length, None
1359
1303
 
1360
1304
    def add_fallback_versioned_files(self, a_versioned_files):
1377
1321
        if keys is None:
1378
1322
            keys = self.keys()
1379
1323
            for record in self.get_record_stream(keys, 'unordered', True):
1380
 
                for chunk in record.iter_bytes_as('chunked'):
1381
 
                    pass
 
1324
                record.get_bytes_as('fulltext')
1382
1325
        else:
1383
1326
            return self.get_record_stream(keys, 'unordered', True)
1384
1327
 
1388
1331
        self._index._graph_index.clear_cache()
1389
1332
        self._index._int_cache.clear()
1390
1333
 
1391
 
    def _check_add(self, key, random_id):
 
1334
    def _check_add(self, key, lines, random_id, check_content):
1392
1335
        """check that version_id and lines are safe to add."""
1393
1336
        version_id = key[-1]
1394
1337
        if version_id is not None:
1399
1342
        # probably check that the existing content is identical to what is
1400
1343
        # being inserted, and otherwise raise an exception.  This would make
1401
1344
        # the bundle code simpler.
 
1345
        if check_content:
 
1346
            self._check_lines_not_unicode(lines)
 
1347
            self._check_lines_are_lines(lines)
1402
1348
 
1403
1349
    def get_parent_map(self, keys):
1404
1350
        """Get a map of the graph parents of keys.
1463
1409
                yield read_memo, cached[read_memo]
1464
1410
            except KeyError:
1465
1411
                # Read the block, and cache it.
1466
 
                zdata = next(raw_records)
 
1412
                zdata = raw_records.next()
1467
1413
                block = GroupCompressBlock.from_bytes(zdata)
1468
1414
                self._group_cache[read_memo] = block
1469
1415
                cached[read_memo] = block
1497
1443
        if not keys:
1498
1444
            return
1499
1445
        if (not self._index.has_graph
1500
 
                and ordering in ('topological', 'groupcompress')):
 
1446
            and ordering in ('topological', 'groupcompress')):
1501
1447
            # Cannot topological order when no graph has been stored.
1502
1448
            # but we allow 'as-requested' or 'unordered'
1503
1449
            ordering = 'unordered'
1507
1453
            try:
1508
1454
                keys = set(remaining_keys)
1509
1455
                for content_factory in self._get_remaining_record_stream(keys,
1510
 
                                                                         orig_keys, ordering, include_delta_closure):
 
1456
                        orig_keys, ordering, include_delta_closure):
1511
1457
                    remaining_keys.discard(content_factory.key)
1512
1458
                    yield content_factory
1513
1459
                return
1514
 
            except errors.RetryWithNewPacks as e:
 
1460
            except errors.RetryWithNewPacks, e:
1515
1461
                self._access.reload_or_raise(e)
1516
1462
 
1517
1463
    def _find_from_fallback(self, missing):
1577
1523
                source = self
1578
1524
            elif key in key_to_source_map:
1579
1525
                source = key_to_source_map[key]
1580
 
            else:  # absent
 
1526
            else: # absent
1581
1527
                continue
1582
1528
            if source is not current_source:
1583
1529
                source_keys.append((source, []))
1591
1537
            # This is the group the bytes are stored in, followed by the
1592
1538
            # location in the group
1593
1539
            return locations[key][0]
 
1540
        present_keys = sorted(locations.iterkeys(), key=get_group)
1594
1541
        # We don't have an ordering for keys in the in-memory object, but
1595
1542
        # lets process the in-memory ones first.
1596
 
        present_keys = list(unadded_keys)
1597
 
        present_keys.extend(sorted(locations, key=get_group))
 
1543
        present_keys = list(unadded_keys) + present_keys
1598
1544
        # Now grab all of the ones from other sources
1599
1545
        source_keys = [(self, present_keys)]
1600
1546
        source_keys.extend(source_result)
1624
1570
            # start with one key, recurse to its oldest parent, then grab
1625
1571
            # everything in the same group, etc.
1626
1572
            parent_map = dict((key, details[2]) for key, details in
1627
 
                              locations.items())
 
1573
                locations.iteritems())
1628
1574
            for key in unadded_keys:
1629
1575
                parent_map[key] = self._unadded_refs[key]
1630
1576
            parent_map.update(fallback_parent_map)
1632
1578
                                                        key_to_source_map)
1633
1579
        elif ordering == 'as-requested':
1634
1580
            source_keys = self._get_as_requested_source_keys(orig_keys,
1635
 
                                                             locations, unadded_keys, key_to_source_map)
 
1581
                locations, unadded_keys, key_to_source_map)
1636
1582
        else:
1637
1583
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
1638
1584
            # Otherwise we thrash the _group_cache and destroy performance
1639
1585
            source_keys = self._get_io_ordered_source_keys(locations,
1640
 
                                                           unadded_keys, source_result)
 
1586
                unadded_keys, source_result)
1641
1587
        for key in missing:
1642
1588
            yield AbsentContentFactory(key)
1643
1589
        # Batch up as many keys as we can until either:
1645
1591
        #  - we run out of keys, or
1646
1592
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1647
1593
        batcher = _BatchingBlockFetcher(self, locations,
1648
 
                                        get_compressor_settings=self._get_compressor_settings)
 
1594
            get_compressor_settings=self._get_compressor_settings)
1649
1595
        for source, keys in source_keys:
1650
1596
            if source is self:
1651
1597
                for key in keys:
1654
1600
                        # self._compressor.
1655
1601
                        for factory in batcher.yield_factories(full_flush=True):
1656
1602
                            yield factory
1657
 
                        chunks, sha1 = self._compressor.extract(key)
 
1603
                        bytes, sha1 = self._compressor.extract(key)
1658
1604
                        parents = self._unadded_refs[key]
1659
 
                        yield ChunkedContentFactory(key, parents, sha1, chunks)
 
1605
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1660
1606
                        continue
1661
1607
                    if batcher.add_key(key) > BATCH_SIZE:
1662
1608
                        # Ok, this batch is big enough.  Yield some results.
1675
1621
        """See VersionedFiles.get_sha1s()."""
1676
1622
        result = {}
1677
1623
        for record in self.get_record_stream(keys, 'unordered', True):
1678
 
            if record.sha1 is not None:
 
1624
            if record.sha1 != None:
1679
1625
                result[record.key] = record.sha1
1680
1626
            else:
1681
1627
                if record.storage_kind != 'absent':
1682
 
                    result[record.key] = osutils.sha_strings(
1683
 
                        record.iter_bytes_as('chunked'))
 
1628
                    result[record.key] = osutils.sha_string(
 
1629
                        record.get_bytes_as('fulltext'))
1684
1630
        return result
1685
1631
 
1686
1632
    def insert_record_stream(self, stream):
1694
1640
        # test_insert_record_stream_existing_keys fail for groupcompress and
1695
1641
        # groupcompress-nograph, this needs to be revisited while addressing
1696
1642
        # 'bzr branch' performance issues.
1697
 
        for _, _ in self._insert_record_stream(stream, random_id=False):
 
1643
        for _ in self._insert_record_stream(stream, random_id=False):
1698
1644
            pass
1699
1645
 
1700
1646
    def _get_compressor_settings(self):
1707
1653
            if val is not None:
1708
1654
                try:
1709
1655
                    val = int(val)
1710
 
                except ValueError as e:
 
1656
                except ValueError, e:
1711
1657
                    trace.warning('Value for '
1712
1658
                                  '"bzr.groupcompress.max_bytes_to_index"'
1713
1659
                                  ' %r is not an integer'
1734
1680
        :param reuse_blocks: If the source is streaming from
1735
1681
            groupcompress-blocks, just insert the blocks as-is, rather than
1736
1682
            expanding the texts and inserting again.
1737
 
        :return: An iterator over (sha1, length) of the inserted records.
 
1683
        :return: An iterator over the sha1 of the inserted records.
1738
1684
        :seealso insert_record_stream:
1739
1685
        :seealso add_lines:
1740
1686
        """
1741
1687
        adapters = {}
1742
 
 
1743
1688
        def get_adapter(adapter_key):
1744
1689
            try:
1745
1690
                return adapters[adapter_key]
1753
1698
        self._compressor = self._make_group_compressor()
1754
1699
        self._unadded_refs = {}
1755
1700
        keys_to_add = []
1756
 
 
1757
1701
        def flush():
1758
1702
            bytes_len, chunks = self._compressor.flush().to_chunks()
1759
1703
            self._compressor = self._make_group_compressor()
1760
1704
            # Note: At this point we still have 1 copy of the fulltext (in
1761
1705
            #       record and the var 'bytes'), and this generates 2 copies of
1762
1706
            #       the compressed text (one for bytes, one in chunks)
 
1707
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1708
            #       have to double compressed memory here
1763
1709
            # TODO: Figure out how to indicate that we would be happy to free
1764
1710
            #       the fulltext content at this point. Note that sometimes we
1765
1711
            #       will want it later (streaming CHK pages), but most of the
1766
1712
            #       time we won't (everything else)
1767
 
            index, start, length = self._access.add_raw_record(
1768
 
                None, bytes_len, chunks)
 
1713
            bytes = ''.join(chunks)
 
1714
            del chunks
 
1715
            index, start, length = self._access.add_raw_records(
 
1716
                [(None, len(bytes))], bytes)[0]
1769
1717
            nodes = []
1770
1718
            for key, reads, refs in keys_to_add:
1771
 
                nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
 
1719
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1772
1720
            self._index.add_records(nodes, random_id=random_id)
1773
1721
            self._unadded_refs = {}
1774
1722
            del keys_to_add[:]
1789
1737
            if random_id:
1790
1738
                if record.key in inserted_keys:
1791
1739
                    trace.note(gettext('Insert claimed random_id=True,'
1792
 
                                       ' but then inserted %r two times'), record.key)
 
1740
                               ' but then inserted %r two times'), record.key)
1793
1741
                    continue
1794
1742
                inserted_keys.add(record.key)
1795
1743
            if reuse_blocks:
1809
1757
                if record.storage_kind == 'groupcompress-block':
1810
1758
                    # Insert the raw block into the target repo
1811
1759
                    insert_manager = record._manager
1812
 
                    bytes_len, chunks = record._manager._block.to_chunks()
1813
 
                    _, start, length = self._access.add_raw_record(
1814
 
                        None, bytes_len, chunks)
 
1760
                    bytes = record._manager._block.to_bytes()
 
1761
                    _, start, length = self._access.add_raw_records(
 
1762
                        [(None, len(bytes))], bytes)[0]
 
1763
                    del bytes
1815
1764
                    block_start = start
1816
1765
                    block_length = length
1817
1766
                if record.storage_kind in ('groupcompress-block',
1820
1769
                        raise AssertionError('No insert_manager set')
1821
1770
                    if insert_manager is not record._manager:
1822
1771
                        raise AssertionError('insert_manager does not match'
1823
 
                                             ' the current record, we cannot be positive'
1824
 
                                             ' that the appropriate content was inserted.'
1825
 
                                             )
1826
 
                    value = b"%d %d %d %d" % (block_start, block_length,
1827
 
                                              record._start, record._end)
 
1772
                            ' the current record, we cannot be positive'
 
1773
                            ' that the appropriate content was inserted.'
 
1774
                            )
 
1775
                    value = "%d %d %d %d" % (block_start, block_length,
 
1776
                                             record._start, record._end)
1828
1777
                    nodes = [(record.key, value, (record.parents,))]
1829
1778
                    # TODO: Consider buffering up many nodes to be added, not
1830
1779
                    #       sure how much overhead this has, but we're seeing
1832
1781
                    self._index.add_records(nodes, random_id=random_id)
1833
1782
                    continue
1834
1783
            try:
1835
 
                chunks = record.get_bytes_as('chunked')
 
1784
                bytes = record.get_bytes_as('fulltext')
1836
1785
            except errors.UnavailableRepresentation:
1837
 
                adapter_key = record.storage_kind, 'chunked'
 
1786
                adapter_key = record.storage_kind, 'fulltext'
1838
1787
                adapter = get_adapter(adapter_key)
1839
 
                chunks = adapter.get_bytes(record, 'chunked')
1840
 
            chunks_len = record.size
1841
 
            if chunks_len is None:
1842
 
                chunks_len = sum(map(len, chunks))
 
1788
                bytes = adapter.get_bytes(record)
1843
1789
            if len(record.key) > 1:
1844
1790
                prefix = record.key[0]
1845
1791
                soft = (prefix == last_prefix)
1846
1792
            else:
1847
1793
                prefix = None
1848
1794
                soft = False
1849
 
            if max_fulltext_len < chunks_len:
1850
 
                max_fulltext_len = chunks_len
 
1795
            if max_fulltext_len < len(bytes):
 
1796
                max_fulltext_len = len(bytes)
1851
1797
                max_fulltext_prefix = prefix
1852
1798
            (found_sha1, start_point, end_point,
1853
 
             type) = self._compressor.compress(
1854
 
                 record.key, chunks, chunks_len, record.sha1, soft=soft,
1855
 
                 nostore_sha=nostore_sha)
1856
 
            # delta_ratio = float(chunks_len) / (end_point - start_point)
 
1799
             type) = self._compressor.compress(record.key,
 
1800
                                               bytes, record.sha1, soft=soft,
 
1801
                                               nostore_sha=nostore_sha)
 
1802
            # delta_ratio = float(len(bytes)) / (end_point - start_point)
1857
1803
            # Check if we want to continue to include that text
1858
1804
            if (prefix == max_fulltext_prefix
1859
 
                    and end_point < 2 * max_fulltext_len):
 
1805
                and end_point < 2 * max_fulltext_len):
1860
1806
                # As long as we are on the same file_id, we will fill at least
1861
1807
                # 2 * max_fulltext_len
1862
1808
                start_new_block = False
1863
 
            elif end_point > 4 * 1024 * 1024:
 
1809
            elif end_point > 4*1024*1024:
1864
1810
                start_new_block = True
1865
1811
            elif (prefix is not None and prefix != last_prefix
1866
 
                  and end_point > 2 * 1024 * 1024):
 
1812
                  and end_point > 2*1024*1024):
1867
1813
                start_new_block = True
1868
1814
            else:
1869
1815
                start_new_block = False
1871
1817
            if start_new_block:
1872
1818
                self._compressor.pop_last()
1873
1819
                flush()
1874
 
                max_fulltext_len = chunks_len
 
1820
                max_fulltext_len = len(bytes)
1875
1821
                (found_sha1, start_point, end_point,
1876
 
                 type) = self._compressor.compress(
1877
 
                     record.key, chunks, chunks_len, record.sha1)
 
1822
                 type) = self._compressor.compress(record.key, bytes,
 
1823
                                                   record.sha1)
1878
1824
            if record.key[-1] is None:
1879
 
                key = record.key[:-1] + (b'sha1:' + found_sha1,)
 
1825
                key = record.key[:-1] + ('sha1:' + found_sha1,)
1880
1826
            else:
1881
1827
                key = record.key
1882
1828
            self._unadded_refs[key] = record.parents
1883
 
            yield found_sha1, chunks_len
 
1829
            yield found_sha1
1884
1830
            as_st = static_tuple.StaticTuple.from_sequence
1885
1831
            if record.parents is not None:
1886
1832
                parents = as_st([as_st(p) for p in record.parents])
1887
1833
            else:
1888
1834
                parents = None
1889
1835
            refs = static_tuple.StaticTuple(parents)
1890
 
            keys_to_add.append(
1891
 
                (key, b'%d %d' % (start_point, end_point), refs))
 
1836
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1892
1837
        if len(keys_to_add):
1893
1838
            flush()
1894
1839
        self._compressor = None
1920
1865
        # but we need to setup a list of records to visit.
1921
1866
        # we need key, position, length
1922
1867
        for key_idx, record in enumerate(self.get_record_stream(keys,
1923
 
                                                                'unordered', True)):
 
1868
            'unordered', True)):
1924
1869
            # XXX: todo - optimise to use less than full texts.
1925
1870
            key = record.key
1926
1871
            if pb is not None:
1927
1872
                pb.update('Walking content', key_idx, total)
1928
1873
            if record.storage_kind == 'absent':
1929
1874
                raise errors.RevisionNotPresent(key, self)
1930
 
            for line in record.iter_bytes_as('lines'):
 
1875
            lines = osutils.split_lines(record.get_bytes_as('fulltext'))
 
1876
            for line in lines:
1931
1877
                yield line, key
1932
1878
        if pb is not None:
1933
1879
            pb.update('Walking content', total, total)
1963
1909
 
1964
1910
    def __repr__(self):
1965
1911
        return '%s(%s, %s)' % (self.__class__.__name__,
1966
 
                               self.index_memo, self._parents)
 
1912
            self.index_memo, self._parents)
1967
1913
 
1968
1914
    @property
1969
1915
    def index_memo(self):
1979
1925
        if offset == 0:
1980
1926
            return self.index_memo
1981
1927
        elif offset == 1:
1982
 
            return self.compression_parent  # Always None
 
1928
            return self.compression_parent # Always None
1983
1929
        elif offset == 2:
1984
1930
            return self._parents
1985
1931
        elif offset == 3:
1986
1932
            return self.record_details
1987
1933
        else:
1988
1934
            raise IndexError('offset out of range')
1989
 
 
 
1935
            
1990
1936
    def __len__(self):
1991
1937
        return 4
1992
1938
 
1995
1941
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1996
1942
 
1997
1943
    def __init__(self, graph_index, is_locked, parents=True,
1998
 
                 add_callback=None, track_external_parent_refs=False,
1999
 
                 inconsistency_fatal=True, track_new_keys=False):
 
1944
        add_callback=None, track_external_parent_refs=False,
 
1945
        inconsistency_fatal=True, track_new_keys=False):
2000
1946
        """Construct a _GCGraphIndex on a graph_index.
2001
1947
 
2002
 
        :param graph_index: An implementation of breezy.index.GraphIndex.
 
1948
        :param graph_index: An implementation of brzlib.index.GraphIndex.
2003
1949
        :param is_locked: A callback, returns True if the index is locked and
2004
1950
            thus usable.
2005
1951
        :param parents: If True, record knits parents, if not do not record
2055
2001
                if refs:
2056
2002
                    for ref in refs:
2057
2003
                        if ref:
2058
 
                            raise knit.KnitCorrupt(self,
2059
 
                                                   "attempt to add node with parents "
2060
 
                                                   "in parentless index.")
 
2004
                            raise errors.KnitCorrupt(self,
 
2005
                                "attempt to add node with parents "
 
2006
                                "in parentless index.")
2061
2007
                    refs = ()
2062
2008
                    changed = True
2063
2009
            keys[key] = (value, refs)
2071
2017
                if node_refs != passed[1]:
2072
2018
                    details = '%s %s %s' % (key, (value, node_refs), passed)
2073
2019
                    if self._inconsistency_fatal:
2074
 
                        raise knit.KnitCorrupt(self, "inconsistent details"
2075
 
                                               " in add_records: %s" %
2076
 
                                               details)
 
2020
                        raise errors.KnitCorrupt(self, "inconsistent details"
 
2021
                                                 " in add_records: %s" %
 
2022
                                                 details)
2077
2023
                    else:
2078
2024
                        trace.warning("inconsistent details in skipped"
2079
2025
                                      " record: %s", details)
2082
2028
        if changed:
2083
2029
            result = []
2084
2030
            if self._parents:
2085
 
                for key, (value, node_refs) in keys.items():
 
2031
                for key, (value, node_refs) in keys.iteritems():
2086
2032
                    result.append((key, value, node_refs))
2087
2033
            else:
2088
 
                for key, (value, node_refs) in keys.items():
 
2034
                for key, (value, node_refs) in keys.iteritems():
2089
2035
                    result.append((key, value))
2090
2036
            records = result
2091
2037
        key_dependencies = self._key_dependencies
2203
2149
 
2204
2150
    def _node_to_position(self, node):
2205
2151
        """Convert an index value to position details."""
2206
 
        bits = node[2].split(b' ')
 
2152
        bits = node[2].split(' ')
2207
2153
        # It would be nice not to read the entire gzip.
2208
2154
        # start and stop are put into _int_cache because they are very common.
2209
2155
        # They define the 'group' that an entry is in, and many groups can have
2242
2188
            key_dependencies.add_references(node[1], node[3][0])
2243
2189
 
2244
2190
 
2245
 
from ._groupcompress_py import (
 
2191
from brzlib._groupcompress_py import (
2246
2192
    apply_delta,
2247
2193
    apply_delta_to_source,
2248
2194
    encode_base128_int,
2251
2197
    LinesDeltaIndex,
2252
2198
    )
2253
2199
try:
2254
 
    from ._groupcompress_pyx import (
 
2200
    from brzlib._groupcompress_pyx import (
2255
2201
        apply_delta,
2256
2202
        apply_delta_to_source,
2257
2203
        DeltaIndex,
2259
2205
        decode_base128_int,
2260
2206
        )
2261
2207
    GroupCompressor = PyrexGroupCompressor
2262
 
except ImportError as e:
 
2208
except ImportError, e:
2263
2209
    osutils.failed_to_load_extension(e)
2264
2210
    GroupCompressor = PythonGroupCompressor
 
2211