1718
1718
entries = files.get_record_stream(keys, 'topological', False)
1719
1719
self.assertAbsentRecord(files, keys, parent_map, entries)
1721
def assertRecordHasContent(self, record, bytes):
1722
"""Assert that record has the bytes bytes."""
1723
self.assertEqual(bytes, record.get_bytes_as('fulltext'))
1724
self.assertEqual(bytes, ''.join(record.get_bytes_as('chunked')))
1726
def test_get_record_stream_native_formats_are_wire_ready_one_ft(self):
1727
files = self.get_versionedfiles()
1728
key = self.get_simple_key('foo')
1729
files.add_lines(key, (), ['my text\n', 'content'])
1730
stream = files.get_record_stream([key], 'unordered', False)
1731
record = stream.next()
1732
if record.storage_kind in ('chunked', 'fulltext'):
1733
# chunked and fulltext representations are for direct use not wire
1734
# serialisation: check they are able to be used directly. To send
1735
# such records over the wire translation will be needed.
1736
self.assertRecordHasContent(record, "my text\ncontent")
1738
bytes = [record.get_bytes_as(record.storage_kind)]
1739
network_stream = versionedfile.NetworkRecordStream(bytes).read()
1740
source_record = record
1742
for record in network_stream:
1743
records.append(record)
1744
self.assertEqual(source_record.storage_kind,
1745
record.storage_kind)
1746
self.assertEqual(source_record.parents, record.parents)
1748
source_record.get_bytes_as(source_record.storage_kind),
1749
record.get_bytes_as(record.storage_kind))
1750
self.assertEqual(1, len(records))
1752
def assertStreamMetaEqual(self, records, expected, stream):
1753
"""Assert that streams expected and stream have the same records.
1755
:param records: A list to collect the seen records.
1756
:return: A generator of the records in stream.
1758
# We make assertions during copying to catch things early for
1760
for record, ref_record in izip(stream, expected):
1761
records.append(record)
1762
self.assertEqual(ref_record.key, record.key)
1763
self.assertEqual(ref_record.storage_kind, record.storage_kind)
1764
self.assertEqual(ref_record.parents, record.parents)
1767
def stream_to_bytes_or_skip_counter(self, skipped_records, full_texts,
1769
"""Convert a stream to a bytes iterator.
1771
:param skipped_records: A list with one element to increment when a
1773
:param full_texts: A dict from key->fulltext representation, for
1774
checking chunked or fulltext stored records.
1775
:param stream: A record_stream.
1776
:return: An iterator over the bytes of each record.
1778
for record in stream:
1779
if record.storage_kind in ('chunked', 'fulltext'):
1780
skipped_records[0] += 1
1781
# check the content is correct for direct use.
1782
self.assertRecordHasContent(record, full_texts[record.key])
1784
yield record.get_bytes_as(record.storage_kind)
1786
def test_get_record_stream_native_formats_are_wire_ready_ft_delta(self):
1787
files = self.get_versionedfiles()
1788
target_files = self.get_versionedfiles('target')
1789
key = self.get_simple_key('ft')
1790
key_delta = self.get_simple_key('delta')
1791
files.add_lines(key, (), ['my text\n', 'content'])
1793
delta_parents = (key,)
1796
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1797
local = files.get_record_stream([key, key_delta], 'unordered', False)
1798
ref = files.get_record_stream([key, key_delta], 'unordered', False)
1799
skipped_records = [0]
1801
key: "my text\ncontent",
1802
key_delta: "different\ncontent\n",
1804
byte_stream = self.stream_to_bytes_or_skip_counter(
1805
skipped_records, full_texts, local)
1806
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1808
# insert the stream from the network into a versioned files object so we can
1809
# check the content was carried across correctly without doing delta
1811
target_files.insert_record_stream(
1812
self.assertStreamMetaEqual(records, ref, network_stream))
1813
# No duplicates on the wire thank you!
1814
self.assertEqual(2, len(records) + skipped_records[0])
1816
# if any content was copied it all must have all been.
1817
self.assertIdenticalVersionedFile(files, target_files)
1819
def test_get_record_stream_native_formats_are_wire_ready_delta(self):
1820
# copy a delta over the wire
1821
files = self.get_versionedfiles()
1822
target_files = self.get_versionedfiles('target')
1823
key = self.get_simple_key('ft')
1824
key_delta = self.get_simple_key('delta')
1825
files.add_lines(key, (), ['my text\n', 'content'])
1827
delta_parents = (key,)
1830
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1831
# Copy the basis text across so we can reconstruct the delta during
1832
# insertion into target.
1833
target_files.insert_record_stream(files.get_record_stream([key],
1834
'unordered', False))
1835
local = files.get_record_stream([key_delta], 'unordered', False)
1836
ref = files.get_record_stream([key_delta], 'unordered', False)
1837
skipped_records = [0]
1839
key_delta: "different\ncontent\n",
1841
byte_stream = self.stream_to_bytes_or_skip_counter(
1842
skipped_records, full_texts, local)
1843
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1845
# insert the stream from the network into a versioned files object so we can
1846
# check the content was carried across correctly without doing delta
1847
# inspection during check_stream.
1848
target_files.insert_record_stream(
1849
self.assertStreamMetaEqual(records, ref, network_stream))
1850
# No duplicates on the wire thank you!
1851
self.assertEqual(1, len(records) + skipped_records[0])
1853
# if any content was copied it all must have all been
1854
self.assertIdenticalVersionedFile(files, target_files)
1856
def test_get_record_stream_wire_ready_delta_closure_included(self):
1857
# copy a delta over the wire with the ability to get its full text.
1858
files = self.get_versionedfiles()
1859
key = self.get_simple_key('ft')
1860
key_delta = self.get_simple_key('delta')
1861
files.add_lines(key, (), ['my text\n', 'content'])
1863
delta_parents = (key,)
1866
files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
1867
local = files.get_record_stream([key_delta], 'unordered', True)
1868
ref = files.get_record_stream([key_delta], 'unordered', True)
1869
skipped_records = [0]
1871
key_delta: "different\ncontent\n",
1873
byte_stream = self.stream_to_bytes_or_skip_counter(
1874
skipped_records, full_texts, local)
1875
network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
1877
# insert the stream from the network into a versioned files object so we can
1878
# check the content was carried across correctly without doing delta
1879
# inspection during check_stream.
1880
for record in self.assertStreamMetaEqual(records, ref, network_stream):
1881
# we have to be able to get the full text out:
1882
self.assertRecordHasContent(record, full_texts[record.key])
1883
# No duplicates on the wire thank you!
1884
self.assertEqual(1, len(records) + skipped_records[0])
1721
1886
def assertAbsentRecord(self, files, keys, parents, entries):
1722
1887
"""Helper for test_get_record_stream_missing_records_are_absent."""