1773
1782
entries = files.get_record_stream(keys, 'topological', False)
1774
1783
self.assertAbsentRecord(files, keys, parent_map, entries)
1785
def assertRecordHasContent(self, record, bytes):
1786
"""Assert that record has the bytes bytes."""
1787
self.assertEqual(bytes, record.get_bytes_as('fulltext'))
1788
self.assertEqual(bytes, ''.join(record.get_bytes_as('chunked')))
1790
def test_get_record_stream_native_formats_are_wire_ready_one_ft(self):
1791
files = self.get_versionedfiles()
1792
key = self.get_simple_key('foo')
1793
files.add_lines(key, (), ['my text\n', 'content'])
1794
stream = files.get_record_stream([key], 'unordered', False)
1795
record = stream.next()
1796
if record.storage_kind in ('chunked', 'fulltext'):
1797
# chunked and fulltext representations are for direct use not wire
1798
# serialisation: check they are able to be used directly. To send
1799
# such records over the wire translation will be needed.
1800
self.assertRecordHasContent(record, "my text\ncontent")
1802
bytes = [record.get_bytes_as(record.storage_kind)]
1803
network_stream = versionedfile.NetworkRecordStream(bytes).read()
1804
source_record = record
1806
for record in network_stream:
1807
records.append(record)
1808
self.assertEqual(source_record.storage_kind,
1809
record.storage_kind)
1810
self.assertEqual(source_record.parents, record.parents)
1812
source_record.get_bytes_as(source_record.storage_kind),
1813
record.get_bytes_as(record.storage_kind))
1814
self.assertEqual(1, len(records))
1816
def assertStreamMetaEqual(self, records, expected, stream):
1817
"""Assert that streams expected and stream have the same records.
1819
:param records: A list to collect the seen records.
1820
:return: A generator of the records in stream.
1822
# We make assertions during copying to catch things early for
1824
for record, ref_record in izip(stream, expected):
1825
records.append(record)
1826
self.assertEqual(ref_record.key, record.key)
1827
self.assertEqual(ref_record.storage_kind, record.storage_kind)
1828
self.assertEqual(ref_record.parents, record.parents)
1831
def stream_to_bytes_or_skip_counter(self, skipped_records, full_texts,
1833
"""Convert a stream to a bytes iterator.
1835
:param skipped_records: A list with one element to increment when a
1837
:param full_texts: A dict from key->fulltext representation, for
1838
checking chunked or fulltext stored records.
1839
:param stream: A record_stream.
1840
:return: An iterator over the bytes of each record.
1842
for record in stream:
1843
if record.storage_kind in ('chunked', 'fulltext'):
1844
skipped_records[0] += 1
1845
# check the content is correct for direct use.
1846
self.assertRecordHasContent(record, full_texts[record.key])
1848
yield record.get_bytes_as(record.storage_kind)
1850
def test_get_record_stream_native_formats_are_wire_ready_ft_delta(self):
1851
files = self.get_versionedfiles()
1852
target_files = self.get_versionedfiles('target')
1853
key = self.get_simple_key('ft')
1854
key_delta = self.get_simple_key('delta')
1855
files.add_lines(key, (), ['my text\n', 'content'])
1857
delta_parents = (key,)
1860
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1861
local = files.get_record_stream([key, key_delta], 'unordered', False)
1862
ref = files.get_record_stream([key, key_delta], 'unordered', False)
1863
skipped_records = [0]
1865
key: "my text\ncontent",
1866
key_delta: "different\ncontent\n",
1868
byte_stream = self.stream_to_bytes_or_skip_counter(
1869
skipped_records, full_texts, local)
1870
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1872
# insert the stream from the network into a versioned files object so we can
1873
# check the content was carried across correctly without doing delta
1875
target_files.insert_record_stream(
1876
self.assertStreamMetaEqual(records, ref, network_stream))
1877
# No duplicates on the wire thank you!
1878
self.assertEqual(2, len(records) + skipped_records[0])
1880
# if any content was copied it all must have all been.
1881
self.assertIdenticalVersionedFile(files, target_files)
1883
def test_get_record_stream_native_formats_are_wire_ready_delta(self):
1884
# copy a delta over the wire
1885
files = self.get_versionedfiles()
1886
target_files = self.get_versionedfiles('target')
1887
key = self.get_simple_key('ft')
1888
key_delta = self.get_simple_key('delta')
1889
files.add_lines(key, (), ['my text\n', 'content'])
1891
delta_parents = (key,)
1894
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1895
# Copy the basis text across so we can reconstruct the delta during
1896
# insertion into target.
1897
target_files.insert_record_stream(files.get_record_stream([key],
1898
'unordered', False))
1899
local = files.get_record_stream([key_delta], 'unordered', False)
1900
ref = files.get_record_stream([key_delta], 'unordered', False)
1901
skipped_records = [0]
1903
key_delta: "different\ncontent\n",
1905
byte_stream = self.stream_to_bytes_or_skip_counter(
1906
skipped_records, full_texts, local)
1907
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1909
# insert the stream from the network into a versioned files object so we can
1910
# check the content was carried across correctly without doing delta
1911
# inspection during check_stream.
1912
target_files.insert_record_stream(
1913
self.assertStreamMetaEqual(records, ref, network_stream))
1914
# No duplicates on the wire thank you!
1915
self.assertEqual(1, len(records) + skipped_records[0])
1917
# if any content was copied it all must have all been
1918
self.assertIdenticalVersionedFile(files, target_files)
1920
def test_get_record_stream_wire_ready_delta_closure_included(self):
1921
# copy a delta over the wire with the ability to get its full text.
1922
files = self.get_versionedfiles()
1923
key = self.get_simple_key('ft')
1924
key_delta = self.get_simple_key('delta')
1925
files.add_lines(key, (), ['my text\n', 'content'])
1927
delta_parents = (key,)
1930
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1931
local = files.get_record_stream([key_delta], 'unordered', True)
1932
ref = files.get_record_stream([key_delta], 'unordered', True)
1933
skipped_records = [0]
1935
key_delta: "different\ncontent\n",
1937
byte_stream = self.stream_to_bytes_or_skip_counter(
1938
skipped_records, full_texts, local)
1939
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1941
# insert the stream from the network into a versioned files object so we can
1942
# check the content was carried across correctly without doing delta
1943
# inspection during check_stream.
1944
for record in self.assertStreamMetaEqual(records, ref, network_stream):
1945
# we have to be able to get the full text out:
1946
self.assertRecordHasContent(record, full_texts[record.key])
1947
# No duplicates on the wire thank you!
1948
self.assertEqual(1, len(records) + skipped_records[0])
1776
1950
def assertAbsentRecord(self, files, keys, parents, entries):
1777
1951
"""Helper for test_get_record_stream_missing_records_are_absent."""
2034
2208
self.assertIdenticalVersionedFile(source, files)
2210
def get_knit_delta_source(self):
2211
"""Get a source that can produce a stream with knit delta records,
2212
regardless of this test's scenario.
2214
mapper = self.get_mapper()
2215
source_transport = self.get_transport('source')
2216
source_transport.mkdir('.')
2217
source = make_file_factory(False, mapper)(source_transport)
2218
get_diamond_files(source, self.key_length, trailing_eol=True,
2219
nograph=False, left_only=False)
2036
2222
def test_insert_record_stream_delta_missing_basis_no_corruption(self):
2037
"""Insertion where a needed basis is not included aborts safely."""
2038
# We use a knit always here to be sure we are getting a binary delta.
2039
mapper = self.get_mapper()
2040
source_transport = self.get_transport('source')
2041
source_transport.mkdir('.')
2042
source = make_file_factory(False, mapper)(source_transport)
2043
self.get_diamond_files(source)
2044
entries = source.get_record_stream(['origin', 'merged'], 'unordered', False)
2045
files = self.get_versionedfiles()
2046
self.assertRaises(RevisionNotPresent, files.insert_record_stream,
2223
"""Insertion where a needed basis is not included notifies the caller
2224
of the missing basis. In the meantime a record missing its basis is
2227
source = self.get_knit_delta_source()
2228
keys = [self.get_simple_key('origin'), self.get_simple_key('merged')]
2229
entries = source.get_record_stream(keys, 'unordered', False)
2230
files = self.get_versionedfiles()
2231
if self.support_partial_insertion:
2232
self.assertEqual([],
2233
list(files.get_missing_compression_parent_keys()))
2234
files.insert_record_stream(entries)
2235
missing_bases = files.get_missing_compression_parent_keys()
2236
self.assertEqual(set([self.get_simple_key('left')]),
2238
self.assertEqual(set(keys), set(files.get_parent_map(keys)))
2241
errors.RevisionNotPresent, files.insert_record_stream, entries)
2244
def test_insert_record_stream_delta_missing_basis_can_be_added_later(self):
2245
"""Insertion where a needed basis is not included notifies the caller
2246
of the missing basis. That basis can be added in a second
2247
insert_record_stream call that does not need to repeat records present
2248
in the previous stream. The record(s) that required that basis are
2249
fully inserted once their basis is no longer missing.
2251
if not self.support_partial_insertion:
2252
raise TestNotApplicable(
2253
'versioned file scenario does not support partial insertion')
2254
source = self.get_knit_delta_source()
2255
entries = source.get_record_stream([self.get_simple_key('origin'),
2256
self.get_simple_key('merged')], 'unordered', False)
2257
files = self.get_versionedfiles()
2258
files.insert_record_stream(entries)
2259
missing_bases = files.get_missing_compression_parent_keys()
2260
self.assertEqual(set([self.get_simple_key('left')]),
2262
# 'merged' is inserted (although a commit of a write group involving
2263
# this versionedfiles would fail).
2264
merged_key = self.get_simple_key('merged')
2266
[merged_key], files.get_parent_map([merged_key]).keys())
2267
# Add the full delta closure of the missing records
2268
missing_entries = source.get_record_stream(
2269
missing_bases, 'unordered', True)
2270
files.insert_record_stream(missing_entries)
2271
# Now 'merged' is fully inserted (and a commit would succeed).
2272
self.assertEqual([], list(files.get_missing_compression_parent_keys()))
2274
[merged_key], files.get_parent_map([merged_key]).keys())
2049
self.assertEqual({}, files.get_parent_map([]))
2051
2277
def test_iter_lines_added_or_present_in_keys(self):
2052
2278
# test that we get at least an equalset of the lines added by