/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/bzr/groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2020-07-18 23:14:00 UTC
  • mfrom: (7490.40.62 work)
  • mto: This revision was merged to the branch mainline in revision 7519.
  • Revision ID: jelmer@jelmer.uk-20200718231400-jaes9qltn8oi8xss
Merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Core compression logic for compressing streams of related files."""
 
18
 
 
19
import time
 
20
import zlib
 
21
 
 
22
from ..lazy_import import lazy_import
 
23
lazy_import(globals(), """
 
24
from breezy import (
 
25
    annotate,
 
26
    config,
 
27
    debug,
 
28
    osutils,
 
29
    static_tuple,
 
30
    trace,
 
31
    tsort,
 
32
    )
 
33
from breezy.bzr import (
 
34
    knit,
 
35
    pack,
 
36
    pack_repo,
 
37
    )
 
38
 
 
39
from breezy.i18n import gettext
 
40
""")
 
41
 
 
42
from .. import (
 
43
    errors,
 
44
    )
 
45
from .btree_index import BTreeBuilder
 
46
from ..lru_cache import LRUSizeCache
 
47
from .versionedfile import (
 
48
    _KeyRefs,
 
49
    adapter_registry,
 
50
    AbsentContentFactory,
 
51
    ChunkedContentFactory,
 
52
    ExistingContent,
 
53
    FulltextContentFactory,
 
54
    VersionedFilesWithFallbacks,
 
55
    UnavailableRepresentation,
 
56
    )
 
57
 
 
58
# Minimum number of uncompressed bytes to try fetch at once when retrieving
 
59
# groupcompress blocks.
 
60
BATCH_SIZE = 2**16
 
61
 
 
62
# osutils.sha_string(b'')
 
63
_null_sha1 = b'da39a3ee5e6b4b0d3255bfef95601890afd80709'
 
64
 
 
65
 
 
66
def sort_gc_optimal(parent_map):
 
67
    """Sort and group the keys in parent_map into groupcompress order.
 
68
 
 
69
    groupcompress is defined (currently) as reverse-topological order, grouped
 
70
    by the key prefix.
 
71
 
 
72
    :return: A sorted-list of keys
 
73
    """
 
74
    # groupcompress ordering is approximately reverse topological,
 
75
    # properly grouped by file-id.
 
76
    per_prefix_map = {}
 
77
    for key, value in parent_map.items():
 
78
        if isinstance(key, bytes) or len(key) == 1:
 
79
            prefix = b''
 
80
        else:
 
81
            prefix = key[0]
 
82
        try:
 
83
            per_prefix_map[prefix][key] = value
 
84
        except KeyError:
 
85
            per_prefix_map[prefix] = {key: value}
 
86
 
 
87
    present_keys = []
 
88
    for prefix in sorted(per_prefix_map):
 
89
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
 
90
    return present_keys
 
91
 
 
92
 
 
93
class DecompressCorruption(errors.BzrError):
 
94
 
 
95
    _fmt = "Corruption while decompressing repository file%(orig_error)s"
 
96
 
 
97
    def __init__(self, orig_error=None):
 
98
        if orig_error is not None:
 
99
            self.orig_error = ", %s" % (orig_error,)
 
100
        else:
 
101
            self.orig_error = ""
 
102
        errors.BzrError.__init__(self)
 
103
 
 
104
 
 
105
# The max zlib window size is 32kB, so if we set 'max_size' output of the
 
106
# decompressor to the requested bytes + 32kB, then we should guarantee
 
107
# num_bytes coming out.
 
108
_ZLIB_DECOMP_WINDOW = 32 * 1024
 
109
 
 
110
 
 
111
class GroupCompressBlock(object):
 
112
    """An object which maintains the internal structure of the compressed data.
 
113
 
 
114
    This tracks the meta info (start of text, length, type, etc.)
 
115
    """
 
116
 
 
117
    # Group Compress Block v1 Zlib
 
118
    GCB_HEADER = b'gcb1z\n'
 
119
    # Group Compress Block v1 Lzma
 
120
    GCB_LZ_HEADER = b'gcb1l\n'
 
121
    GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
 
122
 
 
123
    def __init__(self):
 
124
        # map by key? or just order in file?
 
125
        self._compressor_name = None
 
126
        self._z_content_chunks = None
 
127
        self._z_content_decompressor = None
 
128
        self._z_content_length = None
 
129
        self._content_length = None
 
130
        self._content = None
 
131
        self._content_chunks = None
 
132
 
 
133
    def __len__(self):
 
134
        # This is the maximum number of bytes this object will reference if
 
135
        # everything is decompressed. However, if we decompress less than
 
136
        # everything... (this would cause some problems for LRUSizeCache)
 
137
        return self._content_length + self._z_content_length
 
138
 
 
139
    def _ensure_content(self, num_bytes=None):
 
140
        """Make sure that content has been expanded enough.
 
141
 
 
142
        :param num_bytes: Ensure that we have extracted at least num_bytes of
 
143
            content. If None, consume everything
 
144
        """
 
145
        if self._content_length is None:
 
146
            raise AssertionError('self._content_length should never be None')
 
147
        if num_bytes is None:
 
148
            num_bytes = self._content_length
 
149
        elif (self._content_length is not None
 
150
              and num_bytes > self._content_length):
 
151
            raise AssertionError(
 
152
                'requested num_bytes (%d) > content length (%d)'
 
153
                % (num_bytes, self._content_length))
 
154
        # Expand the content if required
 
155
        if self._content is None:
 
156
            if self._content_chunks is not None:
 
157
                self._content = b''.join(self._content_chunks)
 
158
                self._content_chunks = None
 
159
        if self._content is None:
 
160
            # We join self._z_content_chunks here, because if we are
 
161
            # decompressing, then it is *very* likely that we have a single
 
162
            # chunk
 
163
            if self._z_content_chunks is None:
 
164
                raise AssertionError('No content to decompress')
 
165
            z_content = b''.join(self._z_content_chunks)
 
166
            if z_content == b'':
 
167
                self._content = b''
 
168
            elif self._compressor_name == 'lzma':
 
169
                # We don't do partial lzma decomp yet
 
170
                import pylzma
 
171
                self._content = pylzma.decompress(z_content)
 
172
            elif self._compressor_name == 'zlib':
 
173
                # Start a zlib decompressor
 
174
                if num_bytes * 4 > self._content_length * 3:
 
175
                    # If we are requesting more that 3/4ths of the content,
 
176
                    # just extract the whole thing in a single pass
 
177
                    num_bytes = self._content_length
 
178
                    self._content = zlib.decompress(z_content)
 
179
                else:
 
180
                    self._z_content_decompressor = zlib.decompressobj()
 
181
                    # Seed the decompressor with the uncompressed bytes, so
 
182
                    # that the rest of the code is simplified
 
183
                    self._content = self._z_content_decompressor.decompress(
 
184
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
185
                    if not self._z_content_decompressor.unconsumed_tail:
 
186
                        self._z_content_decompressor = None
 
187
            else:
 
188
                raise AssertionError('Unknown compressor: %r'
 
189
                                     % self._compressor_name)
 
190
        # Any bytes remaining to be decompressed will be in the decompressors
 
191
        # 'unconsumed_tail'
 
192
 
 
193
        # Do we have enough bytes already?
 
194
        if len(self._content) >= num_bytes:
 
195
            return
 
196
        # If we got this far, and don't have a decompressor, something is wrong
 
197
        if self._z_content_decompressor is None:
 
198
            raise AssertionError(
 
199
                'No decompressor to decompress %d bytes' % num_bytes)
 
200
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
 
201
        if not remaining_decomp:
 
202
            raise AssertionError('Nothing left to decompress')
 
203
        needed_bytes = num_bytes - len(self._content)
 
204
        # We always set max_size to 32kB over the minimum needed, so that
 
205
        # zlib will give us as much as we really want.
 
206
        # TODO: If this isn't good enough, we could make a loop here,
 
207
        #       that keeps expanding the request until we get enough
 
208
        self._content += self._z_content_decompressor.decompress(
 
209
            remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
210
        if len(self._content) < num_bytes:
 
211
            raise AssertionError('%d bytes wanted, only %d available'
 
212
                                 % (num_bytes, len(self._content)))
 
213
        if not self._z_content_decompressor.unconsumed_tail:
 
214
            # The stream is finished
 
215
            self._z_content_decompressor = None
 
216
 
 
217
    def _parse_bytes(self, data, pos):
 
218
        """Read the various lengths from the header.
 
219
 
 
220
        This also populates the various 'compressed' buffers.
 
221
 
 
222
        :return: The position in bytes just after the last newline
 
223
        """
 
224
        # At present, we have 2 integers for the compressed and uncompressed
 
225
        # content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
 
226
        # checking too far, cap the search to 14 bytes.
 
227
        pos2 = data.index(b'\n', pos, pos + 14)
 
228
        self._z_content_length = int(data[pos:pos2])
 
229
        pos = pos2 + 1
 
230
        pos2 = data.index(b'\n', pos, pos + 14)
 
231
        self._content_length = int(data[pos:pos2])
 
232
        pos = pos2 + 1
 
233
        if len(data) != (pos + self._z_content_length):
 
234
            # XXX: Define some GCCorrupt error ?
 
235
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
 
236
                                 (len(data), pos, self._z_content_length))
 
237
        self._z_content_chunks = (data[pos:],)
 
238
 
 
239
    @property
 
240
    def _z_content(self):
 
241
        """Return z_content_chunks as a simple string.
 
242
 
 
243
        Meant only to be used by the test suite.
 
244
        """
 
245
        if self._z_content_chunks is not None:
 
246
            return b''.join(self._z_content_chunks)
 
247
        return None
 
248
 
 
249
    @classmethod
 
250
    def from_bytes(cls, bytes):
 
251
        out = cls()
 
252
        header = bytes[:6]
 
253
        if header not in cls.GCB_KNOWN_HEADERS:
 
254
            raise ValueError('bytes did not start with any of %r'
 
255
                             % (cls.GCB_KNOWN_HEADERS,))
 
256
        if header == cls.GCB_HEADER:
 
257
            out._compressor_name = 'zlib'
 
258
        elif header == cls.GCB_LZ_HEADER:
 
259
            out._compressor_name = 'lzma'
 
260
        else:
 
261
            raise ValueError('unknown compressor: %r' % (header,))
 
262
        out._parse_bytes(bytes, 6)
 
263
        return out
 
264
 
 
265
    def extract(self, key, start, end, sha1=None):
 
266
        """Extract the text for a specific key.
 
267
 
 
268
        :param key: The label used for this content
 
269
        :param sha1: TODO (should we validate only when sha1 is supplied?)
 
270
        :return: The bytes for the content
 
271
        """
 
272
        if start == end == 0:
 
273
            return []
 
274
        self._ensure_content(end)
 
275
        # The bytes are 'f' or 'd' for the type, then a variable-length
 
276
        # base128 integer for the content size, then the actual content
 
277
        # We know that the variable-length integer won't be longer than 5
 
278
        # bytes (it takes 5 bytes to encode 2^32)
 
279
        c = self._content[start:start + 1]
 
280
        if c == b'f':
 
281
            type = 'fulltext'
 
282
        else:
 
283
            if c != b'd':
 
284
                raise ValueError('Unknown content control code: %s'
 
285
                                 % (c,))
 
286
            type = 'delta'
 
287
        content_len, len_len = decode_base128_int(
 
288
            self._content[start + 1:start + 6])
 
289
        content_start = start + 1 + len_len
 
290
        if end != content_start + content_len:
 
291
            raise ValueError('end != len according to field header'
 
292
                             ' %s != %s' % (end, content_start + content_len))
 
293
        if c == b'f':
 
294
            return [self._content[content_start:end]]
 
295
        # Must be type delta as checked above
 
296
        return [apply_delta_to_source(self._content, content_start, end)]
 
297
 
 
298
    def set_chunked_content(self, content_chunks, length):
 
299
        """Set the content of this block to the given chunks."""
 
300
        # If we have lots of short lines, it is may be more efficient to join
 
301
        # the content ahead of time. If the content is <10MiB, we don't really
 
302
        # care about the extra memory consumption, so we can just pack it and
 
303
        # be done. However, timing showed 18s => 17.9s for repacking 1k revs of
 
304
        # mysql, which is below the noise margin
 
305
        self._content_length = length
 
306
        self._content_chunks = content_chunks
 
307
        self._content = None
 
308
        self._z_content_chunks = None
 
309
 
 
310
    def set_content(self, content):
 
311
        """Set the content of this block."""
 
312
        self._content_length = len(content)
 
313
        self._content = content
 
314
        self._z_content_chunks = None
 
315
 
 
316
    def _create_z_content_from_chunks(self, chunks):
 
317
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
 
318
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
319
        # (measured peak is maybe 30MB over the above...)
 
320
        compressed_chunks = list(map(compressor.compress, chunks))
 
321
        compressed_chunks.append(compressor.flush())
 
322
        # Ignore empty chunks
 
323
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
324
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
325
 
 
326
    def _create_z_content(self):
 
327
        if self._z_content_chunks is not None:
 
328
            return
 
329
        if self._content_chunks is not None:
 
330
            chunks = self._content_chunks
 
331
        else:
 
332
            chunks = (self._content,)
 
333
        self._create_z_content_from_chunks(chunks)
 
334
 
 
335
    def to_chunks(self):
 
336
        """Create the byte stream as a series of 'chunks'"""
 
337
        self._create_z_content()
 
338
        header = self.GCB_HEADER
 
339
        chunks = [b'%s%d\n%d\n'
 
340
                  % (header, self._z_content_length, self._content_length),
 
341
                  ]
 
342
        chunks.extend(self._z_content_chunks)
 
343
        total_len = sum(map(len, chunks))
 
344
        return total_len, chunks
 
345
 
 
346
    def to_bytes(self):
 
347
        """Encode the information into a byte stream."""
 
348
        total_len, chunks = self.to_chunks()
 
349
        return b''.join(chunks)
 
350
 
 
351
    def _dump(self, include_text=False):
 
352
        """Take this block, and spit out a human-readable structure.
 
353
 
 
354
        :param include_text: Inserts also include text bits, chose whether you
 
355
            want this displayed in the dump or not.
 
356
        :return: A dump of the given block. The layout is something like:
 
357
            [('f', length), ('d', delta_length, text_length, [delta_info])]
 
358
            delta_info := [('i', num_bytes, text), ('c', offset, num_bytes),
 
359
            ...]
 
360
        """
 
361
        self._ensure_content()
 
362
        result = []
 
363
        pos = 0
 
364
        while pos < self._content_length:
 
365
            kind = self._content[pos:pos + 1]
 
366
            pos += 1
 
367
            if kind not in (b'f', b'd'):
 
368
                raise ValueError('invalid kind character: %r' % (kind,))
 
369
            content_len, len_len = decode_base128_int(
 
370
                self._content[pos:pos + 5])
 
371
            pos += len_len
 
372
            if content_len + pos > self._content_length:
 
373
                raise ValueError('invalid content_len %d for record @ pos %d'
 
374
                                 % (content_len, pos - len_len - 1))
 
375
            if kind == b'f':  # Fulltext
 
376
                if include_text:
 
377
                    text = self._content[pos:pos + content_len]
 
378
                    result.append((b'f', content_len, text))
 
379
                else:
 
380
                    result.append((b'f', content_len))
 
381
            elif kind == b'd':  # Delta
 
382
                delta_content = self._content[pos:pos + content_len]
 
383
                delta_info = []
 
384
                # The first entry in a delta is the decompressed length
 
385
                decomp_len, delta_pos = decode_base128_int(delta_content)
 
386
                result.append((b'd', content_len, decomp_len, delta_info))
 
387
                measured_len = 0
 
388
                while delta_pos < content_len:
 
389
                    c = delta_content[delta_pos]
 
390
                    delta_pos += 1
 
391
                    if c & 0x80:  # Copy
 
392
                        (offset, length,
 
393
                         delta_pos) = decode_copy_instruction(delta_content, c,
 
394
                                                              delta_pos)
 
395
                        if include_text:
 
396
                            text = self._content[offset:offset + length]
 
397
                            delta_info.append((b'c', offset, length, text))
 
398
                        else:
 
399
                            delta_info.append((b'c', offset, length))
 
400
                        measured_len += length
 
401
                    else:  # Insert
 
402
                        if include_text:
 
403
                            txt = delta_content[delta_pos:delta_pos + c]
 
404
                        else:
 
405
                            txt = b''
 
406
                        delta_info.append((b'i', c, txt))
 
407
                        measured_len += c
 
408
                        delta_pos += c
 
409
                if delta_pos != content_len:
 
410
                    raise ValueError('Delta consumed a bad number of bytes:'
 
411
                                     ' %d != %d' % (delta_pos, content_len))
 
412
                if measured_len != decomp_len:
 
413
                    raise ValueError('Delta claimed fulltext was %d bytes, but'
 
414
                                     ' extraction resulted in %d bytes'
 
415
                                     % (decomp_len, measured_len))
 
416
            pos += content_len
 
417
        return result
 
418
 
 
419
 
 
420
class _LazyGroupCompressFactory(object):
 
421
    """Yield content from a GroupCompressBlock on demand."""
 
422
 
 
423
    def __init__(self, key, parents, manager, start, end, first):
 
424
        """Create a _LazyGroupCompressFactory
 
425
 
 
426
        :param key: The key of just this record
 
427
        :param parents: The parents of this key (possibly None)
 
428
        :param gc_block: A GroupCompressBlock object
 
429
        :param start: Offset of the first byte for this record in the
 
430
            uncompressd content
 
431
        :param end: Offset of the byte just after the end of this record
 
432
            (ie, bytes = content[start:end])
 
433
        :param first: Is this the first Factory for the given block?
 
434
        """
 
435
        self.key = key
 
436
        self.parents = parents
 
437
        self.sha1 = None
 
438
        self.size = None
 
439
        # Note: This attribute coupled with Manager._factories creates a
 
440
        #       reference cycle. Perhaps we would rather use a weakref(), or
 
441
        #       find an appropriate time to release the ref. After the first
 
442
        #       get_bytes_as call? After Manager.get_record_stream() returns
 
443
        #       the object?
 
444
        self._manager = manager
 
445
        self._chunks = None
 
446
        self.storage_kind = 'groupcompress-block'
 
447
        if not first:
 
448
            self.storage_kind = 'groupcompress-block-ref'
 
449
        self._first = first
 
450
        self._start = start
 
451
        self._end = end
 
452
 
 
453
    def __repr__(self):
 
454
        return '%s(%s, first=%s)' % (self.__class__.__name__,
 
455
                                     self.key, self._first)
 
456
 
 
457
    def _extract_bytes(self):
 
458
        # Grab and cache the raw bytes for this entry
 
459
        # and break the ref-cycle with _manager since we don't need it
 
460
        # anymore
 
461
        try:
 
462
            self._manager._prepare_for_extract()
 
463
        except zlib.error as value:
 
464
            raise DecompressCorruption("zlib: " + str(value))
 
465
        block = self._manager._block
 
466
        self._chunks = block.extract(self.key, self._start, self._end)
 
467
        # There are code paths that first extract as fulltext, and then
 
468
        # extract as storage_kind (smart fetch). So we don't break the
 
469
        # refcycle here, but instead in manager.get_record_stream()
 
470
 
 
471
    def get_bytes_as(self, storage_kind):
 
472
        if storage_kind == self.storage_kind:
 
473
            if self._first:
 
474
                # wire bytes, something...
 
475
                return self._manager._wire_bytes()
 
476
            else:
 
477
                return b''
 
478
        if storage_kind in ('fulltext', 'chunked', 'lines'):
 
479
            if self._chunks is None:
 
480
                self._extract_bytes()
 
481
            if storage_kind == 'fulltext':
 
482
                return b''.join(self._chunks)
 
483
            elif storage_kind == 'chunked':
 
484
                return self._chunks
 
485
            else:
 
486
                return osutils.chunks_to_lines(self._chunks)
 
487
        raise UnavailableRepresentation(self.key, storage_kind,
 
488
                                               self.storage_kind)
 
489
 
 
490
    def iter_bytes_as(self, storage_kind):
 
491
        if self._chunks is None:
 
492
            self._extract_bytes()
 
493
        if storage_kind == 'chunked':
 
494
            return iter(self._chunks)
 
495
        elif storage_kind == 'lines':
 
496
            return iter(osutils.chunks_to_lines(self._chunks))
 
497
        raise UnavailableRepresentation(self.key, storage_kind,
 
498
                                               self.storage_kind)
 
499
 
 
500
 
 
501
class _LazyGroupContentManager(object):
 
502
    """This manages a group of _LazyGroupCompressFactory objects."""
 
503
 
 
504
    _max_cut_fraction = 0.75  # We allow a block to be trimmed to 75% of
 
505
    # current size, and still be considered
 
506
    # resuable
 
507
    _full_block_size = 4 * 1024 * 1024
 
508
    _full_mixed_block_size = 2 * 1024 * 1024
 
509
    _full_enough_block_size = 3 * 1024 * 1024  # size at which we won't repack
 
510
    _full_enough_mixed_block_size = 2 * 768 * 1024  # 1.5MB
 
511
 
 
512
    def __init__(self, block, get_compressor_settings=None):
 
513
        self._block = block
 
514
        # We need to preserve the ordering
 
515
        self._factories = []
 
516
        self._last_byte = 0
 
517
        self._get_settings = get_compressor_settings
 
518
        self._compressor_settings = None
 
519
 
 
520
    def _get_compressor_settings(self):
 
521
        if self._compressor_settings is not None:
 
522
            return self._compressor_settings
 
523
        settings = None
 
524
        if self._get_settings is not None:
 
525
            settings = self._get_settings()
 
526
        if settings is None:
 
527
            vf = GroupCompressVersionedFiles
 
528
            settings = vf._DEFAULT_COMPRESSOR_SETTINGS
 
529
        self._compressor_settings = settings
 
530
        return self._compressor_settings
 
531
 
 
532
    def add_factory(self, key, parents, start, end):
 
533
        if not self._factories:
 
534
            first = True
 
535
        else:
 
536
            first = False
 
537
        # Note that this creates a reference cycle....
 
538
        factory = _LazyGroupCompressFactory(key, parents, self,
 
539
                                            start, end, first=first)
 
540
        # max() works here, but as a function call, doing a compare seems to be
 
541
        # significantly faster, timeit says 250ms for max() and 100ms for the
 
542
        # comparison
 
543
        if end > self._last_byte:
 
544
            self._last_byte = end
 
545
        self._factories.append(factory)
 
546
 
 
547
    def get_record_stream(self):
 
548
        """Get a record for all keys added so far."""
 
549
        for factory in self._factories:
 
550
            yield factory
 
551
            # Break the ref-cycle
 
552
            factory._bytes = None
 
553
            factory._manager = None
 
554
        # TODO: Consider setting self._factories = None after the above loop,
 
555
        #       as it will break the reference cycle
 
556
 
 
557
    def _trim_block(self, last_byte):
 
558
        """Create a new GroupCompressBlock, with just some of the content."""
 
559
        # None of the factories need to be adjusted, because the content is
 
560
        # located in an identical place. Just that some of the unreferenced
 
561
        # trailing bytes are stripped
 
562
        trace.mutter('stripping trailing bytes from groupcompress block'
 
563
                     ' %d => %d', self._block._content_length, last_byte)
 
564
        new_block = GroupCompressBlock()
 
565
        self._block._ensure_content(last_byte)
 
566
        new_block.set_content(self._block._content[:last_byte])
 
567
        self._block = new_block
 
568
 
 
569
    def _make_group_compressor(self):
 
570
        return GroupCompressor(self._get_compressor_settings())
 
571
 
 
572
    def _rebuild_block(self):
 
573
        """Create a new GroupCompressBlock with only the referenced texts."""
 
574
        compressor = self._make_group_compressor()
 
575
        tstart = time.time()
 
576
        old_length = self._block._content_length
 
577
        end_point = 0
 
578
        for factory in self._factories:
 
579
            chunks = factory.get_bytes_as('chunked')
 
580
            chunks_len = factory.size
 
581
            if chunks_len is None:
 
582
                chunks_len = sum(map(len, chunks))
 
583
            (found_sha1, start_point, end_point,
 
584
             type) = compressor.compress(
 
585
                 factory.key, chunks, chunks_len, factory.sha1)
 
586
            # Now update this factory with the new offsets, etc
 
587
            factory.sha1 = found_sha1
 
588
            factory._start = start_point
 
589
            factory._end = end_point
 
590
        self._last_byte = end_point
 
591
        new_block = compressor.flush()
 
592
        # TODO: Should we check that new_block really *is* smaller than the old
 
593
        #       block? It seems hard to come up with a method that it would
 
594
        #       expand, since we do full compression again. Perhaps based on a
 
595
        #       request that ends up poorly ordered?
 
596
        # TODO: If the content would have expanded, then we would want to
 
597
        #       handle a case where we need to split the block.
 
598
        #       Now that we have a user-tweakable option
 
599
        #       (max_bytes_to_index), it is possible that one person set it
 
600
        #       to a very low value, causing poor compression.
 
601
        delta = time.time() - tstart
 
602
        self._block = new_block
 
603
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
 
604
                     ' %d bytes => %d bytes', delta, old_length,
 
605
                     self._block._content_length)
 
606
 
 
607
    def _prepare_for_extract(self):
 
608
        """A _LazyGroupCompressFactory is about to extract to fulltext."""
 
609
        # We expect that if one child is going to fulltext, all will be. This
 
610
        # helps prevent all of them from extracting a small amount at a time.
 
611
        # Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
 
612
        # time (self._block._content) is a little expensive.
 
613
        self._block._ensure_content(self._last_byte)
 
614
 
 
615
    def _check_rebuild_action(self):
 
616
        """Check to see if our block should be repacked."""
 
617
        total_bytes_used = 0
 
618
        last_byte_used = 0
 
619
        for factory in self._factories:
 
620
            total_bytes_used += factory._end - factory._start
 
621
            if last_byte_used < factory._end:
 
622
                last_byte_used = factory._end
 
623
        # If we are using more than half of the bytes from the block, we have
 
624
        # nothing else to check
 
625
        if total_bytes_used * 2 >= self._block._content_length:
 
626
            return None, last_byte_used, total_bytes_used
 
627
        # We are using less than 50% of the content. Is the content we are
 
628
        # using at the beginning of the block? If so, we can just trim the
 
629
        # tail, rather than rebuilding from scratch.
 
630
        if total_bytes_used * 2 > last_byte_used:
 
631
            return 'trim', last_byte_used, total_bytes_used
 
632
 
 
633
        # We are using a small amount of the data, and it isn't just packed
 
634
        # nicely at the front, so rebuild the content.
 
635
        # Note: This would be *nicer* as a strip-data-from-group, rather than
 
636
        #       building it up again from scratch
 
637
        #       It might be reasonable to consider the fulltext sizes for
 
638
        #       different bits when deciding this, too. As you may have a small
 
639
        #       fulltext, and a trivial delta, and you are just trading around
 
640
        #       for another fulltext. If we do a simple 'prune' you may end up
 
641
        #       expanding many deltas into fulltexts, as well.
 
642
        #       If we build a cheap enough 'strip', then we could try a strip,
 
643
        #       if that expands the content, we then rebuild.
 
644
        return 'rebuild', last_byte_used, total_bytes_used
 
645
 
 
646
    def check_is_well_utilized(self):
 
647
        """Is the current block considered 'well utilized'?
 
648
 
 
649
        This heuristic asks if the current block considers itself to be a fully
 
650
        developed group, rather than just a loose collection of data.
 
651
        """
 
652
        if len(self._factories) == 1:
 
653
            # A block of length 1 could be improved by combining with other
 
654
            # groups - don't look deeper. Even larger than max size groups
 
655
            # could compress well with adjacent versions of the same thing.
 
656
            return False
 
657
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
658
        block_size = self._block._content_length
 
659
        if total_bytes_used < block_size * self._max_cut_fraction:
 
660
            # This block wants to trim itself small enough that we want to
 
661
            # consider it under-utilized.
 
662
            return False
 
663
        # TODO: This code is meant to be the twin of _insert_record_stream's
 
664
        #       'start_new_block' logic. It would probably be better to factor
 
665
        #       out that logic into a shared location, so that it stays
 
666
        #       together better
 
667
        # We currently assume a block is properly utilized whenever it is >75%
 
668
        # of the size of a 'full' block. In normal operation, a block is
 
669
        # considered full when it hits 4MB of same-file content. So any block
 
670
        # >3MB is 'full enough'.
 
671
        # The only time this isn't true is when a given block has large-object
 
672
        # content. (a single file >4MB, etc.)
 
673
        # Under these circumstances, we allow a block to grow to
 
674
        # 2 x largest_content.  Which means that if a given block had a large
 
675
        # object, it may actually be under-utilized. However, given that this
 
676
        # is 'pack-on-the-fly' it is probably reasonable to not repack large
 
677
        # content blobs on-the-fly. Note that because we return False for all
 
678
        # 1-item blobs, we will repack them; we may wish to reevaluate our
 
679
        # treatment of large object blobs in the future.
 
680
        if block_size >= self._full_enough_block_size:
 
681
            return True
 
682
        # If a block is <3MB, it still may be considered 'full' if it contains
 
683
        # mixed content. The current rule is 2MB of mixed content is considered
 
684
        # full. So check to see if this block contains mixed content, and
 
685
        # set the threshold appropriately.
 
686
        common_prefix = None
 
687
        for factory in self._factories:
 
688
            prefix = factory.key[:-1]
 
689
            if common_prefix is None:
 
690
                common_prefix = prefix
 
691
            elif prefix != common_prefix:
 
692
                # Mixed content, check the size appropriately
 
693
                if block_size >= self._full_enough_mixed_block_size:
 
694
                    return True
 
695
                break
 
696
        # The content failed both the mixed check and the single-content check
 
697
        # so obviously it is not fully utilized
 
698
        # TODO: there is one other constraint that isn't being checked
 
699
        #       namely, that the entries in the block are in the appropriate
 
700
        #       order. For example, you could insert the entries in exactly
 
701
        #       reverse groupcompress order, and we would think that is ok.
 
702
        #       (all the right objects are in one group, and it is fully
 
703
        #       utilized, etc.) For now, we assume that case is rare,
 
704
        #       especially since we should always fetch in 'groupcompress'
 
705
        #       order.
 
706
        return False
 
707
 
 
708
    def _check_rebuild_block(self):
 
709
        action, last_byte_used, total_bytes_used = self._check_rebuild_action()
 
710
        if action is None:
 
711
            return
 
712
        if action == 'trim':
 
713
            self._trim_block(last_byte_used)
 
714
        elif action == 'rebuild':
 
715
            self._rebuild_block()
 
716
        else:
 
717
            raise ValueError('unknown rebuild action: %r' % (action,))
 
718
 
 
719
    def _wire_bytes(self):
 
720
        """Return a byte stream suitable for transmitting over the wire."""
 
721
        self._check_rebuild_block()
 
722
        # The outer block starts with:
 
723
        #   'groupcompress-block\n'
 
724
        #   <length of compressed key info>\n
 
725
        #   <length of uncompressed info>\n
 
726
        #   <length of gc block>\n
 
727
        #   <header bytes>
 
728
        #   <gc-block>
 
729
        lines = [b'groupcompress-block\n']
 
730
        # The minimal info we need is the key, the start offset, and the
 
731
        # parents. The length and type are encoded in the record itself.
 
732
        # However, passing in the other bits makes it easier.  The list of
 
733
        # keys, and the start offset, the length
 
734
        # 1 line key
 
735
        # 1 line with parents, '' for ()
 
736
        # 1 line for start offset
 
737
        # 1 line for end byte
 
738
        header_lines = []
 
739
        for factory in self._factories:
 
740
            key_bytes = b'\x00'.join(factory.key)
 
741
            parents = factory.parents
 
742
            if parents is None:
 
743
                parent_bytes = b'None:'
 
744
            else:
 
745
                parent_bytes = b'\t'.join(b'\x00'.join(key) for key in parents)
 
746
            record_header = b'%s\n%s\n%d\n%d\n' % (
 
747
                key_bytes, parent_bytes, factory._start, factory._end)
 
748
            header_lines.append(record_header)
 
749
            # TODO: Can we break the refcycle at this point and set
 
750
            #       factory._manager = None?
 
751
        header_bytes = b''.join(header_lines)
 
752
        del header_lines
 
753
        header_bytes_len = len(header_bytes)
 
754
        z_header_bytes = zlib.compress(header_bytes)
 
755
        del header_bytes
 
756
        z_header_bytes_len = len(z_header_bytes)
 
757
        block_bytes_len, block_chunks = self._block.to_chunks()
 
758
        lines.append(b'%d\n%d\n%d\n' % (
 
759
            z_header_bytes_len, header_bytes_len, block_bytes_len))
 
760
        lines.append(z_header_bytes)
 
761
        lines.extend(block_chunks)
 
762
        del z_header_bytes, block_chunks
 
763
        # TODO: This is a point where we will double the memory consumption. To
 
764
        #       avoid this, we probably have to switch to a 'chunked' api
 
765
        return b''.join(lines)
 
766
 
 
767
    @classmethod
 
768
    def from_bytes(cls, bytes):
 
769
        # TODO: This does extra string copying, probably better to do it a
 
770
        #       different way. At a minimum this creates 2 copies of the
 
771
        #       compressed content
 
772
        (storage_kind, z_header_len, header_len,
 
773
         block_len, rest) = bytes.split(b'\n', 4)
 
774
        del bytes
 
775
        if storage_kind != b'groupcompress-block':
 
776
            raise ValueError('Unknown storage kind: %s' % (storage_kind,))
 
777
        z_header_len = int(z_header_len)
 
778
        if len(rest) < z_header_len:
 
779
            raise ValueError('Compressed header len shorter than all bytes')
 
780
        z_header = rest[:z_header_len]
 
781
        header_len = int(header_len)
 
782
        header = zlib.decompress(z_header)
 
783
        if len(header) != header_len:
 
784
            raise ValueError('invalid length for decompressed bytes')
 
785
        del z_header
 
786
        block_len = int(block_len)
 
787
        if len(rest) != z_header_len + block_len:
 
788
            raise ValueError('Invalid length for block')
 
789
        block_bytes = rest[z_header_len:]
 
790
        del rest
 
791
        # So now we have a valid GCB, we just need to parse the factories that
 
792
        # were sent to us
 
793
        header_lines = header.split(b'\n')
 
794
        del header
 
795
        last = header_lines.pop()
 
796
        if last != b'':
 
797
            raise ValueError('header lines did not end with a trailing'
 
798
                             ' newline')
 
799
        if len(header_lines) % 4 != 0:
 
800
            raise ValueError('The header was not an even multiple of 4 lines')
 
801
        block = GroupCompressBlock.from_bytes(block_bytes)
 
802
        del block_bytes
 
803
        result = cls(block)
 
804
        for start in range(0, len(header_lines), 4):
 
805
            # intern()?
 
806
            key = tuple(header_lines[start].split(b'\x00'))
 
807
            parents_line = header_lines[start + 1]
 
808
            if parents_line == b'None:':
 
809
                parents = None
 
810
            else:
 
811
                parents = tuple([tuple(segment.split(b'\x00'))
 
812
                                 for segment in parents_line.split(b'\t')
 
813
                                 if segment])
 
814
            start_offset = int(header_lines[start + 2])
 
815
            end_offset = int(header_lines[start + 3])
 
816
            result.add_factory(key, parents, start_offset, end_offset)
 
817
        return result
 
818
 
 
819
 
 
820
def network_block_to_records(storage_kind, bytes, line_end):
 
821
    if storage_kind != 'groupcompress-block':
 
822
        raise ValueError('Unknown storage kind: %s' % (storage_kind,))
 
823
    manager = _LazyGroupContentManager.from_bytes(bytes)
 
824
    return manager.get_record_stream()
 
825
 
 
826
 
 
827
class _CommonGroupCompressor(object):
 
828
 
 
829
    def __init__(self, settings=None):
 
830
        """Create a GroupCompressor."""
 
831
        self.chunks = []
 
832
        self._last = None
 
833
        self.endpoint = 0
 
834
        self.input_bytes = 0
 
835
        self.labels_deltas = {}
 
836
        self._delta_index = None  # Set by the children
 
837
        self._block = GroupCompressBlock()
 
838
        if settings is None:
 
839
            self._settings = {}
 
840
        else:
 
841
            self._settings = settings
 
842
 
 
843
    def compress(self, key, chunks, length, expected_sha, nostore_sha=None,
 
844
                 soft=False):
 
845
        """Compress lines with label key.
 
846
 
 
847
        :param key: A key tuple. It is stored in the output
 
848
            for identification of the text during decompression. If the last
 
849
            element is b'None' it is replaced with the sha1 of the text -
 
850
            e.g. sha1:xxxxxxx.
 
851
        :param chunks: Chunks of bytes to be compressed
 
852
        :param length: Length of chunks
 
853
        :param expected_sha: If non-None, the sha the lines are believed to
 
854
            have. During compression the sha is calculated; a mismatch will
 
855
            cause an error.
 
856
        :param nostore_sha: If the computed sha1 sum matches, we will raise
 
857
            ExistingContent rather than adding the text.
 
858
        :param soft: Do a 'soft' compression. This means that we require larger
 
859
            ranges to match to be considered for a copy command.
 
860
 
 
861
        :return: The sha1 of lines, the start and end offsets in the delta, and
 
862
            the type ('fulltext' or 'delta').
 
863
 
 
864
        :seealso VersionedFiles.add_lines:
 
865
        """
 
866
        if length == 0:  # empty, like a dir entry, etc
 
867
            if nostore_sha == _null_sha1:
 
868
                raise ExistingContent()
 
869
            return _null_sha1, 0, 0, 'fulltext'
 
870
        # we assume someone knew what they were doing when they passed it in
 
871
        if expected_sha is not None:
 
872
            sha1 = expected_sha
 
873
        else:
 
874
            sha1 = osutils.sha_strings(chunks)
 
875
        if nostore_sha is not None:
 
876
            if sha1 == nostore_sha:
 
877
                raise ExistingContent()
 
878
        if key[-1] is None:
 
879
            key = key[:-1] + (b'sha1:' + sha1,)
 
880
 
 
881
        start, end, type = self._compress(key, chunks, length, length / 2, soft)
 
882
        return sha1, start, end, type
 
883
 
 
884
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
885
        """Compress lines with label key.
 
886
 
 
887
        :param key: A key tuple. It is stored in the output for identification
 
888
            of the text during decompression.
 
889
 
 
890
        :param chunks: The chunks of bytes to be compressed
 
891
 
 
892
        :param input_len: The length of the chunks
 
893
 
 
894
        :param max_delta_size: The size above which we issue a fulltext instead
 
895
            of a delta.
 
896
 
 
897
        :param soft: Do a 'soft' compression. This means that we require larger
 
898
            ranges to match to be considered for a copy command.
 
899
 
 
900
        :return: The sha1 of lines, the start and end offsets in the delta, and
 
901
            the type ('fulltext' or 'delta').
 
902
        """
 
903
        raise NotImplementedError(self._compress)
 
904
 
 
905
    def extract(self, key):
 
906
        """Extract a key previously added to the compressor.
 
907
 
 
908
        :param key: The key to extract.
 
909
        :return: An iterable over chunks and the sha1.
 
910
        """
 
911
        (start_byte, start_chunk, end_byte,
 
912
         end_chunk) = self.labels_deltas[key]
 
913
        delta_chunks = self.chunks[start_chunk:end_chunk]
 
914
        stored_bytes = b''.join(delta_chunks)
 
915
        kind = stored_bytes[:1]
 
916
        if kind == b'f':
 
917
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
 
918
            data_len = fulltext_len + 1 + offset
 
919
            if data_len != len(stored_bytes):
 
920
                raise ValueError('Index claimed fulltext len, but stored bytes'
 
921
                                 ' claim %s != %s'
 
922
                                 % (len(stored_bytes), data_len))
 
923
            data = [stored_bytes[offset + 1:]]
 
924
        else:
 
925
            if kind != b'd':
 
926
                raise ValueError('Unknown content kind, bytes claim %s' % kind)
 
927
            # XXX: This is inefficient at best
 
928
            source = b''.join(self.chunks[:start_chunk])
 
929
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
 
930
            data_len = delta_len + 1 + offset
 
931
            if data_len != len(stored_bytes):
 
932
                raise ValueError('Index claimed delta len, but stored bytes'
 
933
                                 ' claim %s != %s'
 
934
                                 % (len(stored_bytes), data_len))
 
935
            data = [apply_delta(source, stored_bytes[offset + 1:])]
 
936
        data_sha1 = osutils.sha_strings(data)
 
937
        return data, data_sha1
 
938
 
 
939
    def flush(self):
 
940
        """Finish this group, creating a formatted stream.
 
941
 
 
942
        After calling this, the compressor should no longer be used
 
943
        """
 
944
        self._block.set_chunked_content(self.chunks, self.endpoint)
 
945
        self.chunks = None
 
946
        self._delta_index = None
 
947
        return self._block
 
948
 
 
949
    def pop_last(self):
 
950
        """Call this if you want to 'revoke' the last compression.
 
951
 
 
952
        After this, the data structures will be rolled back, but you cannot do
 
953
        more compression.
 
954
        """
 
955
        self._delta_index = None
 
956
        del self.chunks[self._last[0]:]
 
957
        self.endpoint = self._last[1]
 
958
        self._last = None
 
959
 
 
960
    def ratio(self):
 
961
        """Return the overall compression ratio."""
 
962
        return float(self.input_bytes) / float(self.endpoint)
 
963
 
 
964
 
 
965
class PythonGroupCompressor(_CommonGroupCompressor):
 
966
 
 
967
    def __init__(self, settings=None):
 
968
        """Create a GroupCompressor.
 
969
 
 
970
        Used only if the pyrex version is not available.
 
971
        """
 
972
        super(PythonGroupCompressor, self).__init__(settings)
 
973
        self._delta_index = LinesDeltaIndex([])
 
974
        # The actual content is managed by LinesDeltaIndex
 
975
        self.chunks = self._delta_index.lines
 
976
 
 
977
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
978
        """see _CommonGroupCompressor._compress"""
 
979
        new_lines = osutils.chunks_to_lines(chunks)
 
980
        out_lines, index_lines = self._delta_index.make_delta(
 
981
            new_lines, bytes_length=input_len, soft=soft)
 
982
        delta_length = sum(map(len, out_lines))
 
983
        if delta_length > max_delta_size:
 
984
            # The delta is longer than the fulltext, insert a fulltext
 
985
            type = 'fulltext'
 
986
            out_lines = [b'f', encode_base128_int(input_len)]
 
987
            out_lines.extend(new_lines)
 
988
            index_lines = [False, False]
 
989
            index_lines.extend([True] * len(new_lines))
 
990
        else:
 
991
            # this is a worthy delta, output it
 
992
            type = 'delta'
 
993
            out_lines[0] = b'd'
 
994
            # Update the delta_length to include those two encoded integers
 
995
            out_lines[1] = encode_base128_int(delta_length)
 
996
        # Before insertion
 
997
        start = self.endpoint
 
998
        chunk_start = len(self.chunks)
 
999
        self._last = (chunk_start, self.endpoint)
 
1000
        self._delta_index.extend_lines(out_lines, index_lines)
 
1001
        self.endpoint = self._delta_index.endpoint
 
1002
        self.input_bytes += input_len
 
1003
        chunk_end = len(self.chunks)
 
1004
        self.labels_deltas[key] = (start, chunk_start,
 
1005
                                   self.endpoint, chunk_end)
 
1006
        return start, self.endpoint, type
 
1007
 
 
1008
 
 
1009
class PyrexGroupCompressor(_CommonGroupCompressor):
 
1010
    """Produce a serialised group of compressed texts.
 
1011
 
 
1012
    It contains code very similar to SequenceMatcher because of having a similar
 
1013
    task. However some key differences apply:
 
1014
 
 
1015
    * there is no junk, we want a minimal edit not a human readable diff.
 
1016
    * we don't filter very common lines (because we don't know where a good
 
1017
      range will start, and after the first text we want to be emitting minmal
 
1018
      edits only.
 
1019
    * we chain the left side, not the right side
 
1020
    * we incrementally update the adjacency matrix as new lines are provided.
 
1021
    * we look for matches in all of the left side, so the routine which does
 
1022
      the analagous task of find_longest_match does not need to filter on the
 
1023
      left side.
 
1024
    """
 
1025
 
 
1026
    def __init__(self, settings=None):
 
1027
        super(PyrexGroupCompressor, self).__init__(settings)
 
1028
        max_bytes_to_index = self._settings.get('max_bytes_to_index', 0)
 
1029
        self._delta_index = DeltaIndex(max_bytes_to_index=max_bytes_to_index)
 
1030
 
 
1031
    def _compress(self, key, chunks, input_len, max_delta_size, soft=False):
 
1032
        """see _CommonGroupCompressor._compress"""
 
1033
        # By having action/label/sha1/len, we can parse the group if the index
 
1034
        # was ever destroyed, we have the key in 'label', we know the final
 
1035
        # bytes are valid from sha1, and we know where to find the end of this
 
1036
        # record because of 'len'. (the delta record itself will store the
 
1037
        # total length for the expanded record)
 
1038
        # 'len: %d\n' costs approximately 1% increase in total data
 
1039
        # Having the labels at all costs us 9-10% increase, 38% increase for
 
1040
        # inventory pages, and 5.8% increase for text pages
 
1041
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
 
1042
        if self._delta_index._source_offset != self.endpoint:
 
1043
            raise AssertionError('_source_offset != endpoint'
 
1044
                                 ' somehow the DeltaIndex got out of sync with'
 
1045
                                 ' the output lines')
 
1046
        bytes = b''.join(chunks)
 
1047
        delta = self._delta_index.make_delta(bytes, max_delta_size)
 
1048
        if delta is None:
 
1049
            type = 'fulltext'
 
1050
            enc_length = encode_base128_int(input_len)
 
1051
            len_mini_header = 1 + len(enc_length)
 
1052
            self._delta_index.add_source(bytes, len_mini_header)
 
1053
            new_chunks = [b'f', enc_length] + chunks
 
1054
        else:
 
1055
            type = 'delta'
 
1056
            enc_length = encode_base128_int(len(delta))
 
1057
            len_mini_header = 1 + len(enc_length)
 
1058
            new_chunks = [b'd', enc_length, delta]
 
1059
            self._delta_index.add_delta_source(delta, len_mini_header)
 
1060
        # Before insertion
 
1061
        start = self.endpoint
 
1062
        chunk_start = len(self.chunks)
 
1063
        # Now output these bytes
 
1064
        self._output_chunks(new_chunks)
 
1065
        self.input_bytes += input_len
 
1066
        chunk_end = len(self.chunks)
 
1067
        self.labels_deltas[key] = (start, chunk_start,
 
1068
                                   self.endpoint, chunk_end)
 
1069
        if not self._delta_index._source_offset == self.endpoint:
 
1070
            raise AssertionError('the delta index is out of sync'
 
1071
                                 'with the output lines %s != %s'
 
1072
                                 % (self._delta_index._source_offset, self.endpoint))
 
1073
        return start, self.endpoint, type
 
1074
 
 
1075
    def _output_chunks(self, new_chunks):
 
1076
        """Output some chunks.
 
1077
 
 
1078
        :param new_chunks: The chunks to output.
 
1079
        """
 
1080
        self._last = (len(self.chunks), self.endpoint)
 
1081
        endpoint = self.endpoint
 
1082
        self.chunks.extend(new_chunks)
 
1083
        endpoint += sum(map(len, new_chunks))
 
1084
        self.endpoint = endpoint
 
1085
 
 
1086
 
 
1087
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
 
1088
    """Create a factory for creating a pack based groupcompress.
 
1089
 
 
1090
    This is only functional enough to run interface tests, it doesn't try to
 
1091
    provide a full pack environment.
 
1092
 
 
1093
    :param graph: Store a graph.
 
1094
    :param delta: Delta compress contents.
 
1095
    :param keylength: How long should keys be.
 
1096
    """
 
1097
    def factory(transport):
 
1098
        parents = graph
 
1099
        ref_length = 0
 
1100
        if graph:
 
1101
            ref_length = 1
 
1102
        graph_index = BTreeBuilder(reference_lists=ref_length,
 
1103
                                   key_elements=keylength)
 
1104
        stream = transport.open_write_stream('newpack')
 
1105
        writer = pack.ContainerWriter(stream.write)
 
1106
        writer.begin()
 
1107
        index = _GCGraphIndex(graph_index, lambda: True, parents=parents,
 
1108
                              add_callback=graph_index.add_nodes,
 
1109
                              inconsistency_fatal=inconsistency_fatal)
 
1110
        access = pack_repo._DirectPackAccess({})
 
1111
        access.set_writer(writer, graph_index, (transport, 'newpack'))
 
1112
        result = GroupCompressVersionedFiles(index, access, delta)
 
1113
        result.stream = stream
 
1114
        result.writer = writer
 
1115
        return result
 
1116
    return factory
 
1117
 
 
1118
 
 
1119
def cleanup_pack_group(versioned_files):
 
1120
    versioned_files.writer.end()
 
1121
    versioned_files.stream.close()
 
1122
 
 
1123
 
 
1124
class _BatchingBlockFetcher(object):
 
1125
    """Fetch group compress blocks in batches.
 
1126
 
 
1127
    :ivar total_bytes: int of expected number of bytes needed to fetch the
 
1128
        currently pending batch.
 
1129
    """
 
1130
 
 
1131
    def __init__(self, gcvf, locations, get_compressor_settings=None):
 
1132
        self.gcvf = gcvf
 
1133
        self.locations = locations
 
1134
        self.keys = []
 
1135
        self.batch_memos = {}
 
1136
        self.memos_to_get = []
 
1137
        self.total_bytes = 0
 
1138
        self.last_read_memo = None
 
1139
        self.manager = None
 
1140
        self._get_compressor_settings = get_compressor_settings
 
1141
 
 
1142
    def add_key(self, key):
 
1143
        """Add another to key to fetch.
 
1144
 
 
1145
        :return: The estimated number of bytes needed to fetch the batch so
 
1146
            far.
 
1147
        """
 
1148
        self.keys.append(key)
 
1149
        index_memo, _, _, _ = self.locations[key]
 
1150
        read_memo = index_memo[0:3]
 
1151
        # Three possibilities for this read_memo:
 
1152
        #  - it's already part of this batch; or
 
1153
        #  - it's not yet part of this batch, but is already cached; or
 
1154
        #  - it's not yet part of this batch and will need to be fetched.
 
1155
        if read_memo in self.batch_memos:
 
1156
            # This read memo is already in this batch.
 
1157
            return self.total_bytes
 
1158
        try:
 
1159
            cached_block = self.gcvf._group_cache[read_memo]
 
1160
        except KeyError:
 
1161
            # This read memo is new to this batch, and the data isn't cached
 
1162
            # either.
 
1163
            self.batch_memos[read_memo] = None
 
1164
            self.memos_to_get.append(read_memo)
 
1165
            byte_length = read_memo[2]
 
1166
            self.total_bytes += byte_length
 
1167
        else:
 
1168
            # This read memo is new to this batch, but cached.
 
1169
            # Keep a reference to the cached block in batch_memos because it's
 
1170
            # certain that we'll use it when this batch is processed, but
 
1171
            # there's a risk that it would fall out of _group_cache between now
 
1172
            # and then.
 
1173
            self.batch_memos[read_memo] = cached_block
 
1174
        return self.total_bytes
 
1175
 
 
1176
    def _flush_manager(self):
 
1177
        if self.manager is not None:
 
1178
            for factory in self.manager.get_record_stream():
 
1179
                yield factory
 
1180
            self.manager = None
 
1181
            self.last_read_memo = None
 
1182
 
 
1183
    def yield_factories(self, full_flush=False):
 
1184
        """Yield factories for keys added since the last yield.  They will be
 
1185
        returned in the order they were added via add_key.
 
1186
 
 
1187
        :param full_flush: by default, some results may not be returned in case
 
1188
            they can be part of the next batch.  If full_flush is True, then
 
1189
            all results are returned.
 
1190
        """
 
1191
        if self.manager is None and not self.keys:
 
1192
            return
 
1193
        # Fetch all memos in this batch.
 
1194
        blocks = self.gcvf._get_blocks(self.memos_to_get)
 
1195
        # Turn blocks into factories and yield them.
 
1196
        memos_to_get_stack = list(self.memos_to_get)
 
1197
        memos_to_get_stack.reverse()
 
1198
        for key in self.keys:
 
1199
            index_memo, _, parents, _ = self.locations[key]
 
1200
            read_memo = index_memo[:3]
 
1201
            if self.last_read_memo != read_memo:
 
1202
                # We are starting a new block. If we have a
 
1203
                # manager, we have found everything that fits for
 
1204
                # now, so yield records
 
1205
                for factory in self._flush_manager():
 
1206
                    yield factory
 
1207
                # Now start a new manager.
 
1208
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
 
1209
                    # The next block from _get_blocks will be the block we
 
1210
                    # need.
 
1211
                    block_read_memo, block = next(blocks)
 
1212
                    if block_read_memo != read_memo:
 
1213
                        raise AssertionError(
 
1214
                            "block_read_memo out of sync with read_memo"
 
1215
                            "(%r != %r)" % (block_read_memo, read_memo))
 
1216
                    self.batch_memos[read_memo] = block
 
1217
                    memos_to_get_stack.pop()
 
1218
                else:
 
1219
                    block = self.batch_memos[read_memo]
 
1220
                self.manager = _LazyGroupContentManager(block,
 
1221
                                                        get_compressor_settings=self._get_compressor_settings)
 
1222
                self.last_read_memo = read_memo
 
1223
            start, end = index_memo[3:5]
 
1224
            self.manager.add_factory(key, parents, start, end)
 
1225
        if full_flush:
 
1226
            for factory in self._flush_manager():
 
1227
                yield factory
 
1228
        del self.keys[:]
 
1229
        self.batch_memos.clear()
 
1230
        del self.memos_to_get[:]
 
1231
        self.total_bytes = 0
 
1232
 
 
1233
 
 
1234
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
 
1235
    """A group-compress based VersionedFiles implementation."""
 
1236
 
 
1237
    # This controls how the GroupCompress DeltaIndex works. Basically, we
 
1238
    # compute hash pointers into the source blocks (so hash(text) => text).
 
1239
    # However each of these references costs some memory in trade against a
 
1240
    # more accurate match result. For very large files, they either are
 
1241
    # pre-compressed and change in bulk whenever they change, or change in just
 
1242
    # local blocks. Either way, 'improved resolution' is not very helpful,
 
1243
    # versus running out of memory trying to track everything. The default max
 
1244
    # gives 100% sampling of a 1MB file.
 
1245
    _DEFAULT_MAX_BYTES_TO_INDEX = 1024 * 1024
 
1246
    _DEFAULT_COMPRESSOR_SETTINGS = {'max_bytes_to_index':
 
1247
                                    _DEFAULT_MAX_BYTES_TO_INDEX}
 
1248
 
 
1249
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1250
                 _group_cache=None):
 
1251
        """Create a GroupCompressVersionedFiles object.
 
1252
 
 
1253
        :param index: The index object storing access and graph data.
 
1254
        :param access: The access object storing raw data.
 
1255
        :param delta: Whether to delta compress or just entropy compress.
 
1256
        :param _unadded_refs: private parameter, don't use.
 
1257
        :param _group_cache: private parameter, don't use.
 
1258
        """
 
1259
        self._index = index
 
1260
        self._access = access
 
1261
        self._delta = delta
 
1262
        if _unadded_refs is None:
 
1263
            _unadded_refs = {}
 
1264
        self._unadded_refs = _unadded_refs
 
1265
        if _group_cache is None:
 
1266
            _group_cache = LRUSizeCache(max_size=50 * 1024 * 1024)
 
1267
        self._group_cache = _group_cache
 
1268
        self._immediate_fallback_vfs = []
 
1269
        self._max_bytes_to_index = None
 
1270
 
 
1271
    def without_fallbacks(self):
 
1272
        """Return a clone of this object without any fallbacks configured."""
 
1273
        return GroupCompressVersionedFiles(self._index, self._access,
 
1274
                                           self._delta, _unadded_refs=dict(
 
1275
                                               self._unadded_refs),
 
1276
                                           _group_cache=self._group_cache)
 
1277
 
 
1278
    def add_lines(self, key, parents, lines, parent_texts=None,
 
1279
                  left_matching_blocks=None, nostore_sha=None, random_id=False,
 
1280
                  check_content=True):
 
1281
        """Add a text to the store.
 
1282
 
 
1283
        :param key: The key tuple of the text to add.
 
1284
        :param parents: The parents key tuples of the text to add.
 
1285
        :param lines: A list of lines. Each line must be a bytestring. And all
 
1286
            of them except the last must be terminated with \\n and contain no
 
1287
            other \\n's. The last line may either contain no \\n's or a single
 
1288
            terminating \\n. If the lines list does meet this constraint the
 
1289
            add routine may error or may succeed - but you will be unable to
 
1290
            read the data back accurately. (Checking the lines have been split
 
1291
            correctly is expensive and extremely unlikely to catch bugs so it
 
1292
            is not done at runtime unless check_content is True.)
 
1293
        :param parent_texts: An optional dictionary containing the opaque
 
1294
            representations of some or all of the parents of version_id to
 
1295
            allow delta optimisations.  VERY IMPORTANT: the texts must be those
 
1296
            returned by add_lines or data corruption can be caused.
 
1297
        :param left_matching_blocks: a hint about which areas are common
 
1298
            between the text and its left-hand-parent.  The format is
 
1299
            the SequenceMatcher.get_matching_blocks format.
 
1300
        :param nostore_sha: Raise ExistingContent and do not add the lines to
 
1301
            the versioned file if the digest of the lines matches this.
 
1302
        :param random_id: If True a random id has been selected rather than
 
1303
            an id determined by some deterministic process such as a converter
 
1304
            from a foreign VCS. When True the backend may choose not to check
 
1305
            for uniqueness of the resulting key within the versioned file, so
 
1306
            this should only be done when the result is expected to be unique
 
1307
            anyway.
 
1308
        :param check_content: If True, the lines supplied are verified to be
 
1309
            bytestrings that are correctly formed lines.
 
1310
        :return: The text sha1, the number of bytes in the text, and an opaque
 
1311
                 representation of the inserted version which can be provided
 
1312
                 back to future add_lines calls in the parent_texts dictionary.
 
1313
        """
 
1314
        self._index._check_write_ok()
 
1315
        if check_content:
 
1316
            self._check_lines_not_unicode(lines)
 
1317
            self._check_lines_are_lines(lines)
 
1318
        return self.add_content(
 
1319
            ChunkedContentFactory(
 
1320
                key, parents, osutils.sha_strings(lines), lines, chunks_are_lines=True),
 
1321
            parent_texts, left_matching_blocks, nostore_sha, random_id)
 
1322
 
 
1323
    def add_content(self, factory, parent_texts=None,
 
1324
                    left_matching_blocks=None, nostore_sha=None,
 
1325
                    random_id=False):
 
1326
        """Add a text to the store.
 
1327
 
 
1328
        :param factory: A ContentFactory that can be used to retrieve the key,
 
1329
            parents and contents.
 
1330
        :param parent_texts: An optional dictionary containing the opaque
 
1331
            representations of some or all of the parents of version_id to
 
1332
            allow delta optimisations.  VERY IMPORTANT: the texts must be those
 
1333
            returned by add_lines or data corruption can be caused.
 
1334
        :param left_matching_blocks: a hint about which areas are common
 
1335
            between the text and its left-hand-parent.  The format is
 
1336
            the SequenceMatcher.get_matching_blocks format.
 
1337
        :param nostore_sha: Raise ExistingContent and do not add the lines to
 
1338
            the versioned file if the digest of the lines matches this.
 
1339
        :param random_id: If True a random id has been selected rather than
 
1340
            an id determined by some deterministic process such as a converter
 
1341
            from a foreign VCS. When True the backend may choose not to check
 
1342
            for uniqueness of the resulting key within the versioned file, so
 
1343
            this should only be done when the result is expected to be unique
 
1344
            anyway.
 
1345
        :return: The text sha1, the number of bytes in the text, and an opaque
 
1346
                 representation of the inserted version which can be provided
 
1347
                 back to future add_lines calls in the parent_texts dictionary.
 
1348
        """
 
1349
        self._index._check_write_ok()
 
1350
        parents = factory.parents
 
1351
        self._check_add(factory.key, random_id)
 
1352
        if parents is None:
 
1353
            # The caller might pass None if there is no graph data, but kndx
 
1354
            # indexes can't directly store that, so we give them
 
1355
            # an empty tuple instead.
 
1356
            parents = ()
 
1357
        # double handling for now. Make it work until then.
 
1358
        sha1, length = list(self._insert_record_stream(
 
1359
            [factory], random_id=random_id, nostore_sha=nostore_sha))[0]
 
1360
        return sha1, length, None
 
1361
 
 
1362
    def add_fallback_versioned_files(self, a_versioned_files):
 
1363
        """Add a source of texts for texts not present in this knit.
 
1364
 
 
1365
        :param a_versioned_files: A VersionedFiles object.
 
1366
        """
 
1367
        self._immediate_fallback_vfs.append(a_versioned_files)
 
1368
 
 
1369
    def annotate(self, key):
 
1370
        """See VersionedFiles.annotate."""
 
1371
        ann = annotate.Annotator(self)
 
1372
        return ann.annotate_flat(key)
 
1373
 
 
1374
    def get_annotator(self):
 
1375
        return annotate.Annotator(self)
 
1376
 
 
1377
    def check(self, progress_bar=None, keys=None):
 
1378
        """See VersionedFiles.check()."""
 
1379
        if keys is None:
 
1380
            keys = self.keys()
 
1381
            for record in self.get_record_stream(keys, 'unordered', True):
 
1382
                for chunk in record.iter_bytes_as('chunked'):
 
1383
                    pass
 
1384
        else:
 
1385
            return self.get_record_stream(keys, 'unordered', True)
 
1386
 
 
1387
    def clear_cache(self):
 
1388
        """See VersionedFiles.clear_cache()"""
 
1389
        self._group_cache.clear()
 
1390
        self._index._graph_index.clear_cache()
 
1391
        self._index._int_cache.clear()
 
1392
 
 
1393
    def _check_add(self, key, random_id):
 
1394
        """check that version_id and lines are safe to add."""
 
1395
        version_id = key[-1]
 
1396
        if version_id is not None:
 
1397
            if osutils.contains_whitespace(version_id):
 
1398
                raise errors.InvalidRevisionId(version_id, self)
 
1399
        self.check_not_reserved_id(version_id)
 
1400
        # TODO: If random_id==False and the key is already present, we should
 
1401
        # probably check that the existing content is identical to what is
 
1402
        # being inserted, and otherwise raise an exception.  This would make
 
1403
        # the bundle code simpler.
 
1404
 
 
1405
    def get_parent_map(self, keys):
 
1406
        """Get a map of the graph parents of keys.
 
1407
 
 
1408
        :param keys: The keys to look up parents for.
 
1409
        :return: A mapping from keys to parents. Absent keys are absent from
 
1410
            the mapping.
 
1411
        """
 
1412
        return self._get_parent_map_with_sources(keys)[0]
 
1413
 
 
1414
    def _get_parent_map_with_sources(self, keys):
 
1415
        """Get a map of the parents of keys.
 
1416
 
 
1417
        :param keys: The keys to look up parents for.
 
1418
        :return: A tuple. The first element is a mapping from keys to parents.
 
1419
            Absent keys are absent from the mapping. The second element is a
 
1420
            list with the locations each key was found in. The first element
 
1421
            is the in-this-knit parents, the second the first fallback source,
 
1422
            and so on.
 
1423
        """
 
1424
        result = {}
 
1425
        sources = [self._index] + self._immediate_fallback_vfs
 
1426
        source_results = []
 
1427
        missing = set(keys)
 
1428
        for source in sources:
 
1429
            if not missing:
 
1430
                break
 
1431
            new_result = source.get_parent_map(missing)
 
1432
            source_results.append(new_result)
 
1433
            result.update(new_result)
 
1434
            missing.difference_update(set(new_result))
 
1435
        return result, source_results
 
1436
 
 
1437
    def _get_blocks(self, read_memos):
 
1438
        """Get GroupCompressBlocks for the given read_memos.
 
1439
 
 
1440
        :returns: a series of (read_memo, block) pairs, in the order they were
 
1441
            originally passed.
 
1442
        """
 
1443
        cached = {}
 
1444
        for read_memo in read_memos:
 
1445
            try:
 
1446
                block = self._group_cache[read_memo]
 
1447
            except KeyError:
 
1448
                pass
 
1449
            else:
 
1450
                cached[read_memo] = block
 
1451
        not_cached = []
 
1452
        not_cached_seen = set()
 
1453
        for read_memo in read_memos:
 
1454
            if read_memo in cached:
 
1455
                # Don't fetch what we already have
 
1456
                continue
 
1457
            if read_memo in not_cached_seen:
 
1458
                # Don't try to fetch the same data twice
 
1459
                continue
 
1460
            not_cached.append(read_memo)
 
1461
            not_cached_seen.add(read_memo)
 
1462
        raw_records = self._access.get_raw_records(not_cached)
 
1463
        for read_memo in read_memos:
 
1464
            try:
 
1465
                yield read_memo, cached[read_memo]
 
1466
            except KeyError:
 
1467
                # Read the block, and cache it.
 
1468
                zdata = next(raw_records)
 
1469
                block = GroupCompressBlock.from_bytes(zdata)
 
1470
                self._group_cache[read_memo] = block
 
1471
                cached[read_memo] = block
 
1472
                yield read_memo, block
 
1473
 
 
1474
    def get_missing_compression_parent_keys(self):
 
1475
        """Return the keys of missing compression parents.
 
1476
 
 
1477
        Missing compression parents occur when a record stream was missing
 
1478
        basis texts, or a index was scanned that had missing basis texts.
 
1479
        """
 
1480
        # GroupCompress cannot currently reference texts that are not in the
 
1481
        # group, so this is valid for now
 
1482
        return frozenset()
 
1483
 
 
1484
    def get_record_stream(self, keys, ordering, include_delta_closure):
 
1485
        """Get a stream of records for keys.
 
1486
 
 
1487
        :param keys: The keys to include.
 
1488
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
1489
            sorted stream has compression parents strictly before their
 
1490
            children.
 
1491
        :param include_delta_closure: If True then the closure across any
 
1492
            compression parents will be included (in the opaque data).
 
1493
        :return: An iterator of ContentFactory objects, each of which is only
 
1494
            valid until the iterator is advanced.
 
1495
        """
 
1496
        # keys might be a generator
 
1497
        orig_keys = list(keys)
 
1498
        keys = set(keys)
 
1499
        if not keys:
 
1500
            return
 
1501
        if (not self._index.has_graph
 
1502
                and ordering in ('topological', 'groupcompress')):
 
1503
            # Cannot topological order when no graph has been stored.
 
1504
            # but we allow 'as-requested' or 'unordered'
 
1505
            ordering = 'unordered'
 
1506
 
 
1507
        remaining_keys = keys
 
1508
        while True:
 
1509
            try:
 
1510
                keys = set(remaining_keys)
 
1511
                for content_factory in self._get_remaining_record_stream(keys,
 
1512
                                                                         orig_keys, ordering, include_delta_closure):
 
1513
                    remaining_keys.discard(content_factory.key)
 
1514
                    yield content_factory
 
1515
                return
 
1516
            except errors.RetryWithNewPacks as e:
 
1517
                self._access.reload_or_raise(e)
 
1518
 
 
1519
    def _find_from_fallback(self, missing):
 
1520
        """Find whatever keys you can from the fallbacks.
 
1521
 
 
1522
        :param missing: A set of missing keys. This set will be mutated as keys
 
1523
            are found from a fallback_vfs
 
1524
        :return: (parent_map, key_to_source_map, source_results)
 
1525
            parent_map  the overall key => parent_keys
 
1526
            key_to_source_map   a dict from {key: source}
 
1527
            source_results      a list of (source: keys)
 
1528
        """
 
1529
        parent_map = {}
 
1530
        key_to_source_map = {}
 
1531
        source_results = []
 
1532
        for source in self._immediate_fallback_vfs:
 
1533
            if not missing:
 
1534
                break
 
1535
            source_parents = source.get_parent_map(missing)
 
1536
            parent_map.update(source_parents)
 
1537
            source_parents = list(source_parents)
 
1538
            source_results.append((source, source_parents))
 
1539
            key_to_source_map.update((key, source) for key in source_parents)
 
1540
            missing.difference_update(source_parents)
 
1541
        return parent_map, key_to_source_map, source_results
 
1542
 
 
1543
    def _get_ordered_source_keys(self, ordering, parent_map, key_to_source_map):
 
1544
        """Get the (source, [keys]) list.
 
1545
 
 
1546
        The returned objects should be in the order defined by 'ordering',
 
1547
        which can weave between different sources.
 
1548
 
 
1549
        :param ordering: Must be one of 'topological' or 'groupcompress'
 
1550
        :return: List of [(source, [keys])] tuples, such that all keys are in
 
1551
            the defined order, regardless of source.
 
1552
        """
 
1553
        if ordering == 'topological':
 
1554
            present_keys = tsort.topo_sort(parent_map)
 
1555
        else:
 
1556
            # ordering == 'groupcompress'
 
1557
            # XXX: This only optimizes for the target ordering. We may need
 
1558
            #      to balance that with the time it takes to extract
 
1559
            #      ordering, by somehow grouping based on
 
1560
            #      locations[key][0:3]
 
1561
            present_keys = sort_gc_optimal(parent_map)
 
1562
        # Now group by source:
 
1563
        source_keys = []
 
1564
        current_source = None
 
1565
        for key in present_keys:
 
1566
            source = key_to_source_map.get(key, self)
 
1567
            if source is not current_source:
 
1568
                source_keys.append((source, []))
 
1569
                current_source = source
 
1570
            source_keys[-1][1].append(key)
 
1571
        return source_keys
 
1572
 
 
1573
    def _get_as_requested_source_keys(self, orig_keys, locations, unadded_keys,
 
1574
                                      key_to_source_map):
 
1575
        source_keys = []
 
1576
        current_source = None
 
1577
        for key in orig_keys:
 
1578
            if key in locations or key in unadded_keys:
 
1579
                source = self
 
1580
            elif key in key_to_source_map:
 
1581
                source = key_to_source_map[key]
 
1582
            else:  # absent
 
1583
                continue
 
1584
            if source is not current_source:
 
1585
                source_keys.append((source, []))
 
1586
                current_source = source
 
1587
            source_keys[-1][1].append(key)
 
1588
        return source_keys
 
1589
 
 
1590
    def _get_io_ordered_source_keys(self, locations, unadded_keys,
 
1591
                                    source_result):
 
1592
        def get_group(key):
 
1593
            # This is the group the bytes are stored in, followed by the
 
1594
            # location in the group
 
1595
            return locations[key][0]
 
1596
        # We don't have an ordering for keys in the in-memory object, but
 
1597
        # lets process the in-memory ones first.
 
1598
        present_keys = list(unadded_keys)
 
1599
        present_keys.extend(sorted(locations, key=get_group))
 
1600
        # Now grab all of the ones from other sources
 
1601
        source_keys = [(self, present_keys)]
 
1602
        source_keys.extend(source_result)
 
1603
        return source_keys
 
1604
 
 
1605
    def _get_remaining_record_stream(self, keys, orig_keys, ordering,
 
1606
                                     include_delta_closure):
 
1607
        """Get a stream of records for keys.
 
1608
 
 
1609
        :param keys: The keys to include.
 
1610
        :param ordering: one of 'unordered', 'topological', 'groupcompress' or
 
1611
            'as-requested'
 
1612
        :param include_delta_closure: If True then the closure across any
 
1613
            compression parents will be included (in the opaque data).
 
1614
        :return: An iterator of ContentFactory objects, each of which is only
 
1615
            valid until the iterator is advanced.
 
1616
        """
 
1617
        # Cheap: iterate
 
1618
        locations = self._index.get_build_details(keys)
 
1619
        unadded_keys = set(self._unadded_refs).intersection(keys)
 
1620
        missing = keys.difference(locations)
 
1621
        missing.difference_update(unadded_keys)
 
1622
        (fallback_parent_map, key_to_source_map,
 
1623
         source_result) = self._find_from_fallback(missing)
 
1624
        if ordering in ('topological', 'groupcompress'):
 
1625
            # would be better to not globally sort initially but instead
 
1626
            # start with one key, recurse to its oldest parent, then grab
 
1627
            # everything in the same group, etc.
 
1628
            parent_map = dict((key, details[2]) for key, details in
 
1629
                              locations.items())
 
1630
            for key in unadded_keys:
 
1631
                parent_map[key] = self._unadded_refs[key]
 
1632
            parent_map.update(fallback_parent_map)
 
1633
            source_keys = self._get_ordered_source_keys(ordering, parent_map,
 
1634
                                                        key_to_source_map)
 
1635
        elif ordering == 'as-requested':
 
1636
            source_keys = self._get_as_requested_source_keys(orig_keys,
 
1637
                                                             locations, unadded_keys, key_to_source_map)
 
1638
        else:
 
1639
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
 
1640
            # Otherwise we thrash the _group_cache and destroy performance
 
1641
            source_keys = self._get_io_ordered_source_keys(locations,
 
1642
                                                           unadded_keys, source_result)
 
1643
        for key in missing:
 
1644
            yield AbsentContentFactory(key)
 
1645
        # Batch up as many keys as we can until either:
 
1646
        #  - we encounter an unadded ref, or
 
1647
        #  - we run out of keys, or
 
1648
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
 
1649
        batcher = _BatchingBlockFetcher(self, locations,
 
1650
                                        get_compressor_settings=self._get_compressor_settings)
 
1651
        for source, keys in source_keys:
 
1652
            if source is self:
 
1653
                for key in keys:
 
1654
                    if key in self._unadded_refs:
 
1655
                        # Flush batch, then yield unadded ref from
 
1656
                        # self._compressor.
 
1657
                        for factory in batcher.yield_factories(full_flush=True):
 
1658
                            yield factory
 
1659
                        chunks, sha1 = self._compressor.extract(key)
 
1660
                        parents = self._unadded_refs[key]
 
1661
                        yield ChunkedContentFactory(key, parents, sha1, chunks)
 
1662
                        continue
 
1663
                    if batcher.add_key(key) > BATCH_SIZE:
 
1664
                        # Ok, this batch is big enough.  Yield some results.
 
1665
                        for factory in batcher.yield_factories():
 
1666
                            yield factory
 
1667
            else:
 
1668
                for factory in batcher.yield_factories(full_flush=True):
 
1669
                    yield factory
 
1670
                for record in source.get_record_stream(keys, ordering,
 
1671
                                                       include_delta_closure):
 
1672
                    yield record
 
1673
        for factory in batcher.yield_factories(full_flush=True):
 
1674
            yield factory
 
1675
 
 
1676
    def get_sha1s(self, keys):
 
1677
        """See VersionedFiles.get_sha1s()."""
 
1678
        result = {}
 
1679
        for record in self.get_record_stream(keys, 'unordered', True):
 
1680
            if record.sha1 is not None:
 
1681
                result[record.key] = record.sha1
 
1682
            else:
 
1683
                if record.storage_kind != 'absent':
 
1684
                    result[record.key] = osutils.sha_strings(
 
1685
                        record.iter_bytes_as('chunked'))
 
1686
        return result
 
1687
 
 
1688
    def insert_record_stream(self, stream):
 
1689
        """Insert a record stream into this container.
 
1690
 
 
1691
        :param stream: A stream of records to insert.
 
1692
        :return: None
 
1693
        :seealso VersionedFiles.get_record_stream:
 
1694
        """
 
1695
        # XXX: Setting random_id=True makes
 
1696
        # test_insert_record_stream_existing_keys fail for groupcompress and
 
1697
        # groupcompress-nograph, this needs to be revisited while addressing
 
1698
        # 'bzr branch' performance issues.
 
1699
        for _, _ in self._insert_record_stream(stream, random_id=False):
 
1700
            pass
 
1701
 
 
1702
    def _get_compressor_settings(self):
 
1703
        if self._max_bytes_to_index is None:
 
1704
            # TODO: VersionedFiles don't know about their containing
 
1705
            #       repository, so they don't have much of an idea about their
 
1706
            #       location. So for now, this is only a global option.
 
1707
            c = config.GlobalConfig()
 
1708
            val = c.get_user_option('bzr.groupcompress.max_bytes_to_index')
 
1709
            if val is not None:
 
1710
                try:
 
1711
                    val = int(val)
 
1712
                except ValueError as e:
 
1713
                    trace.warning('Value for '
 
1714
                                  '"bzr.groupcompress.max_bytes_to_index"'
 
1715
                                  ' %r is not an integer'
 
1716
                                  % (val,))
 
1717
                    val = None
 
1718
            if val is None:
 
1719
                val = self._DEFAULT_MAX_BYTES_TO_INDEX
 
1720
            self._max_bytes_to_index = val
 
1721
        return {'max_bytes_to_index': self._max_bytes_to_index}
 
1722
 
 
1723
    def _make_group_compressor(self):
 
1724
        return GroupCompressor(self._get_compressor_settings())
 
1725
 
 
1726
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
 
1727
                              reuse_blocks=True):
 
1728
        """Internal core to insert a record stream into this container.
 
1729
 
 
1730
        This helper function has a different interface than insert_record_stream
 
1731
        to allow add_lines to be minimal, but still return the needed data.
 
1732
 
 
1733
        :param stream: A stream of records to insert.
 
1734
        :param nostore_sha: If the sha1 of a given text matches nostore_sha,
 
1735
            raise ExistingContent, rather than committing the new text.
 
1736
        :param reuse_blocks: If the source is streaming from
 
1737
            groupcompress-blocks, just insert the blocks as-is, rather than
 
1738
            expanding the texts and inserting again.
 
1739
        :return: An iterator over (sha1, length) of the inserted records.
 
1740
        :seealso insert_record_stream:
 
1741
        :seealso add_lines:
 
1742
        """
 
1743
        adapters = {}
 
1744
 
 
1745
        def get_adapter(adapter_key):
 
1746
            try:
 
1747
                return adapters[adapter_key]
 
1748
            except KeyError:
 
1749
                adapter_factory = adapter_registry.get(adapter_key)
 
1750
                adapter = adapter_factory(self)
 
1751
                adapters[adapter_key] = adapter
 
1752
                return adapter
 
1753
        # This will go up to fulltexts for gc to gc fetching, which isn't
 
1754
        # ideal.
 
1755
        self._compressor = self._make_group_compressor()
 
1756
        self._unadded_refs = {}
 
1757
        keys_to_add = []
 
1758
 
 
1759
        def flush():
 
1760
            bytes_len, chunks = self._compressor.flush().to_chunks()
 
1761
            self._compressor = self._make_group_compressor()
 
1762
            # Note: At this point we still have 1 copy of the fulltext (in
 
1763
            #       record and the var 'bytes'), and this generates 2 copies of
 
1764
            #       the compressed text (one for bytes, one in chunks)
 
1765
            # TODO: Figure out how to indicate that we would be happy to free
 
1766
            #       the fulltext content at this point. Note that sometimes we
 
1767
            #       will want it later (streaming CHK pages), but most of the
 
1768
            #       time we won't (everything else)
 
1769
            index, start, length = self._access.add_raw_record(
 
1770
                None, bytes_len, chunks)
 
1771
            nodes = []
 
1772
            for key, reads, refs in keys_to_add:
 
1773
                nodes.append((key, b"%d %d %s" % (start, length, reads), refs))
 
1774
            self._index.add_records(nodes, random_id=random_id)
 
1775
            self._unadded_refs = {}
 
1776
            del keys_to_add[:]
 
1777
 
 
1778
        last_prefix = None
 
1779
        max_fulltext_len = 0
 
1780
        max_fulltext_prefix = None
 
1781
        insert_manager = None
 
1782
        block_start = None
 
1783
        block_length = None
 
1784
        # XXX: TODO: remove this, it is just for safety checking for now
 
1785
        inserted_keys = set()
 
1786
        reuse_this_block = reuse_blocks
 
1787
        for record in stream:
 
1788
            # Raise an error when a record is missing.
 
1789
            if record.storage_kind == 'absent':
 
1790
                raise errors.RevisionNotPresent(record.key, self)
 
1791
            if random_id:
 
1792
                if record.key in inserted_keys:
 
1793
                    trace.note(gettext('Insert claimed random_id=True,'
 
1794
                                       ' but then inserted %r two times'), record.key)
 
1795
                    continue
 
1796
                inserted_keys.add(record.key)
 
1797
            if reuse_blocks:
 
1798
                # If the reuse_blocks flag is set, check to see if we can just
 
1799
                # copy a groupcompress block as-is.
 
1800
                # We only check on the first record (groupcompress-block) not
 
1801
                # on all of the (groupcompress-block-ref) entries.
 
1802
                # The reuse_this_block flag is then kept for as long as
 
1803
                if record.storage_kind == 'groupcompress-block':
 
1804
                    # Check to see if we really want to re-use this block
 
1805
                    insert_manager = record._manager
 
1806
                    reuse_this_block = insert_manager.check_is_well_utilized()
 
1807
            else:
 
1808
                reuse_this_block = False
 
1809
            if reuse_this_block:
 
1810
                # We still want to reuse this block
 
1811
                if record.storage_kind == 'groupcompress-block':
 
1812
                    # Insert the raw block into the target repo
 
1813
                    insert_manager = record._manager
 
1814
                    bytes_len, chunks = record._manager._block.to_chunks()
 
1815
                    _, start, length = self._access.add_raw_record(
 
1816
                        None, bytes_len, chunks)
 
1817
                    block_start = start
 
1818
                    block_length = length
 
1819
                if record.storage_kind in ('groupcompress-block',
 
1820
                                           'groupcompress-block-ref'):
 
1821
                    if insert_manager is None:
 
1822
                        raise AssertionError('No insert_manager set')
 
1823
                    if insert_manager is not record._manager:
 
1824
                        raise AssertionError('insert_manager does not match'
 
1825
                                             ' the current record, we cannot be positive'
 
1826
                                             ' that the appropriate content was inserted.'
 
1827
                                             )
 
1828
                    value = b"%d %d %d %d" % (block_start, block_length,
 
1829
                                              record._start, record._end)
 
1830
                    nodes = [(record.key, value, (record.parents,))]
 
1831
                    # TODO: Consider buffering up many nodes to be added, not
 
1832
                    #       sure how much overhead this has, but we're seeing
 
1833
                    #       ~23s / 120s in add_records calls
 
1834
                    self._index.add_records(nodes, random_id=random_id)
 
1835
                    continue
 
1836
            try:
 
1837
                chunks = record.get_bytes_as('chunked')
 
1838
            except UnavailableRepresentation:
 
1839
                adapter_key = record.storage_kind, 'chunked'
 
1840
                adapter = get_adapter(adapter_key)
 
1841
                chunks = adapter.get_bytes(record, 'chunked')
 
1842
            chunks_len = record.size
 
1843
            if chunks_len is None:
 
1844
                chunks_len = sum(map(len, chunks))
 
1845
            if len(record.key) > 1:
 
1846
                prefix = record.key[0]
 
1847
                soft = (prefix == last_prefix)
 
1848
            else:
 
1849
                prefix = None
 
1850
                soft = False
 
1851
            if max_fulltext_len < chunks_len:
 
1852
                max_fulltext_len = chunks_len
 
1853
                max_fulltext_prefix = prefix
 
1854
            (found_sha1, start_point, end_point,
 
1855
             type) = self._compressor.compress(
 
1856
                 record.key, chunks, chunks_len, record.sha1, soft=soft,
 
1857
                 nostore_sha=nostore_sha)
 
1858
            # delta_ratio = float(chunks_len) / (end_point - start_point)
 
1859
            # Check if we want to continue to include that text
 
1860
            if (prefix == max_fulltext_prefix
 
1861
                    and end_point < 2 * max_fulltext_len):
 
1862
                # As long as we are on the same file_id, we will fill at least
 
1863
                # 2 * max_fulltext_len
 
1864
                start_new_block = False
 
1865
            elif end_point > 4 * 1024 * 1024:
 
1866
                start_new_block = True
 
1867
            elif (prefix is not None and prefix != last_prefix
 
1868
                  and end_point > 2 * 1024 * 1024):
 
1869
                start_new_block = True
 
1870
            else:
 
1871
                start_new_block = False
 
1872
            last_prefix = prefix
 
1873
            if start_new_block:
 
1874
                self._compressor.pop_last()
 
1875
                flush()
 
1876
                max_fulltext_len = chunks_len
 
1877
                (found_sha1, start_point, end_point,
 
1878
                 type) = self._compressor.compress(
 
1879
                     record.key, chunks, chunks_len, record.sha1)
 
1880
            if record.key[-1] is None:
 
1881
                key = record.key[:-1] + (b'sha1:' + found_sha1,)
 
1882
            else:
 
1883
                key = record.key
 
1884
            self._unadded_refs[key] = record.parents
 
1885
            yield found_sha1, chunks_len
 
1886
            as_st = static_tuple.StaticTuple.from_sequence
 
1887
            if record.parents is not None:
 
1888
                parents = as_st([as_st(p) for p in record.parents])
 
1889
            else:
 
1890
                parents = None
 
1891
            refs = static_tuple.StaticTuple(parents)
 
1892
            keys_to_add.append(
 
1893
                (key, b'%d %d' % (start_point, end_point), refs))
 
1894
        if len(keys_to_add):
 
1895
            flush()
 
1896
        self._compressor = None
 
1897
 
 
1898
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
 
1899
        """Iterate over the lines in the versioned files from keys.
 
1900
 
 
1901
        This may return lines from other keys. Each item the returned
 
1902
        iterator yields is a tuple of a line and a text version that that line
 
1903
        is present in (not introduced in).
 
1904
 
 
1905
        Ordering of results is in whatever order is most suitable for the
 
1906
        underlying storage format.
 
1907
 
 
1908
        If a progress bar is supplied, it may be used to indicate progress.
 
1909
        The caller is responsible for cleaning up progress bars (because this
 
1910
        is an iterator).
 
1911
 
 
1912
        NOTES:
 
1913
         * Lines are normalised by the underlying store: they will all have \n
 
1914
           terminators.
 
1915
         * Lines are returned in arbitrary order.
 
1916
 
 
1917
        :return: An iterator over (line, key).
 
1918
        """
 
1919
        keys = set(keys)
 
1920
        total = len(keys)
 
1921
        # we don't care about inclusions, the caller cares.
 
1922
        # but we need to setup a list of records to visit.
 
1923
        # we need key, position, length
 
1924
        for key_idx, record in enumerate(self.get_record_stream(keys,
 
1925
                                                                'unordered', True)):
 
1926
            # XXX: todo - optimise to use less than full texts.
 
1927
            key = record.key
 
1928
            if pb is not None:
 
1929
                pb.update('Walking content', key_idx, total)
 
1930
            if record.storage_kind == 'absent':
 
1931
                raise errors.RevisionNotPresent(key, self)
 
1932
            for line in record.iter_bytes_as('lines'):
 
1933
                yield line, key
 
1934
        if pb is not None:
 
1935
            pb.update('Walking content', total, total)
 
1936
 
 
1937
    def keys(self):
 
1938
        """See VersionedFiles.keys."""
 
1939
        if 'evil' in debug.debug_flags:
 
1940
            trace.mutter_callsite(2, "keys scales with size of history")
 
1941
        sources = [self._index] + self._immediate_fallback_vfs
 
1942
        result = set()
 
1943
        for source in sources:
 
1944
            result.update(source.keys())
 
1945
        return result
 
1946
 
 
1947
 
 
1948
class _GCBuildDetails(object):
 
1949
    """A blob of data about the build details.
 
1950
 
 
1951
    This stores the minimal data, which then allows compatibility with the old
 
1952
    api, without taking as much memory.
 
1953
    """
 
1954
 
 
1955
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1956
                 '_delta_end', '_parents')
 
1957
 
 
1958
    method = 'group'
 
1959
    compression_parent = None
 
1960
 
 
1961
    def __init__(self, parents, position_info):
 
1962
        self._parents = parents
 
1963
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1964
         self._delta_end) = position_info
 
1965
 
 
1966
    def __repr__(self):
 
1967
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1968
                               self.index_memo, self._parents)
 
1969
 
 
1970
    @property
 
1971
    def index_memo(self):
 
1972
        return (self._index, self._group_start, self._group_end,
 
1973
                self._basis_end, self._delta_end)
 
1974
 
 
1975
    @property
 
1976
    def record_details(self):
 
1977
        return static_tuple.StaticTuple(self.method, None)
 
1978
 
 
1979
    def __getitem__(self, offset):
 
1980
        """Compatibility thunk to act like a tuple."""
 
1981
        if offset == 0:
 
1982
            return self.index_memo
 
1983
        elif offset == 1:
 
1984
            return self.compression_parent  # Always None
 
1985
        elif offset == 2:
 
1986
            return self._parents
 
1987
        elif offset == 3:
 
1988
            return self.record_details
 
1989
        else:
 
1990
            raise IndexError('offset out of range')
 
1991
 
 
1992
    def __len__(self):
 
1993
        return 4
 
1994
 
 
1995
 
 
1996
class _GCGraphIndex(object):
 
1997
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
 
1998
 
 
1999
    def __init__(self, graph_index, is_locked, parents=True,
 
2000
                 add_callback=None, track_external_parent_refs=False,
 
2001
                 inconsistency_fatal=True, track_new_keys=False):
 
2002
        """Construct a _GCGraphIndex on a graph_index.
 
2003
 
 
2004
        :param graph_index: An implementation of breezy.index.GraphIndex.
 
2005
        :param is_locked: A callback, returns True if the index is locked and
 
2006
            thus usable.
 
2007
        :param parents: If True, record knits parents, if not do not record
 
2008
            parents.
 
2009
        :param add_callback: If not None, allow additions to the index and call
 
2010
            this callback with a list of added GraphIndex nodes:
 
2011
            [(node, value, node_refs), ...]
 
2012
        :param track_external_parent_refs: As keys are added, keep track of the
 
2013
            keys they reference, so that we can query get_missing_parents(),
 
2014
            etc.
 
2015
        :param inconsistency_fatal: When asked to add records that are already
 
2016
            present, and the details are inconsistent with the existing
 
2017
            record, raise an exception instead of warning (and skipping the
 
2018
            record).
 
2019
        """
 
2020
        self._add_callback = add_callback
 
2021
        self._graph_index = graph_index
 
2022
        self._parents = parents
 
2023
        self.has_graph = parents
 
2024
        self._is_locked = is_locked
 
2025
        self._inconsistency_fatal = inconsistency_fatal
 
2026
        # GroupCompress records tend to have the same 'group' start + offset
 
2027
        # repeated over and over, this creates a surplus of ints
 
2028
        self._int_cache = {}
 
2029
        if track_external_parent_refs:
 
2030
            self._key_dependencies = _KeyRefs(
 
2031
                track_new_keys=track_new_keys)
 
2032
        else:
 
2033
            self._key_dependencies = None
 
2034
 
 
2035
    def add_records(self, records, random_id=False):
 
2036
        """Add multiple records to the index.
 
2037
 
 
2038
        This function does not insert data into the Immutable GraphIndex
 
2039
        backing the KnitGraphIndex, instead it prepares data for insertion by
 
2040
        the caller and checks that it is safe to insert then calls
 
2041
        self._add_callback with the prepared GraphIndex nodes.
 
2042
 
 
2043
        :param records: a list of tuples:
 
2044
                         (key, options, access_memo, parents).
 
2045
        :param random_id: If True the ids being added were randomly generated
 
2046
            and no check for existence will be performed.
 
2047
        """
 
2048
        if not self._add_callback:
 
2049
            raise errors.ReadOnlyError(self)
 
2050
        # we hope there are no repositories with inconsistent parentage
 
2051
        # anymore.
 
2052
 
 
2053
        changed = False
 
2054
        keys = {}
 
2055
        for (key, value, refs) in records:
 
2056
            if not self._parents:
 
2057
                if refs:
 
2058
                    for ref in refs:
 
2059
                        if ref:
 
2060
                            raise knit.KnitCorrupt(self,
 
2061
                                                   "attempt to add node with parents "
 
2062
                                                   "in parentless index.")
 
2063
                    refs = ()
 
2064
                    changed = True
 
2065
            keys[key] = (value, refs)
 
2066
        # check for dups
 
2067
        if not random_id:
 
2068
            present_nodes = self._get_entries(keys)
 
2069
            for (index, key, value, node_refs) in present_nodes:
 
2070
                # Sometimes these are passed as a list rather than a tuple
 
2071
                node_refs = static_tuple.as_tuples(node_refs)
 
2072
                passed = static_tuple.as_tuples(keys[key])
 
2073
                if node_refs != passed[1]:
 
2074
                    details = '%s %s %s' % (key, (value, node_refs), passed)
 
2075
                    if self._inconsistency_fatal:
 
2076
                        raise knit.KnitCorrupt(self, "inconsistent details"
 
2077
                                               " in add_records: %s" %
 
2078
                                               details)
 
2079
                    else:
 
2080
                        trace.warning("inconsistent details in skipped"
 
2081
                                      " record: %s", details)
 
2082
                del keys[key]
 
2083
                changed = True
 
2084
        if changed:
 
2085
            result = []
 
2086
            if self._parents:
 
2087
                for key, (value, node_refs) in keys.items():
 
2088
                    result.append((key, value, node_refs))
 
2089
            else:
 
2090
                for key, (value, node_refs) in keys.items():
 
2091
                    result.append((key, value))
 
2092
            records = result
 
2093
        key_dependencies = self._key_dependencies
 
2094
        if key_dependencies is not None:
 
2095
            if self._parents:
 
2096
                for key, value, refs in records:
 
2097
                    parents = refs[0]
 
2098
                    key_dependencies.add_references(key, parents)
 
2099
            else:
 
2100
                for key, value, refs in records:
 
2101
                    new_keys.add_key(key)
 
2102
        self._add_callback(records)
 
2103
 
 
2104
    def _check_read(self):
 
2105
        """Raise an exception if reads are not permitted."""
 
2106
        if not self._is_locked():
 
2107
            raise errors.ObjectNotLocked(self)
 
2108
 
 
2109
    def _check_write_ok(self):
 
2110
        """Raise an exception if writes are not permitted."""
 
2111
        if not self._is_locked():
 
2112
            raise errors.ObjectNotLocked(self)
 
2113
 
 
2114
    def _get_entries(self, keys, check_present=False):
 
2115
        """Get the entries for keys.
 
2116
 
 
2117
        Note: Callers are responsible for checking that the index is locked
 
2118
        before calling this method.
 
2119
 
 
2120
        :param keys: An iterable of index key tuples.
 
2121
        """
 
2122
        keys = set(keys)
 
2123
        found_keys = set()
 
2124
        if self._parents:
 
2125
            for node in self._graph_index.iter_entries(keys):
 
2126
                yield node
 
2127
                found_keys.add(node[1])
 
2128
        else:
 
2129
            # adapt parentless index to the rest of the code.
 
2130
            for node in self._graph_index.iter_entries(keys):
 
2131
                yield node[0], node[1], node[2], ()
 
2132
                found_keys.add(node[1])
 
2133
        if check_present:
 
2134
            missing_keys = keys.difference(found_keys)
 
2135
            if missing_keys:
 
2136
                raise errors.RevisionNotPresent(missing_keys.pop(), self)
 
2137
 
 
2138
    def find_ancestry(self, keys):
 
2139
        """See CombinedGraphIndex.find_ancestry"""
 
2140
        return self._graph_index.find_ancestry(keys, 0)
 
2141
 
 
2142
    def get_parent_map(self, keys):
 
2143
        """Get a map of the parents of keys.
 
2144
 
 
2145
        :param keys: The keys to look up parents for.
 
2146
        :return: A mapping from keys to parents. Absent keys are absent from
 
2147
            the mapping.
 
2148
        """
 
2149
        self._check_read()
 
2150
        nodes = self._get_entries(keys)
 
2151
        result = {}
 
2152
        if self._parents:
 
2153
            for node in nodes:
 
2154
                result[node[1]] = node[3][0]
 
2155
        else:
 
2156
            for node in nodes:
 
2157
                result[node[1]] = None
 
2158
        return result
 
2159
 
 
2160
    def get_missing_parents(self):
 
2161
        """Return the keys of missing parents."""
 
2162
        # Copied from _KnitGraphIndex.get_missing_parents
 
2163
        # We may have false positives, so filter those out.
 
2164
        self._key_dependencies.satisfy_refs_for_keys(
 
2165
            self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
 
2166
        return frozenset(self._key_dependencies.get_unsatisfied_refs())
 
2167
 
 
2168
    def get_build_details(self, keys):
 
2169
        """Get the various build details for keys.
 
2170
 
 
2171
        Ghosts are omitted from the result.
 
2172
 
 
2173
        :param keys: An iterable of keys.
 
2174
        :return: A dict of key:
 
2175
            (index_memo, compression_parent, parents, record_details).
 
2176
 
 
2177
            * index_memo: opaque structure to pass to read_records to extract
 
2178
              the raw data
 
2179
            * compression_parent: Content that this record is built upon, may
 
2180
              be None
 
2181
            * parents: Logical parents of this node
 
2182
            * record_details: extra information about the content which needs
 
2183
              to be passed to Factory.parse_record
 
2184
        """
 
2185
        self._check_read()
 
2186
        result = {}
 
2187
        entries = self._get_entries(keys)
 
2188
        for entry in entries:
 
2189
            key = entry[1]
 
2190
            if not self._parents:
 
2191
                parents = None
 
2192
            else:
 
2193
                parents = entry[3][0]
 
2194
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2195
            result[key] = details
 
2196
        return result
 
2197
 
 
2198
    def keys(self):
 
2199
        """Get all the keys in the collection.
 
2200
 
 
2201
        The keys are not ordered.
 
2202
        """
 
2203
        self._check_read()
 
2204
        return [node[1] for node in self._graph_index.iter_all_entries()]
 
2205
 
 
2206
    def _node_to_position(self, node):
 
2207
        """Convert an index value to position details."""
 
2208
        bits = node[2].split(b' ')
 
2209
        # It would be nice not to read the entire gzip.
 
2210
        # start and stop are put into _int_cache because they are very common.
 
2211
        # They define the 'group' that an entry is in, and many groups can have
 
2212
        # thousands of objects.
 
2213
        # Branching Launchpad, for example, saves ~600k integers, at 12 bytes
 
2214
        # each, or about 7MB. Note that it might be even more when you consider
 
2215
        # how PyInt is allocated in separate slabs. And you can't return a slab
 
2216
        # to the OS if even 1 int on it is in use. Note though that Python uses
 
2217
        # a LIFO when re-using PyInt slots, which might cause more
 
2218
        # fragmentation.
 
2219
        start = int(bits[0])
 
2220
        start = self._int_cache.setdefault(start, start)
 
2221
        stop = int(bits[1])
 
2222
        stop = self._int_cache.setdefault(stop, stop)
 
2223
        basis_end = int(bits[2])
 
2224
        delta_end = int(bits[3])
 
2225
        # We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
 
2226
        # instance...
 
2227
        return (node[0], start, stop, basis_end, delta_end)
 
2228
 
 
2229
    def scan_unvalidated_index(self, graph_index):
 
2230
        """Inform this _GCGraphIndex that there is an unvalidated index.
 
2231
 
 
2232
        This allows this _GCGraphIndex to keep track of any missing
 
2233
        compression parents we may want to have filled in to make those
 
2234
        indices valid.  It also allows _GCGraphIndex to track any new keys.
 
2235
 
 
2236
        :param graph_index: A GraphIndex
 
2237
        """
 
2238
        key_dependencies = self._key_dependencies
 
2239
        if key_dependencies is None:
 
2240
            return
 
2241
        for node in graph_index.iter_all_entries():
 
2242
            # Add parent refs from graph_index (and discard parent refs
 
2243
            # that the graph_index has).
 
2244
            key_dependencies.add_references(node[1], node[3][0])
 
2245
 
 
2246
 
 
2247
from ._groupcompress_py import (
 
2248
    apply_delta,
 
2249
    apply_delta_to_source,
 
2250
    encode_base128_int,
 
2251
    decode_base128_int,
 
2252
    decode_copy_instruction,
 
2253
    LinesDeltaIndex,
 
2254
    )
 
2255
try:
 
2256
    from ._groupcompress_pyx import (
 
2257
        apply_delta,
 
2258
        apply_delta_to_source,
 
2259
        DeltaIndex,
 
2260
        encode_base128_int,
 
2261
        decode_base128_int,
 
2262
        )
 
2263
    GroupCompressor = PyrexGroupCompressor
 
2264
except ImportError as e:
 
2265
    osutils.failed_to_load_extension(e)
 
2266
    GroupCompressor = PythonGroupCompressor