137
134
% (num_bytes, self._content_length))
138
135
# Expand the content if required
139
136
if self._content is None:
137
if self._content_chunks is not None:
138
self._content = ''.join(self._content_chunks)
139
self._content_chunks = None
140
if self._content is None:
140
141
if self._z_content is None:
141
142
raise AssertionError('No content to decompress')
142
143
if self._z_content == '':
273
274
bytes = apply_delta_to_source(self._content, content_start, end)
277
def set_chunked_content(self, content_chunks, length):
278
"""Set the content of this block to the given chunks."""
279
# If we have lots of short lines, it is may be more efficient to join
280
# the content ahead of time. If the content is <10MiB, we don't really
281
# care about the extra memory consumption, so we can just pack it and
282
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
283
# mysql, which is below the noise margin
284
self._content_length = length
285
self._content_chunks = content_chunks
287
self._z_content = None
276
289
def set_content(self, content):
277
290
"""Set the content of this block."""
278
291
self._content_length = len(content)
279
292
self._content = content
280
293
self._z_content = None
295
def _create_z_content_using_lzma(self):
296
if self._content_chunks is not None:
297
self._content = ''.join(self._content_chunks)
298
self._content_chunks = None
299
if self._content is None:
300
raise AssertionError('Nothing to compress')
301
self._z_content = pylzma.compress(self._content)
302
self._z_content_length = len(self._z_content)
304
def _create_z_content_from_chunks(self):
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
compressed_chunks = map(compressor.compress, self._content_chunks)
307
compressed_chunks.append(compressor.flush())
308
self._z_content = ''.join(compressed_chunks)
309
self._z_content_length = len(self._z_content)
311
def _create_z_content(self):
312
if self._z_content is not None:
315
self._create_z_content_using_lzma()
317
if self._content_chunks is not None:
318
self._create_z_content_from_chunks()
320
self._z_content = zlib.compress(self._content)
321
self._z_content_length = len(self._z_content)
282
323
def to_bytes(self):
283
324
"""Encode the information into a byte stream."""
284
compress = zlib.compress
286
compress = pylzma.compress
287
if self._z_content is None:
288
if self._content is None:
289
raise AssertionError('Nothing to compress')
290
self._z_content = compress(self._content)
291
self._z_content_length = len(self._z_content)
325
self._create_z_content()
293
327
header = self.GCB_LZ_HEADER
324
358
raise ValueError('invalid content_len %d for record @ pos %d'
325
359
% (content_len, pos - len_len - 1))
326
360
if kind == 'f': # Fulltext
327
result.append(('f', content_len))
362
text = self._content[pos:pos+content_len]
363
result.append(('f', content_len, text))
365
result.append(('f', content_len))
328
366
elif kind == 'd': # Delta
329
367
delta_content = self._content[pos:pos+content_len]
340
378
delta_pos) = decode_copy_instruction(delta_content, c,
342
delta_info.append(('c', offset, length))
381
text = self._content[offset:offset+length]
382
delta_info.append(('c', offset, length, text))
384
delta_info.append(('c', offset, length))
343
385
measured_len += length
747
789
After calling this, the compressor should no longer be used
749
content = ''.join(self.chunks)
791
# TODO: this causes us to 'bloat' to 2x the size of content in the
792
# group. This has an impact for 'commit' of large objects.
793
# One possibility is to use self._content_chunks, and be lazy and
794
# only fill out self._content as a full string when we actually
795
# need it. That would at least drop the peak memory consumption
796
# for 'commit' down to ~1x the size of the largest file, at a
797
# cost of increased complexity within this code. 2x is still <<
798
# 3x the size of the largest file, so we are doing ok.
799
self._block.set_chunked_content(self.chunks, self.endpoint)
750
800
self.chunks = None
751
801
self._delta_index = None
752
self._block.set_content(content)
753
802
return self._block
755
804
def pop_last(self):
889
938
self.endpoint = endpoint
892
def make_pack_factory(graph, delta, keylength):
941
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
893
942
"""Create a factory for creating a pack based groupcompress.
895
944
This is only functional enough to run interface tests, it doesn't try to
910
959
writer = pack.ContainerWriter(stream.write)
912
961
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
913
add_callback=graph_index.add_nodes)
914
access = _DirectPackAccess({})
962
add_callback=graph_index.add_nodes,
963
inconsistency_fatal=inconsistency_fatal)
964
access = knit._DirectPackAccess({})
915
965
access.set_writer(writer, graph_index, (transport, 'newpack'))
916
966
result = GroupCompressVersionedFiles(index, access, delta)
917
967
result.stream = stream
992
1042
nostore_sha=nostore_sha))[0]
993
1043
return sha1, length, None
1045
def _add_text(self, key, parents, text, nostore_sha=None, random_id=False):
1046
"""See VersionedFiles._add_text()."""
1047
self._index._check_write_ok()
1048
self._check_add(key, None, random_id, check_content=False)
1049
if text.__class__ is not str:
1050
raise errors.BzrBadParameterUnicode("text")
1052
# The caller might pass None if there is no graph data, but kndx
1053
# indexes can't directly store that, so we give them
1054
# an empty tuple instead.
1056
# double handling for now. Make it work until then.
1058
record = FulltextContentFactory(key, parents, None, text)
1059
sha1 = list(self._insert_record_stream([record], random_id=random_id,
1060
nostore_sha=nostore_sha))[0]
1061
return sha1, length, None
995
1063
def add_fallback_versioned_files(self, a_versioned_files):
996
1064
"""Add a source of texts for texts not present in this knit.
1006
1074
if not parent_map:
1007
1075
raise errors.RevisionNotPresent(key, self)
1008
1076
if parent_map[key] is not None:
1009
search = graph._make_breadth_first_searcher([key])
1013
present, ghosts = search.next_with_ghosts()
1014
except StopIteration:
1016
keys.update(present)
1017
parent_map = self.get_parent_map(keys)
1077
parent_map = dict((k, v) for k, v in graph.iter_ancestry([key])
1079
keys = parent_map.keys()
1020
1082
parent_map = {key:()}
1021
# So we used Graph(self) to load the parent_map, but now that we have
1022
# it, we can just query the parent map directly, so create a new Graph
1024
graph = _mod_graph.Graph(_mod_graph.DictParentsProvider(parent_map))
1025
head_cache = _mod_graph.FrozenHeadsCache(graph)
1083
# We used Graph(self) to load the parent_map, but now that we have it,
1084
# we can just query the parent map directly, so create a KnownGraph
1085
heads_provider = _mod_graph.KnownGraph(parent_map)
1026
1086
parent_cache = {}
1027
1087
reannotate = annotate.reannotate
1028
1088
for record in self.get_record_stream(keys, 'topological', True):
1030
1090
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1031
1091
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
1032
1092
parent_cache[key] = list(
1033
reannotate(parent_lines, lines, key, None, head_cache))
1093
reannotate(parent_lines, lines, key, None, heads_provider))
1034
1094
return parent_cache[key]
1036
1096
def check(self, progress_bar=None):
1524
1582
'unordered', True)):
1525
1583
# XXX: todo - optimise to use less than full texts.
1526
1584
key = record.key
1527
pb.update('Walking content', key_idx, total)
1586
pb.update('Walking content', key_idx, total)
1528
1587
if record.storage_kind == 'absent':
1529
1588
raise errors.RevisionNotPresent(key, self)
1530
1589
lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1531
1590
for line in lines:
1532
1591
yield line, key
1533
pb.update('Walking content', total, total)
1593
pb.update('Walking content', total, total)
1535
1595
def keys(self):
1536
1596
"""See VersionedFiles.keys."""
1547
1607
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1549
1609
def __init__(self, graph_index, is_locked, parents=True,
1610
add_callback=None, track_external_parent_refs=False,
1611
inconsistency_fatal=True):
1551
1612
"""Construct a _GCGraphIndex on a graph_index.
1553
1614
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1558
1619
:param add_callback: If not None, allow additions to the index and call
1559
1620
this callback with a list of added GraphIndex nodes:
1560
1621
[(node, value, node_refs), ...]
1622
:param track_external_parent_refs: As keys are added, keep track of the
1623
keys they reference, so that we can query get_missing_parents(),
1625
:param inconsistency_fatal: When asked to add records that are already
1626
present, and the details are inconsistent with the existing
1627
record, raise an exception instead of warning (and skipping the
1562
1630
self._add_callback = add_callback
1563
1631
self._graph_index = graph_index
1564
1632
self._parents = parents
1565
1633
self.has_graph = parents
1566
1634
self._is_locked = is_locked
1635
self._inconsistency_fatal = inconsistency_fatal
1636
if track_external_parent_refs:
1637
self._key_dependencies = knit._KeyRefs()
1639
self._key_dependencies = None
1568
1641
def add_records(self, records, random_id=False):
1569
1642
"""Add multiple records to the index.
1601
1674
present_nodes = self._get_entries(keys)
1602
1675
for (index, key, value, node_refs) in present_nodes:
1603
1676
if node_refs != keys[key][1]:
1604
raise errors.KnitCorrupt(self, "inconsistent details in add_records"
1605
": %s %s" % ((value, node_refs), keys[key]))
1677
details = '%s %s %s' % (key, (value, node_refs), keys[key])
1678
if self._inconsistency_fatal:
1679
raise errors.KnitCorrupt(self, "inconsistent details"
1680
" in add_records: %s" %
1683
trace.warning("inconsistent details in skipped"
1684
" record: %s", details)
1614
1693
for key, (value, node_refs) in keys.iteritems():
1615
1694
result.append((key, value))
1616
1695
records = result
1696
key_dependencies = self._key_dependencies
1697
if key_dependencies is not None and self._parents:
1698
for key, value, refs in records:
1700
key_dependencies.add_references(key, parents)
1617
1701
self._add_callback(records)
1619
1703
def _check_read(self):
1648
1732
if check_present:
1649
1733
missing_keys = keys.difference(found_keys)
1650
1734
if missing_keys:
1651
raise RevisionNotPresent(missing_keys.pop(), self)
1735
raise errors.RevisionNotPresent(missing_keys.pop(), self)
1653
1737
def get_parent_map(self, keys):
1654
1738
"""Get a map of the parents of keys.
1668
1752
result[node[1]] = None
1755
def get_missing_parents(self):
1756
"""Return the keys of missing parents."""
1757
# Copied from _KnitGraphIndex.get_missing_parents
1758
# We may have false positives, so filter those out.
1759
self._key_dependencies.add_keys(
1760
self.get_parent_map(self._key_dependencies.get_unsatisfied_refs()))
1761
return frozenset(self._key_dependencies.get_unsatisfied_refs())
1671
1763
def get_build_details(self, keys):
1672
1764
"""Get the various build details for keys.
1719
1811
delta_end = int(bits[3])
1720
1812
return node[0], start, stop, basis_end, delta_end
1814
def scan_unvalidated_index(self, graph_index):
1815
"""Inform this _GCGraphIndex that there is an unvalidated index.
1817
This allows this _GCGraphIndex to keep track of any missing
1818
compression parents we may want to have filled in to make those
1821
:param graph_index: A GraphIndex
1823
if self._key_dependencies is not None:
1824
# Add parent refs from graph_index (and discard parent refs that
1825
# the graph_index has).
1826
add_refs = self._key_dependencies.add_references
1827
for node in graph_index.iter_all_entries():
1828
add_refs(node[1], node[3][0])
1723
1832
from bzrlib._groupcompress_py import (