/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Martin Pool
  • Date: 2011-04-19 03:23:05 UTC
  • mto: This revision was merged to the branch mainline in revision 5802.
  • Revision ID: mbp@sourcefrog.net-20110419032305-6qzm1yo30x56dnu2
Fix slightly broken test

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
36
36
    )
37
37
from bzrlib.btree_index import BTreeBuilder
38
38
from bzrlib.lru_cache import LRUSizeCache
 
39
from bzrlib.repofmt import pack_repo
39
40
from bzrlib.tsort import topo_sort
40
41
from bzrlib.versionedfile import (
41
42
    adapter_registry,
101
102
    def __init__(self):
102
103
        # map by key? or just order in file?
103
104
        self._compressor_name = None
104
 
        self._z_content = None
 
105
        self._z_content_chunks = None
105
106
        self._z_content_decompressor = None
106
107
        self._z_content_length = None
107
108
        self._content_length = None
135
136
                self._content = ''.join(self._content_chunks)
136
137
                self._content_chunks = None
137
138
        if self._content is None:
138
 
            if self._z_content is None:
 
139
            # We join self._z_content_chunks here, because if we are
 
140
            # decompressing, then it is *very* likely that we have a single
 
141
            # chunk
 
142
            if self._z_content_chunks is None:
139
143
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
144
            z_content = ''.join(self._z_content_chunks)
 
145
            if z_content == '':
141
146
                self._content = ''
142
147
            elif self._compressor_name == 'lzma':
143
148
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
149
                self._content = pylzma.decompress(z_content)
145
150
            elif self._compressor_name == 'zlib':
146
151
                # Start a zlib decompressor
147
152
                if num_bytes * 4 > self._content_length * 3:
148
153
                    # If we are requesting more that 3/4ths of the content,
149
154
                    # just extract the whole thing in a single pass
150
155
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
156
                    self._content = zlib.decompress(z_content)
152
157
                else:
153
158
                    self._z_content_decompressor = zlib.decompressobj()
154
159
                    # Seed the decompressor with the uncompressed bytes, so
155
160
                    # that the rest of the code is simplified
156
161
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
162
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
163
                    if not self._z_content_decompressor.unconsumed_tail:
159
164
                        self._z_content_decompressor = None
160
165
            else:
207
212
            # XXX: Define some GCCorrupt error ?
208
213
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
214
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
215
        self._z_content_chunks = (bytes[pos:],)
 
216
 
 
217
    @property
 
218
    def _z_content(self):
 
219
        """Return z_content_chunks as a simple string.
 
220
 
 
221
        Meant only to be used by the test suite.
 
222
        """
 
223
        if self._z_content_chunks is not None:
 
224
            return ''.join(self._z_content_chunks)
 
225
        return None
211
226
 
212
227
    @classmethod
213
228
    def from_bytes(cls, bytes):
269
284
        self._content_length = length
270
285
        self._content_chunks = content_chunks
271
286
        self._content = None
272
 
        self._z_content = None
 
287
        self._z_content_chunks = None
273
288
 
274
289
    def set_content(self, content):
275
290
        """Set the content of this block."""
276
291
        self._content_length = len(content)
277
292
        self._content = content
278
 
        self._z_content = None
 
293
        self._z_content_chunks = None
279
294
 
280
295
    def _create_z_content_using_lzma(self):
281
296
        if self._content_chunks is not None:
283
298
            self._content_chunks = None
284
299
        if self._content is None:
285
300
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
301
        z_content = pylzma.compress(self._content)
 
302
        self._z_content_chunks = (z_content,)
 
303
        self._z_content_length = len(z_content)
288
304
 
289
 
    def _create_z_content_from_chunks(self):
 
305
    def _create_z_content_from_chunks(self, chunks):
290
306
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
307
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
308
        # (measured peak is maybe 30MB over the above...)
 
309
        compressed_chunks = map(compressor.compress, chunks)
292
310
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
311
        # Ignore empty chunks
 
312
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
313
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
314
 
296
315
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
316
        if self._z_content_chunks is not None:
298
317
            return
299
318
        if _USE_LZMA:
300
319
            self._create_z_content_using_lzma()
301
320
            return
302
321
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
322
            chunks = self._content_chunks
 
323
        else:
 
324
            chunks = (self._content,)
 
325
        self._create_z_content_from_chunks(chunks)
307
326
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
327
    def to_chunks(self):
 
328
        """Create the byte stream as a series of 'chunks'"""
310
329
        self._create_z_content()
311
330
        if _USE_LZMA:
312
331
            header = self.GCB_LZ_HEADER
313
332
        else:
314
333
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
334
        chunks = ['%s%d\n%d\n'
 
335
                  % (header, self._z_content_length, self._content_length),
318
336
                 ]
 
337
        chunks.extend(self._z_content_chunks)
 
338
        total_len = sum(map(len, chunks))
 
339
        return total_len, chunks
 
340
 
 
341
    def to_bytes(self):
 
342
        """Encode the information into a byte stream."""
 
343
        total_len, chunks = self.to_chunks()
319
344
        return ''.join(chunks)
320
345
 
321
346
    def _dump(self, include_text=False):
679
704
        z_header_bytes = zlib.compress(header_bytes)
680
705
        del header_bytes
681
706
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
707
        block_bytes_len, block_chunks = self._block.to_chunks()
683
708
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
709
                                       block_bytes_len))
685
710
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
711
        lines.extend(block_chunks)
 
712
        del z_header_bytes, block_chunks
 
713
        # TODO: This is a point where we will double the memory consumption. To
 
714
        #       avoid this, we probably have to switch to a 'chunked' api
688
715
        return ''.join(lines)
689
716
 
690
717
    @classmethod
691
718
    def from_bytes(cls, bytes):
692
719
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
720
        #       different way. At a minimum this creates 2 copies of the
 
721
        #       compressed content
694
722
        (storage_kind, z_header_len, header_len,
695
723
         block_len, rest) = bytes.split('\n', 4)
696
724
        del bytes
854
882
 
855
883
        After calling this, the compressor should no longer be used
856
884
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
885
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
886
        self.chunks = None
867
887
        self._delta_index = None
1027
1047
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1048
            add_callback=graph_index.add_nodes,
1029
1049
            inconsistency_fatal=inconsistency_fatal)
1030
 
        access = knit._DirectPackAccess({})
 
1050
        access = pack_repo._DirectPackAccess({})
1031
1051
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1052
        result = GroupCompressVersionedFiles(index, access, delta)
1033
1053
        result.stream = stream
1167
1187
            _unadded_refs = {}
1168
1188
        self._unadded_refs = _unadded_refs
1169
1189
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
 
        self._fallback_vfs = []
 
1190
        self._immediate_fallback_vfs = []
1171
1191
 
1172
1192
    def without_fallbacks(self):
1173
1193
        """Return a clone of this object without any fallbacks configured."""
1247
1267
 
1248
1268
        :param a_versioned_files: A VersionedFiles object.
1249
1269
        """
1250
 
        self._fallback_vfs.append(a_versioned_files)
 
1270
        self._immediate_fallback_vfs.append(a_versioned_files)
1251
1271
 
1252
1272
    def annotate(self, key):
1253
1273
        """See VersionedFiles.annotate."""
1293
1313
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1294
1314
        # ancestry.
1295
1315
        parent_map, missing_keys = self._index.find_ancestry(keys)
1296
 
        for fallback in self._fallback_vfs:
 
1316
        for fallback in self._transitive_fallbacks():
1297
1317
            if not missing_keys:
1298
1318
                break
1299
1319
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1323
1343
            and so on.
1324
1344
        """
1325
1345
        result = {}
1326
 
        sources = [self._index] + self._fallback_vfs
 
1346
        sources = [self._index] + self._immediate_fallback_vfs
1327
1347
        source_results = []
1328
1348
        missing = set(keys)
1329
1349
        for source in sources:
1430
1450
        parent_map = {}
1431
1451
        key_to_source_map = {}
1432
1452
        source_results = []
1433
 
        for source in self._fallback_vfs:
 
1453
        for source in self._immediate_fallback_vfs:
1434
1454
            if not missing:
1435
1455
                break
1436
1456
            source_parents = source.get_parent_map(missing)
1630
1650
        self._unadded_refs = {}
1631
1651
        keys_to_add = []
1632
1652
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1653
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1654
            self._compressor = GroupCompressor()
 
1655
            # Note: At this point we still have 1 copy of the fulltext (in
 
1656
            #       record and the var 'bytes'), and this generates 2 copies of
 
1657
            #       the compressed text (one for bytes, one in chunks)
 
1658
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1659
            #       have to double compressed memory here
 
1660
            # TODO: Figure out how to indicate that we would be happy to free
 
1661
            #       the fulltext content at this point. Note that sometimes we
 
1662
            #       will want it later (streaming CHK pages), but most of the
 
1663
            #       time we won't (everything else)
 
1664
            bytes = ''.join(chunks)
 
1665
            del chunks
1635
1666
            index, start, length = self._access.add_raw_records(
1636
1667
                [(None, len(bytes))], bytes)[0]
1637
1668
            nodes = []
1802
1833
        """See VersionedFiles.keys."""
1803
1834
        if 'evil' in debug.debug_flags:
1804
1835
            trace.mutter_callsite(2, "keys scales with size of history")
1805
 
        sources = [self._index] + self._fallback_vfs
 
1836
        sources = [self._index] + self._immediate_fallback_vfs
1806
1837
        result = set()
1807
1838
        for source in sources:
1808
1839
            result.update(source.keys())
1809
1840
        return result
1810
1841
 
1811
1842
 
 
1843
class _GCBuildDetails(object):
 
1844
    """A blob of data about the build details.
 
1845
 
 
1846
    This stores the minimal data, which then allows compatibility with the old
 
1847
    api, without taking as much memory.
 
1848
    """
 
1849
 
 
1850
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1851
                 '_delta_end', '_parents')
 
1852
 
 
1853
    method = 'group'
 
1854
    compression_parent = None
 
1855
 
 
1856
    def __init__(self, parents, position_info):
 
1857
        self._parents = parents
 
1858
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1859
         self._delta_end) = position_info
 
1860
 
 
1861
    def __repr__(self):
 
1862
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1863
            self.index_memo, self._parents)
 
1864
 
 
1865
    @property
 
1866
    def index_memo(self):
 
1867
        return (self._index, self._group_start, self._group_end,
 
1868
                self._basis_end, self._delta_end)
 
1869
 
 
1870
    @property
 
1871
    def record_details(self):
 
1872
        return static_tuple.StaticTuple(self.method, None)
 
1873
 
 
1874
    def __getitem__(self, offset):
 
1875
        """Compatibility thunk to act like a tuple."""
 
1876
        if offset == 0:
 
1877
            return self.index_memo
 
1878
        elif offset == 1:
 
1879
            return self.compression_parent # Always None
 
1880
        elif offset == 2:
 
1881
            return self._parents
 
1882
        elif offset == 3:
 
1883
            return self.record_details
 
1884
        else:
 
1885
            raise IndexError('offset out of range')
 
1886
            
 
1887
    def __len__(self):
 
1888
        return 4
 
1889
 
 
1890
 
1812
1891
class _GCGraphIndex(object):
1813
1892
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1814
1893
 
2009
2088
                parents = None
2010
2089
            else:
2011
2090
                parents = entry[3][0]
2012
 
            method = 'group'
2013
 
            result[key] = (self._node_to_position(entry),
2014
 
                                  None, parents, (method, None))
 
2091
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2092
            result[key] = details
2015
2093
        return result
2016
2094
 
2017
2095
    def keys(self):
2033
2111
        # each, or about 7MB. Note that it might be even more when you consider
2034
2112
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
2113
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
 
2114
        # a LIFO when re-using PyInt slots, which might cause more
2037
2115
        # fragmentation.
2038
2116
        start = int(bits[0])
2039
2117
        start = self._int_cache.setdefault(start, start)