/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/selftest/teststore.py

[merge] robertc's integration, updated tests to check for retcode=3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Development Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
 
19
from cStringIO import StringIO
19
20
import os
20
21
import gzip
21
22
 
22
 
from ... import errors as errors
23
 
from ...errors import BzrError
24
 
from ...sixish import (
25
 
    BytesIO,
26
 
    )
27
 
from .store import TransportStore
28
 
from .store.text import TextStore
29
 
from .store.versioned import VersionedFileStore
30
 
from ...tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
31
 
from ... import transactions
32
 
from ... import transport
33
 
from ...transport.memory import MemoryTransport
34
 
from ...bzr.weave import WeaveFile
 
23
from bzrlib.errors import BzrError, UnlistableStore
 
24
from bzrlib.store import copy_all
 
25
from bzrlib.transport.local import LocalTransport
 
26
from bzrlib.transport import NoSuchFile
 
27
from bzrlib.store.text import TextStore
 
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
29
import bzrlib.store as store
 
30
import bzrlib.transport as transport
 
31
from bzrlib.transport.memory import MemoryTransport
35
32
 
36
33
 
37
34
class TestStores(object):
42
39
        self.assertEqual(f.read(), value)
43
40
 
44
41
    def fill_store(self, store):
45
 
        store.add(BytesIO(b'hello'), b'a')
46
 
        store.add(BytesIO(b'other'), b'b')
47
 
        store.add(BytesIO(b'something'), b'c')
48
 
        store.add(BytesIO(b'goodbye'), b'123123')
 
42
        store.add(StringIO('hello'), 'a')
 
43
        store.add(StringIO('other'), 'b')
 
44
        store.add(StringIO('something'), 'c')
 
45
        store.add(StringIO('goodbye'), '123123')
 
46
 
 
47
    def test_copy_all(self):
 
48
        """Test copying"""
 
49
        os.mkdir('a')
 
50
        store_a = self.get_store('a')
 
51
        store_a.add('foo', '1')
 
52
        os.mkdir('b')
 
53
        store_b = self.get_store('b')
 
54
        copy_all(store_a, store_b)
 
55
        self.assertEqual(store_a.get('1').read(), 'foo')
 
56
        self.assertEqual(store_b.get('1').read(), 'foo')
 
57
        # TODO: Switch the exception form UnlistableStore to
 
58
        #       or make Stores throw UnlistableStore if their
 
59
        #       Transport doesn't support listing
 
60
        # store_c = RemoteStore('http://example.com/')
 
61
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
49
62
 
50
63
    def test_get(self):
51
64
        store = self.get_store()
52
65
        self.fill_store(store)
53
 
 
54
 
        self.check_content(store, b'a', b'hello')
55
 
        self.check_content(store, b'b', b'other')
56
 
        self.check_content(store, b'c', b'something')
57
 
 
 
66
    
 
67
        self.check_content(store, 'a', 'hello')
 
68
        self.check_content(store, 'b', 'other')
 
69
        self.check_content(store, 'c', 'something')
 
70
    
58
71
        # Make sure that requesting a non-existing file fails
59
 
        self.assertRaises(KeyError, self.check_content, store, b'd', None)
 
72
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
60
73
 
61
74
    def test_multiple_add(self):
62
75
        """Multiple add with same ID should raise a BzrError"""
63
76
        store = self.get_store()
64
77
        self.fill_store(store)
65
 
        self.assertRaises(BzrError, store.add, BytesIO(b'goodbye'), b'123123')
 
78
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
66
79
 
67
80
 
68
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
69
82
 
70
 
    def get_store(self, path=u'.'):
71
 
        t = transport.get_transport_from_path(path)
 
83
    def get_store(self, path='.'):
 
84
        t = LocalTransport(path)
72
85
        return TextStore(t, compressed=True)
73
86
 
74
87
    def test_total_size(self):
75
 
        store = self.get_store(u'.')
 
88
        store = self.get_store('.')
76
89
        store.register_suffix('dsc')
77
 
        store.add(BytesIO(b'goodbye'), b'123123')
78
 
        store.add(BytesIO(b'goodbye2'), b'123123', 'dsc')
 
90
        store.add(StringIO('goodbye'), '123123')
 
91
        store.add(StringIO('goodbye2'), '123123', 'dsc')
79
92
        # these get gzipped - content should be stable
80
93
        self.assertEqual(store.total_size(), (2, 55))
81
 
 
 
94
        
82
95
    def test__relpath_suffixed(self):
83
 
        my_store = TextStore(MockTransport(),
84
 
                             prefixed=True, compressed=True)
 
96
        my_store = TextStore(MockTransport(), True, compressed=True)
85
97
        my_store.register_suffix('dsc')
86
 
        self.assertEqual('45/foo.dsc', my_store._relpath(b'foo', ['dsc']))
 
98
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
87
99
 
88
100
 
89
101
class TestMemoryStore(TestCase):
90
 
 
 
102
    
91
103
    def get_store(self):
92
 
        return TextStore(MemoryTransport())
 
104
        return store.ImmutableMemoryStore()
 
105
    
 
106
    def test_imports(self):
 
107
        from bzrlib.store import ImmutableMemoryStore
93
108
 
94
109
    def test_add_and_retrieve(self):
95
110
        store = self.get_store()
96
 
        store.add(BytesIO(b'hello'), b'aa')
97
 
        self.assertNotEqual(store.get(b'aa'), None)
98
 
        self.assertEqual(store.get(b'aa').read(), b'hello')
99
 
        store.add(BytesIO(b'hello world'), b'bb')
100
 
        self.assertNotEqual(store.get(b'bb'), None)
101
 
        self.assertEqual(store.get(b'bb').read(), b'hello world')
 
111
        store.add(StringIO('hello'), 'aa')
 
112
        self.assertNotEqual(store.get('aa'), None)
 
113
        self.assertEqual(store.get('aa').read(), 'hello')
 
114
        store.add(StringIO('hello world'), 'bb')
 
115
        self.assertNotEqual(store.get('bb'), None)
 
116
        self.assertEqual(store.get('bb').read(), 'hello world')
102
117
 
103
118
    def test_missing_is_absent(self):
104
119
        store = self.get_store()
105
 
        self.assertNotIn(b'aa', store)
 
120
        self.failIf('aa' in store)
106
121
 
107
122
    def test_adding_fails_when_present(self):
108
123
        my_store = self.get_store()
109
 
        my_store.add(BytesIO(b'hello'), b'aa')
 
124
        my_store.add(StringIO('hello'), 'aa')
110
125
        self.assertRaises(BzrError,
111
 
                          my_store.add, BytesIO(b'hello'), b'aa')
 
126
                          my_store.add, StringIO('hello'), 'aa')
112
127
 
113
128
    def test_total_size(self):
114
129
        store = self.get_store()
115
 
        store.add(BytesIO(b'goodbye'), b'123123')
116
 
        store.add(BytesIO(b'goodbye2'), b'123123.dsc')
 
130
        store.add(StringIO('goodbye'), '123123')
 
131
        store.add(StringIO('goodbye2'), '123123.dsc')
117
132
        self.assertEqual(store.total_size(), (2, 15))
118
133
        # TODO: Switch the exception form UnlistableStore to
119
134
        #       or make Stores throw UnlistableStore if their
124
139
 
125
140
class TestTextStore(TestCaseInTempDir, TestStores):
126
141
 
127
 
    def get_store(self, path=u'.'):
128
 
        t = transport.get_transport_from_path(path)
 
142
    def get_store(self, path='.'):
 
143
        t = LocalTransport(path)
129
144
        return TextStore(t, compressed=False)
130
145
 
131
146
    def test_total_size(self):
132
147
        store = self.get_store()
133
 
        store.add(BytesIO(b'goodbye'), b'123123')
134
 
        store.add(BytesIO(b'goodbye2'), b'123123.dsc')
 
148
        store.add(StringIO('goodbye'), '123123')
 
149
        store.add(StringIO('goodbye2'), '123123.dsc')
135
150
        self.assertEqual(store.total_size(), (2, 15))
136
151
        # TODO: Switch the exception form UnlistableStore to
137
152
        #       or make Stores throw UnlistableStore if their
142
157
 
143
158
class TestMixedTextStore(TestCaseInTempDir, TestStores):
144
159
 
145
 
    def get_store(self, path=u'.', compressed=True):
146
 
        t = transport.get_transport_from_path(path)
 
160
    def get_store(self, path='.', compressed=True):
 
161
        t = LocalTransport(path)
147
162
        return TextStore(t, compressed=compressed)
148
163
 
149
164
    def test_get_mixed(self):
150
 
        cs = self.get_store(u'.', compressed=True)
151
 
        s = self.get_store(u'.', compressed=False)
152
 
        cs.add(BytesIO(b'hello there'), b'a')
153
 
 
154
 
        self.assertPathExists('a.gz')
155
 
        self.assertFalse(os.path.lexists('a'))
156
 
 
157
 
        with gzip.GzipFile('a.gz') as f:
158
 
            self.assertEqual(f.read(), b'hello there')
159
 
 
160
 
        self.assertEqual(cs.has_id(b'a'), True)
161
 
        self.assertEqual(s.has_id(b'a'), True)
162
 
        self.assertEqual(cs.get(b'a').read(), b'hello there')
163
 
        self.assertEqual(s.get(b'a').read(), b'hello there')
164
 
 
165
 
        self.assertRaises(BzrError, s.add, BytesIO(b'goodbye'), b'a')
166
 
 
167
 
        s.add(BytesIO(b'goodbye'), b'b')
168
 
        self.assertPathExists('b')
169
 
        self.assertFalse(os.path.lexists('b.gz'))
170
 
        with open('b', 'rb') as f:
171
 
            self.assertEqual(f.read(), b'goodbye')
172
 
 
173
 
        self.assertEqual(cs.has_id(b'b'), True)
174
 
        self.assertEqual(s.has_id(b'b'), True)
175
 
        self.assertEqual(cs.get(b'b').read(), b'goodbye')
176
 
        self.assertEqual(s.get(b'b').read(), b'goodbye')
177
 
 
178
 
        self.assertRaises(BzrError, cs.add, BytesIO(b'again'), b'b')
179
 
 
 
165
        cs = self.get_store('.', compressed=True)
 
166
        s = self.get_store('.', compressed=False)
 
167
        cs.add(StringIO('hello there'), 'a')
 
168
 
 
169
        self.failUnlessExists('a.gz')
 
170
        self.failIf(os.path.lexists('a'))
 
171
 
 
172
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
 
173
 
 
174
        self.assertEquals(cs.has_id('a'), True)
 
175
        self.assertEquals(s.has_id('a'), True)
 
176
        self.assertEquals(cs.get('a').read(), 'hello there')
 
177
        self.assertEquals(s.get('a').read(), 'hello there')
 
178
        
 
179
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
 
180
 
 
181
        s.add(StringIO('goodbye'), 'b')
 
182
        self.failUnlessExists('b')
 
183
        self.failIf(os.path.lexists('b.gz'))
 
184
        self.assertEquals(open('b').read(), 'goodbye')
 
185
 
 
186
        self.assertEquals(cs.has_id('b'), True)
 
187
        self.assertEquals(s.has_id('b'), True)
 
188
        self.assertEquals(cs.get('b').read(), 'goodbye')
 
189
        self.assertEquals(s.get('b').read(), 'goodbye')
 
190
        
 
191
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
180
192
 
181
193
class MockTransport(transport.Transport):
182
194
    """A fake transport for testing with."""
193
205
        return
194
206
 
195
207
 
196
 
class InstrumentedTransportStore(TransportStore):
 
208
class InstrumentedTransportStore(store.TransportStore):
197
209
    """An instrumented TransportStore.
198
210
 
199
211
    Here we replace template method worker methods with calls that record the
219
231
class TestMockTransport(TestCase):
220
232
 
221
233
    def test_isinstance(self):
222
 
        self.assertIsInstance(MockTransport(), transport.Transport)
 
234
        self.failUnless(isinstance(MockTransport(), transport.Transport))
223
235
 
224
236
    def test_has(self):
225
237
        self.assertEqual(False, MockTransport().has('foo'))
229
241
 
230
242
 
231
243
class TestTransportStore(TestCase):
232
 
 
 
244
    
233
245
    def test__relpath_invalid(self):
234
 
        my_store = TransportStore(MockTransport())
235
 
        self.assertRaises(ValueError, my_store._relpath, b'/foo')
236
 
        self.assertRaises(ValueError, my_store._relpath, b'foo/')
 
246
        my_store = store.TransportStore(MockTransport())
 
247
        self.assertRaises(ValueError, my_store._relpath, '/foo')
 
248
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
237
249
 
238
250
    def test_register_invalid_suffixes(self):
239
 
        my_store = TransportStore(MockTransport())
 
251
        my_store = store.TransportStore(MockTransport())
240
252
        self.assertRaises(ValueError, my_store.register_suffix, '/')
241
253
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
242
254
 
243
255
    def test__relpath_unregister_suffixes(self):
244
 
        my_store = TransportStore(MockTransport())
245
 
        self.assertRaises(ValueError, my_store._relpath, b'foo', [b'gz'])
246
 
        self.assertRaises(ValueError, my_store._relpath,
247
 
                          b'foo', [b'dsc', b'gz'])
 
256
        my_store = store.TransportStore(MockTransport())
 
257
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
 
258
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
248
259
 
249
260
    def test__relpath_simple(self):
250
 
        my_store = TransportStore(MockTransport())
251
 
        self.assertEqual("foo", my_store._relpath(b'foo'))
 
261
        my_store = store.TransportStore(MockTransport())
 
262
        self.assertEqual("foo", my_store._relpath('foo'))
252
263
 
253
264
    def test__relpath_prefixed(self):
254
 
        my_store = TransportStore(MockTransport(), True)
255
 
        self.assertEqual('45/foo', my_store._relpath(b'foo'))
 
265
        my_store = store.TransportStore(MockTransport(), True)
 
266
        self.assertEqual('45/foo', my_store._relpath('foo'))
256
267
 
257
268
    def test__relpath_simple_suffixed(self):
258
 
        my_store = TransportStore(MockTransport())
 
269
        my_store = store.TransportStore(MockTransport())
259
270
        my_store.register_suffix('bar')
260
271
        my_store.register_suffix('baz')
261
 
        self.assertEqual('foo.baz', my_store._relpath(b'foo', ['baz']))
262
 
        self.assertEqual('foo.bar.baz', my_store._relpath(
263
 
            b'foo', ['bar', 'baz']))
 
272
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
 
273
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
264
274
 
265
275
    def test__relpath_prefixed_suffixed(self):
266
 
        my_store = TransportStore(MockTransport(), True)
 
276
        my_store = store.TransportStore(MockTransport(), True)
267
277
        my_store.register_suffix('bar')
268
278
        my_store.register_suffix('baz')
269
 
        self.assertEqual('45/foo.baz', my_store._relpath(b'foo', ['baz']))
 
279
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
270
280
        self.assertEqual('45/foo.bar.baz',
271
 
                         my_store._relpath(b'foo', ['bar', 'baz']))
 
281
                         my_store._relpath('foo', ['bar', 'baz']))
272
282
 
273
283
    def test_add_simple(self):
274
 
        stream = BytesIO(b"content")
 
284
        stream = StringIO("content")
275
285
        my_store = InstrumentedTransportStore(MockTransport())
276
 
        my_store.add(stream, b"foo")
 
286
        my_store.add(stream, "foo")
277
287
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
278
288
 
279
289
    def test_add_prefixed(self):
280
 
        stream = BytesIO(b"content")
 
290
        stream = StringIO("content")
281
291
        my_store = InstrumentedTransportStore(MockTransport(), True)
282
 
        my_store.add(stream, b"foo")
 
292
        my_store.add(stream, "foo")
283
293
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
284
294
 
285
295
    def test_add_simple_suffixed(self):
286
 
        stream = BytesIO(b"content")
 
296
        stream = StringIO("content")
287
297
        my_store = InstrumentedTransportStore(MockTransport())
288
298
        my_store.register_suffix('dsc')
289
 
        my_store.add(stream, b"foo", b'dsc')
 
299
        my_store.add(stream, "foo", 'dsc')
290
300
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
291
 
 
 
301
        
292
302
    def test_add_simple_suffixed(self):
293
 
        stream = BytesIO(b"content")
 
303
        stream = StringIO("content")
294
304
        my_store = InstrumentedTransportStore(MockTransport(), True)
295
305
        my_store.register_suffix('dsc')
296
 
        my_store.add(stream, b"foo", 'dsc')
 
306
        my_store.add(stream, "foo", 'dsc')
297
307
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
298
308
 
299
309
    def get_populated_store(self, prefixed=False,
300
 
                            store_class=TextStore, compressed=False):
 
310
            store_class=TextStore, compressed=False):
301
311
        my_store = store_class(MemoryTransport(), prefixed,
302
312
                               compressed=compressed)
303
313
        my_store.register_suffix('sig')
304
 
        stream = BytesIO(b"signature")
305
 
        my_store.add(stream, b"foo", 'sig')
306
 
        stream = BytesIO(b"content")
307
 
        my_store.add(stream, b"foo")
308
 
        stream = BytesIO(b"signature for missing base")
309
 
        my_store.add(stream, b"missing", 'sig')
 
314
        stream = StringIO("signature")
 
315
        my_store.add(stream, "foo", 'sig')
 
316
        stream = StringIO("content")
 
317
        my_store.add(stream, "foo")
 
318
        stream = StringIO("signature for missing base")
 
319
        my_store.add(stream, "missing", 'sig')
310
320
        return my_store
311
 
 
 
321
        
312
322
    def test_has_simple(self):
313
323
        my_store = self.get_populated_store()
314
 
        self.assertEqual(True, my_store.has_id(b'foo'))
 
324
        self.assertEqual(True, my_store.has_id('foo'))
315
325
        my_store = self.get_populated_store(True)
316
 
        self.assertEqual(True, my_store.has_id(b'foo'))
 
326
        self.assertEqual(True, my_store.has_id('foo'))
317
327
 
318
328
    def test_has_suffixed(self):
319
329
        my_store = self.get_populated_store()
320
 
        self.assertEqual(True, my_store.has_id(b'foo', 'sig'))
 
330
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
321
331
        my_store = self.get_populated_store(True)
322
 
        self.assertEqual(True, my_store.has_id(b'foo', 'sig'))
 
332
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
323
333
 
324
334
    def test_has_suffixed_no_base(self):
325
335
        my_store = self.get_populated_store()
326
 
        self.assertEqual(False, my_store.has_id(b'missing'))
 
336
        self.assertEqual(False, my_store.has_id('missing'))
327
337
        my_store = self.get_populated_store(True)
328
 
        self.assertEqual(False, my_store.has_id(b'missing'))
 
338
        self.assertEqual(False, my_store.has_id('missing'))
329
339
 
330
340
    def test_get_simple(self):
331
341
        my_store = self.get_populated_store()
332
 
        self.assertEqual(b'content', my_store.get(b'foo').read())
 
342
        self.assertEqual('content', my_store.get('foo').read())
333
343
        my_store = self.get_populated_store(True)
334
 
        self.assertEqual(b'content', my_store.get(b'foo').read())
 
344
        self.assertEqual('content', my_store.get('foo').read())
335
345
 
336
346
    def test_get_suffixed(self):
337
347
        my_store = self.get_populated_store()
338
 
        self.assertEqual(b'signature', my_store.get(b'foo', 'sig').read())
 
348
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
339
349
        my_store = self.get_populated_store(True)
340
 
        self.assertEqual(b'signature', my_store.get(b'foo', 'sig').read())
 
350
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
341
351
 
342
352
    def test_get_suffixed_no_base(self):
343
353
        my_store = self.get_populated_store()
344
 
        self.assertEqual(b'signature for missing base',
345
 
                         my_store.get(b'missing', 'sig').read())
 
354
        self.assertEqual('signature for missing base',
 
355
                         my_store.get('missing', 'sig').read())
346
356
        my_store = self.get_populated_store(True)
347
 
        self.assertEqual(b'signature for missing base',
348
 
                         my_store.get(b'missing', 'sig').read())
 
357
        self.assertEqual('signature for missing base',
 
358
                         my_store.get('missing', 'sig').read())
349
359
 
350
360
    def test___iter__no_suffix(self):
351
 
        my_store = TextStore(MemoryTransport(),
352
 
                             prefixed=False, compressed=False)
353
 
        stream = BytesIO(b"content")
354
 
        my_store.add(stream, b"foo")
355
 
        self.assertEqual({b'foo'},
 
361
        my_store = TextStore(MemoryTransport(), False, compressed=False)
 
362
        stream = StringIO("content")
 
363
        my_store.add(stream, "foo")
 
364
        self.assertEqual(set(['foo']),
356
365
                         set(my_store.__iter__()))
357
366
 
358
367
    def test___iter__(self):
359
 
        self.assertEqual({b'foo'},
 
368
        self.assertEqual(set(['foo']),
360
369
                         set(self.get_populated_store().__iter__()))
361
 
        self.assertEqual({b'foo'},
 
370
        self.assertEqual(set(['foo']),
362
371
                         set(self.get_populated_store(True).__iter__()))
363
372
 
364
373
    def test___iter__compressed(self):
365
 
        self.assertEqual({b'foo'},
 
374
        self.assertEqual(set(['foo']),
366
375
                         set(self.get_populated_store(
367
376
                             compressed=True).__iter__()))
368
 
        self.assertEqual({b'foo'},
 
377
        self.assertEqual(set(['foo']),
369
378
                         set(self.get_populated_store(
370
379
                             True, compressed=True).__iter__()))
371
380
 
372
381
    def test___len__(self):
373
382
        self.assertEqual(1, len(self.get_populated_store()))
374
383
 
 
384
    def test_copy_suffixes(self):
 
385
        from_store = self.get_populated_store()
 
386
        to_store = TextStore(MemoryTransport(), True, compressed=True)
 
387
        to_store.register_suffix('sig')
 
388
        copy_all(from_store, to_store)
 
389
        self.assertEqual(1, len(to_store))
 
390
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
 
391
        self.assertEqual('content', to_store.get('foo').read())
 
392
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
 
393
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
 
394
 
375
395
    def test_relpath_escaped(self):
376
 
        my_store = TransportStore(MemoryTransport())
377
 
        self.assertEqual('%25', my_store._relpath(b'%'))
378
 
 
379
 
    def test_escaped_uppercase(self):
380
 
        """Uppercase letters are escaped for safety on Windows"""
381
 
        my_store = TransportStore(MemoryTransport(), prefixed=True,
382
 
                                  escaped=True)
383
 
        # a particularly perverse file-id! :-)
384
 
        self.assertEqual(my_store._relpath(b'C:<>'), 'be/%2543%253a%253c%253e')
385
 
 
386
 
 
387
 
class TestVersionFileStore(TestCaseWithTransport):
388
 
 
389
 
    def get_scope(self):
390
 
        return self._transaction
391
 
 
392
 
    def setUp(self):
393
 
        super(TestVersionFileStore, self).setUp()
394
 
        self.vfstore = VersionedFileStore(MemoryTransport(),
395
 
                                          versionedfile_class=WeaveFile)
396
 
        self.vfstore.get_scope = self.get_scope
397
 
        self._transaction = None
398
 
 
399
 
    def test_get_weave_registers_dirty_in_write(self):
400
 
        self._transaction = transactions.WriteTransaction()
401
 
        vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
402
 
        self._transaction.finish()
403
 
        self._transaction = None
404
 
        self.assertRaises(errors.OutSideTransaction,
405
 
                          vf.add_lines, b'b', [], [])
406
 
        self._transaction = transactions.WriteTransaction()
407
 
        vf = self.vfstore.get_weave(b'id', self._transaction)
408
 
        self._transaction.finish()
409
 
        self._transaction = None
410
 
        self.assertRaises(errors.OutSideTransaction,
411
 
                          vf.add_lines, b'b', [], [])
412
 
 
413
 
    def test_get_weave_readonly_cant_write(self):
414
 
        self._transaction = transactions.WriteTransaction()
415
 
        vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
416
 
        self._transaction.finish()
417
 
        self._transaction = transactions.ReadOnlyTransaction()
418
 
        vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
419
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, b'b', [], [])
420
 
 
421
 
    def test___iter__escaped(self):
422
 
        self.vfstore = VersionedFileStore(MemoryTransport(),
423
 
                                          prefixed=True, escaped=True, versionedfile_class=WeaveFile)
424
 
        self.vfstore.get_scope = self.get_scope
425
 
        self._transaction = transactions.WriteTransaction()
426
 
        vf = self.vfstore.get_weave_or_empty(b' ', self._transaction)
427
 
        vf.add_lines(b'a', [], [])
428
 
        del vf
429
 
        self._transaction.finish()
430
 
        self.assertEqual([b' '], list(self.vfstore))
 
396
        my_store = store.TransportStore(MemoryTransport())
 
397
        self.assertEqual('%25', my_store._relpath('%'))