/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

Merge bzr.dev 4032. Resolve the new streaming fetch.

XXX: We cheat a bit for CHK fetching. CHK serializers happen to still
have legacy 'read_inventory_from_string' and 'write_inventory_to_string'
functions that convert the paged representation to a single-string XML
representation.

So when converting between formats, we just go down to the
whole-inventory XML form.

At least it works for now. Even if it is grossly innefficient.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
# TODO: might be nice to create a versionedfile with some type of corruption
22
22
# considered typical and check that it can be detected/corrected.
23
23
 
24
 
from itertools import chain
 
24
from itertools import chain, izip
25
25
from StringIO import StringIO
26
26
 
27
27
import bzrlib
47
47
from bzrlib.tests import (
48
48
    TestCase,
49
49
    TestCaseWithMemoryTransport,
 
50
    TestNotApplicable,
50
51
    TestScenarioApplier,
51
52
    TestSkipped,
52
53
    condition_isinstance,
94
95
                ConstantMapper('inventory')),
95
96
            'graph':True,
96
97
            'key_length':1,
 
98
            'support_partial_insertion': False,
97
99
            }),
98
100
        ('named-knit', {
99
101
            'cleanup':None,
100
102
            'factory':make_file_factory(False, ConstantMapper('revisions')),
101
103
            'graph':True,
102
104
            'key_length':1,
 
105
            'support_partial_insertion': False,
103
106
            }),
104
 
        ('named-nograph-knit-pack', {
 
107
        ('named-nograph-nodelta-knit-pack', {
105
108
            'cleanup':cleanup_pack_knit,
106
109
            'factory':make_pack_factory(False, False, 1),
107
110
            'graph':False,
108
111
            'key_length':1,
 
112
            'support_partial_insertion': False,
109
113
            }),
110
114
        ('named-graph-knit-pack', {
111
115
            'cleanup':cleanup_pack_knit,
112
116
            'factory':make_pack_factory(True, True, 1),
113
117
            'graph':True,
114
118
            'key_length':1,
 
119
            'support_partial_insertion': True,
115
120
            }),
116
121
        ('named-graph-nodelta-knit-pack', {
117
122
            'cleanup':cleanup_pack_knit,
118
123
            'factory':make_pack_factory(True, False, 1),
119
124
            'graph':True,
120
125
            'key_length':1,
 
126
            'support_partial_insertion': False,
121
127
            }),
122
128
        ]
123
129
    len_two_adapter.scenarios = [
127
133
                PrefixMapper()),
128
134
            'graph':True,
129
135
            'key_length':2,
 
136
            'support_partial_insertion': False,
130
137
            }),
131
138
        ('annotated-knit-escape', {
132
139
            'cleanup':None,
133
140
            'factory':make_file_factory(True, HashEscapedPrefixMapper()),
134
141
            'graph':True,
135
142
            'key_length':2,
 
143
            'support_partial_insertion': False,
136
144
            }),
137
145
        ('plain-knit-pack', {
138
146
            'cleanup':cleanup_pack_knit,
139
147
            'factory':make_pack_factory(True, True, 2),
140
148
            'graph':True,
141
149
            'key_length':2,
 
150
            'support_partial_insertion': True,
142
151
            }),
143
152
        ]
144
153
    for test in iter_suite_tests(to_adapt):
1288
1297
        # origin is a fulltext
1289
1298
        entries = f.get_record_stream([('origin',)], 'unordered', False)
1290
1299
        base = entries.next()
1291
 
        ft_data = ft_adapter.get_bytes(base, base.get_bytes_as(base.storage_kind))
 
1300
        ft_data = ft_adapter.get_bytes(base)
1292
1301
        # merged is both a delta and multiple parents.
1293
1302
        entries = f.get_record_stream([('merged',)], 'unordered', False)
1294
1303
        merged = entries.next()
1295
 
        delta_data = delta_adapter.get_bytes(merged,
1296
 
            merged.get_bytes_as(merged.storage_kind))
 
1304
        delta_data = delta_adapter.get_bytes(merged)
1297
1305
        return ft_data, delta_data
1298
1306
 
1299
1307
    def test_deannotation_noeol(self):
1624
1632
            ['mpdiff', 'knit-annotated-ft', 'knit-annotated-delta',
1625
1633
             'knit-ft', 'knit-delta', 'chunked', 'fulltext',
1626
1634
             'knit-annotated-ft-gz', 'knit-annotated-delta-gz', 'knit-ft-gz',
1627
 
             'knit-delta-gz'])
 
1635
             'knit-delta-gz',
 
1636
             'knit-delta-closure', 'knit-delta-closure-ref'])
1628
1637
 
1629
1638
    def capture_stream(self, f, entries, on_seen, parents):
1630
1639
        """Capture a stream for testing."""
1773
1782
        entries = files.get_record_stream(keys, 'topological', False)
1774
1783
        self.assertAbsentRecord(files, keys, parent_map, entries)
1775
1784
 
 
1785
    def assertRecordHasContent(self, record, bytes):
 
1786
        """Assert that record has the bytes bytes."""
 
1787
        self.assertEqual(bytes, record.get_bytes_as('fulltext'))
 
1788
        self.assertEqual(bytes, ''.join(record.get_bytes_as('chunked')))
 
1789
 
 
1790
    def test_get_record_stream_native_formats_are_wire_ready_one_ft(self):
 
1791
        files = self.get_versionedfiles()
 
1792
        key = self.get_simple_key('foo')
 
1793
        files.add_lines(key, (), ['my text\n', 'content'])
 
1794
        stream = files.get_record_stream([key], 'unordered', False)
 
1795
        record = stream.next()
 
1796
        if record.storage_kind in ('chunked', 'fulltext'):
 
1797
            # chunked and fulltext representations are for direct use not wire
 
1798
            # serialisation: check they are able to be used directly. To send
 
1799
            # such records over the wire translation will be needed.
 
1800
            self.assertRecordHasContent(record, "my text\ncontent")
 
1801
        else:
 
1802
            bytes = [record.get_bytes_as(record.storage_kind)]
 
1803
            network_stream = versionedfile.NetworkRecordStream(bytes).read()
 
1804
            source_record = record
 
1805
            records = []
 
1806
            for record in network_stream:
 
1807
                records.append(record)
 
1808
                self.assertEqual(source_record.storage_kind,
 
1809
                    record.storage_kind)
 
1810
                self.assertEqual(source_record.parents, record.parents)
 
1811
                self.assertEqual(
 
1812
                    source_record.get_bytes_as(source_record.storage_kind),
 
1813
                    record.get_bytes_as(record.storage_kind))
 
1814
            self.assertEqual(1, len(records))
 
1815
 
 
1816
    def assertStreamMetaEqual(self, records, expected, stream):
 
1817
        """Assert that streams expected and stream have the same records.
 
1818
        
 
1819
        :param records: A list to collect the seen records.
 
1820
        :return: A generator of the records in stream.
 
1821
        """
 
1822
        # We make assertions during copying to catch things early for
 
1823
        # easier debugging.
 
1824
        for record, ref_record in izip(stream, expected):
 
1825
            records.append(record)
 
1826
            self.assertEqual(ref_record.key, record.key)
 
1827
            self.assertEqual(ref_record.storage_kind, record.storage_kind)
 
1828
            self.assertEqual(ref_record.parents, record.parents)
 
1829
            yield record
 
1830
 
 
1831
    def stream_to_bytes_or_skip_counter(self, skipped_records, full_texts,
 
1832
        stream):
 
1833
        """Convert a stream to a bytes iterator.
 
1834
 
 
1835
        :param skipped_records: A list with one element to increment when a
 
1836
            record is skipped.
 
1837
        :param full_texts: A dict from key->fulltext representation, for 
 
1838
            checking chunked or fulltext stored records.
 
1839
        :param stream: A record_stream.
 
1840
        :return: An iterator over the bytes of each record.
 
1841
        """
 
1842
        for record in stream:
 
1843
            if record.storage_kind in ('chunked', 'fulltext'):
 
1844
                skipped_records[0] += 1
 
1845
                # check the content is correct for direct use.
 
1846
                self.assertRecordHasContent(record, full_texts[record.key])
 
1847
            else:
 
1848
                yield record.get_bytes_as(record.storage_kind)
 
1849
 
 
1850
    def test_get_record_stream_native_formats_are_wire_ready_ft_delta(self):
 
1851
        files = self.get_versionedfiles()
 
1852
        target_files = self.get_versionedfiles('target')
 
1853
        key = self.get_simple_key('ft')
 
1854
        key_delta = self.get_simple_key('delta')
 
1855
        files.add_lines(key, (), ['my text\n', 'content'])
 
1856
        if self.graph:
 
1857
            delta_parents = (key,)
 
1858
        else:
 
1859
            delta_parents = ()
 
1860
        files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
 
1861
        local = files.get_record_stream([key, key_delta], 'unordered', False)
 
1862
        ref = files.get_record_stream([key, key_delta], 'unordered', False)
 
1863
        skipped_records = [0]
 
1864
        full_texts = {
 
1865
            key: "my text\ncontent",
 
1866
            key_delta: "different\ncontent\n",
 
1867
            }
 
1868
        byte_stream = self.stream_to_bytes_or_skip_counter(
 
1869
            skipped_records, full_texts, local)
 
1870
        network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
 
1871
        records = []
 
1872
        # insert the stream from the network into a versioned files object so we can
 
1873
        # check the content was carried across correctly without doing delta
 
1874
        # inspection.
 
1875
        target_files.insert_record_stream(
 
1876
            self.assertStreamMetaEqual(records, ref, network_stream))
 
1877
        # No duplicates on the wire thank you!
 
1878
        self.assertEqual(2, len(records) + skipped_records[0])
 
1879
        if len(records):
 
1880
            # if any content was copied it all must have all been.
 
1881
            self.assertIdenticalVersionedFile(files, target_files)
 
1882
 
 
1883
    def test_get_record_stream_native_formats_are_wire_ready_delta(self):
 
1884
        # copy a delta over the wire
 
1885
        files = self.get_versionedfiles()
 
1886
        target_files = self.get_versionedfiles('target')
 
1887
        key = self.get_simple_key('ft')
 
1888
        key_delta = self.get_simple_key('delta')
 
1889
        files.add_lines(key, (), ['my text\n', 'content'])
 
1890
        if self.graph:
 
1891
            delta_parents = (key,)
 
1892
        else:
 
1893
            delta_parents = ()
 
1894
        files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
 
1895
        # Copy the basis text across so we can reconstruct the delta during
 
1896
        # insertion into target.
 
1897
        target_files.insert_record_stream(files.get_record_stream([key],
 
1898
            'unordered', False))
 
1899
        local = files.get_record_stream([key_delta], 'unordered', False)
 
1900
        ref = files.get_record_stream([key_delta], 'unordered', False)
 
1901
        skipped_records = [0]
 
1902
        full_texts = {
 
1903
            key_delta: "different\ncontent\n",
 
1904
            }
 
1905
        byte_stream = self.stream_to_bytes_or_skip_counter(
 
1906
            skipped_records, full_texts, local)
 
1907
        network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
 
1908
        records = []
 
1909
        # insert the stream from the network into a versioned files object so we can
 
1910
        # check the content was carried across correctly without doing delta
 
1911
        # inspection during check_stream.
 
1912
        target_files.insert_record_stream(
 
1913
            self.assertStreamMetaEqual(records, ref, network_stream))
 
1914
        # No duplicates on the wire thank you!
 
1915
        self.assertEqual(1, len(records) + skipped_records[0])
 
1916
        if len(records):
 
1917
            # if any content was copied it all must have all been
 
1918
            self.assertIdenticalVersionedFile(files, target_files)
 
1919
 
 
1920
    def test_get_record_stream_wire_ready_delta_closure_included(self):
 
1921
        # copy a delta over the wire with the ability to get its full text.
 
1922
        files = self.get_versionedfiles()
 
1923
        key = self.get_simple_key('ft')
 
1924
        key_delta = self.get_simple_key('delta')
 
1925
        files.add_lines(key, (), ['my text\n', 'content'])
 
1926
        if self.graph:
 
1927
            delta_parents = (key,)
 
1928
        else:
 
1929
            delta_parents = ()
 
1930
        files.add_lines(key_delta, delta_parents, ['different\n', 'content\n'])
 
1931
        local = files.get_record_stream([key_delta], 'unordered', True)
 
1932
        ref = files.get_record_stream([key_delta], 'unordered', True)
 
1933
        skipped_records = [0]
 
1934
        full_texts = {
 
1935
            key_delta: "different\ncontent\n",
 
1936
            }
 
1937
        byte_stream = self.stream_to_bytes_or_skip_counter(
 
1938
            skipped_records, full_texts, local)
 
1939
        network_stream = versionedfile.NetworkRecordStream(byte_stream).read()
 
1940
        records = []
 
1941
        # insert the stream from the network into a versioned files object so we can
 
1942
        # check the content was carried across correctly without doing delta
 
1943
        # inspection during check_stream.
 
1944
        for record in self.assertStreamMetaEqual(records, ref, network_stream):
 
1945
            # we have to be able to get the full text out:
 
1946
            self.assertRecordHasContent(record, full_texts[record.key])
 
1947
        # No duplicates on the wire thank you!
 
1948
        self.assertEqual(1, len(records) + skipped_records[0])
 
1949
 
1776
1950
    def assertAbsentRecord(self, files, keys, parents, entries):
1777
1951
        """Helper for test_get_record_stream_missing_records_are_absent."""
1778
1952
        seen = set()
2033
2207
        else:
2034
2208
            self.assertIdenticalVersionedFile(source, files)
2035
2209
 
 
2210
    def get_knit_delta_source(self):
 
2211
        """Get a source that can produce a stream with knit delta records,
 
2212
        regardless of this test's scenario.
 
2213
        """
 
2214
        mapper = self.get_mapper()
 
2215
        source_transport = self.get_transport('source')
 
2216
        source_transport.mkdir('.')
 
2217
        source = make_file_factory(False, mapper)(source_transport)
 
2218
        get_diamond_files(source, self.key_length, trailing_eol=True,
 
2219
            nograph=False, left_only=False)
 
2220
        return source
 
2221
 
2036
2222
    def test_insert_record_stream_delta_missing_basis_no_corruption(self):
2037
 
        """Insertion where a needed basis is not included aborts safely."""
2038
 
        # We use a knit always here to be sure we are getting a binary delta.
2039
 
        mapper = self.get_mapper()
2040
 
        source_transport = self.get_transport('source')
2041
 
        source_transport.mkdir('.')
2042
 
        source = make_file_factory(False, mapper)(source_transport)
2043
 
        self.get_diamond_files(source)
2044
 
        entries = source.get_record_stream(['origin', 'merged'], 'unordered', False)
2045
 
        files = self.get_versionedfiles()
2046
 
        self.assertRaises(RevisionNotPresent, files.insert_record_stream,
2047
 
            entries)
 
2223
        """Insertion where a needed basis is not included notifies the caller
 
2224
        of the missing basis.  In the meantime a record missing its basis is
 
2225
        not added.
 
2226
        """
 
2227
        source = self.get_knit_delta_source()
 
2228
        keys = [self.get_simple_key('origin'), self.get_simple_key('merged')]
 
2229
        entries = source.get_record_stream(keys, 'unordered', False)
 
2230
        files = self.get_versionedfiles()
 
2231
        if self.support_partial_insertion:
 
2232
            self.assertEqual([],
 
2233
                list(files.get_missing_compression_parent_keys()))
 
2234
            files.insert_record_stream(entries)
 
2235
            missing_bases = files.get_missing_compression_parent_keys()
 
2236
            self.assertEqual(set([self.get_simple_key('left')]),
 
2237
                set(missing_bases))
 
2238
            self.assertEqual(set(keys), set(files.get_parent_map(keys)))
 
2239
        else:
 
2240
            self.assertRaises(
 
2241
                errors.RevisionNotPresent, files.insert_record_stream, entries)
 
2242
            files.check()
 
2243
 
 
2244
    def test_insert_record_stream_delta_missing_basis_can_be_added_later(self):
 
2245
        """Insertion where a needed basis is not included notifies the caller
 
2246
        of the missing basis.  That basis can be added in a second
 
2247
        insert_record_stream call that does not need to repeat records present
 
2248
        in the previous stream.  The record(s) that required that basis are
 
2249
        fully inserted once their basis is no longer missing.
 
2250
        """
 
2251
        if not self.support_partial_insertion:
 
2252
            raise TestNotApplicable(
 
2253
                'versioned file scenario does not support partial insertion')
 
2254
        source = self.get_knit_delta_source()
 
2255
        entries = source.get_record_stream([self.get_simple_key('origin'),
 
2256
            self.get_simple_key('merged')], 'unordered', False)
 
2257
        files = self.get_versionedfiles()
 
2258
        files.insert_record_stream(entries)
 
2259
        missing_bases = files.get_missing_compression_parent_keys()
 
2260
        self.assertEqual(set([self.get_simple_key('left')]),
 
2261
            set(missing_bases))
 
2262
        # 'merged' is inserted (although a commit of a write group involving
 
2263
        # this versionedfiles would fail).
 
2264
        merged_key = self.get_simple_key('merged')
 
2265
        self.assertEqual(
 
2266
            [merged_key], files.get_parent_map([merged_key]).keys())
 
2267
        # Add the full delta closure of the missing records
 
2268
        missing_entries = source.get_record_stream(
 
2269
            missing_bases, 'unordered', True)
 
2270
        files.insert_record_stream(missing_entries)
 
2271
        # Now 'merged' is fully inserted (and a commit would succeed).
 
2272
        self.assertEqual([], list(files.get_missing_compression_parent_keys()))
 
2273
        self.assertEqual(
 
2274
            [merged_key], files.get_parent_map([merged_key]).keys())
2048
2275
        files.check()
2049
 
        self.assertEqual({}, files.get_parent_map([]))
2050
2276
 
2051
2277
    def test_iter_lines_added_or_present_in_keys(self):
2052
2278
        # test that we get at least an equalset of the lines added by