287
307
self.storage_kind)
310
class LazyKnitContentFactory(ContentFactory):
311
"""A ContentFactory which can either generate full text or a wire form.
313
:seealso ContentFactory:
316
def __init__(self, key, parents, generator, first):
317
"""Create a LazyKnitContentFactory.
319
:param key: The key of the record.
320
:param parents: The parents of the record.
321
:param generator: A _ContentMapGenerator containing the record for this
323
:param first: Is this the first content object returned from generator?
324
if it is, its storage kind is knit-delta-closure, otherwise it is
325
knit-delta-closure-ref
328
self.parents = parents
330
self._generator = generator
331
self.storage_kind = "knit-delta-closure"
333
self.storage_kind = self.storage_kind + "-ref"
336
def get_bytes_as(self, storage_kind):
337
if storage_kind == self.storage_kind:
339
return self._generator._wire_bytes()
341
# all the keys etc are contained in the bytes returned in the
344
if storage_kind in ('chunked', 'fulltext'):
345
chunks = self._generator._get_one_work(self.key).text()
346
if storage_kind == 'chunked':
349
return ''.join(chunks)
350
raise errors.UnavailableRepresentation(self.key, storage_kind,
354
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
355
"""Convert a network record to a iterator over stream records.
357
:param storage_kind: The storage kind of the record.
358
Must be 'knit-delta-closure'.
359
:param bytes: The bytes of the record on the network.
361
generator = _NetworkContentMapGenerator(bytes, line_end)
362
return generator.get_record_stream()
365
def knit_network_to_record(storage_kind, bytes, line_end):
366
"""Convert a network record to a record object.
368
:param storage_kind: The storage kind of the record.
369
:param bytes: The bytes of the record on the network.
372
line_end = bytes.find('\n', start)
373
key = tuple(bytes[start:line_end].split('\x00'))
375
line_end = bytes.find('\n', start)
376
parent_line = bytes[start:line_end]
377
if parent_line == 'None:':
381
[tuple(segment.split('\x00')) for segment in parent_line.split('\t')
384
noeol = bytes[start] == 'N'
385
if 'ft' in storage_kind:
388
method = 'line-delta'
389
build_details = (method, noeol)
391
raw_record = bytes[start:]
392
annotated = 'annotated' in storage_kind
393
return [KnitContentFactory(key, parents, build_details, None, raw_record,
394
annotated, network_bytes=bytes)]
290
397
class KnitContent(object):
291
398
"""Content of a knit version to which deltas can be applied.
293
400
This is always stored in memory as a list of lines with \n at the end,
294
plus a flag saying if the final ending is really there or not, because that
401
plus a flag saying if the final ending is really there or not, because that
295
402
corresponds to the on-disk knit representation.
986
1122
if not self.get_parent_map([key]):
987
1123
raise RevisionNotPresent(key, self)
988
1124
return cached_version
989
text_map, contents_map = self._get_content_maps([key])
990
return contents_map[key]
992
def _get_content_maps(self, keys, nonlocal_keys=None):
993
"""Produce maps of text and KnitContents
995
:param keys: The keys to produce content maps for.
996
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
997
which are known to not be in this knit, but rather in one of the
999
:return: (text_map, content_map) where text_map contains the texts for
1000
the requested versions and content_map contains the KnitContents.
1002
# FUTURE: This function could be improved for the 'extract many' case
1003
# by tracking each component and only doing the copy when the number of
1004
# children than need to apply delta's to it is > 1 or it is part of the
1007
multiple_versions = len(keys) != 1
1008
record_map = self._get_record_map(keys, allow_missing=True)
1013
if nonlocal_keys is None:
1014
nonlocal_keys = set()
1016
nonlocal_keys = frozenset(nonlocal_keys)
1017
missing_keys = set(nonlocal_keys)
1018
for source in self._fallback_vfs:
1019
if not missing_keys:
1021
for record in source.get_record_stream(missing_keys,
1023
if record.storage_kind == 'absent':
1025
missing_keys.remove(record.key)
1026
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1027
text_map[record.key] = lines
1028
content_map[record.key] = PlainKnitContent(lines, record.key)
1029
if record.key in keys:
1030
final_content[record.key] = content_map[record.key]
1032
if key in nonlocal_keys:
1037
while cursor is not None:
1039
record, record_details, digest, next = record_map[cursor]
1041
raise RevisionNotPresent(cursor, self)
1042
components.append((cursor, record, record_details, digest))
1044
if cursor in content_map:
1045
# no need to plan further back
1046
components.append((cursor, None, None, None))
1050
for (component_id, record, record_details,
1051
digest) in reversed(components):
1052
if component_id in content_map:
1053
content = content_map[component_id]
1055
content, delta = self._factory.parse_record(key[-1],
1056
record, record_details, content,
1057
copy_base_content=multiple_versions)
1058
if multiple_versions:
1059
content_map[component_id] = content
1061
final_content[key] = content
1063
# digest here is the digest from the last applied component.
1064
text = content.text()
1065
actual_sha = sha_strings(text)
1066
if actual_sha != digest:
1067
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1068
text_map[key] = text
1069
return text_map, final_content
1125
generator = _VFContentMapGenerator(self, [key])
1126
return generator._get_content(key)
1071
1128
def get_parent_map(self, keys):
1072
1129
"""Get a map of the graph parents of keys.
1149
1232
This should be revisited if _get_content_maps() can ever cross
1150
1233
file-id boundaries.
1235
The keys for a given file_id are kept in the same relative order.
1236
Ordering between file_ids is not, though prefix_order will return the
1237
order that the key was first seen.
1152
1239
:param keys: An iterable of key tuples
1153
:return: A dict of {prefix: [key_list]}
1240
:return: (split_map, prefix_order)
1241
split_map A dictionary mapping prefix => keys
1242
prefix_order The order that we saw the various prefixes
1155
1244
split_by_prefix = {}
1156
1246
for key in keys:
1157
1247
if len(key) == 1:
1158
split_by_prefix.setdefault('', []).append(key)
1160
split_by_prefix.setdefault(key[0], []).append(key)
1161
return split_by_prefix
1252
if prefix in split_by_prefix:
1253
split_by_prefix[prefix].append(key)
1255
split_by_prefix[prefix] = [key]
1256
prefix_order.append(prefix)
1257
return split_by_prefix, prefix_order
1259
def _group_keys_for_io(self, keys, non_local_keys, positions,
1260
_min_buffer_size=_STREAM_MIN_BUFFER_SIZE):
1261
"""For the given keys, group them into 'best-sized' requests.
1263
The idea is to avoid making 1 request per file, but to never try to
1264
unpack an entire 1.5GB source tree in a single pass. Also when
1265
possible, we should try to group requests to the same pack file
1268
:return: list of (keys, non_local) tuples that indicate what keys
1269
should be fetched next.
1271
# TODO: Ideally we would group on 2 factors. We want to extract texts
1272
# from the same pack file together, and we want to extract all
1273
# the texts for a given build-chain together. Ultimately it
1274
# probably needs a better global view.
1275
total_keys = len(keys)
1276
prefix_split_keys, prefix_order = self._split_by_prefix(keys)
1277
prefix_split_non_local_keys, _ = self._split_by_prefix(non_local_keys)
1279
cur_non_local = set()
1283
for prefix in prefix_order:
1284
keys = prefix_split_keys[prefix]
1285
non_local = prefix_split_non_local_keys.get(prefix, [])
1287
this_size = self._index._get_total_build_size(keys, positions)
1288
cur_size += this_size
1289
cur_keys.extend(keys)
1290
cur_non_local.update(non_local)
1291
if cur_size > _min_buffer_size:
1292
result.append((cur_keys, cur_non_local))
1293
sizes.append(cur_size)
1295
cur_non_local = set()
1298
result.append((cur_keys, cur_non_local))
1299
sizes.append(cur_size)
1300
trace.mutter('Collapsed %d keys into %d requests w/ %d file_ids'
1301
' w/ sizes: %s', total_keys, len(result),
1302
len(prefix_split_keys), sizes)
1163
1305
def get_record_stream(self, keys, ordering, include_delta_closure):
1164
1306
"""Get a stream of records for keys.
1448
1590
elif record.storage_kind == 'chunked':
1449
1591
self.add_lines(record.key, parents,
1450
1592
osutils.chunks_to_lines(record.get_bytes_as('chunked')))
1451
elif record.storage_kind == 'fulltext':
1452
self.add_lines(record.key, parents,
1453
split_lines(record.get_bytes_as('fulltext')))
1455
# Not a fulltext, and not suitable for direct insertion as a
1594
# Not suitable for direct insertion as a
1456
1595
# delta, either because it's not the right format, or this
1457
1596
# KnitVersionedFiles doesn't permit deltas (_max_delta_chain ==
1458
1597
# 0) or because it depends on a base only present in the
1459
1598
# fallback kvfs.
1460
adapter_key = record.storage_kind, 'fulltext'
1461
adapter = get_adapter(adapter_key)
1462
lines = split_lines(adapter.get_bytes(
1463
record, record.get_bytes_as(record.storage_kind)))
1600
# Try getting a fulltext directly from the record.
1601
bytes = record.get_bytes_as('fulltext')
1602
except errors.UnavailableRepresentation:
1603
adapter_key = record.storage_kind, 'fulltext'
1604
adapter = get_adapter(adapter_key)
1605
bytes = adapter.get_bytes(record)
1606
lines = split_lines(bytes)
1465
1608
self.add_lines(record.key, parents, lines)
1466
1609
except errors.RevisionAlreadyPresent:
1468
1611
# Add any records whose basis parent is now available.
1469
added_keys = [record.key]
1471
key = added_keys.pop(0)
1472
if key in buffered_index_entries:
1473
index_entries = buffered_index_entries[key]
1474
self._index.add_records(index_entries)
1476
[index_entry[0] for index_entry in index_entries])
1477
del buffered_index_entries[key]
1478
# If there were any deltas which had a missing basis parent, error.
1613
added_keys = [record.key]
1615
key = added_keys.pop(0)
1616
if key in buffered_index_entries:
1617
index_entries = buffered_index_entries[key]
1618
self._index.add_records(index_entries)
1620
[index_entry[0] for index_entry in index_entries])
1621
del buffered_index_entries[key]
1479
1622
if buffered_index_entries:
1480
from pprint import pformat
1481
raise errors.BzrCheckError(
1482
"record_stream refers to compression parents not in %r:\n%s"
1483
% (self, pformat(sorted(buffered_index_entries.keys()))))
1623
# There were index entries buffered at the end of the stream,
1624
# So these need to be added (if the index supports holding such
1625
# entries for later insertion)
1626
for key in buffered_index_entries:
1627
index_entries = buffered_index_entries[key]
1628
self._index.add_records(index_entries,
1629
missing_compression_parents=True)
1631
def get_missing_compression_parent_keys(self):
1632
"""Return an iterable of keys of missing compression parents.
1634
Check this after calling insert_record_stream to find out if there are
1635
any missing compression parents. If there are, the records that
1636
depend on them are not able to be inserted safely. For atomic
1637
KnitVersionedFiles built on packs, the transaction should be aborted or
1638
suspended - commit will fail at this point. Nonatomic knits will error
1639
earlier because they have no staging area to put pending entries into.
1641
return self._index.get_missing_compression_parents()
1485
1643
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1486
1644
"""Iterate over the lines in the versioned files from keys.
1938
class _ContentMapGenerator(object):
1939
"""Generate texts or expose raw deltas for a set of texts."""
1941
def _get_content(self, key):
1942
"""Get the content object for key."""
1943
# Note that _get_content is only called when the _ContentMapGenerator
1944
# has been constructed with just one key requested for reconstruction.
1945
if key in self.nonlocal_keys:
1946
record = self.get_record_stream().next()
1947
# Create a content object on the fly
1948
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1949
return PlainKnitContent(lines, record.key)
1951
# local keys we can ask for directly
1952
return self._get_one_work(key)
1954
def get_record_stream(self):
1955
"""Get a record stream for the keys requested during __init__."""
1956
for record in self._work():
1960
"""Produce maps of text and KnitContents as dicts.
1962
:return: (text_map, content_map) where text_map contains the texts for
1963
the requested versions and content_map contains the KnitContents.
1965
# NB: By definition we never need to read remote sources unless texts
1966
# are requested from them: we don't delta across stores - and we
1967
# explicitly do not want to to prevent data loss situations.
1968
if self.global_map is None:
1969
self.global_map = self.vf.get_parent_map(self.keys)
1970
nonlocal_keys = self.nonlocal_keys
1972
missing_keys = set(nonlocal_keys)
1973
# Read from remote versioned file instances and provide to our caller.
1974
for source in self.vf._fallback_vfs:
1975
if not missing_keys:
1977
# Loop over fallback repositories asking them for texts - ignore
1978
# any missing from a particular fallback.
1979
for record in source.get_record_stream(missing_keys,
1981
if record.storage_kind == 'absent':
1982
# Not in thie particular stream, may be in one of the
1983
# other fallback vfs objects.
1985
missing_keys.remove(record.key)
1988
self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
1991
for key in self.keys:
1992
if key in self.nonlocal_keys:
1994
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
1997
def _get_one_work(self, requested_key):
1998
# Now, if we have calculated everything already, just return the
2000
if requested_key in self._contents_map:
2001
return self._contents_map[requested_key]
2002
# To simplify things, parse everything at once - code that wants one text
2003
# probably wants them all.
2004
# FUTURE: This function could be improved for the 'extract many' case
2005
# by tracking each component and only doing the copy when the number of
2006
# children than need to apply delta's to it is > 1 or it is part of the
2008
multiple_versions = len(self.keys) != 1
2009
if self._record_map is None:
2010
self._record_map = self.vf._raw_map_to_record_map(
2011
self._raw_record_map)
2012
record_map = self._record_map
2013
# raw_record_map is key:
2014
# Have read and parsed records at this point.
2015
for key in self.keys:
2016
if key in self.nonlocal_keys:
2021
while cursor is not None:
2023
record, record_details, digest, next = record_map[cursor]
2025
raise RevisionNotPresent(cursor, self)
2026
components.append((cursor, record, record_details, digest))
2028
if cursor in self._contents_map:
2029
# no need to plan further back
2030
components.append((cursor, None, None, None))
2034
for (component_id, record, record_details,
2035
digest) in reversed(components):
2036
if component_id in self._contents_map:
2037
content = self._contents_map[component_id]
2039
content, delta = self._factory.parse_record(key[-1],
2040
record, record_details, content,
2041
copy_base_content=multiple_versions)
2042
if multiple_versions:
2043
self._contents_map[component_id] = content
2045
# digest here is the digest from the last applied component.
2046
text = content.text()
2047
actual_sha = sha_strings(text)
2048
if actual_sha != digest:
2049
raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
2050
if multiple_versions:
2051
return self._contents_map[requested_key]
2055
def _wire_bytes(self):
2056
"""Get the bytes to put on the wire for 'key'.
2058
The first collection of bytes asked for returns the serialised
2059
raw_record_map and the additional details (key, parent) for key.
2060
Subsequent calls return just the additional details (key, parent).
2061
The wire storage_kind given for the first key is 'knit-delta-closure',
2062
For subsequent keys it is 'knit-delta-closure-ref'.
2064
:param key: A key from the content generator.
2065
:return: Bytes to put on the wire.
2068
# kind marker for dispatch on the far side,
2069
lines.append('knit-delta-closure')
2071
if self.vf._factory.annotated:
2072
lines.append('annotated')
2075
# then the list of keys
2076
lines.append('\t'.join(['\x00'.join(key) for key in self.keys
2077
if key not in self.nonlocal_keys]))
2078
# then the _raw_record_map in serialised form:
2080
# for each item in the map:
2082
# 1 line with parents if the key is to be yielded (None: for None, '' for ())
2083
# one line with method
2084
# one line with noeol
2085
# one line with next ('' for None)
2086
# one line with byte count of the record bytes
2088
for key, (record_bytes, (method, noeol), next) in \
2089
self._raw_record_map.iteritems():
2090
key_bytes = '\x00'.join(key)
2091
parents = self.global_map.get(key, None)
2093
parent_bytes = 'None:'
2095
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
2096
method_bytes = method
2102
next_bytes = '\x00'.join(next)
2105
map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
2106
key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
2107
len(record_bytes), record_bytes))
2108
map_bytes = ''.join(map_byte_list)
2109
lines.append(map_bytes)
2110
bytes = '\n'.join(lines)
2114
class _VFContentMapGenerator(_ContentMapGenerator):
2115
"""Content map generator reading from a VersionedFiles object."""
2117
def __init__(self, versioned_files, keys, nonlocal_keys=None,
2118
global_map=None, raw_record_map=None):
2119
"""Create a _ContentMapGenerator.
2121
:param versioned_files: The versioned files that the texts are being
2123
:param keys: The keys to produce content maps for.
2124
:param nonlocal_keys: An iterable of keys(possibly intersecting keys)
2125
which are known to not be in this knit, but rather in one of the
2127
:param global_map: The result of get_parent_map(keys) (or a supermap).
2128
This is required if get_record_stream() is to be used.
2129
:param raw_record_map: A unparsed raw record map to use for answering
2132
# The vf to source data from
2133
self.vf = versioned_files
2135
self.keys = list(keys)
2136
# Keys known to be in fallback vfs objects
2137
if nonlocal_keys is None:
2138
self.nonlocal_keys = set()
2140
self.nonlocal_keys = frozenset(nonlocal_keys)
2141
# Parents data for keys to be returned in get_record_stream
2142
self.global_map = global_map
2143
# The chunked lists for self.keys in text form
2145
# A cache of KnitContent objects used in extracting texts.
2146
self._contents_map = {}
2147
# All the knit records needed to assemble the requested keys as full
2149
self._record_map = None
2150
if raw_record_map is None:
2151
self._raw_record_map = self.vf._get_record_map_unparsed(keys,
2154
self._raw_record_map = raw_record_map
2155
# the factory for parsing records
2156
self._factory = self.vf._factory
2159
class _NetworkContentMapGenerator(_ContentMapGenerator):
2160
"""Content map generator sourced from a network stream."""
2162
def __init__(self, bytes, line_end):
2163
"""Construct a _NetworkContentMapGenerator from a bytes block."""
2165
self.global_map = {}
2166
self._raw_record_map = {}
2167
self._contents_map = {}
2168
self._record_map = None
2169
self.nonlocal_keys = []
2170
# Get access to record parsing facilities
2171
self.vf = KnitVersionedFiles(None, None)
2174
line_end = bytes.find('\n', start)
2175
line = bytes[start:line_end]
2176
start = line_end + 1
2177
if line == 'annotated':
2178
self._factory = KnitAnnotateFactory()
2180
self._factory = KnitPlainFactory()
2181
# list of keys to emit in get_record_stream
2182
line_end = bytes.find('\n', start)
2183
line = bytes[start:line_end]
2184
start = line_end + 1
2186
tuple(segment.split('\x00')) for segment in line.split('\t')
2188
# now a loop until the end. XXX: It would be nice if this was just a
2189
# bunch of the same records as get_record_stream(..., False) gives, but
2190
# there is a decent sized gap stopping that at the moment.
2194
line_end = bytes.find('\n', start)
2195
key = tuple(bytes[start:line_end].split('\x00'))
2196
start = line_end + 1
2197
# 1 line with parents (None: for None, '' for ())
2198
line_end = bytes.find('\n', start)
2199
line = bytes[start:line_end]
2204
[tuple(segment.split('\x00')) for segment in line.split('\t')
2206
self.global_map[key] = parents
2207
start = line_end + 1
2208
# one line with method
2209
line_end = bytes.find('\n', start)
2210
line = bytes[start:line_end]
2212
start = line_end + 1
2213
# one line with noeol
2214
line_end = bytes.find('\n', start)
2215
line = bytes[start:line_end]
2217
start = line_end + 1
2218
# one line with next ('' for None)
2219
line_end = bytes.find('\n', start)
2220
line = bytes[start:line_end]
2224
next = tuple(bytes[start:line_end].split('\x00'))
2225
start = line_end + 1
2226
# one line with byte count of the record bytes
2227
line_end = bytes.find('\n', start)
2228
line = bytes[start:line_end]
2230
start = line_end + 1
2232
record_bytes = bytes[start:start+count]
2233
start = start + count
2235
self._raw_record_map[key] = (record_bytes, (method, noeol), next)
2237
def get_record_stream(self):
2238
"""Get a record stream for for keys requested by the bytestream."""
2240
for key in self.keys:
2241
yield LazyKnitContentFactory(key, self.global_map[key], self, first)
2244
def _wire_bytes(self):
1769
2248
class _KndxIndex(object):
1770
2249
"""Manages knit index files