/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Robert Collins
  • Date: 2010-05-11 08:36:16 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100511083616-b8fjb19zomwupid0
Make all lock methods return Result objects, rather than lock_read returning self, as per John's review.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Core compression logic for compressing streams of related files."""
18
18
 
19
 
from __future__ import absolute_import
20
 
 
21
19
import time
22
20
import zlib
 
21
try:
 
22
    import pylzma
 
23
except ImportError:
 
24
    pylzma = None
23
25
 
24
 
from ..lazy_import import lazy_import
25
 
lazy_import(globals(), """
26
 
from breezy import (
 
26
from bzrlib import (
27
27
    annotate,
28
 
    config,
29
28
    debug,
 
29
    errors,
 
30
    graph as _mod_graph,
 
31
    knit,
30
32
    osutils,
 
33
    pack,
31
34
    static_tuple,
32
35
    trace,
33
 
    tsort,
34
 
    )
35
 
from breezy.bzr import (
36
 
    knit,
37
 
    pack,
38
 
    pack_repo,
39
 
    )
40
 
 
41
 
from breezy.i18n import gettext
42
 
""")
43
 
 
44
 
from .. import (
45
 
    errors,
46
 
    )
47
 
from .btree_index import BTreeBuilder
48
 
from ..lru_cache import LRUSizeCache
49
 
from ..sixish import (
50
 
    indexbytes,
51
 
    map,
52
 
    range,
53
 
    viewitems,
54
 
    )
55
 
from .versionedfile import (
56
 
    _KeyRefs,
 
36
    )
 
37
from bzrlib.btree_index import BTreeBuilder
 
38
from bzrlib.lru_cache import LRUSizeCache
 
39
from bzrlib.tsort import topo_sort
 
40
from bzrlib.versionedfile import (
57
41
    adapter_registry,
58
42
    AbsentContentFactory,
59
43
    ChunkedContentFactory,
60
 
    ExistingContent,
61
44
    FulltextContentFactory,
62
 
    VersionedFilesWithFallbacks,
63
 
    UnavailableRepresentation,
 
45
    VersionedFiles,
64
46
    )
65
47
 
66
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
67
49
# groupcompress blocks.
68
50
BATCH_SIZE = 2**16
69
51
 
70
 
# osutils.sha_string(b'')
71
 
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
 
52
_USE_LZMA = False and (pylzma is not None)
72
53
 
 
54
# osutils.sha_string('')
 
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
73
56
 
74
57
def sort_gc_optimal(parent_map):
75
58
    """Sort and group the keys in parent_map into groupcompress order.
82
65
    # groupcompress ordering is approximately reverse topological,
83
66
    # properly grouped by file-id.
84
67
    per_prefix_map = {}
85
 
    for key, value in viewitems(parent_map):
86
 
        if isinstance(key, bytes) or len(key) == 1:
87
 
            prefix = b''
 
68
    for key, value in parent_map.iteritems():
 
69
        if isinstance(key, str) or len(key) == 1:
 
70
            prefix = ''
88
71
        else:
89
72
            prefix = key[0]
90
73
        try:
94
77
 
95
78
    present_keys = []
96
79
    for prefix in sorted(per_prefix_map):
97
 
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
 
80
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
98
81
    return present_keys
99
82
 
100
83
 
101
 
class DecompressCorruption(errors.BzrError):
102
 
 
103
 
    _fmt = "Corruption while decompressing repository file%(orig_error)s"
104
 
 
105
 
    def __init__(self, orig_error=None):
106
 
        if orig_error is not None:
107
 
            self.orig_error = ", %s" % (orig_error,)
108
 
        else:
109
 
            self.orig_error = ""
110
 
        errors.BzrError.__init__(self)
111
 
 
112
 
 
113
84
# The max zlib window size is 32kB, so if we set 'max_size' output of the
114
85
# decompressor to the requested bytes + 32kB, then we should guarantee
115
86
# num_bytes coming out.
116
 
_ZLIB_DECOMP_WINDOW = 32 * 1024
117
 
 
 
87
_ZLIB_DECOMP_WINDOW = 32*1024
118
88
 
119
89
class GroupCompressBlock(object):
120
90
    """An object which maintains the internal structure of the compressed data.
123
93
    """
124
94
 
125
95
    # Group Compress Block v1 Zlib
126
 
    GCB_HEADER = b'gcb1z\n'
 
96
    GCB_HEADER = 'gcb1z\n'
127
97
    # Group Compress Block v1 Lzma
128
 
    GCB_LZ_HEADER = b'gcb1l\n'
 
98
    GCB_LZ_HEADER = 'gcb1l\n'
129
99
    GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
130
100
 
131
101
    def __init__(self):
132
102
        # map by key? or just order in file?
133
103
        self._compressor_name = None
134
 
        self._z_content_chunks = None
 
104
        self._z_content = None
135
105
        self._z_content_decompressor = None
136
106
        self._z_content_length = None
137
107
        self._content_length = None
162
132
        # Expand the content if required
163
133
        if self._content is None:
164
134
            if self._content_chunks is not None:
165
 
                self._content = b''.join(self._content_chunks)
 
135
                self._content = ''.join(self._content_chunks)
166
136
                self._content_chunks = None
167
137
        if self._content is None:
168
 
            # We join self._z_content_chunks here, because if we are
169
 
            # decompressing, then it is *very* likely that we have a single
170
 
            # chunk
171
 
            if self._z_content_chunks is None:
 
138
            if self._z_content is None:
172
139
                raise AssertionError('No content to decompress')
173
 
            z_content = b''.join(self._z_content_chunks)
174
 
            if z_content == b'':
175
 
                self._content = b''
 
140
            if self._z_content == '':
 
141
                self._content = ''
176
142
            elif self._compressor_name == 'lzma':
177
143
                # We don't do partial lzma decomp yet
178
 
                import pylzma
179
 
                self._content = pylzma.decompress(z_content)
 
144
                self._content = pylzma.decompress(self._z_content)
180
145
            elif self._compressor_name == 'zlib':
181
146
                # Start a zlib decompressor
182
147
                if num_bytes * 4 > self._content_length * 3:
183
148
                    # If we are requesting more that 3/4ths of the content,
184
149
                    # just extract the whole thing in a single pass
185
150
                    num_bytes = self._content_length
186
 
                    self._content = zlib.decompress(z_content)
 
151
                    self._content = zlib.decompress(self._z_content)
187
152
                else:
188
153
                    self._z_content_decompressor = zlib.decompressobj()
189
154
                    # Seed the decompressor with the uncompressed bytes, so
190
155
                    # that the rest of the code is simplified
191
156
                    self._content = self._z_content_decompressor.decompress(
192
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
193
158
                    if not self._z_content_decompressor.unconsumed_tail:
194
159
                        self._z_content_decompressor = None
195
160
            else:
222
187
            # The stream is finished
223
188
            self._z_content_decompressor = None
224
189
 
225
 
    def _parse_bytes(self, data, pos):
 
190
    def _parse_bytes(self, bytes, pos):
226
191
        """Read the various lengths from the header.
227
192
 
228
193
        This also populates the various 'compressed' buffers.
232
197
        # At present, we have 2 integers for the compressed and uncompressed
233
198
        # content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
234
199
        # checking too far, cap the search to 14 bytes.
235
 
        pos2 = data.index(b'\n', pos, pos + 14)
236
 
        self._z_content_length = int(data[pos:pos2])
237
 
        pos = pos2 + 1
238
 
        pos2 = data.index(b'\n', pos, pos + 14)
239
 
        self._content_length = int(data[pos:pos2])
240
 
        pos = pos2 + 1
241
 
        if len(data) != (pos + self._z_content_length):
 
200
        pos2 = bytes.index('\n', pos, pos + 14)
 
201
        self._z_content_length = int(bytes[pos:pos2])
 
202
        pos = pos2 + 1
 
203
        pos2 = bytes.index('\n', pos, pos + 14)
 
204
        self._content_length = int(bytes[pos:pos2])
 
205
        pos = pos2 + 1
 
206
        if len(bytes) != (pos + self._z_content_length):
242
207
            # XXX: Define some GCCorrupt error ?
243
208
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
244
 
                                 (len(data), pos, self._z_content_length))
245
 
        self._z_content_chunks = (data[pos:],)
246
 
 
247
 
    @property
248
 
    def _z_content(self):
249
 
        """Return z_content_chunks as a simple string.
250
 
 
251
 
        Meant only to be used by the test suite.
252
 
        """
253
 
        if self._z_content_chunks is not None:
254
 
            return b''.join(self._z_content_chunks)
255
 
        return None
 
209
                                 (len(bytes), pos, self._z_content_length))
 
210
        self._z_content = bytes[pos:]
256
211
 
257
212
    @classmethod
258
213
    def from_bytes(cls, bytes):
259
214
        out = cls()
260
 
        header = bytes[:6]
261
 
        if header not in cls.GCB_KNOWN_HEADERS:
 
215
        if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
262
216
            raise ValueError('bytes did not start with any of %r'
263
217
                             % (cls.GCB_KNOWN_HEADERS,))
264
 
        if header == cls.GCB_HEADER:
 
218
        # XXX: why not testing the whole header ?
 
219
        if bytes[4] == 'z':
265
220
            out._compressor_name = 'zlib'
266
 
        elif header == cls.GCB_LZ_HEADER:
 
221
        elif bytes[4] == 'l':
267
222
            out._compressor_name = 'lzma'
268
223
        else:
269
 
            raise ValueError('unknown compressor: %r' % (header,))
 
224
            raise ValueError('unknown compressor: %r' % (bytes,))
270
225
        out._parse_bytes(bytes, 6)
271
226
        return out
272
227
 
278
233
        :return: The bytes for the content
279
234
        """
280
235
        if start == end == 0:
281
 
            return []
 
236
            return ''
282
237
        self._ensure_content(end)
283
238
        # The bytes are 'f' or 'd' for the type, then a variable-length
284
239
        # base128 integer for the content size, then the actual content
285
240
        # We know that the variable-length integer won't be longer than 5
286
241
        # bytes (it takes 5 bytes to encode 2^32)
287
 
        c = self._content[start:start + 1]
288
 
        if c == b'f':
 
242
        c = self._content[start]
 
243
        if c == 'f':
289
244
            type = 'fulltext'
290
245
        else:
291
 
            if c != b'd':
 
246
            if c != 'd':
292
247
                raise ValueError('Unknown content control code: %s'
293
248
                                 % (c,))
294
249
            type = 'delta'
295
250
        content_len, len_len = decode_base128_int(
296
 
            self._content[start + 1:start + 6])
 
251
                            self._content[start + 1:start + 6])
297
252
        content_start = start + 1 + len_len
298
253
        if end != content_start + content_len:
299
254
            raise ValueError('end != len according to field header'
300
 
                             ' %s != %s' % (end, content_start + content_len))
301
 
        if c == b'f':
302
 
            return [self._content[content_start:end]]
303
 
        # Must be type delta as checked above
304
 
        return [apply_delta_to_source(self._content, content_start, end)]
 
255
                ' %s != %s' % (end, content_start + content_len))
 
256
        if c == 'f':
 
257
            bytes = self._content[content_start:end]
 
258
        elif c == 'd':
 
259
            bytes = apply_delta_to_source(self._content, content_start, end)
 
260
        return bytes
305
261
 
306
262
    def set_chunked_content(self, content_chunks, length):
307
263
        """Set the content of this block to the given chunks."""
313
269
        self._content_length = length
314
270
        self._content_chunks = content_chunks
315
271
        self._content = None
316
 
        self._z_content_chunks = None
 
272
        self._z_content = None
317
273
 
318
274
    def set_content(self, content):
319
275
        """Set the content of this block."""
320
276
        self._content_length = len(content)
321
277
        self._content = content
322
 
        self._z_content_chunks = None
323
 
 
324
 
    def _create_z_content_from_chunks(self, chunks):
 
278
        self._z_content = None
 
279
 
 
280
    def _create_z_content_using_lzma(self):
 
281
        if self._content_chunks is not None:
 
282
            self._content = ''.join(self._content_chunks)
 
283
            self._content_chunks = None
 
284
        if self._content is None:
 
285
            raise AssertionError('Nothing to compress')
 
286
        self._z_content = pylzma.compress(self._content)
 
287
        self._z_content_length = len(self._z_content)
 
288
 
 
289
    def _create_z_content_from_chunks(self):
325
290
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
326
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
327
 
        # (measured peak is maybe 30MB over the above...)
328
 
        compressed_chunks = list(map(compressor.compress, chunks))
 
291
        compressed_chunks = map(compressor.compress, self._content_chunks)
329
292
        compressed_chunks.append(compressor.flush())
330
 
        # Ignore empty chunks
331
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
332
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
293
        self._z_content = ''.join(compressed_chunks)
 
294
        self._z_content_length = len(self._z_content)
333
295
 
334
296
    def _create_z_content(self):
335
 
        if self._z_content_chunks is not None:
 
297
        if self._z_content is not None:
 
298
            return
 
299
        if _USE_LZMA:
 
300
            self._create_z_content_using_lzma()
336
301
            return
337
302
        if self._content_chunks is not None:
338
 
            chunks = self._content_chunks
339
 
        else:
340
 
            chunks = (self._content,)
341
 
        self._create_z_content_from_chunks(chunks)
342
 
 
343
 
    def to_chunks(self):
344
 
        """Create the byte stream as a series of 'chunks'"""
345
 
        self._create_z_content()
346
 
        header = self.GCB_HEADER
347
 
        chunks = [b'%s%d\n%d\n'
348
 
                  % (header, self._z_content_length, self._content_length),
349
 
                  ]
350
 
        chunks.extend(self._z_content_chunks)
351
 
        total_len = sum(map(len, chunks))
352
 
        return total_len, chunks
 
303
            self._create_z_content_from_chunks()
 
304
            return
 
305
        self._z_content = zlib.compress(self._content)
 
306
        self._z_content_length = len(self._z_content)
353
307
 
354
308
    def to_bytes(self):
355
309
        """Encode the information into a byte stream."""
356
 
        total_len, chunks = self.to_chunks()
357
 
        return b''.join(chunks)
 
310
        self._create_z_content()
 
311
        if _USE_LZMA:
 
312
            header = self.GCB_LZ_HEADER
 
313
        else:
 
314
            header = self.GCB_HEADER
 
315
        chunks = [header,
 
316
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
317
                  self._z_content,
 
318
                 ]
 
319
        return ''.join(chunks)
358
320
 
359
321
    def _dump(self, include_text=False):
360
322
        """Take this block, and spit out a human-readable structure.
370
332
        result = []
371
333
        pos = 0
372
334
        while pos < self._content_length:
373
 
            kind = self._content[pos:pos + 1]
 
335
            kind = self._content[pos]
374
336
            pos += 1
375
 
            if kind not in (b'f', b'd'):
 
337
            if kind not in ('f', 'd'):
376
338
                raise ValueError('invalid kind character: %r' % (kind,))
377
339
            content_len, len_len = decode_base128_int(
378
 
                self._content[pos:pos + 5])
 
340
                                self._content[pos:pos + 5])
379
341
            pos += len_len
380
342
            if content_len + pos > self._content_length:
381
343
                raise ValueError('invalid content_len %d for record @ pos %d'
382
344
                                 % (content_len, pos - len_len - 1))
383
 
            if kind == b'f':  # Fulltext
 
345
            if kind == 'f': # Fulltext
384
346
                if include_text:
385
 
                    text = self._content[pos:pos + content_len]
386
 
                    result.append((b'f', content_len, text))
 
347
                    text = self._content[pos:pos+content_len]
 
348
                    result.append(('f', content_len, text))
387
349
                else:
388
 
                    result.append((b'f', content_len))
389
 
            elif kind == b'd':  # Delta
390
 
                delta_content = self._content[pos:pos + content_len]
 
350
                    result.append(('f', content_len))
 
351
            elif kind == 'd': # Delta
 
352
                delta_content = self._content[pos:pos+content_len]
391
353
                delta_info = []
392
354
                # The first entry in a delta is the decompressed length
393
355
                decomp_len, delta_pos = decode_base128_int(delta_content)
394
 
                result.append((b'd', content_len, decomp_len, delta_info))
 
356
                result.append(('d', content_len, decomp_len, delta_info))
395
357
                measured_len = 0
396
358
                while delta_pos < content_len:
397
 
                    c = indexbytes(delta_content, delta_pos)
 
359
                    c = ord(delta_content[delta_pos])
398
360
                    delta_pos += 1
399
 
                    if c & 0x80:  # Copy
 
361
                    if c & 0x80: # Copy
400
362
                        (offset, length,
401
363
                         delta_pos) = decode_copy_instruction(delta_content, c,
402
364
                                                              delta_pos)
403
365
                        if include_text:
404
 
                            text = self._content[offset:offset + length]
405
 
                            delta_info.append((b'c', offset, length, text))
 
366
                            text = self._content[offset:offset+length]
 
367
                            delta_info.append(('c', offset, length, text))
406
368
                        else:
407
 
                            delta_info.append((b'c', offset, length))
 
369
                            delta_info.append(('c', offset, length))
408
370
                        measured_len += length
409
 
                    else:  # Insert
 
371
                    else: # Insert
410
372
                        if include_text:
411
 
                            txt = delta_content[delta_pos:delta_pos + c]
 
373
                            txt = delta_content[delta_pos:delta_pos+c]
412
374
                        else:
413
 
                            txt = b''
414
 
                        delta_info.append((b'i', c, txt))
 
375
                            txt = ''
 
376
                        delta_info.append(('i', c, txt))
415
377
                        measured_len += c
416
378
                        delta_pos += c
417
379
                if delta_pos != content_len:
443
405
        self.key = key
444
406
        self.parents = parents
445
407
        self.sha1 = None
446
 
        self.size = None
447
408
        # Note: This attribute coupled with Manager._factories creates a
448
409
        #       reference cycle. Perhaps we would rather use a weakref(), or
449
410
        #       find an appropriate time to release the ref. After the first
450
411
        #       get_bytes_as call? After Manager.get_record_stream() returns
451
412
        #       the object?
452
413
        self._manager = manager
453
 
        self._chunks = None
 
414
        self._bytes = None
454
415
        self.storage_kind = 'groupcompress-block'
455
416
        if not first:
456
417
            self.storage_kind = 'groupcompress-block-ref'
460
421
 
461
422
    def __repr__(self):
462
423
        return '%s(%s, first=%s)' % (self.__class__.__name__,
463
 
                                     self.key, self._first)
464
 
 
465
 
    def _extract_bytes(self):
466
 
        # Grab and cache the raw bytes for this entry
467
 
        # and break the ref-cycle with _manager since we don't need it
468
 
        # anymore
469
 
        try:
470
 
            self._manager._prepare_for_extract()
471
 
        except zlib.error as value:
472
 
            raise DecompressCorruption("zlib: " + str(value))
473
 
        block = self._manager._block
474
 
        self._chunks = block.extract(self.key, self._start, self._end)
475
 
        # There are code paths that first extract as fulltext, and then
476
 
        # extract as storage_kind (smart fetch). So we don't break the
477
 
        # refcycle here, but instead in manager.get_record_stream()
 
424
            self.key, self._first)
478
425
 
479
426
    def get_bytes_as(self, storage_kind):
480
427
        if storage_kind == self.storage_kind:
482
429
                # wire bytes, something...
483
430
                return self._manager._wire_bytes()
484
431
            else:
485
 
                return b''
486
 
        if storage_kind in ('fulltext', 'chunked', 'lines'):
487
 
            if self._chunks is None:
488
 
                self._extract_bytes()
 
432
                return ''
 
433
        if storage_kind in ('fulltext', 'chunked'):
 
434
            if self._bytes is None:
 
435
                # Grab and cache the raw bytes for this entry
 
436
                # and break the ref-cycle with _manager since we don't need it
 
437
                # anymore
 
438
                self._manager._prepare_for_extract()
 
439
                block = self._manager._block
 
440
                self._bytes = block.extract(self.key, self._start, self._end)
 
441
                # There are code paths that first extract as fulltext, and then
 
442
                # extract as storage_kind (smart fetch). So we don't break the
 
443
                # refcycle here, but instead in manager.get_record_stream()
489
444
            if storage_kind == 'fulltext':
490
 
                return b''.join(self._chunks)
491
 
            elif storage_kind == 'chunked':
492
 
                return self._chunks
 
445
                return self._bytes
493
446
            else:
494
 
                return osutils.chunks_to_lines(self._chunks)
495
 
        raise UnavailableRepresentation(self.key, storage_kind,
496
 
                                               self.storage_kind)
497
 
 
498
 
    def iter_bytes_as(self, storage_kind):
499
 
        if self._chunks is None:
500
 
            self._extract_bytes()
501
 
        if storage_kind == 'chunked':
502
 
            return iter(self._chunks)
503
 
        elif storage_kind == 'lines':
504
 
            return iter(osutils.chunks_to_lines(self._chunks))
505
 
        raise UnavailableRepresentation(self.key, storage_kind,
 
447
                return [self._bytes]
 
448
        raise errors.UnavailableRepresentation(self.key, storage_kind,
506
449
                                               self.storage_kind)
507
450
 
508
451
 
509
452
class _LazyGroupContentManager(object):
510
453
    """This manages a group of _LazyGroupCompressFactory objects."""
511
454
 
512
 
    _max_cut_fraction = 0.75  # We allow a block to be trimmed to 75% of
513
 
    # current size, and still be considered
514
 
    # resuable
515
 
    _full_block_size = 4 * 1024 * 1024
516
 
    _full_mixed_block_size = 2 * 1024 * 1024
517
 
    _full_enough_block_size = 3 * 1024 * 1024  # size at which we won't repack
518
 
    _full_enough_mixed_block_size = 2 * 768 * 1024  # 1.5MB
 
455
    _max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
 
456
                             # current size, and still be considered
 
457
                             # resuable
 
458
    _full_block_size = 4*1024*1024
 
459
    _full_mixed_block_size = 2*1024*1024
 
460
    _full_enough_block_size = 3*1024*1024 # size at which we won't repack
 
461
    _full_enough_mixed_block_size = 2*768*1024 # 1.5MB
519
462
 
520
 
    def __init__(self, block, get_compressor_settings=None):
 
463
    def __init__(self, block):
521
464
        self._block = block
522
465
        # We need to preserve the ordering
523
466
        self._factories = []
524
467
        self._last_byte = 0
525
 
        self._get_settings = get_compressor_settings
526
 
        self._compressor_settings = None
527
 
 
528
 
    def _get_compressor_settings(self):
529
 
        if self._compressor_settings is not None:
530
 
            return self._compressor_settings
531
 
        settings = None
532
 
        if self._get_settings is not None:
533
 
            settings = self._get_settings()
534
 
        if settings is None:
535
 
            vf = GroupCompressVersionedFiles
536
 
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
537
 
        self._compressor_settings = settings
538
 
        return self._compressor_settings
539
468
 
540
469
    def add_factory(self, key, parents, start, end):
541
470
        if not self._factories:
544
473
            first = False
545
474
        # Note that this creates a reference cycle....
546
475
        factory = _LazyGroupCompressFactory(key, parents, self,
547
 
                                            start, end, first=first)
 
476
            start, end, first=first)
548
477
        # max() works here, but as a function call, doing a compare seems to be
549
478
        # significantly faster, timeit says 250ms for max() and 100ms for the
550
479
        # comparison
574
503
        new_block.set_content(self._block._content[:last_byte])
575
504
        self._block = new_block
576
505
 
577
 
    def _make_group_compressor(self):
578
 
        return GroupCompressor(self._get_compressor_settings())
579
 
 
580
506
    def _rebuild_block(self):
581
507
        """Create a new GroupCompressBlock with only the referenced texts."""
582
 
        compressor = self._make_group_compressor()
 
508
        compressor = GroupCompressor()
583
509
        tstart = time.time()
584
510
        old_length = self._block._content_length
585
511
        end_point = 0
586
512
        for factory in self._factories:
587
 
            chunks = factory.get_bytes_as('chunked')
588
 
            chunks_len = factory.size
589
 
            if chunks_len is None:
590
 
                chunks_len = sum(map(len, chunks))
 
513
            bytes = factory.get_bytes_as('fulltext')
591
514
            (found_sha1, start_point, end_point,
592
 
             type) = compressor.compress(
593
 
                 factory.key, chunks, chunks_len, factory.sha1)
 
515
             type) = compressor.compress(factory.key, bytes, factory.sha1)
594
516
            # Now update this factory with the new offsets, etc
595
517
            factory.sha1 = found_sha1
596
518
            factory._start = start_point
601
523
        #       block? It seems hard to come up with a method that it would
602
524
        #       expand, since we do full compression again. Perhaps based on a
603
525
        #       request that ends up poorly ordered?
604
 
        # TODO: If the content would have expanded, then we would want to
605
 
        #       handle a case where we need to split the block.
606
 
        #       Now that we have a user-tweakable option
607
 
        #       (max_bytes_to_index), it is possible that one person set it
608
 
        #       to a very low value, causing poor compression.
609
526
        delta = time.time() - tstart
610
527
        self._block = new_block
611
528
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
734
651
        #   <length of gc block>\n
735
652
        #   <header bytes>
736
653
        #   <gc-block>
737
 
        lines = [b'groupcompress-block\n']
 
654
        lines = ['groupcompress-block\n']
738
655
        # The minimal info we need is the key, the start offset, and the
739
656
        # parents. The length and type are encoded in the record itself.
740
657
        # However, passing in the other bits makes it easier.  The list of
745
662
        # 1 line for end byte
746
663
        header_lines = []
747
664
        for factory in self._factories:
748
 
            key_bytes = b'\x00'.join(factory.key)
 
665
            key_bytes = '\x00'.join(factory.key)
749
666
            parents = factory.parents
750
667
            if parents is None:
751
 
                parent_bytes = b'None:'
 
668
                parent_bytes = 'None:'
752
669
            else:
753
 
                parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
754
 
            record_header = b'%s\n%s\n%d\n%d\n' % (
 
670
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
671
            record_header = '%s\n%s\n%d\n%d\n' % (
755
672
                key_bytes, parent_bytes, factory._start, factory._end)
756
673
            header_lines.append(record_header)
757
674
            # TODO: Can we break the refcycle at this point and set
758
675
            #       factory._manager = None?
759
 
        header_bytes = b''.join(header_lines)
 
676
        header_bytes = ''.join(header_lines)
760
677
        del header_lines
761
678
        header_bytes_len = len(header_bytes)
762
679
        z_header_bytes = zlib.compress(header_bytes)
763
680
        del header_bytes
764
681
        z_header_bytes_len = len(z_header_bytes)
765
 
        block_bytes_len, block_chunks = self._block.to_chunks()
766
 
        lines.append(b'%d\n%d\n%d\n' % (
767
 
            z_header_bytes_len, header_bytes_len, block_bytes_len))
 
682
        block_bytes = self._block.to_bytes()
 
683
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
 
684
                                       len(block_bytes)))
768
685
        lines.append(z_header_bytes)
769
 
        lines.extend(block_chunks)
770
 
        del z_header_bytes, block_chunks
771
 
        # TODO: This is a point where we will double the memory consumption. To
772
 
        #       avoid this, we probably have to switch to a 'chunked' api
773
 
        return b''.join(lines)
 
686
        lines.append(block_bytes)
 
687
        del z_header_bytes, block_bytes
 
688
        return ''.join(lines)
774
689
 
775
690
    @classmethod
776
691
    def from_bytes(cls, bytes):
777
692
        # TODO: This does extra string copying, probably better to do it a
778
 
        #       different way. At a minimum this creates 2 copies of the
779
 
        #       compressed content
 
693
        #       different way
780
694
        (storage_kind, z_header_len, header_len,
781
 
         block_len, rest) = bytes.split(b'\n', 4)
 
695
         block_len, rest) = bytes.split('\n', 4)
782
696
        del bytes
783
 
        if storage_kind != b'groupcompress-block':
 
697
        if storage_kind != 'groupcompress-block':
784
698
            raise ValueError('Unknown storage kind: %s' % (storage_kind,))
785
699
        z_header_len = int(z_header_len)
786
700
        if len(rest) < z_header_len:
798
712
        del rest
799
713
        # So now we have a valid GCB, we just need to parse the factories that
800
714
        # were sent to us
801
 
        header_lines = header.split(b'\n')
 
715
        header_lines = header.split('\n')
802
716
        del header
803
717
        last = header_lines.pop()
804
 
        if last != b'':
 
718
        if last != '':
805
719
            raise ValueError('header lines did not end with a trailing'
806
720
                             ' newline')
807
721
        if len(header_lines) % 4 != 0:
809
723
        block = GroupCompressBlock.from_bytes(block_bytes)
810
724
        del block_bytes
811
725
        result = cls(block)
812
 
        for start in range(0, len(header_lines), 4):
 
726
        for start in xrange(0, len(header_lines), 4):
813
727
            # intern()?
814
 
            key = tuple(header_lines[start].split(b'\x00'))
815
 
            parents_line = header_lines[start + 1]
816
 
            if parents_line == b'None:':
 
728
            key = tuple(header_lines[start].split('\x00'))
 
729
            parents_line = header_lines[start+1]
 
730
            if parents_line == 'None:':
817
731
                parents = None
818
732
            else:
819
 
                parents = tuple([tuple(segment.split(b'\x00'))
820
 
                                 for segment in parents_line.split(b'\t')
821
 
                                 if segment])
822
 
            start_offset = int(header_lines[start + 2])
823
 
            end_offset = int(header_lines[start + 3])
 
733
                parents = tuple([tuple(segment.split('\x00'))
 
734
                                 for segment in parents_line.split('\t')
 
735
                                  if segment])
 
736
            start_offset = int(header_lines[start+2])
 
737
            end_offset = int(header_lines[start+3])
824
738
            result.add_factory(key, parents, start_offset, end_offset)
825
739
        return result
826
740
 
834
748
 
835
749
class _CommonGroupCompressor(object):
836
750
 
837
 
    def __init__(self, settings=None):
 
751
    def __init__(self):
838
752
        """Create a GroupCompressor."""
839
753
        self.chunks = []
840
754
        self._last = None
841
755
        self.endpoint = 0
842
756
        self.input_bytes = 0
843
757
        self.labels_deltas = {}
844
 
        self._delta_index = None  # Set by the children
 
758
        self._delta_index = None # Set by the children
845
759
        self._block = GroupCompressBlock()
846
 
        if settings is None:
847
 
            self._settings = {}
848
 
        else:
849
 
            self._settings = settings
850
760
 
851
 
    def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
852
 
                 soft=False):
 
761
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
853
762
        """Compress lines with label key.
854
763
 
855
764
        :param key: A key tuple. It is stored in the output
856
765
            for identification of the text during decompression. If the last
857
 
            element is b'None' it is replaced with the sha1 of the text -
 
766
            element is 'None' it is replaced with the sha1 of the text -
858
767
            e.g. sha1:xxxxxxx.
859
 
        :param chunks: Chunks of bytes to be compressed
860
 
        :param length: Length of chunks
 
768
        :param bytes: The bytes to be compressed
861
769
        :param expected_sha: If non-None, the sha the lines are believed to
862
770
            have. During compression the sha is calculated; a mismatch will
863
771
            cause an error.
871
779
 
872
780
        :seealso VersionedFiles.add_lines:
873
781
        """
874
 
        if length == 0:  # empty, like a dir entry, etc
 
782
        if not bytes: # empty, like a dir entry, etc
875
783
            if nostore_sha == _null_sha1:
876
 
                raise ExistingContent()
 
784
                raise errors.ExistingContent()
877
785
            return _null_sha1, 0, 0, 'fulltext'
878
786
        # we assume someone knew what they were doing when they passed it in
879
787
        if expected_sha is not None:
880
788
            sha1 = expected_sha
881
789
        else:
882
 
            sha1 = osutils.sha_strings(chunks)
 
790
            sha1 = osutils.sha_string(bytes)
883
791
        if nostore_sha is not None:
884
792
            if sha1 == nostore_sha:
885
 
                raise ExistingContent()
 
793
                raise errors.ExistingContent()
886
794
        if key[-1] is None:
887
 
            key = key[:-1] + (b'sha1:' + sha1,)
 
795
            key = key[:-1] + ('sha1:' + sha1,)
888
796
 
889
 
        start, end, type = self._compress(key, chunks, length, length / 2, soft)
 
797
        start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
890
798
        return sha1, start, end, type
891
799
 
892
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
800
    def _compress(self, key, bytes, max_delta_size, soft=False):
893
801
        """Compress lines with label key.
894
802
 
895
803
        :param key: A key tuple. It is stored in the output for identification
896
804
            of the text during decompression.
897
805
 
898
 
        :param chunks: The chunks of bytes to be compressed
899
 
 
900
 
        :param input_len: The length of the chunks
 
806
        :param bytes: The bytes to be compressed
901
807
 
902
808
        :param max_delta_size: The size above which we issue a fulltext instead
903
809
            of a delta.
914
820
        """Extract a key previously added to the compressor.
915
821
 
916
822
        :param key: The key to extract.
917
 
        :return: An iterable over chunks and the sha1.
 
823
        :return: An iterable over bytes and the sha1.
918
824
        """
919
 
        (start_byte, start_chunk, end_byte,
920
 
         end_chunk) = self.labels_deltas[key]
 
825
        (start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
921
826
        delta_chunks = self.chunks[start_chunk:end_chunk]
922
 
        stored_bytes = b''.join(delta_chunks)
923
 
        kind = stored_bytes[:1]
924
 
        if kind == b'f':
 
827
        stored_bytes = ''.join(delta_chunks)
 
828
        if stored_bytes[0] == 'f':
925
829
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
926
830
            data_len = fulltext_len + 1 + offset
927
 
            if data_len != len(stored_bytes):
 
831
            if  data_len != len(stored_bytes):
928
832
                raise ValueError('Index claimed fulltext len, but stored bytes'
929
833
                                 ' claim %s != %s'
930
834
                                 % (len(stored_bytes), data_len))
931
 
            data = [stored_bytes[offset + 1:]]
 
835
            bytes = stored_bytes[offset + 1:]
932
836
        else:
933
 
            if kind != b'd':
934
 
                raise ValueError('Unknown content kind, bytes claim %s' % kind)
935
837
            # XXX: This is inefficient at best
936
 
            source = b''.join(self.chunks[:start_chunk])
 
838
            source = ''.join(self.chunks[:start_chunk])
 
839
            if stored_bytes[0] != 'd':
 
840
                raise ValueError('Unknown content kind, bytes claim %s'
 
841
                                 % (stored_bytes[0],))
937
842
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
938
843
            data_len = delta_len + 1 + offset
939
844
            if data_len != len(stored_bytes):
940
845
                raise ValueError('Index claimed delta len, but stored bytes'
941
846
                                 ' claim %s != %s'
942
847
                                 % (len(stored_bytes), data_len))
943
 
            data = [apply_delta(source, stored_bytes[offset + 1:])]
944
 
        data_sha1 = osutils.sha_strings(data)
945
 
        return data, data_sha1
 
848
            bytes = apply_delta(source, stored_bytes[offset + 1:])
 
849
        bytes_sha1 = osutils.sha_string(bytes)
 
850
        return bytes, bytes_sha1
946
851
 
947
852
    def flush(self):
948
853
        """Finish this group, creating a formatted stream.
949
854
 
950
855
        After calling this, the compressor should no longer be used
951
856
        """
 
857
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
858
        #       group. This has an impact for 'commit' of large objects.
 
859
        #       One possibility is to use self._content_chunks, and be lazy and
 
860
        #       only fill out self._content as a full string when we actually
 
861
        #       need it. That would at least drop the peak memory consumption
 
862
        #       for 'commit' down to ~1x the size of the largest file, at a
 
863
        #       cost of increased complexity within this code. 2x is still <<
 
864
        #       3x the size of the largest file, so we are doing ok.
952
865
        self._block.set_chunked_content(self.chunks, self.endpoint)
953
866
        self.chunks = None
954
867
        self._delta_index = None
972
885
 
973
886
class PythonGroupCompressor(_CommonGroupCompressor):
974
887
 
975
 
    def __init__(self, settings=None):
 
888
    def __init__(self):
976
889
        """Create a GroupCompressor.
977
890
 
978
891
        Used only if the pyrex version is not available.
979
892
        """
980
 
        super(PythonGroupCompressor, self).__init__(settings)
 
893
        super(PythonGroupCompressor, self).__init__()
981
894
        self._delta_index = LinesDeltaIndex([])
982
895
        # The actual content is managed by LinesDeltaIndex
983
896
        self.chunks = self._delta_index.lines
984
897
 
985
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
898
    def _compress(self, key, bytes, max_delta_size, soft=False):
986
899
        """see _CommonGroupCompressor._compress"""
987
 
        new_lines = osutils.chunks_to_lines(chunks)
 
900
        input_len = len(bytes)
 
901
        new_lines = osutils.split_lines(bytes)
988
902
        out_lines, index_lines = self._delta_index.make_delta(
989
903
            new_lines, bytes_length=input_len, soft=soft)
990
904
        delta_length = sum(map(len, out_lines))
991
905
        if delta_length > max_delta_size:
992
906
            # The delta is longer than the fulltext, insert a fulltext
993
907
            type = 'fulltext'
994
 
            out_lines = [b'f', encode_base128_int(input_len)]
 
908
            out_lines = ['f', encode_base128_int(input_len)]
995
909
            out_lines.extend(new_lines)
996
910
            index_lines = [False, False]
997
911
            index_lines.extend([True] * len(new_lines))
998
912
        else:
999
913
            # this is a worthy delta, output it
1000
914
            type = 'delta'
1001
 
            out_lines[0] = b'd'
 
915
            out_lines[0] = 'd'
1002
916
            # Update the delta_length to include those two encoded integers
1003
917
            out_lines[1] = encode_base128_int(delta_length)
1004
918
        # Before insertion
1019
933
 
1020
934
    It contains code very similar to SequenceMatcher because of having a similar
1021
935
    task. However some key differences apply:
1022
 
 
1023
 
    * there is no junk, we want a minimal edit not a human readable diff.
1024
 
    * we don't filter very common lines (because we don't know where a good
1025
 
      range will start, and after the first text we want to be emitting minmal
1026
 
      edits only.
1027
 
    * we chain the left side, not the right side
1028
 
    * we incrementally update the adjacency matrix as new lines are provided.
1029
 
    * we look for matches in all of the left side, so the routine which does
1030
 
      the analagous task of find_longest_match does not need to filter on the
1031
 
      left side.
 
936
     - there is no junk, we want a minimal edit not a human readable diff.
 
937
     - we don't filter very common lines (because we don't know where a good
 
938
       range will start, and after the first text we want to be emitting minmal
 
939
       edits only.
 
940
     - we chain the left side, not the right side
 
941
     - we incrementally update the adjacency matrix as new lines are provided.
 
942
     - we look for matches in all of the left side, so the routine which does
 
943
       the analagous task of find_longest_match does not need to filter on the
 
944
       left side.
1032
945
    """
1033
946
 
1034
 
    def __init__(self, settings=None):
1035
 
        super(PyrexGroupCompressor, self).__init__(settings)
1036
 
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
1037
 
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
 
947
    def __init__(self):
 
948
        super(PyrexGroupCompressor, self).__init__()
 
949
        self._delta_index = DeltaIndex()
1038
950
 
1039
 
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
951
    def _compress(self, key, bytes, max_delta_size, soft=False):
1040
952
        """see _CommonGroupCompressor._compress"""
 
953
        input_len = len(bytes)
1041
954
        # By having action/label/sha1/len, we can parse the group if the index
1042
955
        # was ever destroyed, we have the key in 'label', we know the final
1043
956
        # bytes are valid from sha1, and we know where to find the end of this
1049
962
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
1050
963
        if self._delta_index._source_offset != self.endpoint:
1051
964
            raise AssertionError('_source_offset != endpoint'
1052
 
                                 ' somehow the DeltaIndex got out of sync with'
1053
 
                                 ' the output lines')
1054
 
        bytes = b''.join(chunks)
 
965
                ' somehow the DeltaIndex got out of sync with'
 
966
                ' the output lines')
1055
967
        delta = self._delta_index.make_delta(bytes, max_delta_size)
1056
 
        if delta is None:
 
968
        if (delta is None):
1057
969
            type = 'fulltext'
1058
 
            enc_length = encode_base128_int(input_len)
 
970
            enc_length = encode_base128_int(len(bytes))
1059
971
            len_mini_header = 1 + len(enc_length)
1060
972
            self._delta_index.add_source(bytes, len_mini_header)
1061
 
            new_chunks = [b'f', enc_length] + chunks
 
973
            new_chunks = ['f', enc_length, bytes]
1062
974
        else:
1063
975
            type = 'delta'
1064
976
            enc_length = encode_base128_int(len(delta))
1065
977
            len_mini_header = 1 + len(enc_length)
1066
 
            new_chunks = [b'd', enc_length, delta]
 
978
            new_chunks = ['d', enc_length, delta]
1067
979
            self._delta_index.add_delta_source(delta, len_mini_header)
1068
980
        # Before insertion
1069
981
        start = self.endpoint
1076
988
                                   self.endpoint, chunk_end)
1077
989
        if not self._delta_index._source_offset == self.endpoint:
1078
990
            raise AssertionError('the delta index is out of sync'
1079
 
                                 'with the output lines %s != %s'
1080
 
                                 % (self._delta_index._source_offset, self.endpoint))
 
991
                'with the output lines %s != %s'
 
992
                % (self._delta_index._source_offset, self.endpoint))
1081
993
        return start, self.endpoint, type
1082
994
 
1083
995
    def _output_chunks(self, new_chunks):
1108
1020
        if graph:
1109
1021
            ref_length = 1
1110
1022
        graph_index = BTreeBuilder(reference_lists=ref_length,
1111
 
                                   key_elements=keylength)
 
1023
            key_elements=keylength)
1112
1024
        stream = transport.open_write_stream('newpack')
1113
1025
        writer = pack.ContainerWriter(stream.write)
1114
1026
        writer.begin()
1115
 
        index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
1116
 
                              add_callback=graph_index.add_nodes,
1117
 
                              inconsistency_fatal=inconsistency_fatal)
1118
 
        access = pack_repo._DirectPackAccess({})
 
1027
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
 
1028
            add_callback=graph_index.add_nodes,
 
1029
            inconsistency_fatal=inconsistency_fatal)
 
1030
        access = knit._DirectPackAccess({})
1119
1031
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1120
1032
        result = GroupCompressVersionedFiles(index, access, delta)
1121
1033
        result.stream = stream
1131
1043
 
1132
1044
class _BatchingBlockFetcher(object):
1133
1045
    """Fetch group compress blocks in batches.
1134
 
 
 
1046
    
1135
1047
    :ivar total_bytes: int of expected number of bytes needed to fetch the
1136
1048
        currently pending batch.
1137
1049
    """
1138
1050
 
1139
 
    def __init__(self, gcvf, locations, get_compressor_settings=None):
 
1051
    def __init__(self, gcvf, locations):
1140
1052
        self.gcvf = gcvf
1141
1053
        self.locations = locations
1142
1054
        self.keys = []
1145
1057
        self.total_bytes = 0
1146
1058
        self.last_read_memo = None
1147
1059
        self.manager = None
1148
 
        self._get_compressor_settings = get_compressor_settings
1149
1060
 
1150
1061
    def add_key(self, key):
1151
1062
        """Add another to key to fetch.
1152
 
 
 
1063
        
1153
1064
        :return: The estimated number of bytes needed to fetch the batch so
1154
1065
            far.
1155
1066
        """
1180
1091
            # and then.
1181
1092
            self.batch_memos[read_memo] = cached_block
1182
1093
        return self.total_bytes
1183
 
 
 
1094
        
1184
1095
    def _flush_manager(self):
1185
1096
        if self.manager is not None:
1186
1097
            for factory in self.manager.get_record_stream():
1191
1102
    def yield_factories(self, full_flush=False):
1192
1103
        """Yield factories for keys added since the last yield.  They will be
1193
1104
        returned in the order they were added via add_key.
1194
 
 
 
1105
        
1195
1106
        :param full_flush: by default, some results may not be returned in case
1196
1107
            they can be part of the next batch.  If full_flush is True, then
1197
1108
            all results are returned.
1216
1127
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1217
1128
                    # The next block from _get_blocks will be the block we
1218
1129
                    # need.
1219
 
                    block_read_memo, block = next(blocks)
 
1130
                    block_read_memo, block = blocks.next()
1220
1131
                    if block_read_memo != read_memo:
1221
1132
                        raise AssertionError(
1222
1133
                            "block_read_memo out of sync with read_memo"
1225
1136
                    memos_to_get_stack.pop()
1226
1137
                else:
1227
1138
                    block = self.batch_memos[read_memo]
1228
 
                self.manager = _LazyGroupContentManager(block,
1229
 
                                                        get_compressor_settings=self._get_compressor_settings)
 
1139
                self.manager = _LazyGroupContentManager(block)
1230
1140
                self.last_read_memo = read_memo
1231
1141
            start, end = index_memo[3:5]
1232
1142
            self.manager.add_factory(key, parents, start, end)
1239
1149
        self.total_bytes = 0
1240
1150
 
1241
1151
 
1242
 
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
 
1152
class GroupCompressVersionedFiles(VersionedFiles):
1243
1153
    """A group-compress based VersionedFiles implementation."""
1244
1154
 
1245
 
    # This controls how the GroupCompress DeltaIndex works. Basically, we
1246
 
    # compute hash pointers into the source blocks (so hash(text) => text).
1247
 
    # However each of these references costs some memory in trade against a
1248
 
    # more accurate match result. For very large files, they either are
1249
 
    # pre-compressed and change in bulk whenever they change, or change in just
1250
 
    # local blocks. Either way, 'improved resolution' is not very helpful,
1251
 
    # versus running out of memory trying to track everything. The default max
1252
 
    # gives 100% sampling of a 1MB file.
1253
 
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
1254
 
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
1255
 
                                    _DEFAULT_MAX_BYTES_TO_INDEX}
1256
 
 
1257
 
    def __init__(self, index, access, delta=True, _unadded_refs=None,
1258
 
                 _group_cache=None):
 
1155
    def __init__(self, index, access, delta=True, _unadded_refs=None):
1259
1156
        """Create a GroupCompressVersionedFiles object.
1260
1157
 
1261
1158
        :param index: The index object storing access and graph data.
1262
1159
        :param access: The access object storing raw data.
1263
1160
        :param delta: Whether to delta compress or just entropy compress.
1264
1161
        :param _unadded_refs: private parameter, don't use.
1265
 
        :param _group_cache: private parameter, don't use.
1266
1162
        """
1267
1163
        self._index = index
1268
1164
        self._access = access
1270
1166
        if _unadded_refs is None:
1271
1167
            _unadded_refs = {}
1272
1168
        self._unadded_refs = _unadded_refs
1273
 
        if _group_cache is None:
1274
 
            _group_cache = LRUSizeCache(max_size=50 * 1024 * 1024)
1275
 
        self._group_cache = _group_cache
1276
 
        self._immediate_fallback_vfs = []
1277
 
        self._max_bytes_to_index = None
 
1169
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
 
1170
        self._fallback_vfs = []
1278
1171
 
1279
1172
    def without_fallbacks(self):
1280
1173
        """Return a clone of this object without any fallbacks configured."""
1281
1174
        return GroupCompressVersionedFiles(self._index, self._access,
1282
 
                                           self._delta, _unadded_refs=dict(
1283
 
                                               self._unadded_refs),
1284
 
                                           _group_cache=self._group_cache)
 
1175
            self._delta, _unadded_refs=dict(self._unadded_refs))
1285
1176
 
1286
1177
    def add_lines(self, key, parents, lines, parent_texts=None,
1287
 
                  left_matching_blocks=None, nostore_sha=None, random_id=False,
1288
 
                  check_content=True):
 
1178
        left_matching_blocks=None, nostore_sha=None, random_id=False,
 
1179
        check_content=True):
1289
1180
        """Add a text to the store.
1290
1181
 
1291
1182
        :param key: The key tuple of the text to add.
1292
1183
        :param parents: The parents key tuples of the text to add.
1293
1184
        :param lines: A list of lines. Each line must be a bytestring. And all
1294
 
            of them except the last must be terminated with \\n and contain no
1295
 
            other \\n's. The last line may either contain no \\n's or a single
1296
 
            terminating \\n. If the lines list does meet this constraint the
1297
 
            add routine may error or may succeed - but you will be unable to
1298
 
            read the data back accurately. (Checking the lines have been split
 
1185
            of them except the last must be terminated with \n and contain no
 
1186
            other \n's. The last line may either contain no \n's or a single
 
1187
            terminating \n. If the lines list does meet this constraint the add
 
1188
            routine may error or may succeed - but you will be unable to read
 
1189
            the data back accurately. (Checking the lines have been split
1299
1190
            correctly is expensive and extremely unlikely to catch bugs so it
1300
1191
            is not done at runtime unless check_content is True.)
1301
1192
        :param parent_texts: An optional dictionary containing the opaque
1320
1211
                 back to future add_lines calls in the parent_texts dictionary.
1321
1212
        """
1322
1213
        self._index._check_write_ok()
1323
 
        if check_content:
1324
 
            self._check_lines_not_unicode(lines)
1325
 
            self._check_lines_are_lines(lines)
1326
 
        return self.add_content(
1327
 
            ChunkedContentFactory(
1328
 
                key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
1329
 
            parent_texts, left_matching_blocks, nostore_sha, random_id)
1330
 
 
1331
 
    def add_content(self, factory, parent_texts=None,
1332
 
                    left_matching_blocks=None, nostore_sha=None,
1333
 
                    random_id=False):
1334
 
        """Add a text to the store.
1335
 
 
1336
 
        :param factory: A ContentFactory that can be used to retrieve the key,
1337
 
            parents and contents.
1338
 
        :param parent_texts: An optional dictionary containing the opaque
1339
 
            representations of some or all of the parents of version_id to
1340
 
            allow delta optimisations.  VERY IMPORTANT: the texts must be those
1341
 
            returned by add_lines or data corruption can be caused.
1342
 
        :param left_matching_blocks: a hint about which areas are common
1343
 
            between the text and its left-hand-parent.  The format is
1344
 
            the SequenceMatcher.get_matching_blocks format.
1345
 
        :param nostore_sha: Raise ExistingContent and do not add the lines to
1346
 
            the versioned file if the digest of the lines matches this.
1347
 
        :param random_id: If True a random id has been selected rather than
1348
 
            an id determined by some deterministic process such as a converter
1349
 
            from a foreign VCS. When True the backend may choose not to check
1350
 
            for uniqueness of the resulting key within the versioned file, so
1351
 
            this should only be done when the result is expected to be unique
1352
 
            anyway.
1353
 
        :return: The text sha1, the number of bytes in the text, and an opaque
1354
 
                 representation of the inserted version which can be provided
1355
 
                 back to future add_lines calls in the parent_texts dictionary.
1356
 
        """
 
1214
        self._check_add(key, lines, random_id, check_content)
 
1215
        if parents is None:
 
1216
            # The caller might pass None if there is no graph data, but kndx
 
1217
            # indexes can't directly store that, so we give them
 
1218
            # an empty tuple instead.
 
1219
            parents = ()
 
1220
        # double handling for now. Make it work until then.
 
1221
        length = sum(map(len, lines))
 
1222
        record = ChunkedContentFactory(key, parents, None, lines)
 
1223
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
 
1224
                                               nostore_sha=nostore_sha))[0]
 
1225
        return sha1, length, None
 
1226
 
 
1227
    def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
 
1228
        """See VersionedFiles._add_text()."""
1357
1229
        self._index._check_write_ok()
1358
 
        parents = factory.parents
1359
 
        self._check_add(factory.key, random_id)
 
1230
        self._check_add(key, None, random_id, check_content=False)
 
1231
        if text.__class__ is not str:
 
1232
            raise errors.BzrBadParameterUnicode("text")
1360
1233
        if parents is None:
1361
1234
            # The caller might pass None if there is no graph data, but kndx
1362
1235
            # indexes can't directly store that, so we give them
1363
1236
            # an empty tuple instead.
1364
1237
            parents = ()
1365
1238
        # double handling for now. Make it work until then.
1366
 
        sha1, length = list(self._insert_record_stream(
1367
 
            [factory], random_id=random_id, nostore_sha=nostore_sha))[0]
 
1239
        length = len(text)
 
1240
        record = FulltextContentFactory(key, parents, None, text)
 
1241
        sha1 = list(self._insert_record_stream([record], random_id=random_id,
 
1242
                                               nostore_sha=nostore_sha))[0]
1368
1243
        return sha1, length, None
1369
1244
 
1370
1245
    def add_fallback_versioned_files(self, a_versioned_files):
1372
1247
 
1373
1248
        :param a_versioned_files: A VersionedFiles object.
1374
1249
        """
1375
 
        self._immediate_fallback_vfs.append(a_versioned_files)
 
1250
        self._fallback_vfs.append(a_versioned_files)
1376
1251
 
1377
1252
    def annotate(self, key):
1378
1253
        """See VersionedFiles.annotate."""
1387
1262
        if keys is None:
1388
1263
            keys = self.keys()
1389
1264
            for record in self.get_record_stream(keys, 'unordered', True):
1390
 
                for chunk in record.iter_bytes_as('chunked'):
1391
 
                    pass
 
1265
                record.get_bytes_as('fulltext')
1392
1266
        else:
1393
1267
            return self.get_record_stream(keys, 'unordered', True)
1394
1268
 
1398
1272
        self._index._graph_index.clear_cache()
1399
1273
        self._index._int_cache.clear()
1400
1274
 
1401
 
    def _check_add(self, key, random_id):
 
1275
    def _check_add(self, key, lines, random_id, check_content):
1402
1276
        """check that version_id and lines are safe to add."""
1403
1277
        version_id = key[-1]
1404
1278
        if version_id is not None:
1409
1283
        # probably check that the existing content is identical to what is
1410
1284
        # being inserted, and otherwise raise an exception.  This would make
1411
1285
        # the bundle code simpler.
 
1286
        if check_content:
 
1287
            self._check_lines_not_unicode(lines)
 
1288
            self._check_lines_are_lines(lines)
 
1289
 
 
1290
    def get_known_graph_ancestry(self, keys):
 
1291
        """Get a KnownGraph instance with the ancestry of keys."""
 
1292
        # Note that this is identical to
 
1293
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
 
1294
        # ancestry.
 
1295
        parent_map, missing_keys = self._index.find_ancestry(keys)
 
1296
        for fallback in self._fallback_vfs:
 
1297
            if not missing_keys:
 
1298
                break
 
1299
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
 
1300
                                                missing_keys)
 
1301
            parent_map.update(f_parent_map)
 
1302
            missing_keys = f_missing_keys
 
1303
        kg = _mod_graph.KnownGraph(parent_map)
 
1304
        return kg
1412
1305
 
1413
1306
    def get_parent_map(self, keys):
1414
1307
        """Get a map of the graph parents of keys.
1430
1323
            and so on.
1431
1324
        """
1432
1325
        result = {}
1433
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1326
        sources = [self._index] + self._fallback_vfs
1434
1327
        source_results = []
1435
1328
        missing = set(keys)
1436
1329
        for source in sources:
1473
1366
                yield read_memo, cached[read_memo]
1474
1367
            except KeyError:
1475
1368
                # Read the block, and cache it.
1476
 
                zdata = next(raw_records)
 
1369
                zdata = raw_records.next()
1477
1370
                block = GroupCompressBlock.from_bytes(zdata)
1478
1371
                self._group_cache[read_memo] = block
1479
1372
                cached[read_memo] = block
1507
1400
        if not keys:
1508
1401
            return
1509
1402
        if (not self._index.has_graph
1510
 
                and ordering in ('topological', 'groupcompress')):
 
1403
            and ordering in ('topological', 'groupcompress')):
1511
1404
            # Cannot topological order when no graph has been stored.
1512
1405
            # but we allow 'as-requested' or 'unordered'
1513
1406
            ordering = 'unordered'
1517
1410
            try:
1518
1411
                keys = set(remaining_keys)
1519
1412
                for content_factory in self._get_remaining_record_stream(keys,
1520
 
                                                                         orig_keys, ordering, include_delta_closure):
 
1413
                        orig_keys, ordering, include_delta_closure):
1521
1414
                    remaining_keys.discard(content_factory.key)
1522
1415
                    yield content_factory
1523
1416
                return
1524
 
            except errors.RetryWithNewPacks as e:
 
1417
            except errors.RetryWithNewPacks, e:
1525
1418
                self._access.reload_or_raise(e)
1526
1419
 
1527
1420
    def _find_from_fallback(self, missing):
1537
1430
        parent_map = {}
1538
1431
        key_to_source_map = {}
1539
1432
        source_results = []
1540
 
        for source in self._immediate_fallback_vfs:
 
1433
        for source in self._fallback_vfs:
1541
1434
            if not missing:
1542
1435
                break
1543
1436
            source_parents = source.get_parent_map(missing)
1553
1446
 
1554
1447
        The returned objects should be in the order defined by 'ordering',
1555
1448
        which can weave between different sources.
1556
 
 
1557
1449
        :param ordering: Must be one of 'topological' or 'groupcompress'
1558
1450
        :return: List of [(source, [keys])] tuples, such that all keys are in
1559
1451
            the defined order, regardless of source.
1560
1452
        """
1561
1453
        if ordering == 'topological':
1562
 
            present_keys = tsort.topo_sort(parent_map)
 
1454
            present_keys = topo_sort(parent_map)
1563
1455
        else:
1564
1456
            # ordering == 'groupcompress'
1565
1457
            # XXX: This only optimizes for the target ordering. We may need
1587
1479
                source = self
1588
1480
            elif key in key_to_source_map:
1589
1481
                source = key_to_source_map[key]
1590
 
            else:  # absent
 
1482
            else: # absent
1591
1483
                continue
1592
1484
            if source is not current_source:
1593
1485
                source_keys.append((source, []))
1601
1493
            # This is the group the bytes are stored in, followed by the
1602
1494
            # location in the group
1603
1495
            return locations[key][0]
 
1496
        present_keys = sorted(locations.iterkeys(), key=get_group)
1604
1497
        # We don't have an ordering for keys in the in-memory object, but
1605
1498
        # lets process the in-memory ones first.
1606
 
        present_keys = list(unadded_keys)
1607
 
        present_keys.extend(sorted(locations, key=get_group))
 
1499
        present_keys = list(unadded_keys) + present_keys
1608
1500
        # Now grab all of the ones from other sources
1609
1501
        source_keys = [(self, present_keys)]
1610
1502
        source_keys.extend(source_result)
1634
1526
            # start with one key, recurse to its oldest parent, then grab
1635
1527
            # everything in the same group, etc.
1636
1528
            parent_map = dict((key, details[2]) for key, details in
1637
 
                              viewitems(locations))
 
1529
                locations.iteritems())
1638
1530
            for key in unadded_keys:
1639
1531
                parent_map[key] = self._unadded_refs[key]
1640
1532
            parent_map.update(fallback_parent_map)
1642
1534
                                                        key_to_source_map)
1643
1535
        elif ordering == 'as-requested':
1644
1536
            source_keys = self._get_as_requested_source_keys(orig_keys,
1645
 
                                                             locations, unadded_keys, key_to_source_map)
 
1537
                locations, unadded_keys, key_to_source_map)
1646
1538
        else:
1647
1539
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
1648
1540
            # Otherwise we thrash the _group_cache and destroy performance
1649
1541
            source_keys = self._get_io_ordered_source_keys(locations,
1650
 
                                                           unadded_keys, source_result)
 
1542
                unadded_keys, source_result)
1651
1543
        for key in missing:
1652
1544
            yield AbsentContentFactory(key)
1653
1545
        # Batch up as many keys as we can until either:
1654
1546
        #  - we encounter an unadded ref, or
1655
1547
        #  - we run out of keys, or
1656
1548
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
1657
 
        batcher = _BatchingBlockFetcher(self, locations,
1658
 
                                        get_compressor_settings=self._get_compressor_settings)
 
1549
        batcher = _BatchingBlockFetcher(self, locations)
1659
1550
        for source, keys in source_keys:
1660
1551
            if source is self:
1661
1552
                for key in keys:
1664
1555
                        # self._compressor.
1665
1556
                        for factory in batcher.yield_factories(full_flush=True):
1666
1557
                            yield factory
1667
 
                        chunks, sha1 = self._compressor.extract(key)
 
1558
                        bytes, sha1 = self._compressor.extract(key)
1668
1559
                        parents = self._unadded_refs[key]
1669
 
                        yield ChunkedContentFactory(key, parents, sha1, chunks)
 
1560
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1670
1561
                        continue
1671
1562
                    if batcher.add_key(key) > BATCH_SIZE:
1672
1563
                        # Ok, this batch is big enough.  Yield some results.
1685
1576
        """See VersionedFiles.get_sha1s()."""
1686
1577
        result = {}
1687
1578
        for record in self.get_record_stream(keys, 'unordered', True):
1688
 
            if record.sha1 is not None:
 
1579
            if record.sha1 != None:
1689
1580
                result[record.key] = record.sha1
1690
1581
            else:
1691
1582
                if record.storage_kind != 'absent':
1692
 
                    result[record.key] = osutils.sha_strings(
1693
 
                        record.iter_bytes_as('chunked'))
 
1583
                    result[record.key] = osutils.sha_string(
 
1584
                        record.get_bytes_as('fulltext'))
1694
1585
        return result
1695
1586
 
1696
1587
    def insert_record_stream(self, stream):
1704
1595
        # test_insert_record_stream_existing_keys fail for groupcompress and
1705
1596
        # groupcompress-nograph, this needs to be revisited while addressing
1706
1597
        # 'bzr branch' performance issues.
1707
 
        for _, _ in self._insert_record_stream(stream, random_id=False):
 
1598
        for _ in self._insert_record_stream(stream, random_id=False):
1708
1599
            pass
1709
1600
 
1710
 
    def _get_compressor_settings(self):
1711
 
        if self._max_bytes_to_index is None:
1712
 
            # TODO: VersionedFiles don't know about their containing
1713
 
            #       repository, so they don't have much of an idea about their
1714
 
            #       location. So for now, this is only a global option.
1715
 
            c = config.GlobalConfig()
1716
 
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
1717
 
            if val is not None:
1718
 
                try:
1719
 
                    val = int(val)
1720
 
                except ValueError as e:
1721
 
                    trace.warning('Value for '
1722
 
                                  '"bzr.groupcompress.max_bytes_to_index"'
1723
 
                                  ' %r is not an integer'
1724
 
                                  % (val,))
1725
 
                    val = None
1726
 
            if val is None:
1727
 
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
1728
 
            self._max_bytes_to_index = val
1729
 
        return {'max_bytes_to_index': self._max_bytes_to_index}
1730
 
 
1731
 
    def _make_group_compressor(self):
1732
 
        return GroupCompressor(self._get_compressor_settings())
1733
 
 
1734
1601
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1735
1602
                              reuse_blocks=True):
1736
1603
        """Internal core to insert a record stream into this container.
1744
1611
        :param reuse_blocks: If the source is streaming from
1745
1612
            groupcompress-blocks, just insert the blocks as-is, rather than
1746
1613
            expanding the texts and inserting again.
1747
 
        :return: An iterator over (sha1, length) of the inserted records.
 
1614
        :return: An iterator over the sha1 of the inserted records.
1748
1615
        :seealso insert_record_stream:
1749
1616
        :seealso add_lines:
1750
1617
        """
1751
1618
        adapters = {}
1752
 
 
1753
1619
        def get_adapter(adapter_key):
1754
1620
            try:
1755
1621
                return adapters[adapter_key]
1760
1626
                return adapter
1761
1627
        # This will go up to fulltexts for gc to gc fetching, which isn't
1762
1628
        # ideal.
1763
 
        self._compressor = self._make_group_compressor()
 
1629
        self._compressor = GroupCompressor()
1764
1630
        self._unadded_refs = {}
1765
1631
        keys_to_add = []
1766
 
 
1767
1632
        def flush():
1768
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
1769
 
            self._compressor = self._make_group_compressor()
1770
 
            # Note: At this point we still have 1 copy of the fulltext (in
1771
 
            #       record and the var 'bytes'), and this generates 2 copies of
1772
 
            #       the compressed text (one for bytes, one in chunks)
1773
 
            # TODO: Figure out how to indicate that we would be happy to free
1774
 
            #       the fulltext content at this point. Note that sometimes we
1775
 
            #       will want it later (streaming CHK pages), but most of the
1776
 
            #       time we won't (everything else)
1777
 
            index, start, length = self._access.add_raw_record(
1778
 
                None, bytes_len, chunks)
 
1633
            bytes = self._compressor.flush().to_bytes()
 
1634
            self._compressor = GroupCompressor()
 
1635
            index, start, length = self._access.add_raw_records(
 
1636
                [(None, len(bytes))], bytes)[0]
1779
1637
            nodes = []
1780
1638
            for key, reads, refs in keys_to_add:
1781
 
                nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
 
1639
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1782
1640
            self._index.add_records(nodes, random_id=random_id)
1783
1641
            self._unadded_refs = {}
1784
1642
            del keys_to_add[:]
1798
1656
                raise errors.RevisionNotPresent(record.key, self)
1799
1657
            if random_id:
1800
1658
                if record.key in inserted_keys:
1801
 
                    trace.note(gettext('Insert claimed random_id=True,'
1802
 
                                       ' but then inserted %r two times'), record.key)
 
1659
                    trace.note('Insert claimed random_id=True,'
 
1660
                               ' but then inserted %r two times', record.key)
1803
1661
                    continue
1804
1662
                inserted_keys.add(record.key)
1805
1663
            if reuse_blocks:
1819
1677
                if record.storage_kind == 'groupcompress-block':
1820
1678
                    # Insert the raw block into the target repo
1821
1679
                    insert_manager = record._manager
1822
 
                    bytes_len, chunks = record._manager._block.to_chunks()
1823
 
                    _, start, length = self._access.add_raw_record(
1824
 
                        None, bytes_len, chunks)
 
1680
                    bytes = record._manager._block.to_bytes()
 
1681
                    _, start, length = self._access.add_raw_records(
 
1682
                        [(None, len(bytes))], bytes)[0]
 
1683
                    del bytes
1825
1684
                    block_start = start
1826
1685
                    block_length = length
1827
1686
                if record.storage_kind in ('groupcompress-block',
1830
1689
                        raise AssertionError('No insert_manager set')
1831
1690
                    if insert_manager is not record._manager:
1832
1691
                        raise AssertionError('insert_manager does not match'
1833
 
                                             ' the current record, we cannot be positive'
1834
 
                                             ' that the appropriate content was inserted.'
1835
 
                                             )
1836
 
                    value = b"%d %d %d %d" % (block_start, block_length,
1837
 
                                              record._start, record._end)
 
1692
                            ' the current record, we cannot be positive'
 
1693
                            ' that the appropriate content was inserted.'
 
1694
                            )
 
1695
                    value = "%d %d %d %d" % (block_start, block_length,
 
1696
                                             record._start, record._end)
1838
1697
                    nodes = [(record.key, value, (record.parents,))]
1839
1698
                    # TODO: Consider buffering up many nodes to be added, not
1840
1699
                    #       sure how much overhead this has, but we're seeing
1842
1701
                    self._index.add_records(nodes, random_id=random_id)
1843
1702
                    continue
1844
1703
            try:
1845
 
                chunks = record.get_bytes_as('chunked')
1846
 
            except UnavailableRepresentation:
1847
 
                adapter_key = record.storage_kind, 'chunked'
 
1704
                bytes = record.get_bytes_as('fulltext')
 
1705
            except errors.UnavailableRepresentation:
 
1706
                adapter_key = record.storage_kind, 'fulltext'
1848
1707
                adapter = get_adapter(adapter_key)
1849
 
                chunks = adapter.get_bytes(record, 'chunked')
1850
 
            chunks_len = record.size
1851
 
            if chunks_len is None:
1852
 
                chunks_len = sum(map(len, chunks))
 
1708
                bytes = adapter.get_bytes(record)
1853
1709
            if len(record.key) > 1:
1854
1710
                prefix = record.key[0]
1855
1711
                soft = (prefix == last_prefix)
1856
1712
            else:
1857
1713
                prefix = None
1858
1714
                soft = False
1859
 
            if max_fulltext_len < chunks_len:
1860
 
                max_fulltext_len = chunks_len
 
1715
            if max_fulltext_len < len(bytes):
 
1716
                max_fulltext_len = len(bytes)
1861
1717
                max_fulltext_prefix = prefix
1862
1718
            (found_sha1, start_point, end_point,
1863
 
             type) = self._compressor.compress(
1864
 
                 record.key, chunks, chunks_len, record.sha1, soft=soft,
1865
 
                 nostore_sha=nostore_sha)
1866
 
            # delta_ratio = float(chunks_len) / (end_point - start_point)
 
1719
             type) = self._compressor.compress(record.key,
 
1720
                                               bytes, record.sha1, soft=soft,
 
1721
                                               nostore_sha=nostore_sha)
 
1722
            # delta_ratio = float(len(bytes)) / (end_point - start_point)
1867
1723
            # Check if we want to continue to include that text
1868
1724
            if (prefix == max_fulltext_prefix
1869
 
                    and end_point < 2 * max_fulltext_len):
 
1725
                and end_point < 2 * max_fulltext_len):
1870
1726
                # As long as we are on the same file_id, we will fill at least
1871
1727
                # 2 * max_fulltext_len
1872
1728
                start_new_block = False
1873
 
            elif end_point > 4 * 1024 * 1024:
 
1729
            elif end_point > 4*1024*1024:
1874
1730
                start_new_block = True
1875
1731
            elif (prefix is not None and prefix != last_prefix
1876
 
                  and end_point > 2 * 1024 * 1024):
 
1732
                  and end_point > 2*1024*1024):
1877
1733
                start_new_block = True
1878
1734
            else:
1879
1735
                start_new_block = False
1881
1737
            if start_new_block:
1882
1738
                self._compressor.pop_last()
1883
1739
                flush()
1884
 
                max_fulltext_len = chunks_len
 
1740
                max_fulltext_len = len(bytes)
1885
1741
                (found_sha1, start_point, end_point,
1886
 
                 type) = self._compressor.compress(
1887
 
                     record.key, chunks, chunks_len, record.sha1)
 
1742
                 type) = self._compressor.compress(record.key, bytes,
 
1743
                                                   record.sha1)
1888
1744
            if record.key[-1] is None:
1889
 
                key = record.key[:-1] + (b'sha1:' + found_sha1,)
 
1745
                key = record.key[:-1] + ('sha1:' + found_sha1,)
1890
1746
            else:
1891
1747
                key = record.key
1892
1748
            self._unadded_refs[key] = record.parents
1893
 
            yield found_sha1, chunks_len
 
1749
            yield found_sha1
1894
1750
            as_st = static_tuple.StaticTuple.from_sequence
1895
1751
            if record.parents is not None:
1896
1752
                parents = as_st([as_st(p) for p in record.parents])
1897
1753
            else:
1898
1754
                parents = None
1899
1755
            refs = static_tuple.StaticTuple(parents)
1900
 
            keys_to_add.append(
1901
 
                (key, b'%d %d' % (start_point, end_point), refs))
 
1756
            keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
1902
1757
        if len(keys_to_add):
1903
1758
            flush()
1904
1759
        self._compressor = None
1930
1785
        # but we need to setup a list of records to visit.
1931
1786
        # we need key, position, length
1932
1787
        for key_idx, record in enumerate(self.get_record_stream(keys,
1933
 
                                                                'unordered', True)):
 
1788
            'unordered', True)):
1934
1789
            # XXX: todo - optimise to use less than full texts.
1935
1790
            key = record.key
1936
1791
            if pb is not None:
1937
1792
                pb.update('Walking content', key_idx, total)
1938
1793
            if record.storage_kind == 'absent':
1939
1794
                raise errors.RevisionNotPresent(key, self)
1940
 
            for line in record.iter_bytes_as('lines'):
 
1795
            lines = osutils.split_lines(record.get_bytes_as('fulltext'))
 
1796
            for line in lines:
1941
1797
                yield line, key
1942
1798
        if pb is not None:
1943
1799
            pb.update('Walking content', total, total)
1946
1802
        """See VersionedFiles.keys."""
1947
1803
        if 'evil' in debug.debug_flags:
1948
1804
            trace.mutter_callsite(2, "keys scales with size of history")
1949
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1805
        sources = [self._index] + self._fallback_vfs
1950
1806
        result = set()
1951
1807
        for source in sources:
1952
1808
            result.update(source.keys())
1953
1809
        return result
1954
1810
 
1955
1811
 
1956
 
class _GCBuildDetails(object):
1957
 
    """A blob of data about the build details.
1958
 
 
1959
 
    This stores the minimal data, which then allows compatibility with the old
1960
 
    api, without taking as much memory.
1961
 
    """
1962
 
 
1963
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1964
 
                 '_delta_end', '_parents')
1965
 
 
1966
 
    method = 'group'
1967
 
    compression_parent = None
1968
 
 
1969
 
    def __init__(self, parents, position_info):
1970
 
        self._parents = parents
1971
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1972
 
         self._delta_end) = position_info
1973
 
 
1974
 
    def __repr__(self):
1975
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1976
 
                               self.index_memo, self._parents)
1977
 
 
1978
 
    @property
1979
 
    def index_memo(self):
1980
 
        return (self._index, self._group_start, self._group_end,
1981
 
                self._basis_end, self._delta_end)
1982
 
 
1983
 
    @property
1984
 
    def record_details(self):
1985
 
        return static_tuple.StaticTuple(self.method, None)
1986
 
 
1987
 
    def __getitem__(self, offset):
1988
 
        """Compatibility thunk to act like a tuple."""
1989
 
        if offset == 0:
1990
 
            return self.index_memo
1991
 
        elif offset == 1:
1992
 
            return self.compression_parent  # Always None
1993
 
        elif offset == 2:
1994
 
            return self._parents
1995
 
        elif offset == 3:
1996
 
            return self.record_details
1997
 
        else:
1998
 
            raise IndexError('offset out of range')
1999
 
 
2000
 
    def __len__(self):
2001
 
        return 4
2002
 
 
2003
 
 
2004
1812
class _GCGraphIndex(object):
2005
1813
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
2006
1814
 
2007
1815
    def __init__(self, graph_index, is_locked, parents=True,
2008
 
                 add_callback=None, track_external_parent_refs=False,
2009
 
                 inconsistency_fatal=True, track_new_keys=False):
 
1816
        add_callback=None, track_external_parent_refs=False,
 
1817
        inconsistency_fatal=True, track_new_keys=False):
2010
1818
        """Construct a _GCGraphIndex on a graph_index.
2011
1819
 
2012
 
        :param graph_index: An implementation of breezy.index.GraphIndex.
 
1820
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
2013
1821
        :param is_locked: A callback, returns True if the index is locked and
2014
1822
            thus usable.
2015
1823
        :param parents: If True, record knits parents, if not do not record
2035
1843
        # repeated over and over, this creates a surplus of ints
2036
1844
        self._int_cache = {}
2037
1845
        if track_external_parent_refs:
2038
 
            self._key_dependencies = _KeyRefs(
 
1846
            self._key_dependencies = knit._KeyRefs(
2039
1847
                track_new_keys=track_new_keys)
2040
1848
        else:
2041
1849
            self._key_dependencies = None
2065
1873
                if refs:
2066
1874
                    for ref in refs:
2067
1875
                        if ref:
2068
 
                            raise knit.KnitCorrupt(self,
2069
 
                                                   "attempt to add node with parents "
2070
 
                                                   "in parentless index.")
 
1876
                            raise errors.KnitCorrupt(self,
 
1877
                                "attempt to add node with parents "
 
1878
                                "in parentless index.")
2071
1879
                    refs = ()
2072
1880
                    changed = True
2073
1881
            keys[key] = (value, refs)
2081
1889
                if node_refs != passed[1]:
2082
1890
                    details = '%s %s %s' % (key, (value, node_refs), passed)
2083
1891
                    if self._inconsistency_fatal:
2084
 
                        raise knit.KnitCorrupt(self, "inconsistent details"
2085
 
                                               " in add_records: %s" %
2086
 
                                               details)
 
1892
                        raise errors.KnitCorrupt(self, "inconsistent details"
 
1893
                                                 " in add_records: %s" %
 
1894
                                                 details)
2087
1895
                    else:
2088
1896
                        trace.warning("inconsistent details in skipped"
2089
1897
                                      " record: %s", details)
2092
1900
        if changed:
2093
1901
            result = []
2094
1902
            if self._parents:
2095
 
                for key, (value, node_refs) in viewitems(keys):
 
1903
                for key, (value, node_refs) in keys.iteritems():
2096
1904
                    result.append((key, value, node_refs))
2097
1905
            else:
2098
 
                for key, (value, node_refs) in viewitems(keys):
 
1906
                for key, (value, node_refs) in keys.iteritems():
2099
1907
                    result.append((key, value))
2100
1908
            records = result
2101
1909
        key_dependencies = self._key_dependencies
2181
1989
        :param keys: An iterable of keys.
2182
1990
        :return: A dict of key:
2183
1991
            (index_memo, compression_parent, parents, record_details).
2184
 
 
2185
 
            * index_memo: opaque structure to pass to read_records to extract
2186
 
              the raw data
2187
 
            * compression_parent: Content that this record is built upon, may
2188
 
              be None
2189
 
            * parents: Logical parents of this node
2190
 
            * record_details: extra information about the content which needs
2191
 
              to be passed to Factory.parse_record
 
1992
            index_memo
 
1993
                opaque structure to pass to read_records to extract the raw
 
1994
                data
 
1995
            compression_parent
 
1996
                Content that this record is built upon, may be None
 
1997
            parents
 
1998
                Logical parents of this node
 
1999
            record_details
 
2000
                extra information about the content which needs to be passed to
 
2001
                Factory.parse_record
2192
2002
        """
2193
2003
        self._check_read()
2194
2004
        result = {}
2199
2009
                parents = None
2200
2010
            else:
2201
2011
                parents = entry[3][0]
2202
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2203
 
            result[key] = details
 
2012
            method = 'group'
 
2013
            result[key] = (self._node_to_position(entry),
 
2014
                                  None, parents, (method, None))
2204
2015
        return result
2205
2016
 
2206
2017
    def keys(self):
2213
2024
 
2214
2025
    def _node_to_position(self, node):
2215
2026
        """Convert an index value to position details."""
2216
 
        bits = node[2].split(b' ')
 
2027
        bits = node[2].split(' ')
2217
2028
        # It would be nice not to read the entire gzip.
2218
2029
        # start and stop are put into _int_cache because they are very common.
2219
2030
        # They define the 'group' that an entry is in, and many groups can have
2222
2033
        # each, or about 7MB. Note that it might be even more when you consider
2223
2034
        # how PyInt is allocated in separate slabs. And you can't return a slab
2224
2035
        # to the OS if even 1 int on it is in use. Note though that Python uses
2225
 
        # a LIFO when re-using PyInt slots, which might cause more
 
2036
        # a LIFO when re-using PyInt slots, which probably causes more
2226
2037
        # fragmentation.
2227
2038
        start = int(bits[0])
2228
2039
        start = self._int_cache.setdefault(start, start)
2252
2063
            key_dependencies.add_references(node[1], node[3][0])
2253
2064
 
2254
2065
 
2255
 
from ._groupcompress_py import (
 
2066
from bzrlib._groupcompress_py import (
2256
2067
    apply_delta,
2257
2068
    apply_delta_to_source,
2258
2069
    encode_base128_int,
2261
2072
    LinesDeltaIndex,
2262
2073
    )
2263
2074
try:
2264
 
    from ._groupcompress_pyx import (
 
2075
    from bzrlib._groupcompress_pyx import (
2265
2076
        apply_delta,
2266
2077
        apply_delta_to_source,
2267
2078
        DeltaIndex,
2269
2080
        decode_base128_int,
2270
2081
        )
2271
2082
    GroupCompressor = PyrexGroupCompressor
2272
 
except ImportError as e:
 
2083
except ImportError, e:
2273
2084
    osutils.failed_to_load_extension(e)
2274
2085
    GroupCompressor = PythonGroupCompressor
 
2086