/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Richard Wilbur
  • Date: 2016-02-04 19:07:28 UTC
  • mto: This revision was merged to the branch mainline in revision 6618.
  • Revision ID: richard.wilbur@gmail.com-20160204190728-p0zvfii6zase0fw7
Update COPYING.txt from the original http://www.gnu.org/licenses/gpl-2.0.txt  (Only differences were in whitespace.)  Thanks to Petr Stodulka for pointing out the discrepancy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
import zlib
21
23
 
22
 
from ..lazy_import import lazy_import
 
24
from bzrlib.lazy_import import lazy_import
23
25
lazy_import(globals(), """
24
 
from breezy import (
 
26
from bzrlib import (
25
27
    annotate,
26
28
    config,
27
29
    debug,
 
30
    errors,
 
31
    graph as _mod_graph,
28
32
    osutils,
 
33
    pack,
29
34
    static_tuple,
30
35
    trace,
31
36
    tsort,
32
37
    )
33
 
from breezy.bzr import (
34
 
    knit,
35
 
    pack,
36
 
    pack_repo,
37
 
    )
38
38
 
39
 
from breezy.i18n import gettext
 
39
from bzrlib.repofmt import pack_repo
 
40
from bzrlib.i18n import gettext
40
41
""")
41
42
 
42
 
from .. import (
43
 
    errors,
44
 
    )
45
 
from .btree_index import BTreeBuilder
46
 
from ..lru_cache import LRUSizeCache
47
 
from .versionedfile import (
 
43
from bzrlib.btree_index import BTreeBuilder
 
44
from bzrlib.lru_cache import LRUSizeCache
 
45
from bzrlib.versionedfile import (
48
46
    _KeyRefs,
49
47
    adapter_registry,
50
48
    AbsentContentFactory,
51
49
    ChunkedContentFactory,
52
 
    ExistingContent,
53
50
    FulltextContentFactory,
54
51
    VersionedFilesWithFallbacks,
55
 
    UnavailableRepresentation,
56
52
    )
57
53
 
58
54
# Minimum number of uncompressed bytes to try fetch at once when retrieving
59
55
# groupcompress blocks.
60
56
BATCH_SIZE = 2**16
61
57
 
62
 
# osutils.sha_string(b'')
63
 
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
64
 
 
 
58
# osutils.sha_string('')
 
59
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
65
60
 
66
61
def sort_gc_optimal(parent_map):
67
62
    """Sort and group the keys in parent_map into groupcompress order.
74
69
    # groupcompress ordering is approximately reverse topological,
75
70
    # properly grouped by file-id.
76
71
    per_prefix_map = {}
77
 
    for key, value in parent_map.items():
78
 
        if isinstance(key, bytes) or len(key) == 1:
79
 
            prefix = b''
 
72
    for key, value in parent_map.iteritems():
 
73
        if isinstance(key, str) or len(key) == 1:
 
74
            prefix = ''
80
75
        else:
81
76
            prefix = key[0]
82
77
        try:
90
85
    return present_keys
91
86
 
92
87
 
93
 
class DecompressCorruption(errors.BzrError):
94
 
 
95
 
    _fmt = "Corruption while decompressing repository file%(orig_error)s"
96
 
 
97
 
    def __init__(self, orig_error=None):
98
 
        if orig_error is not None:
99
 
            self.orig_error = ", %s" % (orig_error,)
100
 
        else:
101
 
            self.orig_error = ""
102
 
        errors.BzrError.__init__(self)
103
 
 
104
 
 
105
88
# The max zlib window size is 32kB, so if we set 'max_size' output of the
106
89
# decompressor to the requested bytes + 32kB, then we should guarantee
107
90
# num_bytes coming out.
108
 
_ZLIB_DECOMP_WINDOW = 32 * 1024
109
 
 
 
91
_ZLIB_DECOMP_WINDOW = 32*1024
110
92
 
111
93
class GroupCompressBlock(object):
112
94
    """An object which maintains the internal structure of the compressed data.
115
97
    """
116
98
 
117
99
    # Group Compress Block v1 Zlib
118
 
    GCB_HEADER = b'gcb1z\n'
 
100
    GCB_HEADER = 'gcb1z\n'
119
101
    # Group Compress Block v1 Lzma
120
 
    GCB_LZ_HEADER = b'gcb1l\n'
 
102
    GCB_LZ_HEADER = 'gcb1l\n'
121
103
    GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
122
104
 
123
105
    def __init__(self):
154
136
        # Expand the content if required
155
137
        if self._content is None:
156
138
            if self._content_chunks is not None:
157
 
                self._content = b''.join(self._content_chunks)
 
139
                self._content = ''.join(self._content_chunks)
158
140
                self._content_chunks = None
159
141
        if self._content is None:
160
142
            # We join self._z_content_chunks here, because if we are
162
144
            # chunk
163
145
            if self._z_content_chunks is None:
164
146
                raise AssertionError('No content to decompress')
165
 
            z_content = b''.join(self._z_content_chunks)
166
 
            if z_content == b'':
167
 
                self._content = b''
 
147
            z_content = ''.join(self._z_content_chunks)
 
148
            if z_content == '':
 
149
                self._content = ''
168
150
            elif self._compressor_name == 'lzma':
169
151
                # We don't do partial lzma decomp yet
170
152
                import pylzma
214
196
            # The stream is finished
215
197
            self._z_content_decompressor = None
216
198
 
217
 
    def _parse_bytes(self, data, pos):
 
199
    def _parse_bytes(self, bytes, pos):
218
200
        """Read the various lengths from the header.
219
201
 
220
202
        This also populates the various 'compressed' buffers.
224
206
        # At present, we have 2 integers for the compressed and uncompressed
225
207
        # content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
226
208
        # checking too far, cap the search to 14 bytes.
227
 
        pos2 = data.index(b'\n', pos, pos + 14)
228
 
        self._z_content_length = int(data[pos:pos2])
229
 
        pos = pos2 + 1
230
 
        pos2 = data.index(b'\n', pos, pos + 14)
231
 
        self._content_length = int(data[pos:pos2])
232
 
        pos = pos2 + 1
233
 
        if len(data) != (pos + self._z_content_length):
 
209
        pos2 = bytes.index('\n', pos, pos + 14)
 
210
        self._z_content_length = int(bytes[pos:pos2])
 
211
        pos = pos2 + 1
 
212
        pos2 = bytes.index('\n', pos, pos + 14)
 
213
        self._content_length = int(bytes[pos:pos2])
 
214
        pos = pos2 + 1
 
215
        if len(bytes) != (pos + self._z_content_length):
234
216
            # XXX: Define some GCCorrupt error ?
235
217
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
236
 
                                 (len(data), pos, self._z_content_length))
237
 
        self._z_content_chunks = (data[pos:],)
 
218
                                 (len(bytes), pos, self._z_content_length))
 
219
        self._z_content_chunks = (bytes[pos:],)
238
220
 
239
221
    @property
240
222
    def _z_content(self):
243
225
        Meant only to be used by the test suite.
244
226
        """
245
227
        if self._z_content_chunks is not None:
246
 
            return b''.join(self._z_content_chunks)
 
228
            return ''.join(self._z_content_chunks)
247
229
        return None
248
230
 
249
231
    @classmethod
250
232
    def from_bytes(cls, bytes):
251
233
        out = cls()
252
 
        header = bytes[:6]
253
 
        if header not in cls.GCB_KNOWN_HEADERS:
 
234
        if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
254
235
            raise ValueError('bytes did not start with any of %r'
255
236
                             % (cls.GCB_KNOWN_HEADERS,))
256
 
        if header == cls.GCB_HEADER:
 
237
        # XXX: why not testing the whole header ?
 
238
        if bytes[4] == 'z':
257
239
            out._compressor_name = 'zlib'
258
 
        elif header == cls.GCB_LZ_HEADER:
 
240
        elif bytes[4] == 'l':
259
241
            out._compressor_name = 'lzma'
260
242
        else:
261
 
            raise ValueError('unknown compressor: %r' % (header,))
 
243
            raise ValueError('unknown compressor: %r' % (bytes,))
262
244
        out._parse_bytes(bytes, 6)
263
245
        return out
264
246
 
270
252
        :return: The bytes for the content
271
253
        """
272
254
        if start == end == 0:
273
 
            return []
 
255
            return ''
274
256
        self._ensure_content(end)
275
257
        # The bytes are 'f' or 'd' for the type, then a variable-length
276
258
        # base128 integer for the content size, then the actual content
277
259
        # We know that the variable-length integer won't be longer than 5
278
260
        # bytes (it takes 5 bytes to encode 2^32)
279
 
        c = self._content[start:start + 1]
280
 
        if c == b'f':
 
261
        c = self._content[start]
 
262
        if c == 'f':
281
263
            type = 'fulltext'
282
264
        else:
283
 
            if c != b'd':
 
265
            if c != 'd':
284
266
                raise ValueError('Unknown content control code: %s'
285
267
                                 % (c,))
286
268
            type = 'delta'
287
269
        content_len, len_len = decode_base128_int(
288
 
            self._content[start + 1:start + 6])
 
270
                            self._content[start + 1:start + 6])
289
271
        content_start = start + 1 + len_len
290
272
        if end != content_start + content_len:
291
273
            raise ValueError('end != len according to field header'
292
 
                             ' %s != %s' % (end, content_start + content_len))
293
 
        if c == b'f':
294
 
            return [self._content[content_start:end]]
295
 
        # Must be type delta as checked above
296
 
        return [apply_delta_to_source(self._content, content_start, end)]
 
274
                ' %s != %s' % (end, content_start + content_len))
 
275
        if c == 'f':
 
276
            bytes = self._content[content_start:end]
 
277
        elif c == 'd':
 
278
            bytes = apply_delta_to_source(self._content, content_start, end)
 
279
        return bytes
297
280
 
298
281
    def set_chunked_content(self, content_chunks, length):
299
282
        """Set the content of this block to the given chunks."""
317
300
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
318
301
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
319
302
        # (measured peak is maybe 30MB over the above...)
320
 
        compressed_chunks = list(map(compressor.compress, chunks))
 
303
        compressed_chunks = map(compressor.compress, chunks)
321
304
        compressed_chunks.append(compressor.flush())
322
305
        # Ignore empty chunks
323
306
        self._z_content_chunks = [c for c in compressed_chunks if c]
336
319
        """Create the byte stream as a series of 'chunks'"""
337
320
        self._create_z_content()
338
321
        header = self.GCB_HEADER
339
 
        chunks = [b'%s%d\n%d\n'
 
322
        chunks = ['%s%d\n%d\n'
340
323
                  % (header, self._z_content_length, self._content_length),
341
 
                  ]
 
324
                 ]
342
325
        chunks.extend(self._z_content_chunks)
343
326
        total_len = sum(map(len, chunks))
344
327
        return total_len, chunks
346
329
    def to_bytes(self):
347
330
        """Encode the information into a byte stream."""
348
331
        total_len, chunks = self.to_chunks()
349
 
        return b''.join(chunks)
 
332
        return ''.join(chunks)
350
333
 
351
334
    def _dump(self, include_text=False):
352
335
        """Take this block, and spit out a human-readable structure.
362
345
        result = []
363
346
        pos = 0
364
347
        while pos < self._content_length:
365
 
            kind = self._content[pos:pos + 1]
 
348
            kind = self._content[pos]
366
349
            pos += 1
367
 
            if kind not in (b'f', b'd'):
 
350
            if kind not in ('f', 'd'):
368
351
                raise ValueError('invalid kind character: %r' % (kind,))
369
352
            content_len, len_len = decode_base128_int(
370
 
                self._content[pos:pos + 5])
 
353
                                self._content[pos:pos + 5])
371
354
            pos += len_len
372
355
            if content_len + pos > self._content_length:
373
356
                raise ValueError('invalid content_len %d for record @ pos %d'
374
357
                                 % (content_len, pos - len_len - 1))
375
 
            if kind == b'f':  # Fulltext
 
358
            if kind == 'f': # Fulltext
376
359
                if include_text:
377
 
                    text = self._content[pos:pos + content_len]
378
 
                    result.append((b'f', content_len, text))
 
360
                    text = self._content[pos:pos+content_len]
 
361
                    result.append(('f', content_len, text))
379
362
                else:
380
 
                    result.append((b'f', content_len))
381
 
            elif kind == b'd':  # Delta
382
 
                delta_content = self._content[pos:pos + content_len]
 
363
                    result.append(('f', content_len))
 
364
            elif kind == 'd': # Delta
 
365
                delta_content = self._content[pos:pos+content_len]
383
366
                delta_info = []
384
367
                # The first entry in a delta is the decompressed length
385
368
                decomp_len, delta_pos = decode_base128_int(delta_content)
386
 
                result.append((b'd', content_len, decomp_len, delta_info))
 
369
                result.append(('d', content_len, decomp_len, delta_info))
387
370
                measured_len = 0
388
371
                while delta_pos < content_len:
389
 
                    c = delta_content[delta_pos]
 
372
                    c = ord(delta_content[delta_pos])
390
373
                    delta_pos += 1
391
 
                    if c & 0x80:  # Copy
 
374
                    if c & 0x80: # Copy
392
375
                        (offset, length,
393
376
                         delta_pos) = decode_copy_instruction(delta_content, c,
394
377
                                                              delta_pos)
395
378
                        if include_text:
396
 
                            text = self._content[offset:offset + length]
397
 
                            delta_info.append((b'c', offset, length, text))
 
379
                            text = self._content[offset:offset+length]
 
380
                            delta_info.append(('c', offset, length, text))
398
381
                        else:
399
 
                            delta_info.append((b'c', offset, length))
 
382
                            delta_info.append(('c', offset, length))
400
383
                        measured_len += length
401
 
                    else:  # Insert
 
384
                    else: # Insert
402
385
                        if include_text:
403
 
                            txt = delta_content[delta_pos:delta_pos + c]
 
386
                            txt = delta_content[delta_pos:delta_pos+c]
404
387
                        else:
405
 
                            txt = b''
406
 
                        delta_info.append((b'i', c, txt))
 
388
                            txt = ''
 
389
                        delta_info.append(('i', c, txt))
407
390
                        measured_len += c
408
391
                        delta_pos += c
409
392
                if delta_pos != content_len:
435
418
        self.key = key
436
419
        self.parents = parents
437
420
        self.sha1 = None
438
 
        self.size = None
439
421
        # Note: This attribute coupled with Manager._factories creates a
440
422
        #       reference cycle. Perhaps we would rather use a weakref(), or
441
423
        #       find an appropriate time to release the ref. After the first
442
424
        #       get_bytes_as call? After Manager.get_record_stream() returns
443
425
        #       the object?
444
426
        self._manager = manager
445
 
        self._chunks = None
 
427
        self._bytes = None
446
428
        self.storage_kind = 'groupcompress-block'
447
429
        if not first:
448
430
            self.storage_kind = 'groupcompress-block-ref'
452
434
 
453
435
    def __repr__(self):
454
436
        return '%s(%s, first=%s)' % (self.__class__.__name__,
455
 
                                     self.key, self._first)
456
 
 
457
 
    def _extract_bytes(self):
458
 
        # Grab and cache the raw bytes for this entry
459
 
        # and break the ref-cycle with _manager since we don't need it
460
 
        # anymore
461
 
        try:
462
 
            self._manager._prepare_for_extract()
463
 
        except zlib.error as value:
464
 
            raise DecompressCorruption("zlib: " + str(value))
465
 
        block = self._manager._block
466
 
        self._chunks = block.extract(self.key, self._start, self._end)
467
 
        # There are code paths that first extract as fulltext, and then
468
 
        # extract as storage_kind (smart fetch). So we don't break the
469
 
        # refcycle here, but instead in manager.get_record_stream()
 
437
            self.key, self._first)
470
438
 
471
439
    def get_bytes_as(self, storage_kind):
472
440
        if storage_kind == self.storage_kind:
474
442
                # wire bytes, something...
475
443
                return self._manager._wire_bytes()
476
444
            else:
477
 
                return b''
478
 
        if storage_kind in ('fulltext', 'chunked', 'lines'):
479
 
            if self._chunks is None:
480
 
                self._extract_bytes()
 
445
                return ''
 
446
        if storage_kind in ('fulltext', 'chunked'):
 
447
            if self._bytes is None:
 
448
                # Grab and cache the raw bytes for this entry
 
449
                # and break the ref-cycle with _manager since we don't need it
 
450
                # anymore
 
451
                try:
 
452
                    self._manager._prepare_for_extract()
 
453
                except zlib.error as value:
 
454
                    raise errors.DecompressCorruption("zlib: " + str(value))
 
455
                block = self._manager._block
 
456
                self._bytes = block.extract(self.key, self._start, self._end)
 
457
                # There are code paths that first extract as fulltext, and then
 
458
                # extract as storage_kind (smart fetch). So we don't break the
 
459
                # refcycle here, but instead in manager.get_record_stream()
481
460
            if storage_kind == 'fulltext':
482
 
                return b''.join(self._chunks)
483
 
            elif storage_kind == 'chunked':
484
 
                return self._chunks
 
461
                return self._bytes
485
462
            else:
486
 
                return osutils.chunks_to_lines(self._chunks)
487
 
        raise UnavailableRepresentation(self.key, storage_kind,
488
 
                                               self.storage_kind)
489
 
 
490
 
    def iter_bytes_as(self, storage_kind):
491
 
        if self._chunks is None:
492
 
            self._extract_bytes()
493
 
        if storage_kind == 'chunked':
494
 
            return iter(self._chunks)
495
 
        elif storage_kind == 'lines':
496
 
            return iter(osutils.chunks_to_lines(self._chunks))
497
 
        raise UnavailableRepresentation(self.key, storage_kind,
 
463
                return [self._bytes]
 
464
        raise errors.UnavailableRepresentation(self.key, storage_kind,
498
465
                                               self.storage_kind)
499
466
 
500
467
 
501
468
class _LazyGroupContentManager(object):
502
469
    """This manages a group of _LazyGroupCompressFactory objects."""
503
470
 
504
 
    _max_cut_fraction = 0.75  # We allow a block to be trimmed to 75% of
505
 
    # current size, and still be considered
506
 
    # resuable
507
 
    _full_block_size = 4 * 1024 * 1024
508
 
    _full_mixed_block_size = 2 * 1024 * 1024
509
 
    _full_enough_block_size = 3 * 1024 * 1024  # size at which we won't repack
510
 
    _full_enough_mixed_block_size = 2 * 768 * 1024  # 1.5MB
 
471
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
 
472
                             # current size, and still be considered
 
473
                             # resuable
 
474
    _full_block_size = 4*1024*1024
 
475
    _full_mixed_block_size = 2*1024*1024
 
476
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
 
477
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
511
478
 
512
479
    def __init__(self, block, get_compressor_settings=None):
513
480
        self._block = block
536
503
            first = False
537
504
        # Note that this creates a reference cycle....
538
505
        factory = _LazyGroupCompressFactory(key, parents, self,
539
 
                                            start, end, first=first)
 
506
            start, end, first=first)
540
507
        # max() works here, but as a function call, doing a compare seems to be
541
508
        # significantly faster, timeit says 250ms for max() and 100ms for the
542
509
        # comparison
576
543
        old_length = self._block._content_length
577
544
        end_point = 0
578
545
        for factory in self._factories:
579
 
            chunks = factory.get_bytes_as('chunked')
580
 
            chunks_len = factory.size
581
 
            if chunks_len is None:
582
 
                chunks_len = sum(map(len, chunks))
 
546
            bytes = factory.get_bytes_as('fulltext')
583
547
            (found_sha1, start_point, end_point,
584
 
             type) = compressor.compress(
585
 
                 factory.key, chunks, chunks_len, factory.sha1)
 
548
             type) = compressor.compress(factory.key, bytes, factory.sha1)
586
549
            # Now update this factory with the new offsets, etc
587
550
            factory.sha1 = found_sha1
588
551
            factory._start = start_point
726
689
        #   <length of gc block>\n
727
690
        #   <header bytes>
728
691
        #   <gc-block>
729
 
        lines = [b'groupcompress-block\n']
 
692
        lines = ['groupcompress-block\n']
730
693
        # The minimal info we need is the key, the start offset, and the
731
694
        # parents. The length and type are encoded in the record itself.
732
695
        # However, passing in the other bits makes it easier.  The list of
737
700
        # 1 line for end byte
738
701
        header_lines = []
739
702
        for factory in self._factories:
740
 
            key_bytes = b'\x00'.join(factory.key)
 
703
            key_bytes = '\x00'.join(factory.key)
741
704
            parents = factory.parents
742
705
            if parents is None:
743
 
                parent_bytes = b'None:'
 
706
                parent_bytes = 'None:'
744
707
            else:
745
 
                parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
746
 
            record_header = b'%s\n%s\n%d\n%d\n' % (
 
708
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
709
            record_header = '%s\n%s\n%d\n%d\n' % (
747
710
                key_bytes, parent_bytes, factory._start, factory._end)
748
711
            header_lines.append(record_header)
749
712
            # TODO: Can we break the refcycle at this point and set
750
713
            #       factory._manager = None?
751
 
        header_bytes = b''.join(header_lines)
 
714
        header_bytes = ''.join(header_lines)
752
715
        del header_lines
753
716
        header_bytes_len = len(header_bytes)
754
717
        z_header_bytes = zlib.compress(header_bytes)
755
718
        del header_bytes
756
719
        z_header_bytes_len = len(z_header_bytes)
757
720
        block_bytes_len, block_chunks = self._block.to_chunks()
758
 
        lines.append(b'%d\n%d\n%d\n' % (
759
 
            z_header_bytes_len, header_bytes_len, block_bytes_len))
 
721
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
 
722
                                       block_bytes_len))
760
723
        lines.append(z_header_bytes)
761
724
        lines.extend(block_chunks)
762
725
        del z_header_bytes, block_chunks
763
726
        # TODO: This is a point where we will double the memory consumption. To
764
727
        #       avoid this, we probably have to switch to a 'chunked' api
765
 
        return b''.join(lines)
 
728
        return ''.join(lines)
766
729
 
767
730
    @classmethod
768
731
    def from_bytes(cls, bytes):
770
733
        #       different way. At a minimum this creates 2 copies of the
771
734
        #       compressed content
772
735
        (storage_kind, z_header_len, header_len,
773
 
         block_len, rest) = bytes.split(b'\n', 4)
 
736
         block_len, rest) = bytes.split('\n', 4)
774
737
        del bytes
775
 
        if storage_kind != b'groupcompress-block':
 
738
        if storage_kind != 'groupcompress-block':
776
739
            raise ValueError('Unknown storage kind: %s' % (storage_kind,))
777
740
        z_header_len = int(z_header_len)
778
741
        if len(rest) < z_header_len:
790
753
        del rest
791
754
        # So now we have a valid GCB, we just need to parse the factories that
792
755
        # were sent to us
793
 
        header_lines = header.split(b'\n')
 
756
        header_lines = header.split('\n')
794
757
        del header
795
758
        last = header_lines.pop()
796
 
        if last != b'':
 
759
        if last != '':
797
760
            raise ValueError('header lines did not end with a trailing'
798
761
                             ' newline')
799
762
        if len(header_lines) % 4 != 0:
801
764
        block = GroupCompressBlock.from_bytes(block_bytes)
802
765
        del block_bytes
803
766
        result = cls(block)
804
 
        for start in range(0, len(header_lines), 4):
 
767
        for start in xrange(0, len(header_lines), 4):
805
768
            # intern()?
806
 
            key = tuple(header_lines[start].split(b'\x00'))
807
 
            parents_line = header_lines[start + 1]
808
 
            if parents_line == b'None:':
 
769
            key = tuple(header_lines[start].split('\x00'))
 
770
            parents_line = header_lines[start+1]
 
771
            if parents_line == 'None:':
809
772
                parents = None
810
773
            else:
811
 
                parents = tuple([tuple(segment.split(b'\x00'))
812
 
                                 for segment in parents_line.split(b'\t')
813
 
                                 if segment])
814
 
            start_offset = int(header_lines[start + 2])
815
 
            end_offset = int(header_lines[start + 3])
 
774
                parents = tuple([tuple(segment.split('\x00'))
 
775
                                 for segment in parents_line.split('\t')
 
776
                                  if segment])
 
777
            start_offset = int(header_lines[start+2])
 
778
            end_offset = int(header_lines[start+3])
816
779
            result.add_factory(key, parents, start_offset, end_offset)
817
780
        return result
818
781
 
833
796
        self.endpoint = 0
834
797
        self.input_bytes = 0
835
798
        self.labels_deltas = {}
836
 
        self._delta_index = None  # Set by the children
 
799
        self._delta_index = None # Set by the children
837
800
        self._block = GroupCompressBlock()
838
801
        if settings is None:
839
802
            self._settings = {}
840
803
        else:
841
804
            self._settings = settings
842
805
 
843
 
    def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
844
 
                 soft=False):
 
806
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
845
807
        """Compress lines with label key.
846
808
 
847
809
        :param key: A key tuple. It is stored in the output
848
810
            for identification of the text during decompression. If the last
849
 
            element is b'None' it is replaced with the sha1 of the text -
 
811
            element is 'None' it is replaced with the sha1 of the text -
850
812
            e.g. sha1:xxxxxxx.
851
 
        :param chunks: Chunks of bytes to be compressed
852
 
        :param length: Length of chunks
 
813
        :param bytes: The bytes to be compressed
853
814
        :param expected_sha: If non-None, the sha the lines are believed to
854
815
            have. During compression the sha is calculated; a mismatch will
855
816
            cause an error.
863
824
 
864
825
        :seealso VersionedFiles.add_lines:
865
826
        """
866
 
        if length == 0:  # empty, like a dir entry, etc
 
827
        if not bytes: # empty, like a dir entry, etc
867
828
            if nostore_sha == _null_sha1:
868
 
                raise ExistingContent()
 
829
                raise errors.ExistingContent()
869
830
            return _null_sha1, 0, 0, 'fulltext'
870
831
        # we assume someone knew what they were doing when they passed it in
871
832
        if expected_sha is not None:
872
833
            sha1 = expected_sha
873
834
        else:
874
 
            sha1 = osutils.sha_strings(chunks)
 
835
            sha1 = osutils.sha_string(bytes)
875
836
        if nostore_sha is not None:
876
837
            if sha1 == nostore_sha:
877
 
                raise ExistingContent()
 
838
                raise errors.ExistingContent()
878
839
        if key[-1] is None:
879
 
            key = key[:-1] + (b'sha1:' + sha1,)
 
840
            key = key[:-1] + ('sha1:' + sha1,)
880
841
 
881
 
        start, end, type = self._compress(key, chunks, length, length / 2, soft)
 
842
        start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
882
843
        return sha1, start, end, type
883
844
 
884
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
845
    def _compress(self, key, bytes, max_delta_size, soft=False):
885
846
        """Compress lines with label key.
886
847
 
887
848
        :param key: A key tuple. It is stored in the output for identification
888
849
            of the text during decompression.
889
850
 
890
 
        :param chunks: The chunks of bytes to be compressed
891
 
 
892
 
        :param input_len: The length of the chunks
 
851
        :param bytes: The bytes to be compressed
893
852
 
894
853
        :param max_delta_size: The size above which we issue a fulltext instead
895
854
            of a delta.
906
865
        """Extract a key previously added to the compressor.
907
866
 
908
867
        :param key: The key to extract.
909
 
        :return: An iterable over chunks and the sha1.
 
868
        :return: An iterable over bytes and the sha1.
910
869
        """
911
 
        (start_byte, start_chunk, end_byte,
912
 
         end_chunk) = self.labels_deltas[key]
 
870
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
913
871
        delta_chunks = self.chunks[start_chunk:end_chunk]
914
 
        stored_bytes = b''.join(delta_chunks)
915
 
        kind = stored_bytes[:1]
916
 
        if kind == b'f':
 
872
        stored_bytes = ''.join(delta_chunks)
 
873
        if stored_bytes[0] == 'f':
917
874
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
918
875
            data_len = fulltext_len + 1 + offset
919
 
            if data_len != len(stored_bytes):
 
876
            if  data_len != len(stored_bytes):
920
877
                raise ValueError('Index claimed fulltext len, but stored bytes'
921
878
                                 ' claim %s != %s'
922
879
                                 % (len(stored_bytes), data_len))
923
 
            data = [stored_bytes[offset + 1:]]
 
880
            bytes = stored_bytes[offset + 1:]
924
881
        else:
925
 
            if kind != b'd':
926
 
                raise ValueError('Unknown content kind, bytes claim %s' % kind)
927
882
            # XXX: This is inefficient at best
928
 
            source = b''.join(self.chunks[:start_chunk])
 
883
            source = ''.join(self.chunks[:start_chunk])
 
884
            if stored_bytes[0] != 'd':
 
885
                raise ValueError('Unknown content kind, bytes claim %s'
 
886
                                 % (stored_bytes[0],))
929
887
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
930
888
            data_len = delta_len + 1 + offset
931
889
            if data_len != len(stored_bytes):
932
890
                raise ValueError('Index claimed delta len, but stored bytes'
933
891
                                 ' claim %s != %s'
934
892
                                 % (len(stored_bytes), data_len))
935
 
            data = [apply_delta(source, stored_bytes[offset + 1:])]
936
 
        data_sha1 = osutils.sha_strings(data)
937
 
        return data, data_sha1
 
893
            bytes = apply_delta(source, stored_bytes[offset + 1:])
 
894
        bytes_sha1 = osutils.sha_string(bytes)
 
895
        return bytes, bytes_sha1
938
896
 
939
897
    def flush(self):
940
898
        """Finish this group, creating a formatted stream.
974
932
        # The actual content is managed by LinesDeltaIndex
975
933
        self.chunks = self._delta_index.lines
976
934
 
977
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
935
    def _compress(self, key, bytes, max_delta_size, soft=False):
978
936
        """see _CommonGroupCompressor._compress"""
979
 
        new_lines = osutils.chunks_to_lines(chunks)
 
937
        input_len = len(bytes)
 
938
        new_lines = osutils.split_lines(bytes)
980
939
        out_lines, index_lines = self._delta_index.make_delta(
981
940
            new_lines, bytes_length=input_len, soft=soft)
982
941
        delta_length = sum(map(len, out_lines))
983
942
        if delta_length > max_delta_size:
984
943
            # The delta is longer than the fulltext, insert a fulltext
985
944
            type = 'fulltext'
986
 
            out_lines = [b'f', encode_base128_int(input_len)]
 
945
            out_lines = ['f', encode_base128_int(input_len)]
987
946
            out_lines.extend(new_lines)
988
947
            index_lines = [False, False]
989
948
            index_lines.extend([True] * len(new_lines))
990
949
        else:
991
950
            # this is a worthy delta, output it
992
951
            type = 'delta'
993
 
            out_lines[0] = b'd'
 
952
            out_lines[0] = 'd'
994
953
            # Update the delta_length to include those two encoded integers
995
954
            out_lines[1] = encode_base128_int(delta_length)
996
955
        # Before insertion
1028
987
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1029
988
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
1030
989
 
1031
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
990
    def _compress(self, key, bytes, max_delta_size, soft=False):
1032
991
        """see _CommonGroupCompressor._compress"""
 
992
        input_len = len(bytes)
1033
993
        # By having action/label/sha1/len, we can parse the group if the index
1034
994
        # was ever destroyed, we have the key in 'label', we know the final
1035
995
        # bytes are valid from sha1, and we know where to find the end of this
1041
1001
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
1042
1002
        if self._delta_index._source_offset != self.endpoint:
1043
1003
            raise AssertionError('_source_offset != endpoint'
1044
 
                                 ' somehow the DeltaIndex got out of sync with'
1045
 
                                 ' the output lines')
1046
 
        bytes = b''.join(chunks)
 
1004
                ' somehow the DeltaIndex got out of sync with'
 
1005
                ' the output lines')
1047
1006
        delta = self._delta_index.make_delta(bytes, max_delta_size)
1048
 
        if delta is None:
 
1007
        if (delta is None):
1049
1008
            type = 'fulltext'
1050
 
            enc_length = encode_base128_int(input_len)
 
1009
            enc_length = encode_base128_int(len(bytes))
1051
1010
            len_mini_header = 1 + len(enc_length)
1052
1011
            self._delta_index.add_source(bytes, len_mini_header)
1053
 
            new_chunks = [b'f', enc_length] + chunks
 
1012
            new_chunks = ['f', enc_length, bytes]
1054
1013
        else:
1055
1014
            type = 'delta'
1056
1015
            enc_length = encode_base128_int(len(delta))
1057
1016
            len_mini_header = 1 + len(enc_length)
1058
 
            new_chunks = [b'd', enc_length, delta]
 
1017
            new_chunks = ['d', enc_length, delta]
1059
1018
            self._delta_index.add_delta_source(delta, len_mini_header)
1060
1019
        # Before insertion
1061
1020
        start = self.endpoint
1068
1027
                                   self.endpoint, chunk_end)
1069
1028
        if not self._delta_index._source_offset == self.endpoint:
1070
1029
            raise AssertionError('the delta index is out of sync'
1071
 
                                 'with the output lines %s != %s'
1072
 
                                 % (self._delta_index._source_offset, self.endpoint))
 
1030
                'with the output lines %s != %s'
 
1031
                % (self._delta_index._source_offset, self.endpoint))
1073
1032
        return start, self.endpoint, type
1074
1033
 
1075
1034
    def _output_chunks(self, new_chunks):
1100
1059
        if graph:
1101
1060
            ref_length = 1
1102
1061
        graph_index = BTreeBuilder(reference_lists=ref_length,
1103
 
                                   key_elements=keylength)
 
1062
            key_elements=keylength)
1104
1063
        stream = transport.open_write_stream('newpack')
1105
1064
        writer = pack.ContainerWriter(stream.write)
1106
1065
        writer.begin()
1107
 
        index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
1108
 
                              add_callback=graph_index.add_nodes,
1109
 
                              inconsistency_fatal=inconsistency_fatal)
 
1066
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
 
1067
            add_callback=graph_index.add_nodes,
 
1068
            inconsistency_fatal=inconsistency_fatal)
1110
1069
        access = pack_repo._DirectPackAccess({})
1111
1070
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1112
1071
        result = GroupCompressVersionedFiles(index, access, delta)
1208
1167
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1209
1168
                    # The next block from _get_blocks will be the block we
1210
1169
                    # need.
1211
 
                    block_read_memo, block = next(blocks)
 
1170
                    block_read_memo, block = blocks.next()
1212
1171
                    if block_read_memo != read_memo:
1213
1172
                        raise AssertionError(
1214
1173
                            "block_read_memo out of sync with read_memo"
1218
1177
                else:
1219
1178
                    block = self.batch_memos[read_memo]
1220
1179
                self.manager = _LazyGroupContentManager(block,
1221
 
                                                        get_compressor_settings=self._get_compressor_settings)
 
1180
                    get_compressor_settings=self._get_compressor_settings)
1222
1181
                self.last_read_memo = read_memo
1223
1182
            start, end = index_memo[3:5]
1224
1183
            self.manager.add_factory(key, parents, start, end)
1244
1203
    # gives 100% sampling of a 1MB file.
1245
1204
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1246
1205
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1247
 
                                    _DEFAULT_MAX_BYTES_TO_INDEX}
 
1206
                                     _DEFAULT_MAX_BYTES_TO_INDEX}
1248
1207
 
1249
1208
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1250
1209
                 _group_cache=None):
1263
1222
            _unadded_refs = {}
1264
1223
        self._unadded_refs = _unadded_refs
1265
1224
        if _group_cache is None:
1266
 
            _group_cache = LRUSizeCache(max_size=50 * 1024 * 1024)
 
1225
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
1267
1226
        self._group_cache = _group_cache
1268
1227
        self._immediate_fallback_vfs = []
1269
1228
        self._max_bytes_to_index = None
1271
1230
    def without_fallbacks(self):
1272
1231
        """Return a clone of this object without any fallbacks configured."""
1273
1232
        return GroupCompressVersionedFiles(self._index, self._access,
1274
 
                                           self._delta, _unadded_refs=dict(
1275
 
                                               self._unadded_refs),
1276
 
                                           _group_cache=self._group_cache)
 
1233
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1234
            _group_cache=self._group_cache)
1277
1235
 
1278
1236
    def add_lines(self, key, parents, lines, parent_texts=None,
1279
 
                  left_matching_blocks=None, nostore_sha=None, random_id=False,
1280
 
                  check_content=True):
 
1237
        left_matching_blocks=None, nostore_sha=None, random_id=False,
 
1238
        check_content=True):
1281
1239
        """Add a text to the store.
1282
1240
 
1283
1241
        :param key: The key tuple of the text to add.
1312
1270
                 back to future add_lines calls in the parent_texts dictionary.
1313
1271
        """
1314
1272
        self._index._check_write_ok()
1315
 
        if check_content:
1316
 
            self._check_lines_not_unicode(lines)
1317
 
            self._check_lines_are_lines(lines)
1318
 
        return self.add_content(
1319
 
            ChunkedContentFactory(
1320
 
                key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
1321
 
            parent_texts, left_matching_blocks, nostore_sha, random_id)
1322
 
 
1323
 
    def add_content(self, factory, parent_texts=None,
1324
 
                    left_matching_blocks=None, nostore_sha=None,
1325
 
                    random_id=False):
1326
 
        """Add a text to the store.
1327
 
 
1328
 
        :param factory: A ContentFactory that can be used to retrieve the key,
1329
 
            parents and contents.
1330
 
        :param parent_texts: An optional dictionary containing the opaque
1331
 
            representations of some or all of the parents of version_id to
1332
 
            allow delta optimisations.  VERY IMPORTANT: the texts must be those
1333
 
            returned by add_lines or data corruption can be caused.
1334
 
        :param left_matching_blocks: a hint about which areas are common
1335
 
            between the text and its left-hand-parent.  The format is
1336
 
            the SequenceMatcher.get_matching_blocks format.
1337
 
        :param nostore_sha: Raise ExistingContent and do not add the lines to
1338
 
            the versioned file if the digest of the lines matches this.
1339
 
        :param random_id: If True a random id has been selected rather than
1340
 
            an id determined by some deterministic process such as a converter
1341
 
            from a foreign VCS. When True the backend may choose not to check
1342
 
            for uniqueness of the resulting key within the versioned file, so
1343
 
            this should only be done when the result is expected to be unique
1344
 
            anyway.
1345
 
        :return: The text sha1, the number of bytes in the text, and an opaque
1346
 
                 representation of the inserted version which can be provided
1347
 
                 back to future add_lines calls in the parent_texts dictionary.
1348
 
        """
 
1273
        self._check_add(key, lines, random_id, check_content)
 
1274
        if parents is None:
 
1275
            # The caller might pass None if there is no graph data, but kndx
 
1276
            # indexes can't directly store that, so we give them
 
1277
            # an empty tuple instead.
 
1278
            parents = ()
 
1279
        # double handling for now. Make it work until then.
 
1280
        length = sum(map(len, lines))
 
1281
        record = ChunkedContentFactory(key, parents, None, lines)
 
1282
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
 
1283
                                               nostore_sha=nostore_sha))[0]
 
1284
        return sha1, length, None
 
1285
 
 
1286
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
1287
        """See VersionedFiles._add_text()."""
1349
1288
        self._index._check_write_ok()
1350
 
        parents = factory.parents
1351
 
        self._check_add(factory.key, random_id)
 
1289
        self._check_add(key, None, random_id, check_content=False)
 
1290
        if text.__class__ is not str:
 
1291
            raise errors.BzrBadParameterUnicode("text")
1352
1292
        if parents is None:
1353
1293
            # The caller might pass None if there is no graph data, but kndx
1354
1294
            # indexes can't directly store that, so we give them
1355
1295
            # an empty tuple instead.
1356
1296
            parents = ()
1357
1297
        # double handling for now. Make it work until then.
1358
 
        sha1, length = list(self._insert_record_stream(
1359
 
            [factory], random_id=random_id, nostore_sha=nostore_sha))[0]
 
1298
        length = len(text)
 
1299
        record = FulltextContentFactory(key, parents, None, text)
 
1300
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
 
1301
                                               nostore_sha=nostore_sha))[0]
1360
1302
        return sha1, length, None
1361
1303
 
1362
1304
    def add_fallback_versioned_files(self, a_versioned_files):
1379
1321
        if keys is None:
1380
1322
            keys = self.keys()
1381
1323
            for record in self.get_record_stream(keys, 'unordered', True):
1382
 
                for chunk in record.iter_bytes_as('chunked'):
1383
 
                    pass
 
1324
                record.get_bytes_as('fulltext')
1384
1325
        else:
1385
1326
            return self.get_record_stream(keys, 'unordered', True)
1386
1327
 
1390
1331
        self._index._graph_index.clear_cache()
1391
1332
        self._index._int_cache.clear()
1392
1333
 
1393
 
    def _check_add(self, key, random_id):
 
1334
    def _check_add(self, key, lines, random_id, check_content):
1394
1335
        """check that version_id and lines are safe to add."""
1395
1336
        version_id = key[-1]
1396
1337
        if version_id is not None:
1401
1342
        # probably check that the existing content is identical to what is
1402
1343
        # being inserted, and otherwise raise an exception.  This would make
1403
1344
        # the bundle code simpler.
 
1345
        if check_content:
 
1346
            self._check_lines_not_unicode(lines)
 
1347
            self._check_lines_are_lines(lines)
1404
1348
 
1405
1349
    def get_parent_map(self, keys):
1406
1350
        """Get a map of the graph parents of keys.
1465
1409
                yield read_memo, cached[read_memo]
1466
1410
            except KeyError:
1467
1411
                # Read the block, and cache it.
1468
 
                zdata = next(raw_records)
 
1412
                zdata = raw_records.next()
1469
1413
                block = GroupCompressBlock.from_bytes(zdata)
1470
1414
                self._group_cache[read_memo] = block
1471
1415
                cached[read_memo] = block
1499
1443
        if not keys:
1500
1444
            return
1501
1445
        if (not self._index.has_graph
1502
 
                and ordering in ('topological', 'groupcompress')):
 
1446
            and ordering in ('topological', 'groupcompress')):
1503
1447
            # Cannot topological order when no graph has been stored.
1504
1448
            # but we allow 'as-requested' or 'unordered'
1505
1449
            ordering = 'unordered'
1509
1453
            try:
1510
1454
                keys = set(remaining_keys)
1511
1455
                for content_factory in self._get_remaining_record_stream(keys,
1512
 
                                                                         orig_keys, ordering, include_delta_closure):
 
1456
                        orig_keys, ordering, include_delta_closure):
1513
1457
                    remaining_keys.discard(content_factory.key)
1514
1458
                    yield content_factory
1515
1459
                return
1516
 
            except errors.RetryWithNewPacks as e:
 
1460
            except errors.RetryWithNewPacks, e:
1517
1461
                self._access.reload_or_raise(e)
1518
1462
 
1519
1463
    def _find_from_fallback(self, missing):
1579
1523
                source = self
1580
1524
            elif key in key_to_source_map:
1581
1525
                source = key_to_source_map[key]
1582
 
            else:  # absent
 
1526
            else: # absent
1583
1527
                continue
1584
1528
            if source is not current_source:
1585
1529
                source_keys.append((source, []))
1593
1537
            # This is the group the bytes are stored in, followed by the
1594
1538
            # location in the group
1595
1539
            return locations[key][0]
 
1540
        present_keys = sorted(locations.iterkeys(), key=get_group)
1596
1541
        # We don't have an ordering for keys in the in-memory object, but
1597
1542
        # lets process the in-memory ones first.
1598
 
        present_keys = list(unadded_keys)
1599
 
        present_keys.extend(sorted(locations, key=get_group))
 
1543
        present_keys = list(unadded_keys) + present_keys
1600
1544
        # Now grab all of the ones from other sources
1601
1545
        source_keys = [(self, present_keys)]
1602
1546
        source_keys.extend(source_result)
1626
1570
            # start with one key, recurse to its oldest parent, then grab
1627
1571
            # everything in the same group, etc.
1628
1572
            parent_map = dict((key, details[2]) for key, details in
1629
 
                              locations.items())
 
1573
                locations.iteritems())
1630
1574
            for key in unadded_keys:
1631
1575
                parent_map[key] = self._unadded_refs[key]
1632
1576
            parent_map.update(fallback_parent_map)
1634
1578
                                                        key_to_source_map)
1635
1579
        elif ordering == 'as-requested':
1636
1580
            source_keys = self._get_as_requested_source_keys(orig_keys,
1637
 
                                                             locations, unadded_keys, key_to_source_map)
 
1581
                locations, unadded_keys, key_to_source_map)
1638
1582
        else:
1639
1583
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
1640
1584
            # Otherwise we thrash the _group_cache and destroy performance
1641
1585
            source_keys = self._get_io_ordered_source_keys(locations,
1642
 
                                                           unadded_keys, source_result)
 
1586
                unadded_keys, source_result)
1643
1587
        for key in missing:
1644
1588
            yield AbsentContentFactory(key)
1645
1589
        # Batch up as many keys as we can until either:
1647
1591
        #  - we run out of keys, or
1648
1592
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1649
1593
        batcher = _BatchingBlockFetcher(self, locations,
1650
 
                                        get_compressor_settings=self._get_compressor_settings)
 
1594
            get_compressor_settings=self._get_compressor_settings)
1651
1595
        for source, keys in source_keys:
1652
1596
            if source is self:
1653
1597
                for key in keys:
1656
1600
                        # self._compressor.
1657
1601
                        for factory in batcher.yield_factories(full_flush=True):
1658
1602
                            yield factory
1659
 
                        chunks, sha1 = self._compressor.extract(key)
 
1603
                        bytes, sha1 = self._compressor.extract(key)
1660
1604
                        parents = self._unadded_refs[key]
1661
 
                        yield ChunkedContentFactory(key, parents, sha1, chunks)
 
1605
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1662
1606
                        continue
1663
1607
                    if batcher.add_key(key) > BATCH_SIZE:
1664
1608
                        # Ok, this batch is big enough.  Yield some results.
1677
1621
        """See VersionedFiles.get_sha1s()."""
1678
1622
        result = {}
1679
1623
        for record in self.get_record_stream(keys, 'unordered', True):
1680
 
            if record.sha1 is not None:
 
1624
            if record.sha1 != None:
1681
1625
                result[record.key] = record.sha1
1682
1626
            else:
1683
1627
                if record.storage_kind != 'absent':
1684
 
                    result[record.key] = osutils.sha_strings(
1685
 
                        record.iter_bytes_as('chunked'))
 
1628
                    result[record.key] = osutils.sha_string(
 
1629
                        record.get_bytes_as('fulltext'))
1686
1630
        return result
1687
1631
 
1688
1632
    def insert_record_stream(self, stream):
1696
1640
        # test_insert_record_stream_existing_keys fail for groupcompress and
1697
1641
        # groupcompress-nograph, this needs to be revisited while addressing
1698
1642
        # 'bzr branch' performance issues.
1699
 
        for _, _ in self._insert_record_stream(stream, random_id=False):
 
1643
        for _ in self._insert_record_stream(stream, random_id=False):
1700
1644
            pass
1701
1645
 
1702
1646
    def _get_compressor_settings(self):
1709
1653
            if val is not None:
1710
1654
                try:
1711
1655
                    val = int(val)
1712
 
                except ValueError as e:
 
1656
                except ValueError, e:
1713
1657
                    trace.warning('Value for '
1714
1658
                                  '"bzr.groupcompress.max_bytes_to_index"'
1715
1659
                                  ' %r is not an integer'
1736
1680
        :param reuse_blocks: If the source is streaming from
1737
1681
            groupcompress-blocks, just insert the blocks as-is, rather than
1738
1682
            expanding the texts and inserting again.
1739
 
        :return: An iterator over (sha1, length) of the inserted records.
 
1683
        :return: An iterator over the sha1 of the inserted records.
1740
1684
        :seealso insert_record_stream:
1741
1685
        :seealso add_lines:
1742
1686
        """
1743
1687
        adapters = {}
1744
 
 
1745
1688
        def get_adapter(adapter_key):
1746
1689
            try:
1747
1690
                return adapters[adapter_key]
1755
1698
        self._compressor = self._make_group_compressor()
1756
1699
        self._unadded_refs = {}
1757
1700
        keys_to_add = []
1758
 
 
1759
1701
        def flush():
1760
1702
            bytes_len, chunks = self._compressor.flush().to_chunks()
1761
1703
            self._compressor = self._make_group_compressor()
1762
1704
            # Note: At this point we still have 1 copy of the fulltext (in
1763
1705
            #       record and the var 'bytes'), and this generates 2 copies of
1764
1706
            #       the compressed text (one for bytes, one in chunks)
 
1707
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1708
            #       have to double compressed memory here
1765
1709
            # TODO: Figure out how to indicate that we would be happy to free
1766
1710
            #       the fulltext content at this point. Note that sometimes we
1767
1711
            #       will want it later (streaming CHK pages), but most of the
1768
1712
            #       time we won't (everything else)
1769
 
            index, start, length = self._access.add_raw_record(
1770
 
                None, bytes_len, chunks)
 
1713
            bytes = ''.join(chunks)
 
1714
            del chunks
 
1715
            index, start, length = self._access.add_raw_records(
 
1716
                [(None, len(bytes))], bytes)[0]
1771
1717
            nodes = []
1772
1718
            for key, reads, refs in keys_to_add:
1773
 
                nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
 
1719
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1774
1720
            self._index.add_records(nodes, random_id=random_id)
1775
1721
            self._unadded_refs = {}
1776
1722
            del keys_to_add[:]
1791
1737
            if random_id:
1792
1738
                if record.key in inserted_keys:
1793
1739
                    trace.note(gettext('Insert claimed random_id=True,'
1794
 
                                       ' but then inserted %r two times'), record.key)
 
1740
                               ' but then inserted %r two times'), record.key)
1795
1741
                    continue
1796
1742
                inserted_keys.add(record.key)
1797
1743
            if reuse_blocks:
1811
1757
                if record.storage_kind == 'groupcompress-block':
1812
1758
                    # Insert the raw block into the target repo
1813
1759
                    insert_manager = record._manager
1814
 
                    bytes_len, chunks = record._manager._block.to_chunks()
1815
 
                    _, start, length = self._access.add_raw_record(
1816
 
                        None, bytes_len, chunks)
 
1760
                    bytes = record._manager._block.to_bytes()
 
1761
                    _, start, length = self._access.add_raw_records(
 
1762
                        [(None, len(bytes))], bytes)[0]
 
1763
                    del bytes
1817
1764
                    block_start = start
1818
1765
                    block_length = length
1819
1766
                if record.storage_kind in ('groupcompress-block',
1822
1769
                        raise AssertionError('No insert_manager set')
1823
1770
                    if insert_manager is not record._manager:
1824
1771
                        raise AssertionError('insert_manager does not match'
1825
 
                                             ' the current record, we cannot be positive'
1826
 
                                             ' that the appropriate content was inserted.'
1827
 
                                             )
1828
 
                    value = b"%d %d %d %d" % (block_start, block_length,
1829
 
                                              record._start, record._end)
 
1772
                            ' the current record, we cannot be positive'
 
1773
                            ' that the appropriate content was inserted.'
 
1774
                            )
 
1775
                    value = "%d %d %d %d" % (block_start, block_length,
 
1776
                                             record._start, record._end)
1830
1777
                    nodes = [(record.key, value, (record.parents,))]
1831
1778
                    # TODO: Consider buffering up many nodes to be added, not
1832
1779
                    #       sure how much overhead this has, but we're seeing
1834
1781
                    self._index.add_records(nodes, random_id=random_id)
1835
1782
                    continue
1836
1783
            try:
1837
 
                chunks = record.get_bytes_as('chunked')
1838
 
            except UnavailableRepresentation:
1839
 
                adapter_key = record.storage_kind, 'chunked'
 
1784
                bytes = record.get_bytes_as('fulltext')
 
1785
            except errors.UnavailableRepresentation:
 
1786
                adapter_key = record.storage_kind, 'fulltext'
1840
1787
                adapter = get_adapter(adapter_key)
1841
 
                chunks = adapter.get_bytes(record, 'chunked')
1842
 
            chunks_len = record.size
1843
 
            if chunks_len is None:
1844
 
                chunks_len = sum(map(len, chunks))
 
1788
                bytes = adapter.get_bytes(record)
1845
1789
            if len(record.key) > 1:
1846
1790
                prefix = record.key[0]
1847
1791
                soft = (prefix == last_prefix)
1848
1792
            else:
1849
1793
                prefix = None
1850
1794
                soft = False
1851
 
            if max_fulltext_len < chunks_len:
1852
 
                max_fulltext_len = chunks_len
 
1795
            if max_fulltext_len < len(bytes):
 
1796
                max_fulltext_len = len(bytes)
1853
1797
                max_fulltext_prefix = prefix
1854
1798
            (found_sha1, start_point, end_point,
1855
 
             type) = self._compressor.compress(
1856
 
                 record.key, chunks, chunks_len, record.sha1, soft=soft,
1857
 
                 nostore_sha=nostore_sha)
1858
 
            # delta_ratio = float(chunks_len) / (end_point - start_point)
 
1799
             type) = self._compressor.compress(record.key,
 
1800
                                               bytes, record.sha1, soft=soft,
 
1801
                                               nostore_sha=nostore_sha)
 
1802
            # delta_ratio = float(len(bytes)) / (end_point - start_point)
1859
1803
            # Check if we want to continue to include that text
1860
1804
            if (prefix == max_fulltext_prefix
1861
 
                    and end_point < 2 * max_fulltext_len):
 
1805
                and end_point < 2 * max_fulltext_len):
1862
1806
                # As long as we are on the same file_id, we will fill at least
1863
1807
                # 2 * max_fulltext_len
1864
1808
                start_new_block = False
1865
 
            elif end_point > 4 * 1024 * 1024:
 
1809
            elif end_point > 4*1024*1024:
1866
1810
                start_new_block = True
1867
1811
            elif (prefix is not None and prefix != last_prefix
1868
 
                  and end_point > 2 * 1024 * 1024):
 
1812
                  and end_point > 2*1024*1024):
1869
1813
                start_new_block = True
1870
1814
            else:
1871
1815
                start_new_block = False
1873
1817
            if start_new_block:
1874
1818
                self._compressor.pop_last()
1875
1819
                flush()
1876
 
                max_fulltext_len = chunks_len
 
1820
                max_fulltext_len = len(bytes)
1877
1821
                (found_sha1, start_point, end_point,
1878
 
                 type) = self._compressor.compress(
1879
 
                     record.key, chunks, chunks_len, record.sha1)
 
1822
                 type) = self._compressor.compress(record.key, bytes,
 
1823
                                                   record.sha1)
1880
1824
            if record.key[-1] is None:
1881
 
                key = record.key[:-1] + (b'sha1:' + found_sha1,)
 
1825
                key = record.key[:-1] + ('sha1:' + found_sha1,)
1882
1826
            else:
1883
1827
                key = record.key
1884
1828
            self._unadded_refs[key] = record.parents
1885
 
            yield found_sha1, chunks_len
 
1829
            yield found_sha1
1886
1830
            as_st = static_tuple.StaticTuple.from_sequence
1887
1831
            if record.parents is not None:
1888
1832
                parents = as_st([as_st(p) for p in record.parents])
1889
1833
            else:
1890
1834
                parents = None
1891
1835
            refs = static_tuple.StaticTuple(parents)
1892
 
            keys_to_add.append(
1893
 
                (key, b'%d %d' % (start_point, end_point), refs))
 
1836
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1894
1837
        if len(keys_to_add):
1895
1838
            flush()
1896
1839
        self._compressor = None
1922
1865
        # but we need to setup a list of records to visit.
1923
1866
        # we need key, position, length
1924
1867
        for key_idx, record in enumerate(self.get_record_stream(keys,
1925
 
                                                                'unordered', True)):
 
1868
            'unordered', True)):
1926
1869
            # XXX: todo - optimise to use less than full texts.
1927
1870
            key = record.key
1928
1871
            if pb is not None:
1929
1872
                pb.update('Walking content', key_idx, total)
1930
1873
            if record.storage_kind == 'absent':
1931
1874
                raise errors.RevisionNotPresent(key, self)
1932
 
            for line in record.iter_bytes_as('lines'):
 
1875
            lines = osutils.split_lines(record.get_bytes_as('fulltext'))
 
1876
            for line in lines:
1933
1877
                yield line, key
1934
1878
        if pb is not None:
1935
1879
            pb.update('Walking content', total, total)
1965
1909
 
1966
1910
    def __repr__(self):
1967
1911
        return '%s(%s, %s)' % (self.__class__.__name__,
1968
 
                               self.index_memo, self._parents)
 
1912
            self.index_memo, self._parents)
1969
1913
 
1970
1914
    @property
1971
1915
    def index_memo(self):
1981
1925
        if offset == 0:
1982
1926
            return self.index_memo
1983
1927
        elif offset == 1:
1984
 
            return self.compression_parent  # Always None
 
1928
            return self.compression_parent # Always None
1985
1929
        elif offset == 2:
1986
1930
            return self._parents
1987
1931
        elif offset == 3:
1988
1932
            return self.record_details
1989
1933
        else:
1990
1934
            raise IndexError('offset out of range')
1991
 
 
 
1935
            
1992
1936
    def __len__(self):
1993
1937
        return 4
1994
1938
 
1997
1941
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1998
1942
 
1999
1943
    def __init__(self, graph_index, is_locked, parents=True,
2000
 
                 add_callback=None, track_external_parent_refs=False,
2001
 
                 inconsistency_fatal=True, track_new_keys=False):
 
1944
        add_callback=None, track_external_parent_refs=False,
 
1945
        inconsistency_fatal=True, track_new_keys=False):
2002
1946
        """Construct a _GCGraphIndex on a graph_index.
2003
1947
 
2004
 
        :param graph_index: An implementation of breezy.index.GraphIndex.
 
1948
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
2005
1949
        :param is_locked: A callback, returns True if the index is locked and
2006
1950
            thus usable.
2007
1951
        :param parents: If True, record knits parents, if not do not record
2057
2001
                if refs:
2058
2002
                    for ref in refs:
2059
2003
                        if ref:
2060
 
                            raise knit.KnitCorrupt(self,
2061
 
                                                   "attempt to add node with parents "
2062
 
                                                   "in parentless index.")
 
2004
                            raise errors.KnitCorrupt(self,
 
2005
                                "attempt to add node with parents "
 
2006
                                "in parentless index.")
2063
2007
                    refs = ()
2064
2008
                    changed = True
2065
2009
            keys[key] = (value, refs)
2073
2017
                if node_refs != passed[1]:
2074
2018
                    details = '%s %s %s' % (key, (value, node_refs), passed)
2075
2019
                    if self._inconsistency_fatal:
2076
 
                        raise knit.KnitCorrupt(self, "inconsistent details"
2077
 
                                               " in add_records: %s" %
2078
 
                                               details)
 
2020
                        raise errors.KnitCorrupt(self, "inconsistent details"
 
2021
                                                 " in add_records: %s" %
 
2022
                                                 details)
2079
2023
                    else:
2080
2024
                        trace.warning("inconsistent details in skipped"
2081
2025
                                      " record: %s", details)
2084
2028
        if changed:
2085
2029
            result = []
2086
2030
            if self._parents:
2087
 
                for key, (value, node_refs) in keys.items():
 
2031
                for key, (value, node_refs) in keys.iteritems():
2088
2032
                    result.append((key, value, node_refs))
2089
2033
            else:
2090
 
                for key, (value, node_refs) in keys.items():
 
2034
                for key, (value, node_refs) in keys.iteritems():
2091
2035
                    result.append((key, value))
2092
2036
            records = result
2093
2037
        key_dependencies = self._key_dependencies
2205
2149
 
2206
2150
    def _node_to_position(self, node):
2207
2151
        """Convert an index value to position details."""
2208
 
        bits = node[2].split(b' ')
 
2152
        bits = node[2].split(' ')
2209
2153
        # It would be nice not to read the entire gzip.
2210
2154
        # start and stop are put into _int_cache because they are very common.
2211
2155
        # They define the 'group' that an entry is in, and many groups can have
2244
2188
            key_dependencies.add_references(node[1], node[3][0])
2245
2189
 
2246
2190
 
2247
 
from ._groupcompress_py import (
 
2191
from bzrlib._groupcompress_py import (
2248
2192
    apply_delta,
2249
2193
    apply_delta_to_source,
2250
2194
    encode_base128_int,
2253
2197
    LinesDeltaIndex,
2254
2198
    )
2255
2199
try:
2256
 
    from ._groupcompress_pyx import (
 
2200
    from bzrlib._groupcompress_pyx import (
2257
2201
        apply_delta,
2258
2202
        apply_delta_to_source,
2259
2203
        DeltaIndex,
2261
2205
        decode_base128_int,
2262
2206
        )
2263
2207
    GroupCompressor = PyrexGroupCompressor
2264
 
except ImportError as e:
 
2208
except ImportError, e:
2265
2209
    osutils.failed_to_load_extension(e)
2266
2210
    GroupCompressor = PythonGroupCompressor
 
2211