1709
1718
entries = files.get_record_stream(keys, 'topological', False)
1710
1719
self.assertAbsentRecord(files, keys, parent_map, entries)
1721
def assertRecordHasContent(self, record, bytes):
1722
"""Assert that record has the bytes bytes."""
1723
self.assertEqual(bytes, record.get_bytes_as('fulltext'))
1724
self.assertEqual(bytes, ''.join(record.get_bytes_as('chunked')))
1726
def test_get_record_stream_native_formats_are_wire_ready_one_ft(self):
1727
files = self.get_versionedfiles()
1728
key = self.get_simple_key('foo')
1729
files.add_lines(key, (), ['my text\n', 'content'])
1730
stream = files.get_record_stream([key], 'unordered', False)
1731
record = stream.next()
1732
if record.storage_kind in ('chunked', 'fulltext'):
1733
# chunked and fulltext representations are for direct use not wire
1734
# serialisation: check they are able to be used directly. To send
1735
# such records over the wire translation will be needed.
1736
self.assertRecordHasContent(record, "my text\ncontent")
1738
bytes = [record.get_bytes_as(record.storage_kind)]
1739
network_stream = versionedfile.NetworkRecordStream(bytes).read()
1740
source_record = record
1742
for record in network_stream:
1743
records.append(record)
1744
self.assertEqual(source_record.storage_kind,
1745
record.storage_kind)
1746
self.assertEqual(source_record.parents, record.parents)
1748
source_record.get_bytes_as(source_record.storage_kind),
1749
record.get_bytes_as(record.storage_kind))
1750
self.assertEqual(1, len(records))
1752
def assertStreamMetaEqual(self, records, expected, stream):
1753
"""Assert that streams expected and stream have the same records.
1755
:param records: A list to collect the seen records.
1756
:return: A generator of the records in stream.
1758
# We make assertions during copying to catch things early for
1760
for record, ref_record in izip(stream, expected):
1761
records.append(record)
1762
self.assertEqual(ref_record.key, record.key)
1763
self.assertEqual(ref_record.storage_kind, record.storage_kind)
1764
self.assertEqual(ref_record.parents, record.parents)
1767
def stream_to_bytes_or_skip_counter(self, skipped_records, full_texts,
1769
"""Convert a stream to a bytes iterator.
1771
:param skipped_records: A list with one element to increment when a
1773
:param full_texts: A dict from key->fulltext representation, for
1774
checking chunked or fulltext stored records.
1775
:param stream: A record_stream.
1776
:return: An iterator over the bytes of each record.
1778
for record in stream:
1779
if record.storage_kind in ('chunked', 'fulltext'):
1780
skipped_records[0] += 1
1781
# check the content is correct for direct use.
1782
self.assertRecordHasContent(record, full_texts[record.key])
1784
yield record.get_bytes_as(record.storage_kind)
1786
def test_get_record_stream_native_formats_are_wire_ready_ft_delta(self):
1787
files = self.get_versionedfiles()
1788
target_files = self.get_versionedfiles('target')
1789
key = self.get_simple_key('ft')
1790
key_delta = self.get_simple_key('delta')
1791
files.add_lines(key, (), ['my text\n', 'content'])
1793
delta_parents = (key,)
1796
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1797
local = files.get_record_stream([key, key_delta], 'unordered', False)
1798
ref = files.get_record_stream([key, key_delta], 'unordered', False)
1799
skipped_records = [0]
1801
key: "my text\ncontent",
1802
key_delta: "different\ncontent\n",
1804
byte_stream = self.stream_to_bytes_or_skip_counter(
1805
skipped_records, full_texts, local)
1806
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1808
# insert the stream from the network into a versioned files object so we can
1809
# check the content was carried across correctly without doing delta
1811
target_files.insert_record_stream(
1812
self.assertStreamMetaEqual(records, ref, network_stream))
1813
# No duplicates on the wire thank you!
1814
self.assertEqual(2, len(records) + skipped_records[0])
1816
# if any content was copied it all must have all been.
1817
self.assertIdenticalVersionedFile(files, target_files)
1819
def test_get_record_stream_native_formats_are_wire_ready_delta(self):
1820
# copy a delta over the wire
1821
files = self.get_versionedfiles()
1822
target_files = self.get_versionedfiles('target')
1823
key = self.get_simple_key('ft')
1824
key_delta = self.get_simple_key('delta')
1825
files.add_lines(key, (), ['my text\n', 'content'])
1827
delta_parents = (key,)
1830
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1831
# Copy the basis text across so we can reconstruct the delta during
1832
# insertion into target.
1833
target_files.insert_record_stream(files.get_record_stream([key],
1834
'unordered', False))
1835
local = files.get_record_stream([key_delta], 'unordered', False)
1836
ref = files.get_record_stream([key_delta], 'unordered', False)
1837
skipped_records = [0]
1839
key_delta: "different\ncontent\n",
1841
byte_stream = self.stream_to_bytes_or_skip_counter(
1842
skipped_records, full_texts, local)
1843
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1845
# insert the stream from the network into a versioned files object so we can
1846
# check the content was carried across correctly without doing delta
1847
# inspection during check_stream.
1848
target_files.insert_record_stream(
1849
self.assertStreamMetaEqual(records, ref, network_stream))
1850
# No duplicates on the wire thank you!
1851
self.assertEqual(1, len(records) + skipped_records[0])
1853
# if any content was copied it all must have all been
1854
self.assertIdenticalVersionedFile(files, target_files)
1856
def test_get_record_stream_wire_ready_delta_closure_included(self):
1857
# copy a delta over the wire with the ability to get its full text.
1858
files = self.get_versionedfiles()
1859
key = self.get_simple_key('ft')
1860
key_delta = self.get_simple_key('delta')
1861
files.add_lines(key, (), ['my text\n', 'content'])
1863
delta_parents = (key,)
1866
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1867
local = files.get_record_stream([key_delta], 'unordered', True)
1868
ref = files.get_record_stream([key_delta], 'unordered', True)
1869
skipped_records = [0]
1871
key_delta: "different\ncontent\n",
1873
byte_stream = self.stream_to_bytes_or_skip_counter(
1874
skipped_records, full_texts, local)
1875
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1877
# insert the stream from the network into a versioned files object so we can
1878
# check the content was carried across correctly without doing delta
1879
# inspection during check_stream.
1880
for record in self.assertStreamMetaEqual(records, ref, network_stream):
1881
# we have to be able to get the full text out:
1882
self.assertRecordHasContent(record, full_texts[record.key])
1883
# No duplicates on the wire thank you!
1884
self.assertEqual(1, len(records) + skipped_records[0])
1712
1886
def assertAbsentRecord(self, files, keys, parents, entries):
1713
1887
"""Helper for test_get_record_stream_missing_records_are_absent."""
1970
2144
self.assertIdenticalVersionedFile(source, files)
2146
def get_knit_delta_source(self):
2147
"""Get a source that can produce a stream with knit delta records,
2148
regardless of this test's scenario.
2150
mapper = self.get_mapper()
2151
source_transport = self.get_transport('source')
2152
source_transport.mkdir('.')
2153
source = make_file_factory(False, mapper)(source_transport)
2154
get_diamond_files(source, self.key_length, trailing_eol=True,
2155
nograph=False, left_only=False)
1972
2158
def test_insert_record_stream_delta_missing_basis_no_corruption(self):
1973
"""Insertion where a needed basis is not included aborts safely."""
1974
# We use a knit always here to be sure we are getting a binary delta.
1975
mapper = self.get_mapper()
1976
source_transport = self.get_transport('source')
1977
source_transport.mkdir('.')
1978
source = make_file_factory(False, mapper)(source_transport)
1979
self.get_diamond_files(source)
1980
entries = source.get_record_stream(['origin', 'merged'], 'unordered', False)
1981
files = self.get_versionedfiles()
1982
self.assertRaises(RevisionNotPresent, files.insert_record_stream,
2159
"""Insertion where a needed basis is not included notifies the caller
2160
of the missing basis. In the meantime a record missing its basis is
2163
source = self.get_knit_delta_source()
2164
keys = [self.get_simple_key('origin'), self.get_simple_key('merged')]
2165
entries = source.get_record_stream(keys, 'unordered', False)
2166
files = self.get_versionedfiles()
2167
if self.support_partial_insertion:
2168
self.assertEqual([],
2169
list(files.get_missing_compression_parent_keys()))
2170
files.insert_record_stream(entries)
2171
missing_bases = files.get_missing_compression_parent_keys()
2172
self.assertEqual(set([self.get_simple_key('left')]),
2174
self.assertEqual(set(keys), set(files.get_parent_map(keys)))
2177
errors.RevisionNotPresent, files.insert_record_stream, entries)
2180
def test_insert_record_stream_delta_missing_basis_can_be_added_later(self):
2181
"""Insertion where a needed basis is not included notifies the caller
2182
of the missing basis. That basis can be added in a second
2183
insert_record_stream call that does not need to repeat records present
2184
in the previous stream. The record(s) that required that basis are
2185
fully inserted once their basis is no longer missing.
2187
if not self.support_partial_insertion:
2188
raise TestNotApplicable(
2189
'versioned file scenario does not support partial insertion')
2190
source = self.get_knit_delta_source()
2191
entries = source.get_record_stream([self.get_simple_key('origin'),
2192
self.get_simple_key('merged')], 'unordered', False)
2193
files = self.get_versionedfiles()
2194
files.insert_record_stream(entries)
2195
missing_bases = files.get_missing_compression_parent_keys()
2196
self.assertEqual(set([self.get_simple_key('left')]),
2198
# 'merged' is inserted (although a commit of a write group involving
2199
# this versionedfiles would fail).
2200
merged_key = self.get_simple_key('merged')
2202
[merged_key], files.get_parent_map([merged_key]).keys())
2203
# Add the full delta closure of the missing records
2204
missing_entries = source.get_record_stream(
2205
missing_bases, 'unordered', True)
2206
files.insert_record_stream(missing_entries)
2207
# Now 'merged' is fully inserted (and a commit would succeed).
2208
self.assertEqual([], list(files.get_missing_compression_parent_keys()))
2210
[merged_key], files.get_parent_map([merged_key]).keys())
1985
self.assertEqual({}, files.get_parent_map([]))
1987
2213
def test_iter_lines_added_or_present_in_keys(self):
1988
2214
# test that we get at least an equalset of the lines added by