/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2019-06-03 21:45:21 UTC
  • mto: This revision was merged to the branch mainline in revision 7315.
  • Revision ID: jelmer@jelmer.uk-20190603214521-34fa5tp86ncfnn4h
Fix test; .git is now properly recognized as a control dir name.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
 
19
from __future__ import absolute_import
 
20
 
19
21
import time
20
22
import zlib
21
 
try:
22
 
    import pylzma
23
 
except ImportError:
24
 
    pylzma = None
25
23
 
26
 
from bzrlib import (
 
24
from ..lazy_import import lazy_import
 
25
lazy_import(globals(), """
 
26
from breezy import (
27
27
    annotate,
 
28
    config,
28
29
    debug,
29
 
    errors,
30
 
    graph as _mod_graph,
31
 
    knit,
32
30
    osutils,
33
 
    pack,
34
31
    static_tuple,
35
32
    trace,
36
 
    )
37
 
from bzrlib.btree_index import BTreeBuilder
38
 
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
 
from bzrlib.versionedfile import (
 
33
    tsort,
 
34
    )
 
35
from breezy.bzr import (
 
36
    knit,
 
37
    pack,
 
38
    pack_repo,
 
39
    )
 
40
 
 
41
from breezy.i18n import gettext
 
42
""")
 
43
 
 
44
from .. import (
 
45
    errors,
 
46
    )
 
47
from .btree_index import BTreeBuilder
 
48
from ..lru_cache import LRUSizeCache
 
49
from ..sixish import (
 
50
    indexbytes,
 
51
    map,
 
52
    range,
 
53
    viewitems,
 
54
    )
 
55
from .versionedfile import (
 
56
    _KeyRefs,
41
57
    adapter_registry,
42
58
    AbsentContentFactory,
43
59
    ChunkedContentFactory,
44
60
    FulltextContentFactory,
45
 
    VersionedFiles,
 
61
    VersionedFilesWithFallbacks,
46
62
    )
47
63
 
48
64
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
65
# groupcompress blocks.
50
66
BATCH_SIZE = 2**16
51
67
 
52
 
_USE_LZMA = False and (pylzma is not None)
 
68
# osutils.sha_string(b'')
 
69
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
53
70
 
54
 
# osutils.sha_string('')
55
 
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
56
71
 
57
72
def sort_gc_optimal(parent_map):
58
73
    """Sort and group the keys in parent_map into groupcompress order.
65
80
    # groupcompress ordering is approximately reverse topological,
66
81
    # properly grouped by file-id.
67
82
    per_prefix_map = {}
68
 
    for key, value in parent_map.iteritems():
69
 
        if isinstance(key, str) or len(key) == 1:
70
 
            prefix = ''
 
83
    for key, value in viewitems(parent_map):
 
84
        if isinstance(key, bytes) or len(key) == 1:
 
85
            prefix = b''
71
86
        else:
72
87
            prefix = key[0]
73
88
        try:
77
92
 
78
93
    present_keys = []
79
94
    for prefix in sorted(per_prefix_map):
80
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
95
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
96
    return present_keys
82
97
 
83
98
 
 
99
class DecompressCorruption(errors.BzrError):
 
100
 
 
101
    _fmt = "Corruption while decompressing repository file%(orig_error)s"
 
102
 
 
103
    def __init__(self, orig_error=None):
 
104
        if orig_error is not None:
 
105
            self.orig_error = ", %s" % (orig_error,)
 
106
        else:
 
107
            self.orig_error = ""
 
108
        errors.BzrError.__init__(self)
 
109
 
 
110
 
84
111
# The max zlib window size is 32kB, so if we set 'max_size' output of the
85
112
# decompressor to the requested bytes + 32kB, then we should guarantee
86
113
# num_bytes coming out.
87
 
_ZLIB_DECOMP_WINDOW = 32*1024
 
114
_ZLIB_DECOMP_WINDOW = 32 * 1024
 
115
 
88
116
 
89
117
class GroupCompressBlock(object):
90
118
    """An object which maintains the internal structure of the compressed data.
93
121
    """
94
122
 
95
123
    # Group Compress Block v1 Zlib
96
 
    GCB_HEADER = 'gcb1z\n'
 
124
    GCB_HEADER = b'gcb1z\n'
97
125
    # Group Compress Block v1 Lzma
98
 
    GCB_LZ_HEADER = 'gcb1l\n'
 
126
    GCB_LZ_HEADER = b'gcb1l\n'
99
127
    GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
100
128
 
101
129
    def __init__(self):
102
130
        # map by key? or just order in file?
103
131
        self._compressor_name = None
104
 
        self._z_content = None
 
132
        self._z_content_chunks = None
105
133
        self._z_content_decompressor = None
106
134
        self._z_content_length = None
107
135
        self._content_length = None
132
160
        # Expand the content if required
133
161
        if self._content is None:
134
162
            if self._content_chunks is not None:
135
 
                self._content = ''.join(self._content_chunks)
 
163
                self._content = b''.join(self._content_chunks)
136
164
                self._content_chunks = None
137
165
        if self._content is None:
138
 
            if self._z_content is None:
 
166
            # We join self._z_content_chunks here, because if we are
 
167
            # decompressing, then it is *very* likely that we have a single
 
168
            # chunk
 
169
            if self._z_content_chunks is None:
139
170
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
141
 
                self._content = ''
 
171
            z_content = b''.join(self._z_content_chunks)
 
172
            if z_content == b'':
 
173
                self._content = b''
142
174
            elif self._compressor_name == 'lzma':
143
175
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
176
                import pylzma
 
177
                self._content = pylzma.decompress(z_content)
145
178
            elif self._compressor_name == 'zlib':
146
179
                # Start a zlib decompressor
147
180
                if num_bytes * 4 > self._content_length * 3:
148
181
                    # If we are requesting more that 3/4ths of the content,
149
182
                    # just extract the whole thing in a single pass
150
183
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
184
                    self._content = zlib.decompress(z_content)
152
185
                else:
153
186
                    self._z_content_decompressor = zlib.decompressobj()
154
187
                    # Seed the decompressor with the uncompressed bytes, so
155
188
                    # that the rest of the code is simplified
156
189
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
190
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
191
                    if not self._z_content_decompressor.unconsumed_tail:
159
192
                        self._z_content_decompressor = None
160
193
            else:
187
220
            # The stream is finished
188
221
            self._z_content_decompressor = None
189
222
 
190
 
    def _parse_bytes(self, bytes, pos):
 
223
    def _parse_bytes(self, data, pos):
191
224
        """Read the various lengths from the header.
192
225
 
193
226
        This also populates the various 'compressed' buffers.
197
230
        # At present, we have 2 integers for the compressed and uncompressed
198
231
        # content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
199
232
        # checking too far, cap the search to 14 bytes.
200
 
        pos2 = bytes.index('\n', pos, pos + 14)
201
 
        self._z_content_length = int(bytes[pos:pos2])
202
 
        pos = pos2 + 1
203
 
        pos2 = bytes.index('\n', pos, pos + 14)
204
 
        self._content_length = int(bytes[pos:pos2])
205
 
        pos = pos2 + 1
206
 
        if len(bytes) != (pos + self._z_content_length):
 
233
        pos2 = data.index(b'\n', pos, pos + 14)
 
234
        self._z_content_length = int(data[pos:pos2])
 
235
        pos = pos2 + 1
 
236
        pos2 = data.index(b'\n', pos, pos + 14)
 
237
        self._content_length = int(data[pos:pos2])
 
238
        pos = pos2 + 1
 
239
        if len(data) != (pos + self._z_content_length):
207
240
            # XXX: Define some GCCorrupt error ?
208
241
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
 
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
242
                                 (len(data), pos, self._z_content_length))
 
243
        self._z_content_chunks = (data[pos:],)
 
244
 
 
245
    @property
 
246
    def _z_content(self):
 
247
        """Return z_content_chunks as a simple string.
 
248
 
 
249
        Meant only to be used by the test suite.
 
250
        """
 
251
        if self._z_content_chunks is not None:
 
252
            return b''.join(self._z_content_chunks)
 
253
        return None
211
254
 
212
255
    @classmethod
213
256
    def from_bytes(cls, bytes):
214
257
        out = cls()
215
 
        if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
 
258
        header = bytes[:6]
 
259
        if header not in cls.GCB_KNOWN_HEADERS:
216
260
            raise ValueError('bytes did not start with any of %r'
217
261
                             % (cls.GCB_KNOWN_HEADERS,))
218
 
        # XXX: why not testing the whole header ?
219
 
        if bytes[4] == 'z':
 
262
        if header == cls.GCB_HEADER:
220
263
            out._compressor_name = 'zlib'
221
 
        elif bytes[4] == 'l':
 
264
        elif header == cls.GCB_LZ_HEADER:
222
265
            out._compressor_name = 'lzma'
223
266
        else:
224
 
            raise ValueError('unknown compressor: %r' % (bytes,))
 
267
            raise ValueError('unknown compressor: %r' % (header,))
225
268
        out._parse_bytes(bytes, 6)
226
269
        return out
227
270
 
233
276
        :return: The bytes for the content
234
277
        """
235
278
        if start == end == 0:
236
 
            return ''
 
279
            return b''
237
280
        self._ensure_content(end)
238
281
        # The bytes are 'f' or 'd' for the type, then a variable-length
239
282
        # base128 integer for the content size, then the actual content
240
283
        # We know that the variable-length integer won't be longer than 5
241
284
        # bytes (it takes 5 bytes to encode 2^32)
242
 
        c = self._content[start]
243
 
        if c == 'f':
 
285
        c = self._content[start:start + 1]
 
286
        if c == b'f':
244
287
            type = 'fulltext'
245
288
        else:
246
 
            if c != 'd':
 
289
            if c != b'd':
247
290
                raise ValueError('Unknown content control code: %s'
248
291
                                 % (c,))
249
292
            type = 'delta'
250
293
        content_len, len_len = decode_base128_int(
251
 
                            self._content[start + 1:start + 6])
 
294
            self._content[start + 1:start + 6])
252
295
        content_start = start + 1 + len_len
253
296
        if end != content_start + content_len:
254
297
            raise ValueError('end != len according to field header'
255
 
                ' %s != %s' % (end, content_start + content_len))
256
 
        if c == 'f':
257
 
            bytes = self._content[content_start:end]
258
 
        elif c == 'd':
259
 
            bytes = apply_delta_to_source(self._content, content_start, end)
260
 
        return bytes
 
298
                             ' %s != %s' % (end, content_start + content_len))
 
299
        if c == b'f':
 
300
            return self._content[content_start:end]
 
301
        # Must be type delta as checked above
 
302
        return apply_delta_to_source(self._content, content_start, end)
261
303
 
262
304
    def set_chunked_content(self, content_chunks, length):
263
305
        """Set the content of this block to the given chunks."""
269
311
        self._content_length = length
270
312
        self._content_chunks = content_chunks
271
313
        self._content = None
272
 
        self._z_content = None
 
314
        self._z_content_chunks = None
273
315
 
274
316
    def set_content(self, content):
275
317
        """Set the content of this block."""
276
318
        self._content_length = len(content)
277
319
        self._content = content
278
 
        self._z_content = None
279
 
 
280
 
    def _create_z_content_using_lzma(self):
281
 
        if self._content_chunks is not None:
282
 
            self._content = ''.join(self._content_chunks)
283
 
            self._content_chunks = None
284
 
        if self._content is None:
285
 
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
288
 
 
289
 
    def _create_z_content_from_chunks(self):
 
320
        self._z_content_chunks = None
 
321
 
 
322
    def _create_z_content_from_chunks(self, chunks):
290
323
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
324
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
325
        # (measured peak is maybe 30MB over the above...)
 
326
        compressed_chunks = list(map(compressor.compress, chunks))
292
327
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
328
        # Ignore empty chunks
 
329
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
330
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
331
 
296
332
    def _create_z_content(self):
297
 
        if self._z_content is not None:
298
 
            return
299
 
        if _USE_LZMA:
300
 
            self._create_z_content_using_lzma()
 
333
        if self._z_content_chunks is not None:
301
334
            return
302
335
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
336
            chunks = self._content_chunks
 
337
        else:
 
338
            chunks = (self._content,)
 
339
        self._create_z_content_from_chunks(chunks)
 
340
 
 
341
    def to_chunks(self):
 
342
        """Create the byte stream as a series of 'chunks'"""
 
343
        self._create_z_content()
 
344
        header = self.GCB_HEADER
 
345
        chunks = [b'%s%d\n%d\n'
 
346
                  % (header, self._z_content_length, self._content_length),
 
347
                  ]
 
348
        chunks.extend(self._z_content_chunks)
 
349
        total_len = sum(map(len, chunks))
 
350
        return total_len, chunks
307
351
 
308
352
    def to_bytes(self):
309
353
        """Encode the information into a byte stream."""
310
 
        self._create_z_content()
311
 
        if _USE_LZMA:
312
 
            header = self.GCB_LZ_HEADER
313
 
        else:
314
 
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
318
 
                 ]
319
 
        return ''.join(chunks)
 
354
        total_len, chunks = self.to_chunks()
 
355
        return b''.join(chunks)
320
356
 
321
357
    def _dump(self, include_text=False):
322
358
        """Take this block, and spit out a human-readable structure.
332
368
        result = []
333
369
        pos = 0
334
370
        while pos < self._content_length:
335
 
            kind = self._content[pos]
 
371
            kind = self._content[pos:pos + 1]
336
372
            pos += 1
337
 
            if kind not in ('f', 'd'):
 
373
            if kind not in (b'f', b'd'):
338
374
                raise ValueError('invalid kind character: %r' % (kind,))
339
375
            content_len, len_len = decode_base128_int(
340
 
                                self._content[pos:pos + 5])
 
376
                self._content[pos:pos + 5])
341
377
            pos += len_len
342
378
            if content_len + pos > self._content_length:
343
379
                raise ValueError('invalid content_len %d for record @ pos %d'
344
380
                                 % (content_len, pos - len_len - 1))
345
 
            if kind == 'f': # Fulltext
 
381
            if kind == b'f':  # Fulltext
346
382
                if include_text:
347
 
                    text = self._content[pos:pos+content_len]
348
 
                    result.append(('f', content_len, text))
 
383
                    text = self._content[pos:pos + content_len]
 
384
                    result.append((b'f', content_len, text))
349
385
                else:
350
 
                    result.append(('f', content_len))
351
 
            elif kind == 'd': # Delta
352
 
                delta_content = self._content[pos:pos+content_len]
 
386
                    result.append((b'f', content_len))
 
387
            elif kind == b'd':  # Delta
 
388
                delta_content = self._content[pos:pos + content_len]
353
389
                delta_info = []
354
390
                # The first entry in a delta is the decompressed length
355
391
                decomp_len, delta_pos = decode_base128_int(delta_content)
356
 
                result.append(('d', content_len, decomp_len, delta_info))
 
392
                result.append((b'd', content_len, decomp_len, delta_info))
357
393
                measured_len = 0
358
394
                while delta_pos < content_len:
359
 
                    c = ord(delta_content[delta_pos])
 
395
                    c = indexbytes(delta_content, delta_pos)
360
396
                    delta_pos += 1
361
 
                    if c & 0x80: # Copy
 
397
                    if c & 0x80:  # Copy
362
398
                        (offset, length,
363
399
                         delta_pos) = decode_copy_instruction(delta_content, c,
364
400
                                                              delta_pos)
365
401
                        if include_text:
366
 
                            text = self._content[offset:offset+length]
367
 
                            delta_info.append(('c', offset, length, text))
 
402
                            text = self._content[offset:offset + length]
 
403
                            delta_info.append((b'c', offset, length, text))
368
404
                        else:
369
 
                            delta_info.append(('c', offset, length))
 
405
                            delta_info.append((b'c', offset, length))
370
406
                        measured_len += length
371
 
                    else: # Insert
 
407
                    else:  # Insert
372
408
                        if include_text:
373
 
                            txt = delta_content[delta_pos:delta_pos+c]
 
409
                            txt = delta_content[delta_pos:delta_pos + c]
374
410
                        else:
375
 
                            txt = ''
376
 
                        delta_info.append(('i', c, txt))
 
411
                            txt = b''
 
412
                        delta_info.append((b'i', c, txt))
377
413
                        measured_len += c
378
414
                        delta_pos += c
379
415
                if delta_pos != content_len:
421
457
 
422
458
    def __repr__(self):
423
459
        return '%s(%s, first=%s)' % (self.__class__.__name__,
424
 
            self.key, self._first)
 
460
                                     self.key, self._first)
425
461
 
426
462
    def get_bytes_as(self, storage_kind):
427
463
        if storage_kind == self.storage_kind:
429
465
                # wire bytes, something...
430
466
                return self._manager._wire_bytes()
431
467
            else:
432
 
                return ''
 
468
                return b''
433
469
        if storage_kind in ('fulltext', 'chunked'):
434
470
            if self._bytes is None:
435
471
                # Grab and cache the raw bytes for this entry
436
472
                # and break the ref-cycle with _manager since we don't need it
437
473
                # anymore
438
 
                self._manager._prepare_for_extract()
 
474
                try:
 
475
                    self._manager._prepare_for_extract()
 
476
                except zlib.error as value:
 
477
                    raise DecompressCorruption("zlib: " + str(value))
439
478
                block = self._manager._block
440
479
                self._bytes = block.extract(self.key, self._start, self._end)
441
480
                # There are code paths that first extract as fulltext, and then
452
491
class _LazyGroupContentManager(object):
453
492
    """This manages a group of _LazyGroupCompressFactory objects."""
454
493
 
455
 
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
456
 
                             # current size, and still be considered
457
 
                             # resuable
458
 
    _full_block_size = 4*1024*1024
459
 
    _full_mixed_block_size = 2*1024*1024
460
 
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
461
 
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
 
494
    _max_cut_fraction = 0.75  # We allow a block to be trimmed to 75% of
 
495
    # current size, and still be considered
 
496
    # resuable
 
497
    _full_block_size = 4 * 1024 * 1024
 
498
    _full_mixed_block_size = 2 * 1024 * 1024
 
499
    _full_enough_block_size = 3 * 1024 * 1024  # size at which we won't repack
 
500
    _full_enough_mixed_block_size = 2 * 768 * 1024  # 1.5MB
462
501
 
463
 
    def __init__(self, block):
 
502
    def __init__(self, block, get_compressor_settings=None):
464
503
        self._block = block
465
504
        # We need to preserve the ordering
466
505
        self._factories = []
467
506
        self._last_byte = 0
 
507
        self._get_settings = get_compressor_settings
 
508
        self._compressor_settings = None
 
509
 
 
510
    def _get_compressor_settings(self):
 
511
        if self._compressor_settings is not None:
 
512
            return self._compressor_settings
 
513
        settings = None
 
514
        if self._get_settings is not None:
 
515
            settings = self._get_settings()
 
516
        if settings is None:
 
517
            vf = GroupCompressVersionedFiles
 
518
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
519
        self._compressor_settings = settings
 
520
        return self._compressor_settings
468
521
 
469
522
    def add_factory(self, key, parents, start, end):
470
523
        if not self._factories:
473
526
            first = False
474
527
        # Note that this creates a reference cycle....
475
528
        factory = _LazyGroupCompressFactory(key, parents, self,
476
 
            start, end, first=first)
 
529
                                            start, end, first=first)
477
530
        # max() works here, but as a function call, doing a compare seems to be
478
531
        # significantly faster, timeit says 250ms for max() and 100ms for the
479
532
        # comparison
503
556
        new_block.set_content(self._block._content[:last_byte])
504
557
        self._block = new_block
505
558
 
 
559
    def _make_group_compressor(self):
 
560
        return GroupCompressor(self._get_compressor_settings())
 
561
 
506
562
    def _rebuild_block(self):
507
563
        """Create a new GroupCompressBlock with only the referenced texts."""
508
 
        compressor = GroupCompressor()
 
564
        compressor = self._make_group_compressor()
509
565
        tstart = time.time()
510
566
        old_length = self._block._content_length
511
567
        end_point = 0
523
579
        #       block? It seems hard to come up with a method that it would
524
580
        #       expand, since we do full compression again. Perhaps based on a
525
581
        #       request that ends up poorly ordered?
 
582
        # TODO: If the content would have expanded, then we would want to
 
583
        #       handle a case where we need to split the block.
 
584
        #       Now that we have a user-tweakable option
 
585
        #       (max_bytes_to_index), it is possible that one person set it
 
586
        #       to a very low value, causing poor compression.
526
587
        delta = time.time() - tstart
527
588
        self._block = new_block
528
589
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
651
712
        #   <length of gc block>\n
652
713
        #   <header bytes>
653
714
        #   <gc-block>
654
 
        lines = ['groupcompress-block\n']
 
715
        lines = [b'groupcompress-block\n']
655
716
        # The minimal info we need is the key, the start offset, and the
656
717
        # parents. The length and type are encoded in the record itself.
657
718
        # However, passing in the other bits makes it easier.  The list of
662
723
        # 1 line for end byte
663
724
        header_lines = []
664
725
        for factory in self._factories:
665
 
            key_bytes = '\x00'.join(factory.key)
 
726
            key_bytes = b'\x00'.join(factory.key)
666
727
            parents = factory.parents
667
728
            if parents is None:
668
 
                parent_bytes = 'None:'
 
729
                parent_bytes = b'None:'
669
730
            else:
670
 
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
671
 
            record_header = '%s\n%s\n%d\n%d\n' % (
 
731
                parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
 
732
            record_header = b'%s\n%s\n%d\n%d\n' % (
672
733
                key_bytes, parent_bytes, factory._start, factory._end)
673
734
            header_lines.append(record_header)
674
735
            # TODO: Can we break the refcycle at this point and set
675
736
            #       factory._manager = None?
676
 
        header_bytes = ''.join(header_lines)
 
737
        header_bytes = b''.join(header_lines)
677
738
        del header_lines
678
739
        header_bytes_len = len(header_bytes)
679
740
        z_header_bytes = zlib.compress(header_bytes)
680
741
        del header_bytes
681
742
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
683
 
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
743
        block_bytes_len, block_chunks = self._block.to_chunks()
 
744
        lines.append(b'%d\n%d\n%d\n' % (
 
745
            z_header_bytes_len, header_bytes_len, block_bytes_len))
685
746
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
688
 
        return ''.join(lines)
 
747
        lines.extend(block_chunks)
 
748
        del z_header_bytes, block_chunks
 
749
        # TODO: This is a point where we will double the memory consumption. To
 
750
        #       avoid this, we probably have to switch to a 'chunked' api
 
751
        return b''.join(lines)
689
752
 
690
753
    @classmethod
691
754
    def from_bytes(cls, bytes):
692
755
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
756
        #       different way. At a minimum this creates 2 copies of the
 
757
        #       compressed content
694
758
        (storage_kind, z_header_len, header_len,
695
 
         block_len, rest) = bytes.split('\n', 4)
 
759
         block_len, rest) = bytes.split(b'\n', 4)
696
760
        del bytes
697
 
        if storage_kind != 'groupcompress-block':
 
761
        if storage_kind != b'groupcompress-block':
698
762
            raise ValueError('Unknown storage kind: %s' % (storage_kind,))
699
763
        z_header_len = int(z_header_len)
700
764
        if len(rest) < z_header_len:
712
776
        del rest
713
777
        # So now we have a valid GCB, we just need to parse the factories that
714
778
        # were sent to us
715
 
        header_lines = header.split('\n')
 
779
        header_lines = header.split(b'\n')
716
780
        del header
717
781
        last = header_lines.pop()
718
 
        if last != '':
 
782
        if last != b'':
719
783
            raise ValueError('header lines did not end with a trailing'
720
784
                             ' newline')
721
785
        if len(header_lines) % 4 != 0:
723
787
        block = GroupCompressBlock.from_bytes(block_bytes)
724
788
        del block_bytes
725
789
        result = cls(block)
726
 
        for start in xrange(0, len(header_lines), 4):
 
790
        for start in range(0, len(header_lines), 4):
727
791
            # intern()?
728
 
            key = tuple(header_lines[start].split('\x00'))
729
 
            parents_line = header_lines[start+1]
730
 
            if parents_line == 'None:':
 
792
            key = tuple(header_lines[start].split(b'\x00'))
 
793
            parents_line = header_lines[start + 1]
 
794
            if parents_line == b'None:':
731
795
                parents = None
732
796
            else:
733
 
                parents = tuple([tuple(segment.split('\x00'))
734
 
                                 for segment in parents_line.split('\t')
735
 
                                  if segment])
736
 
            start_offset = int(header_lines[start+2])
737
 
            end_offset = int(header_lines[start+3])
 
797
                parents = tuple([tuple(segment.split(b'\x00'))
 
798
                                 for segment in parents_line.split(b'\t')
 
799
                                 if segment])
 
800
            start_offset = int(header_lines[start + 2])
 
801
            end_offset = int(header_lines[start + 3])
738
802
            result.add_factory(key, parents, start_offset, end_offset)
739
803
        return result
740
804
 
748
812
 
749
813
class _CommonGroupCompressor(object):
750
814
 
751
 
    def __init__(self):
 
815
    def __init__(self, settings=None):
752
816
        """Create a GroupCompressor."""
753
817
        self.chunks = []
754
818
        self._last = None
755
819
        self.endpoint = 0
756
820
        self.input_bytes = 0
757
821
        self.labels_deltas = {}
758
 
        self._delta_index = None # Set by the children
 
822
        self._delta_index = None  # Set by the children
759
823
        self._block = GroupCompressBlock()
 
824
        if settings is None:
 
825
            self._settings = {}
 
826
        else:
 
827
            self._settings = settings
760
828
 
761
829
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
762
830
        """Compress lines with label key.
763
831
 
764
832
        :param key: A key tuple. It is stored in the output
765
833
            for identification of the text during decompression. If the last
766
 
            element is 'None' it is replaced with the sha1 of the text -
 
834
            element is b'None' it is replaced with the sha1 of the text -
767
835
            e.g. sha1:xxxxxxx.
768
836
        :param bytes: The bytes to be compressed
769
837
        :param expected_sha: If non-None, the sha the lines are believed to
779
847
 
780
848
        :seealso VersionedFiles.add_lines:
781
849
        """
782
 
        if not bytes: # empty, like a dir entry, etc
 
850
        if not bytes:  # empty, like a dir entry, etc
783
851
            if nostore_sha == _null_sha1:
784
852
                raise errors.ExistingContent()
785
853
            return _null_sha1, 0, 0, 'fulltext'
792
860
            if sha1 == nostore_sha:
793
861
                raise errors.ExistingContent()
794
862
        if key[-1] is None:
795
 
            key = key[:-1] + ('sha1:' + sha1,)
 
863
            key = key[:-1] + (b'sha1:' + sha1,)
796
864
 
797
865
        start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
798
866
        return sha1, start, end, type
822
890
        :param key: The key to extract.
823
891
        :return: An iterable over bytes and the sha1.
824
892
        """
825
 
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
 
893
        (start_byte, start_chunk, end_byte,
 
894
         end_chunk) = self.labels_deltas[key]
826
895
        delta_chunks = self.chunks[start_chunk:end_chunk]
827
 
        stored_bytes = ''.join(delta_chunks)
828
 
        if stored_bytes[0] == 'f':
 
896
        stored_bytes = b''.join(delta_chunks)
 
897
        kind = stored_bytes[:1]
 
898
        if kind == b'f':
829
899
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
830
900
            data_len = fulltext_len + 1 + offset
831
 
            if  data_len != len(stored_bytes):
 
901
            if data_len != len(stored_bytes):
832
902
                raise ValueError('Index claimed fulltext len, but stored bytes'
833
903
                                 ' claim %s != %s'
834
904
                                 % (len(stored_bytes), data_len))
835
 
            bytes = stored_bytes[offset + 1:]
 
905
            data = stored_bytes[offset + 1:]
836
906
        else:
 
907
            if kind != b'd':
 
908
                raise ValueError('Unknown content kind, bytes claim %s' % kind)
837
909
            # XXX: This is inefficient at best
838
 
            source = ''.join(self.chunks[:start_chunk])
839
 
            if stored_bytes[0] != 'd':
840
 
                raise ValueError('Unknown content kind, bytes claim %s'
841
 
                                 % (stored_bytes[0],))
 
910
            source = b''.join(self.chunks[:start_chunk])
842
911
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
843
912
            data_len = delta_len + 1 + offset
844
913
            if data_len != len(stored_bytes):
845
914
                raise ValueError('Index claimed delta len, but stored bytes'
846
915
                                 ' claim %s != %s'
847
916
                                 % (len(stored_bytes), data_len))
848
 
            bytes = apply_delta(source, stored_bytes[offset + 1:])
849
 
        bytes_sha1 = osutils.sha_string(bytes)
850
 
        return bytes, bytes_sha1
 
917
            data = apply_delta(source, stored_bytes[offset + 1:])
 
918
        data_sha1 = osutils.sha_string(data)
 
919
        return data, data_sha1
851
920
 
852
921
    def flush(self):
853
922
        """Finish this group, creating a formatted stream.
854
923
 
855
924
        After calling this, the compressor should no longer be used
856
925
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
926
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
927
        self.chunks = None
867
928
        self._delta_index = None
885
946
 
886
947
class PythonGroupCompressor(_CommonGroupCompressor):
887
948
 
888
 
    def __init__(self):
 
949
    def __init__(self, settings=None):
889
950
        """Create a GroupCompressor.
890
951
 
891
952
        Used only if the pyrex version is not available.
892
953
        """
893
 
        super(PythonGroupCompressor, self).__init__()
 
954
        super(PythonGroupCompressor, self).__init__(settings)
894
955
        self._delta_index = LinesDeltaIndex([])
895
956
        # The actual content is managed by LinesDeltaIndex
896
957
        self.chunks = self._delta_index.lines
905
966
        if delta_length > max_delta_size:
906
967
            # The delta is longer than the fulltext, insert a fulltext
907
968
            type = 'fulltext'
908
 
            out_lines = ['f', encode_base128_int(input_len)]
 
969
            out_lines = [b'f', encode_base128_int(input_len)]
909
970
            out_lines.extend(new_lines)
910
971
            index_lines = [False, False]
911
972
            index_lines.extend([True] * len(new_lines))
912
973
        else:
913
974
            # this is a worthy delta, output it
914
975
            type = 'delta'
915
 
            out_lines[0] = 'd'
 
976
            out_lines[0] = b'd'
916
977
            # Update the delta_length to include those two encoded integers
917
978
            out_lines[1] = encode_base128_int(delta_length)
918
979
        # Before insertion
933
994
 
934
995
    It contains code very similar to SequenceMatcher because of having a similar
935
996
    task. However some key differences apply:
936
 
     - there is no junk, we want a minimal edit not a human readable diff.
937
 
     - we don't filter very common lines (because we don't know where a good
938
 
       range will start, and after the first text we want to be emitting minmal
939
 
       edits only.
940
 
     - we chain the left side, not the right side
941
 
     - we incrementally update the adjacency matrix as new lines are provided.
942
 
     - we look for matches in all of the left side, so the routine which does
943
 
       the analagous task of find_longest_match does not need to filter on the
944
 
       left side.
 
997
 
 
998
    * there is no junk, we want a minimal edit not a human readable diff.
 
999
    * we don't filter very common lines (because we don't know where a good
 
1000
      range will start, and after the first text we want to be emitting minmal
 
1001
      edits only.
 
1002
    * we chain the left side, not the right side
 
1003
    * we incrementally update the adjacency matrix as new lines are provided.
 
1004
    * we look for matches in all of the left side, so the routine which does
 
1005
      the analagous task of find_longest_match does not need to filter on the
 
1006
      left side.
945
1007
    """
946
1008
 
947
 
    def __init__(self):
948
 
        super(PyrexGroupCompressor, self).__init__()
949
 
        self._delta_index = DeltaIndex()
 
1009
    def __init__(self, settings=None):
 
1010
        super(PyrexGroupCompressor, self).__init__(settings)
 
1011
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
1012
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
950
1013
 
951
1014
    def _compress(self, key, bytes, max_delta_size, soft=False):
952
1015
        """see _CommonGroupCompressor._compress"""
962
1025
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
963
1026
        if self._delta_index._source_offset != self.endpoint:
964
1027
            raise AssertionError('_source_offset != endpoint'
965
 
                ' somehow the DeltaIndex got out of sync with'
966
 
                ' the output lines')
 
1028
                                 ' somehow the DeltaIndex got out of sync with'
 
1029
                                 ' the output lines')
967
1030
        delta = self._delta_index.make_delta(bytes, max_delta_size)
968
1031
        if (delta is None):
969
1032
            type = 'fulltext'
970
1033
            enc_length = encode_base128_int(len(bytes))
971
1034
            len_mini_header = 1 + len(enc_length)
972
1035
            self._delta_index.add_source(bytes, len_mini_header)
973
 
            new_chunks = ['f', enc_length, bytes]
 
1036
            new_chunks = [b'f', enc_length, bytes]
974
1037
        else:
975
1038
            type = 'delta'
976
1039
            enc_length = encode_base128_int(len(delta))
977
1040
            len_mini_header = 1 + len(enc_length)
978
 
            new_chunks = ['d', enc_length, delta]
 
1041
            new_chunks = [b'd', enc_length, delta]
979
1042
            self._delta_index.add_delta_source(delta, len_mini_header)
980
1043
        # Before insertion
981
1044
        start = self.endpoint
988
1051
                                   self.endpoint, chunk_end)
989
1052
        if not self._delta_index._source_offset == self.endpoint:
990
1053
            raise AssertionError('the delta index is out of sync'
991
 
                'with the output lines %s != %s'
992
 
                % (self._delta_index._source_offset, self.endpoint))
 
1054
                                 'with the output lines %s != %s'
 
1055
                                 % (self._delta_index._source_offset, self.endpoint))
993
1056
        return start, self.endpoint, type
994
1057
 
995
1058
    def _output_chunks(self, new_chunks):
1020
1083
        if graph:
1021
1084
            ref_length = 1
1022
1085
        graph_index = BTreeBuilder(reference_lists=ref_length,
1023
 
            key_elements=keylength)
 
1086
                                   key_elements=keylength)
1024
1087
        stream = transport.open_write_stream('newpack')
1025
1088
        writer = pack.ContainerWriter(stream.write)
1026
1089
        writer.begin()
1027
 
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
 
            add_callback=graph_index.add_nodes,
1029
 
            inconsistency_fatal=inconsistency_fatal)
1030
 
        access = knit._DirectPackAccess({})
 
1090
        index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
 
1091
                              add_callback=graph_index.add_nodes,
 
1092
                              inconsistency_fatal=inconsistency_fatal)
 
1093
        access = pack_repo._DirectPackAccess({})
1031
1094
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1095
        result = GroupCompressVersionedFiles(index, access, delta)
1033
1096
        result.stream = stream
1043
1106
 
1044
1107
class _BatchingBlockFetcher(object):
1045
1108
    """Fetch group compress blocks in batches.
1046
 
    
 
1109
 
1047
1110
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1048
1111
        currently pending batch.
1049
1112
    """
1050
1113
 
1051
 
    def __init__(self, gcvf, locations):
 
1114
    def __init__(self, gcvf, locations, get_compressor_settings=None):
1052
1115
        self.gcvf = gcvf
1053
1116
        self.locations = locations
1054
1117
        self.keys = []
1057
1120
        self.total_bytes = 0
1058
1121
        self.last_read_memo = None
1059
1122
        self.manager = None
 
1123
        self._get_compressor_settings = get_compressor_settings
1060
1124
 
1061
1125
    def add_key(self, key):
1062
1126
        """Add another to key to fetch.
1063
 
        
 
1127
 
1064
1128
        :return: The estimated number of bytes needed to fetch the batch so
1065
1129
            far.
1066
1130
        """
1091
1155
            # and then.
1092
1156
            self.batch_memos[read_memo] = cached_block
1093
1157
        return self.total_bytes
1094
 
        
 
1158
 
1095
1159
    def _flush_manager(self):
1096
1160
        if self.manager is not None:
1097
1161
            for factory in self.manager.get_record_stream():
1102
1166
    def yield_factories(self, full_flush=False):
1103
1167
        """Yield factories for keys added since the last yield.  They will be
1104
1168
        returned in the order they were added via add_key.
1105
 
        
 
1169
 
1106
1170
        :param full_flush: by default, some results may not be returned in case
1107
1171
            they can be part of the next batch.  If full_flush is True, then
1108
1172
            all results are returned.
1127
1191
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1128
1192
                    # The next block from _get_blocks will be the block we
1129
1193
                    # need.
1130
 
                    block_read_memo, block = blocks.next()
 
1194
                    block_read_memo, block = next(blocks)
1131
1195
                    if block_read_memo != read_memo:
1132
1196
                        raise AssertionError(
1133
1197
                            "block_read_memo out of sync with read_memo"
1136
1200
                    memos_to_get_stack.pop()
1137
1201
                else:
1138
1202
                    block = self.batch_memos[read_memo]
1139
 
                self.manager = _LazyGroupContentManager(block)
 
1203
                self.manager = _LazyGroupContentManager(block,
 
1204
                                                        get_compressor_settings=self._get_compressor_settings)
1140
1205
                self.last_read_memo = read_memo
1141
1206
            start, end = index_memo[3:5]
1142
1207
            self.manager.add_factory(key, parents, start, end)
1149
1214
        self.total_bytes = 0
1150
1215
 
1151
1216
 
1152
 
class GroupCompressVersionedFiles(VersionedFiles):
 
1217
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1218
    """A group-compress based VersionedFiles implementation."""
1154
1219
 
1155
 
    def __init__(self, index, access, delta=True, _unadded_refs=None):
 
1220
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1221
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1222
    # However each of these references costs some memory in trade against a
 
1223
    # more accurate match result. For very large files, they either are
 
1224
    # pre-compressed and change in bulk whenever they change, or change in just
 
1225
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1226
    # versus running out of memory trying to track everything. The default max
 
1227
    # gives 100% sampling of a 1MB file.
 
1228
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1229
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1230
                                    _DEFAULT_MAX_BYTES_TO_INDEX}
 
1231
 
 
1232
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1233
                 _group_cache=None):
1156
1234
        """Create a GroupCompressVersionedFiles object.
1157
1235
 
1158
1236
        :param index: The index object storing access and graph data.
1159
1237
        :param access: The access object storing raw data.
1160
1238
        :param delta: Whether to delta compress or just entropy compress.
1161
1239
        :param _unadded_refs: private parameter, don't use.
 
1240
        :param _group_cache: private parameter, don't use.
1162
1241
        """
1163
1242
        self._index = index
1164
1243
        self._access = access
1166
1245
        if _unadded_refs is None:
1167
1246
            _unadded_refs = {}
1168
1247
        self._unadded_refs = _unadded_refs
1169
 
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
 
        self._fallback_vfs = []
 
1248
        if _group_cache is None:
 
1249
            _group_cache = LRUSizeCache(max_size=50 * 1024 * 1024)
 
1250
        self._group_cache = _group_cache
 
1251
        self._immediate_fallback_vfs = []
 
1252
        self._max_bytes_to_index = None
1171
1253
 
1172
1254
    def without_fallbacks(self):
1173
1255
        """Return a clone of this object without any fallbacks configured."""
1174
1256
        return GroupCompressVersionedFiles(self._index, self._access,
1175
 
            self._delta, _unadded_refs=dict(self._unadded_refs))
 
1257
                                           self._delta, _unadded_refs=dict(
 
1258
                                               self._unadded_refs),
 
1259
                                           _group_cache=self._group_cache)
1176
1260
 
1177
1261
    def add_lines(self, key, parents, lines, parent_texts=None,
1178
 
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1179
 
        check_content=True):
 
1262
                  left_matching_blocks=None, nostore_sha=None, random_id=False,
 
1263
                  check_content=True):
1180
1264
        """Add a text to the store.
1181
1265
 
1182
1266
        :param key: The key tuple of the text to add.
1183
1267
        :param parents: The parents key tuples of the text to add.
1184
1268
        :param lines: A list of lines. Each line must be a bytestring. And all
1185
 
            of them except the last must be terminated with \n and contain no
1186
 
            other \n's. The last line may either contain no \n's or a single
1187
 
            terminating \n. If the lines list does meet this constraint the add
1188
 
            routine may error or may succeed - but you will be unable to read
1189
 
            the data back accurately. (Checking the lines have been split
 
1269
            of them except the last must be terminated with \\n and contain no
 
1270
            other \\n's. The last line may either contain no \\n's or a single
 
1271
            terminating \\n. If the lines list does meet this constraint the
 
1272
            add routine may error or may succeed - but you will be unable to
 
1273
            read the data back accurately. (Checking the lines have been split
1190
1274
            correctly is expensive and extremely unlikely to catch bugs so it
1191
1275
            is not done at runtime unless check_content is True.)
1192
1276
        :param parent_texts: An optional dictionary containing the opaque
1224
1308
                                               nostore_sha=nostore_sha))[0]
1225
1309
        return sha1, length, None
1226
1310
 
1227
 
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1228
 
        """See VersionedFiles._add_text()."""
1229
 
        self._index._check_write_ok()
1230
 
        self._check_add(key, None, random_id, check_content=False)
1231
 
        if text.__class__ is not str:
1232
 
            raise errors.BzrBadParameterUnicode("text")
1233
 
        if parents is None:
1234
 
            # The caller might pass None if there is no graph data, but kndx
1235
 
            # indexes can't directly store that, so we give them
1236
 
            # an empty tuple instead.
1237
 
            parents = ()
1238
 
        # double handling for now. Make it work until then.
1239
 
        length = len(text)
1240
 
        record = FulltextContentFactory(key, parents, None, text)
1241
 
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
1242
 
                                               nostore_sha=nostore_sha))[0]
1243
 
        return sha1, length, None
1244
 
 
1245
1311
    def add_fallback_versioned_files(self, a_versioned_files):
1246
1312
        """Add a source of texts for texts not present in this knit.
1247
1313
 
1248
1314
        :param a_versioned_files: A VersionedFiles object.
1249
1315
        """
1250
 
        self._fallback_vfs.append(a_versioned_files)
 
1316
        self._immediate_fallback_vfs.append(a_versioned_files)
1251
1317
 
1252
1318
    def annotate(self, key):
1253
1319
        """See VersionedFiles.annotate."""
1287
1353
            self._check_lines_not_unicode(lines)
1288
1354
            self._check_lines_are_lines(lines)
1289
1355
 
1290
 
    def get_known_graph_ancestry(self, keys):
1291
 
        """Get a KnownGraph instance with the ancestry of keys."""
1292
 
        # Note that this is identical to
1293
 
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1294
 
        # ancestry.
1295
 
        parent_map, missing_keys = self._index.find_ancestry(keys)
1296
 
        for fallback in self._fallback_vfs:
1297
 
            if not missing_keys:
1298
 
                break
1299
 
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1300
 
                                                missing_keys)
1301
 
            parent_map.update(f_parent_map)
1302
 
            missing_keys = f_missing_keys
1303
 
        kg = _mod_graph.KnownGraph(parent_map)
1304
 
        return kg
1305
 
 
1306
1356
    def get_parent_map(self, keys):
1307
1357
        """Get a map of the graph parents of keys.
1308
1358
 
1323
1373
            and so on.
1324
1374
        """
1325
1375
        result = {}
1326
 
        sources = [self._index] + self._fallback_vfs
 
1376
        sources = [self._index] + self._immediate_fallback_vfs
1327
1377
        source_results = []
1328
1378
        missing = set(keys)
1329
1379
        for source in sources:
1366
1416
                yield read_memo, cached[read_memo]
1367
1417
            except KeyError:
1368
1418
                # Read the block, and cache it.
1369
 
                zdata = raw_records.next()
 
1419
                zdata = next(raw_records)
1370
1420
                block = GroupCompressBlock.from_bytes(zdata)
1371
1421
                self._group_cache[read_memo] = block
1372
1422
                cached[read_memo] = block
1400
1450
        if not keys:
1401
1451
            return
1402
1452
        if (not self._index.has_graph
1403
 
            and ordering in ('topological', 'groupcompress')):
 
1453
                and ordering in ('topological', 'groupcompress')):
1404
1454
            # Cannot topological order when no graph has been stored.
1405
1455
            # but we allow 'as-requested' or 'unordered'
1406
1456
            ordering = 'unordered'
1410
1460
            try:
1411
1461
                keys = set(remaining_keys)
1412
1462
                for content_factory in self._get_remaining_record_stream(keys,
1413
 
                        orig_keys, ordering, include_delta_closure):
 
1463
                                                                         orig_keys, ordering, include_delta_closure):
1414
1464
                    remaining_keys.discard(content_factory.key)
1415
1465
                    yield content_factory
1416
1466
                return
1417
 
            except errors.RetryWithNewPacks, e:
 
1467
            except errors.RetryWithNewPacks as e:
1418
1468
                self._access.reload_or_raise(e)
1419
1469
 
1420
1470
    def _find_from_fallback(self, missing):
1430
1480
        parent_map = {}
1431
1481
        key_to_source_map = {}
1432
1482
        source_results = []
1433
 
        for source in self._fallback_vfs:
 
1483
        for source in self._immediate_fallback_vfs:
1434
1484
            if not missing:
1435
1485
                break
1436
1486
            source_parents = source.get_parent_map(missing)
1446
1496
 
1447
1497
        The returned objects should be in the order defined by 'ordering',
1448
1498
        which can weave between different sources.
 
1499
 
1449
1500
        :param ordering: Must be one of 'topological' or 'groupcompress'
1450
1501
        :return: List of [(source, [keys])] tuples, such that all keys are in
1451
1502
            the defined order, regardless of source.
1452
1503
        """
1453
1504
        if ordering == 'topological':
1454
 
            present_keys = topo_sort(parent_map)
 
1505
            present_keys = tsort.topo_sort(parent_map)
1455
1506
        else:
1456
1507
            # ordering == 'groupcompress'
1457
1508
            # XXX: This only optimizes for the target ordering. We may need
1479
1530
                source = self
1480
1531
            elif key in key_to_source_map:
1481
1532
                source = key_to_source_map[key]
1482
 
            else: # absent
 
1533
            else:  # absent
1483
1534
                continue
1484
1535
            if source is not current_source:
1485
1536
                source_keys.append((source, []))
1493
1544
            # This is the group the bytes are stored in, followed by the
1494
1545
            # location in the group
1495
1546
            return locations[key][0]
1496
 
        present_keys = sorted(locations.iterkeys(), key=get_group)
1497
1547
        # We don't have an ordering for keys in the in-memory object, but
1498
1548
        # lets process the in-memory ones first.
1499
 
        present_keys = list(unadded_keys) + present_keys
 
1549
        present_keys = list(unadded_keys)
 
1550
        present_keys.extend(sorted(locations, key=get_group))
1500
1551
        # Now grab all of the ones from other sources
1501
1552
        source_keys = [(self, present_keys)]
1502
1553
        source_keys.extend(source_result)
1526
1577
            # start with one key, recurse to its oldest parent, then grab
1527
1578
            # everything in the same group, etc.
1528
1579
            parent_map = dict((key, details[2]) for key, details in
1529
 
                locations.iteritems())
 
1580
                              viewitems(locations))
1530
1581
            for key in unadded_keys:
1531
1582
                parent_map[key] = self._unadded_refs[key]
1532
1583
            parent_map.update(fallback_parent_map)
1534
1585
                                                        key_to_source_map)
1535
1586
        elif ordering == 'as-requested':
1536
1587
            source_keys = self._get_as_requested_source_keys(orig_keys,
1537
 
                locations, unadded_keys, key_to_source_map)
 
1588
                                                             locations, unadded_keys, key_to_source_map)
1538
1589
        else:
1539
1590
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
1540
1591
            # Otherwise we thrash the _group_cache and destroy performance
1541
1592
            source_keys = self._get_io_ordered_source_keys(locations,
1542
 
                unadded_keys, source_result)
 
1593
                                                           unadded_keys, source_result)
1543
1594
        for key in missing:
1544
1595
            yield AbsentContentFactory(key)
1545
1596
        # Batch up as many keys as we can until either:
1546
1597
        #  - we encounter an unadded ref, or
1547
1598
        #  - we run out of keys, or
1548
1599
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1549
 
        batcher = _BatchingBlockFetcher(self, locations)
 
1600
        batcher = _BatchingBlockFetcher(self, locations,
 
1601
                                        get_compressor_settings=self._get_compressor_settings)
1550
1602
        for source, keys in source_keys:
1551
1603
            if source is self:
1552
1604
                for key in keys:
1576
1628
        """See VersionedFiles.get_sha1s()."""
1577
1629
        result = {}
1578
1630
        for record in self.get_record_stream(keys, 'unordered', True):
1579
 
            if record.sha1 != None:
 
1631
            if record.sha1 is not None:
1580
1632
                result[record.key] = record.sha1
1581
1633
            else:
1582
1634
                if record.storage_kind != 'absent':
1598
1650
        for _ in self._insert_record_stream(stream, random_id=False):
1599
1651
            pass
1600
1652
 
 
1653
    def _get_compressor_settings(self):
 
1654
        if self._max_bytes_to_index is None:
 
1655
            # TODO: VersionedFiles don't know about their containing
 
1656
            #       repository, so they don't have much of an idea about their
 
1657
            #       location. So for now, this is only a global option.
 
1658
            c = config.GlobalConfig()
 
1659
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1660
            if val is not None:
 
1661
                try:
 
1662
                    val = int(val)
 
1663
                except ValueError as e:
 
1664
                    trace.warning('Value for '
 
1665
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1666
                                  ' %r is not an integer'
 
1667
                                  % (val,))
 
1668
                    val = None
 
1669
            if val is None:
 
1670
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1671
            self._max_bytes_to_index = val
 
1672
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1673
 
 
1674
    def _make_group_compressor(self):
 
1675
        return GroupCompressor(self._get_compressor_settings())
 
1676
 
1601
1677
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1602
1678
                              reuse_blocks=True):
1603
1679
        """Internal core to insert a record stream into this container.
1616
1692
        :seealso add_lines:
1617
1693
        """
1618
1694
        adapters = {}
 
1695
 
1619
1696
        def get_adapter(adapter_key):
1620
1697
            try:
1621
1698
                return adapters[adapter_key]
1626
1703
                return adapter
1627
1704
        # This will go up to fulltexts for gc to gc fetching, which isn't
1628
1705
        # ideal.
1629
 
        self._compressor = GroupCompressor()
 
1706
        self._compressor = self._make_group_compressor()
1630
1707
        self._unadded_refs = {}
1631
1708
        keys_to_add = []
 
1709
 
1632
1710
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
1634
 
            self._compressor = GroupCompressor()
 
1711
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1712
            self._compressor = self._make_group_compressor()
 
1713
            # Note: At this point we still have 1 copy of the fulltext (in
 
1714
            #       record and the var 'bytes'), and this generates 2 copies of
 
1715
            #       the compressed text (one for bytes, one in chunks)
 
1716
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1717
            #       have to double compressed memory here
 
1718
            # TODO: Figure out how to indicate that we would be happy to free
 
1719
            #       the fulltext content at this point. Note that sometimes we
 
1720
            #       will want it later (streaming CHK pages), but most of the
 
1721
            #       time we won't (everything else)
 
1722
            data = b''.join(chunks)
 
1723
            del chunks
1635
1724
            index, start, length = self._access.add_raw_records(
1636
 
                [(None, len(bytes))], bytes)[0]
 
1725
                [(None, len(data))], data)[0]
1637
1726
            nodes = []
1638
1727
            for key, reads, refs in keys_to_add:
1639
 
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
 
1728
                nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
1640
1729
            self._index.add_records(nodes, random_id=random_id)
1641
1730
            self._unadded_refs = {}
1642
1731
            del keys_to_add[:]
1656
1745
                raise errors.RevisionNotPresent(record.key, self)
1657
1746
            if random_id:
1658
1747
                if record.key in inserted_keys:
1659
 
                    trace.note('Insert claimed random_id=True,'
1660
 
                               ' but then inserted %r two times', record.key)
 
1748
                    trace.note(gettext('Insert claimed random_id=True,'
 
1749
                                       ' but then inserted %r two times'), record.key)
1661
1750
                    continue
1662
1751
                inserted_keys.add(record.key)
1663
1752
            if reuse_blocks:
1689
1778
                        raise AssertionError('No insert_manager set')
1690
1779
                    if insert_manager is not record._manager:
1691
1780
                        raise AssertionError('insert_manager does not match'
1692
 
                            ' the current record, we cannot be positive'
1693
 
                            ' that the appropriate content was inserted.'
1694
 
                            )
1695
 
                    value = "%d %d %d %d" % (block_start, block_length,
1696
 
                                             record._start, record._end)
 
1781
                                             ' the current record, we cannot be positive'
 
1782
                                             ' that the appropriate content was inserted.'
 
1783
                                             )
 
1784
                    value = b"%d %d %d %d" % (block_start, block_length,
 
1785
                                              record._start, record._end)
1697
1786
                    nodes = [(record.key, value, (record.parents,))]
1698
1787
                    # TODO: Consider buffering up many nodes to be added, not
1699
1788
                    #       sure how much overhead this has, but we're seeing
1722
1811
            # delta_ratio = float(len(bytes)) / (end_point - start_point)
1723
1812
            # Check if we want to continue to include that text
1724
1813
            if (prefix == max_fulltext_prefix
1725
 
                and end_point < 2 * max_fulltext_len):
 
1814
                    and end_point < 2 * max_fulltext_len):
1726
1815
                # As long as we are on the same file_id, we will fill at least
1727
1816
                # 2 * max_fulltext_len
1728
1817
                start_new_block = False
1729
 
            elif end_point > 4*1024*1024:
 
1818
            elif end_point > 4 * 1024 * 1024:
1730
1819
                start_new_block = True
1731
1820
            elif (prefix is not None and prefix != last_prefix
1732
 
                  and end_point > 2*1024*1024):
 
1821
                  and end_point > 2 * 1024 * 1024):
1733
1822
                start_new_block = True
1734
1823
            else:
1735
1824
                start_new_block = False
1742
1831
                 type) = self._compressor.compress(record.key, bytes,
1743
1832
                                                   record.sha1)
1744
1833
            if record.key[-1] is None:
1745
 
                key = record.key[:-1] + ('sha1:' + found_sha1,)
 
1834
                key = record.key[:-1] + (b'sha1:' + found_sha1,)
1746
1835
            else:
1747
1836
                key = record.key
1748
1837
            self._unadded_refs[key] = record.parents
1753
1842
            else:
1754
1843
                parents = None
1755
1844
            refs = static_tuple.StaticTuple(parents)
1756
 
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
 
1845
            keys_to_add.append(
 
1846
                (key, b'%d %d' % (start_point, end_point), refs))
1757
1847
        if len(keys_to_add):
1758
1848
            flush()
1759
1849
        self._compressor = None
1785
1875
        # but we need to setup a list of records to visit.
1786
1876
        # we need key, position, length
1787
1877
        for key_idx, record in enumerate(self.get_record_stream(keys,
1788
 
            'unordered', True)):
 
1878
                                                                'unordered', True)):
1789
1879
            # XXX: todo - optimise to use less than full texts.
1790
1880
            key = record.key
1791
1881
            if pb is not None:
1802
1892
        """See VersionedFiles.keys."""
1803
1893
        if 'evil' in debug.debug_flags:
1804
1894
            trace.mutter_callsite(2, "keys scales with size of history")
1805
 
        sources = [self._index] + self._fallback_vfs
 
1895
        sources = [self._index] + self._immediate_fallback_vfs
1806
1896
        result = set()
1807
1897
        for source in sources:
1808
1898
            result.update(source.keys())
1809
1899
        return result
1810
1900
 
1811
1901
 
 
1902
class _GCBuildDetails(object):
 
1903
    """A blob of data about the build details.
 
1904
 
 
1905
    This stores the minimal data, which then allows compatibility with the old
 
1906
    api, without taking as much memory.
 
1907
    """
 
1908
 
 
1909
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1910
                 '_delta_end', '_parents')
 
1911
 
 
1912
    method = 'group'
 
1913
    compression_parent = None
 
1914
 
 
1915
    def __init__(self, parents, position_info):
 
1916
        self._parents = parents
 
1917
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1918
         self._delta_end) = position_info
 
1919
 
 
1920
    def __repr__(self):
 
1921
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1922
                               self.index_memo, self._parents)
 
1923
 
 
1924
    @property
 
1925
    def index_memo(self):
 
1926
        return (self._index, self._group_start, self._group_end,
 
1927
                self._basis_end, self._delta_end)
 
1928
 
 
1929
    @property
 
1930
    def record_details(self):
 
1931
        return static_tuple.StaticTuple(self.method, None)
 
1932
 
 
1933
    def __getitem__(self, offset):
 
1934
        """Compatibility thunk to act like a tuple."""
 
1935
        if offset == 0:
 
1936
            return self.index_memo
 
1937
        elif offset == 1:
 
1938
            return self.compression_parent  # Always None
 
1939
        elif offset == 2:
 
1940
            return self._parents
 
1941
        elif offset == 3:
 
1942
            return self.record_details
 
1943
        else:
 
1944
            raise IndexError('offset out of range')
 
1945
 
 
1946
    def __len__(self):
 
1947
        return 4
 
1948
 
 
1949
 
1812
1950
class _GCGraphIndex(object):
1813
1951
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1814
1952
 
1815
1953
    def __init__(self, graph_index, is_locked, parents=True,
1816
 
        add_callback=None, track_external_parent_refs=False,
1817
 
        inconsistency_fatal=True, track_new_keys=False):
 
1954
                 add_callback=None, track_external_parent_refs=False,
 
1955
                 inconsistency_fatal=True, track_new_keys=False):
1818
1956
        """Construct a _GCGraphIndex on a graph_index.
1819
1957
 
1820
 
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
 
1958
        :param graph_index: An implementation of breezy.index.GraphIndex.
1821
1959
        :param is_locked: A callback, returns True if the index is locked and
1822
1960
            thus usable.
1823
1961
        :param parents: If True, record knits parents, if not do not record
1843
1981
        # repeated over and over, this creates a surplus of ints
1844
1982
        self._int_cache = {}
1845
1983
        if track_external_parent_refs:
1846
 
            self._key_dependencies = knit._KeyRefs(
 
1984
            self._key_dependencies = _KeyRefs(
1847
1985
                track_new_keys=track_new_keys)
1848
1986
        else:
1849
1987
            self._key_dependencies = None
1873
2011
                if refs:
1874
2012
                    for ref in refs:
1875
2013
                        if ref:
1876
 
                            raise errors.KnitCorrupt(self,
1877
 
                                "attempt to add node with parents "
1878
 
                                "in parentless index.")
 
2014
                            raise knit.KnitCorrupt(self,
 
2015
                                                   "attempt to add node with parents "
 
2016
                                                   "in parentless index.")
1879
2017
                    refs = ()
1880
2018
                    changed = True
1881
2019
            keys[key] = (value, refs)
1889
2027
                if node_refs != passed[1]:
1890
2028
                    details = '%s %s %s' % (key, (value, node_refs), passed)
1891
2029
                    if self._inconsistency_fatal:
1892
 
                        raise errors.KnitCorrupt(self, "inconsistent details"
1893
 
                                                 " in add_records: %s" %
1894
 
                                                 details)
 
2030
                        raise knit.KnitCorrupt(self, "inconsistent details"
 
2031
                                               " in add_records: %s" %
 
2032
                                               details)
1895
2033
                    else:
1896
2034
                        trace.warning("inconsistent details in skipped"
1897
2035
                                      " record: %s", details)
1900
2038
        if changed:
1901
2039
            result = []
1902
2040
            if self._parents:
1903
 
                for key, (value, node_refs) in keys.iteritems():
 
2041
                for key, (value, node_refs) in viewitems(keys):
1904
2042
                    result.append((key, value, node_refs))
1905
2043
            else:
1906
 
                for key, (value, node_refs) in keys.iteritems():
 
2044
                for key, (value, node_refs) in viewitems(keys):
1907
2045
                    result.append((key, value))
1908
2046
            records = result
1909
2047
        key_dependencies = self._key_dependencies
1989
2127
        :param keys: An iterable of keys.
1990
2128
        :return: A dict of key:
1991
2129
            (index_memo, compression_parent, parents, record_details).
1992
 
            index_memo
1993
 
                opaque structure to pass to read_records to extract the raw
1994
 
                data
1995
 
            compression_parent
1996
 
                Content that this record is built upon, may be None
1997
 
            parents
1998
 
                Logical parents of this node
1999
 
            record_details
2000
 
                extra information about the content which needs to be passed to
2001
 
                Factory.parse_record
 
2130
 
 
2131
            * index_memo: opaque structure to pass to read_records to extract
 
2132
              the raw data
 
2133
            * compression_parent: Content that this record is built upon, may
 
2134
              be None
 
2135
            * parents: Logical parents of this node
 
2136
            * record_details: extra information about the content which needs
 
2137
              to be passed to Factory.parse_record
2002
2138
        """
2003
2139
        self._check_read()
2004
2140
        result = {}
2009
2145
                parents = None
2010
2146
            else:
2011
2147
                parents = entry[3][0]
2012
 
            method = 'group'
2013
 
            result[key] = (self._node_to_position(entry),
2014
 
                                  None, parents, (method, None))
 
2148
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2149
            result[key] = details
2015
2150
        return result
2016
2151
 
2017
2152
    def keys(self):
2024
2159
 
2025
2160
    def _node_to_position(self, node):
2026
2161
        """Convert an index value to position details."""
2027
 
        bits = node[2].split(' ')
 
2162
        bits = node[2].split(b' ')
2028
2163
        # It would be nice not to read the entire gzip.
2029
2164
        # start and stop are put into _int_cache because they are very common.
2030
2165
        # They define the 'group' that an entry is in, and many groups can have
2033
2168
        # each, or about 7MB. Note that it might be even more when you consider
2034
2169
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
2170
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
 
2171
        # a LIFO when re-using PyInt slots, which might cause more
2037
2172
        # fragmentation.
2038
2173
        start = int(bits[0])
2039
2174
        start = self._int_cache.setdefault(start, start)
2063
2198
            key_dependencies.add_references(node[1], node[3][0])
2064
2199
 
2065
2200
 
2066
 
from bzrlib._groupcompress_py import (
 
2201
from ._groupcompress_py import (
2067
2202
    apply_delta,
2068
2203
    apply_delta_to_source,
2069
2204
    encode_base128_int,
2072
2207
    LinesDeltaIndex,
2073
2208
    )
2074
2209
try:
2075
 
    from bzrlib._groupcompress_pyx import (
 
2210
    from ._groupcompress_pyx import (
2076
2211
        apply_delta,
2077
2212
        apply_delta_to_source,
2078
2213
        DeltaIndex,
2080
2215
        decode_base128_int,
2081
2216
        )
2082
2217
    GroupCompressor = PyrexGroupCompressor
2083
 
except ImportError, e:
 
2218
except ImportError as e:
2084
2219
    osutils.failed_to_load_extension(e)
2085
2220
    GroupCompressor = PythonGroupCompressor
2086