/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/selftest/teststore.py

  • Committer: Robert Collins
  • Date: 2005-10-19 10:11:57 UTC
  • mfrom: (1185.16.78)
  • mto: This revision was merged to the branch mainline in revision 1470.
  • Revision ID: robertc@robertcollins.net-20051019101157-17438d311e746b4f
mergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Development Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
 
19
from cStringIO import StringIO
19
20
import os
20
 
import gzip
21
21
 
22
 
from ... import errors as errors
23
 
from ...errors import BzrError
24
 
from ...sixish import (
25
 
    BytesIO,
26
 
    )
27
 
from .store import TransportStore
28
 
from .store.text import TextStore
29
 
from .store.versioned import VersionedFileStore
30
 
from ...tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
31
 
from ... import transactions
32
 
from ... import transport
33
 
from ...transport.memory import MemoryTransport
34
 
from ...bzr.weave import WeaveFile
 
22
from bzrlib.errors import BzrError, UnlistableStore
 
23
from bzrlib.store import copy_all
 
24
from bzrlib.transport.local import LocalTransport
 
25
from bzrlib.transport import NoSuchFile
 
26
from bzrlib.store.compressed_text import CompressedTextStore
 
27
from bzrlib.store.text import TextStore
 
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
29
import bzrlib.store as store
 
30
import bzrlib.transport as transport
 
31
from bzrlib.transport.memory import MemoryTransport
35
32
 
36
33
 
37
34
class TestStores(object):
38
 
    """Mixin template class that provides some common tests for stores"""
39
35
 
40
36
    def check_content(self, store, fileid, value):
41
37
        f = store.get(fileid)
42
38
        self.assertEqual(f.read(), value)
43
39
 
44
40
    def fill_store(self, store):
45
 
        store.add(BytesIO(b'hello'), b'a')
46
 
        store.add(BytesIO(b'other'), b'b')
47
 
        store.add(BytesIO(b'something'), b'c')
48
 
        store.add(BytesIO(b'goodbye'), b'123123')
 
41
        store.add(StringIO('hello'), 'a')
 
42
        store.add(StringIO('other'), 'b')
 
43
        store.add(StringIO('something'), 'c')
 
44
        store.add(StringIO('goodbye'), '123123')
 
45
 
 
46
    def test_copy_all(self):
 
47
        """Test copying"""
 
48
        os.mkdir('a')
 
49
        store_a = self.get_store('a')
 
50
        store_a.add('foo', '1')
 
51
        os.mkdir('b')
 
52
        store_b = self.get_store('b')
 
53
        copy_all(store_a, store_b)
 
54
        self.assertEqual(store_a.get('1').read(), 'foo')
 
55
        self.assertEqual(store_b.get('1').read(), 'foo')
 
56
        # TODO: Switch the exception form UnlistableStore to
 
57
        #       or make Stores throw UnlistableStore if their
 
58
        #       Transport doesn't support listing
 
59
        # store_c = RemoteStore('http://example.com/')
 
60
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
49
61
 
50
62
    def test_get(self):
51
63
        store = self.get_store()
52
64
        self.fill_store(store)
53
 
 
54
 
        self.check_content(store, b'a', b'hello')
55
 
        self.check_content(store, b'b', b'other')
56
 
        self.check_content(store, b'c', b'something')
57
 
 
 
65
    
 
66
        self.check_content(store, 'a', 'hello')
 
67
        self.check_content(store, 'b', 'other')
 
68
        self.check_content(store, 'c', 'something')
 
69
    
58
70
        # Make sure that requesting a non-existing file fails
59
 
        self.assertRaises(KeyError, self.check_content, store, b'd', None)
 
71
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
60
72
 
61
73
    def test_multiple_add(self):
62
74
        """Multiple add with same ID should raise a BzrError"""
63
75
        store = self.get_store()
64
76
        self.fill_store(store)
65
 
        self.assertRaises(BzrError, store.add, BytesIO(b'goodbye'), b'123123')
 
77
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
66
78
 
67
79
 
68
80
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
69
81
 
70
 
    def get_store(self, path=u'.'):
71
 
        t = transport.get_transport_from_path(path)
72
 
        return TextStore(t, compressed=True)
 
82
    def get_store(self, path='.'):
 
83
        t = LocalTransport(path)
 
84
        return CompressedTextStore(t)
73
85
 
74
86
    def test_total_size(self):
75
 
        store = self.get_store(u'.')
 
87
        store = self.get_store('.')
76
88
        store.register_suffix('dsc')
77
 
        store.add(BytesIO(b'goodbye'), b'123123')
78
 
        store.add(BytesIO(b'goodbye2'), b'123123', 'dsc')
 
89
        store.add(StringIO('goodbye'), '123123')
 
90
        store.add(StringIO('goodbye2'), '123123', 'dsc')
79
91
        # these get gzipped - content should be stable
80
92
        self.assertEqual(store.total_size(), (2, 55))
81
 
 
 
93
        
82
94
    def test__relpath_suffixed(self):
83
 
        my_store = TextStore(MockTransport(),
84
 
                             prefixed=True, compressed=True)
 
95
        my_store = CompressedTextStore(MockTransport(), True)
85
96
        my_store.register_suffix('dsc')
86
 
        self.assertEqual('45/foo.dsc', my_store._relpath(b'foo', ['dsc']))
 
97
        self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
87
98
 
88
99
 
89
100
class TestMemoryStore(TestCase):
90
 
 
 
101
    
91
102
    def get_store(self):
92
 
        return TextStore(MemoryTransport())
 
103
        return store.ImmutableMemoryStore()
 
104
    
 
105
    def test_imports(self):
 
106
        from bzrlib.store import ImmutableMemoryStore
93
107
 
94
108
    def test_add_and_retrieve(self):
95
109
        store = self.get_store()
96
 
        store.add(BytesIO(b'hello'), b'aa')
97
 
        self.assertNotEqual(store.get(b'aa'), None)
98
 
        self.assertEqual(store.get(b'aa').read(), b'hello')
99
 
        store.add(BytesIO(b'hello world'), b'bb')
100
 
        self.assertNotEqual(store.get(b'bb'), None)
101
 
        self.assertEqual(store.get(b'bb').read(), b'hello world')
 
110
        store.add(StringIO('hello'), 'aa')
 
111
        self.assertNotEqual(store.get('aa'), None)
 
112
        self.assertEqual(store.get('aa').read(), 'hello')
 
113
        store.add(StringIO('hello world'), 'bb')
 
114
        self.assertNotEqual(store.get('bb'), None)
 
115
        self.assertEqual(store.get('bb').read(), 'hello world')
102
116
 
103
117
    def test_missing_is_absent(self):
104
118
        store = self.get_store()
105
 
        self.assertNotIn(b'aa', store)
 
119
        self.failIf('aa' in store)
106
120
 
107
121
    def test_adding_fails_when_present(self):
108
122
        my_store = self.get_store()
109
 
        my_store.add(BytesIO(b'hello'), b'aa')
 
123
        my_store.add(StringIO('hello'), 'aa')
110
124
        self.assertRaises(BzrError,
111
 
                          my_store.add, BytesIO(b'hello'), b'aa')
 
125
                          my_store.add, StringIO('hello'), 'aa')
112
126
 
113
127
    def test_total_size(self):
114
128
        store = self.get_store()
115
 
        store.add(BytesIO(b'goodbye'), b'123123')
116
 
        store.add(BytesIO(b'goodbye2'), b'123123.dsc')
 
129
        store.add(StringIO('goodbye'), '123123')
 
130
        store.add(StringIO('goodbye2'), '123123.dsc')
117
131
        self.assertEqual(store.total_size(), (2, 15))
118
132
        # TODO: Switch the exception form UnlistableStore to
119
133
        #       or make Stores throw UnlistableStore if their
124
138
 
125
139
class TestTextStore(TestCaseInTempDir, TestStores):
126
140
 
127
 
    def get_store(self, path=u'.'):
128
 
        t = transport.get_transport_from_path(path)
129
 
        return TextStore(t, compressed=False)
 
141
    def get_store(self, path='.'):
 
142
        t = LocalTransport(path)
 
143
        return TextStore(t)
130
144
 
131
145
    def test_total_size(self):
132
146
        store = self.get_store()
133
 
        store.add(BytesIO(b'goodbye'), b'123123')
134
 
        store.add(BytesIO(b'goodbye2'), b'123123.dsc')
 
147
        store.add(StringIO('goodbye'), '123123')
 
148
        store.add(StringIO('goodbye2'), '123123.dsc')
135
149
        self.assertEqual(store.total_size(), (2, 15))
136
150
        # TODO: Switch the exception form UnlistableStore to
137
151
        #       or make Stores throw UnlistableStore if their
140
154
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
141
155
 
142
156
 
143
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
144
 
 
145
 
    def get_store(self, path=u'.', compressed=True):
146
 
        t = transport.get_transport_from_path(path)
147
 
        return TextStore(t, compressed=compressed)
148
 
 
149
 
    def test_get_mixed(self):
150
 
        cs = self.get_store(u'.', compressed=True)
151
 
        s = self.get_store(u'.', compressed=False)
152
 
        cs.add(BytesIO(b'hello there'), b'a')
153
 
 
154
 
        self.assertPathExists('a.gz')
155
 
        self.assertFalse(os.path.lexists('a'))
156
 
 
157
 
        with gzip.GzipFile('a.gz') as f:
158
 
            self.assertEqual(f.read(), b'hello there')
159
 
 
160
 
        self.assertEqual(cs.has_id(b'a'), True)
161
 
        self.assertEqual(s.has_id(b'a'), True)
162
 
        self.assertEqual(cs.get(b'a').read(), b'hello there')
163
 
        self.assertEqual(s.get(b'a').read(), b'hello there')
164
 
 
165
 
        self.assertRaises(BzrError, s.add, BytesIO(b'goodbye'), b'a')
166
 
 
167
 
        s.add(BytesIO(b'goodbye'), b'b')
168
 
        self.assertPathExists('b')
169
 
        self.assertFalse(os.path.lexists('b.gz'))
170
 
        with open('b', 'rb') as f:
171
 
            self.assertEqual(f.read(), b'goodbye')
172
 
 
173
 
        self.assertEqual(cs.has_id(b'b'), True)
174
 
        self.assertEqual(s.has_id(b'b'), True)
175
 
        self.assertEqual(cs.get(b'b').read(), b'goodbye')
176
 
        self.assertEqual(s.get(b'b').read(), b'goodbye')
177
 
 
178
 
        self.assertRaises(BzrError, cs.add, BytesIO(b'again'), b'b')
179
 
 
180
 
 
181
157
class MockTransport(transport.Transport):
182
158
    """A fake transport for testing with."""
183
159
 
193
169
        return
194
170
 
195
171
 
196
 
class InstrumentedTransportStore(TransportStore):
 
172
class InstrumentedTransportStore(store.TransportStore):
197
173
    """An instrumented TransportStore.
198
174
 
199
175
    Here we replace template method worker methods with calls that record the
219
195
class TestMockTransport(TestCase):
220
196
 
221
197
    def test_isinstance(self):
222
 
        self.assertIsInstance(MockTransport(), transport.Transport)
 
198
        self.failUnless(isinstance(MockTransport(), transport.Transport))
223
199
 
224
200
    def test_has(self):
225
201
        self.assertEqual(False, MockTransport().has('foo'))
229
205
 
230
206
 
231
207
class TestTransportStore(TestCase):
232
 
 
 
208
    
233
209
    def test__relpath_invalid(self):
234
 
        my_store = TransportStore(MockTransport())
235
 
        self.assertRaises(ValueError, my_store._relpath, b'/foo')
236
 
        self.assertRaises(ValueError, my_store._relpath, b'foo/')
 
210
        my_store = store.TransportStore(MockTransport())
 
211
        self.assertRaises(ValueError, my_store._relpath, '/foo')
 
212
        self.assertRaises(ValueError, my_store._relpath, 'foo/')
237
213
 
238
214
    def test_register_invalid_suffixes(self):
239
 
        my_store = TransportStore(MockTransport())
 
215
        my_store = store.TransportStore(MockTransport())
240
216
        self.assertRaises(ValueError, my_store.register_suffix, '/')
241
217
        self.assertRaises(ValueError, my_store.register_suffix, '.gz/bar')
242
218
 
243
219
    def test__relpath_unregister_suffixes(self):
244
 
        my_store = TransportStore(MockTransport())
245
 
        self.assertRaises(ValueError, my_store._relpath, b'foo', [b'gz'])
246
 
        self.assertRaises(ValueError, my_store._relpath,
247
 
                          b'foo', [b'dsc', b'gz'])
 
220
        my_store = store.TransportStore(MockTransport())
 
221
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
 
222
        self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
248
223
 
249
224
    def test__relpath_simple(self):
250
 
        my_store = TransportStore(MockTransport())
251
 
        self.assertEqual("foo", my_store._relpath(b'foo'))
 
225
        my_store = store.TransportStore(MockTransport())
 
226
        self.assertEqual("foo", my_store._relpath('foo'))
252
227
 
253
228
    def test__relpath_prefixed(self):
254
 
        my_store = TransportStore(MockTransport(), True)
255
 
        self.assertEqual('45/foo', my_store._relpath(b'foo'))
 
229
        my_store = store.TransportStore(MockTransport(), True)
 
230
        self.assertEqual('45/foo', my_store._relpath('foo'))
256
231
 
257
232
    def test__relpath_simple_suffixed(self):
258
 
        my_store = TransportStore(MockTransport())
 
233
        my_store = store.TransportStore(MockTransport())
 
234
        my_store.register_suffix('gz')
259
235
        my_store.register_suffix('bar')
260
 
        my_store.register_suffix('baz')
261
 
        self.assertEqual('foo.baz', my_store._relpath(b'foo', ['baz']))
262
 
        self.assertEqual('foo.bar.baz', my_store._relpath(
263
 
            b'foo', ['bar', 'baz']))
 
236
        self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
 
237
        self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
264
238
 
265
239
    def test__relpath_prefixed_suffixed(self):
266
 
        my_store = TransportStore(MockTransport(), True)
 
240
        my_store = store.TransportStore(MockTransport(), True)
 
241
        my_store.register_suffix('gz')
267
242
        my_store.register_suffix('bar')
268
 
        my_store.register_suffix('baz')
269
 
        self.assertEqual('45/foo.baz', my_store._relpath(b'foo', ['baz']))
270
 
        self.assertEqual('45/foo.bar.baz',
271
 
                         my_store._relpath(b'foo', ['bar', 'baz']))
 
243
        self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
 
244
        self.assertEqual('45/foo.gz.bar',
 
245
                         my_store._relpath('foo', ['gz', 'bar']))
272
246
 
273
247
    def test_add_simple(self):
274
 
        stream = BytesIO(b"content")
 
248
        stream = StringIO("content")
275
249
        my_store = InstrumentedTransportStore(MockTransport())
276
 
        my_store.add(stream, b"foo")
 
250
        my_store.add(stream, "foo")
277
251
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
278
252
 
279
253
    def test_add_prefixed(self):
280
 
        stream = BytesIO(b"content")
 
254
        stream = StringIO("content")
281
255
        my_store = InstrumentedTransportStore(MockTransport(), True)
282
 
        my_store.add(stream, b"foo")
 
256
        my_store.add(stream, "foo")
283
257
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
284
258
 
285
259
    def test_add_simple_suffixed(self):
286
 
        stream = BytesIO(b"content")
 
260
        stream = StringIO("content")
287
261
        my_store = InstrumentedTransportStore(MockTransport())
288
262
        my_store.register_suffix('dsc')
289
 
        my_store.add(stream, b"foo", b'dsc')
 
263
        my_store.add(stream, "foo", 'dsc')
290
264
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
291
 
 
 
265
        
292
266
    def test_add_simple_suffixed(self):
293
 
        stream = BytesIO(b"content")
 
267
        stream = StringIO("content")
294
268
        my_store = InstrumentedTransportStore(MockTransport(), True)
295
269
        my_store.register_suffix('dsc')
296
 
        my_store.add(stream, b"foo", 'dsc')
 
270
        my_store.add(stream, "foo", 'dsc')
297
271
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
298
272
 
299
 
    def get_populated_store(self, prefixed=False,
300
 
                            store_class=TextStore, compressed=False):
301
 
        my_store = store_class(MemoryTransport(), prefixed,
302
 
                               compressed=compressed)
 
273
    def get_populated_store(self, prefixed=False, store_class=TextStore):
 
274
        my_store = store_class(MemoryTransport(), prefixed)
303
275
        my_store.register_suffix('sig')
304
 
        stream = BytesIO(b"signature")
305
 
        my_store.add(stream, b"foo", 'sig')
306
 
        stream = BytesIO(b"content")
307
 
        my_store.add(stream, b"foo")
308
 
        stream = BytesIO(b"signature for missing base")
309
 
        my_store.add(stream, b"missing", 'sig')
 
276
        stream = StringIO("signature")
 
277
        my_store.add(stream, "foo", 'sig')
 
278
        stream = StringIO("content")
 
279
        my_store.add(stream, "foo")
 
280
        stream = StringIO("signature for missing base")
 
281
        my_store.add(stream, "missing", 'sig')
310
282
        return my_store
311
 
 
 
283
        
312
284
    def test_has_simple(self):
313
285
        my_store = self.get_populated_store()
314
 
        self.assertEqual(True, my_store.has_id(b'foo'))
 
286
        self.assertEqual(True, my_store.has_id('foo'))
315
287
        my_store = self.get_populated_store(True)
316
 
        self.assertEqual(True, my_store.has_id(b'foo'))
 
288
        self.assertEqual(True, my_store.has_id('foo'))
317
289
 
318
290
    def test_has_suffixed(self):
319
291
        my_store = self.get_populated_store()
320
 
        self.assertEqual(True, my_store.has_id(b'foo', 'sig'))
 
292
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
321
293
        my_store = self.get_populated_store(True)
322
 
        self.assertEqual(True, my_store.has_id(b'foo', 'sig'))
 
294
        self.assertEqual(True, my_store.has_id('foo', 'sig'))
323
295
 
324
296
    def test_has_suffixed_no_base(self):
325
297
        my_store = self.get_populated_store()
326
 
        self.assertEqual(False, my_store.has_id(b'missing'))
 
298
        self.assertEqual(False, my_store.has_id('missing'))
327
299
        my_store = self.get_populated_store(True)
328
 
        self.assertEqual(False, my_store.has_id(b'missing'))
 
300
        self.assertEqual(False, my_store.has_id('missing'))
329
301
 
330
302
    def test_get_simple(self):
331
303
        my_store = self.get_populated_store()
332
 
        self.assertEqual(b'content', my_store.get(b'foo').read())
 
304
        self.assertEqual('content', my_store.get('foo').read())
333
305
        my_store = self.get_populated_store(True)
334
 
        self.assertEqual(b'content', my_store.get(b'foo').read())
 
306
        self.assertEqual('content', my_store.get('foo').read())
335
307
 
336
308
    def test_get_suffixed(self):
337
309
        my_store = self.get_populated_store()
338
 
        self.assertEqual(b'signature', my_store.get(b'foo', 'sig').read())
 
310
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
339
311
        my_store = self.get_populated_store(True)
340
 
        self.assertEqual(b'signature', my_store.get(b'foo', 'sig').read())
 
312
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
341
313
 
342
314
    def test_get_suffixed_no_base(self):
343
315
        my_store = self.get_populated_store()
344
 
        self.assertEqual(b'signature for missing base',
345
 
                         my_store.get(b'missing', 'sig').read())
 
316
        self.assertEqual('signature for missing base',
 
317
                         my_store.get('missing', 'sig').read())
346
318
        my_store = self.get_populated_store(True)
347
 
        self.assertEqual(b'signature for missing base',
348
 
                         my_store.get(b'missing', 'sig').read())
 
319
        self.assertEqual('signature for missing base',
 
320
                         my_store.get('missing', 'sig').read())
349
321
 
350
322
    def test___iter__no_suffix(self):
351
 
        my_store = TextStore(MemoryTransport(),
352
 
                             prefixed=False, compressed=False)
353
 
        stream = BytesIO(b"content")
354
 
        my_store.add(stream, b"foo")
355
 
        self.assertEqual({b'foo'},
 
323
        my_store = TextStore(MemoryTransport(), False)
 
324
        stream = StringIO("content")
 
325
        my_store.add(stream, "foo")
 
326
        self.assertEqual(set(['foo']),
356
327
                         set(my_store.__iter__()))
357
328
 
358
329
    def test___iter__(self):
359
 
        self.assertEqual({b'foo'},
 
330
        self.assertEqual(set(['foo']),
360
331
                         set(self.get_populated_store().__iter__()))
361
 
        self.assertEqual({b'foo'},
 
332
        self.assertEqual(set(['foo']),
362
333
                         set(self.get_populated_store(True).__iter__()))
363
334
 
364
335
    def test___iter__compressed(self):
365
 
        self.assertEqual({b'foo'},
366
 
                         set(self.get_populated_store(
367
 
                             compressed=True).__iter__()))
368
 
        self.assertEqual({b'foo'},
369
 
                         set(self.get_populated_store(
370
 
                             True, compressed=True).__iter__()))
 
336
        self.assertEqual(set(['foo']),
 
337
                         set(self.get_populated_store(
 
338
                             store_class=CompressedTextStore).__iter__()))
 
339
        self.assertEqual(set(['foo']),
 
340
                         set(self.get_populated_store(
 
341
                             True, CompressedTextStore).__iter__()))
371
342
 
372
343
    def test___len__(self):
373
344
        self.assertEqual(1, len(self.get_populated_store()))
374
345
 
375
 
    def test_relpath_escaped(self):
376
 
        my_store = TransportStore(MemoryTransport())
377
 
        self.assertEqual('%25', my_store._relpath(b'%'))
378
 
 
379
 
    def test_escaped_uppercase(self):
380
 
        """Uppercase letters are escaped for safety on Windows"""
381
 
        my_store = TransportStore(MemoryTransport(), prefixed=True,
382
 
                                  escaped=True)
383
 
        # a particularly perverse file-id! :-)
384
 
        self.assertEqual(my_store._relpath(b'C:<>'), 'be/%2543%253a%253c%253e')
385
 
 
386
 
 
387
 
class TestVersionFileStore(TestCaseWithTransport):
388
 
 
389
 
    def get_scope(self):
390
 
        return self._transaction
391
 
 
392
 
    def setUp(self):
393
 
        super(TestVersionFileStore, self).setUp()
394
 
        self.vfstore = VersionedFileStore(MemoryTransport(),
395
 
                                          versionedfile_class=WeaveFile)
396
 
        self.vfstore.get_scope = self.get_scope
397
 
        self._transaction = None
398
 
 
399
 
    def test_get_weave_registers_dirty_in_write(self):
400
 
        self._transaction = transactions.WriteTransaction()
401
 
        vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
402
 
        self._transaction.finish()
403
 
        self._transaction = None
404
 
        self.assertRaises(errors.OutSideTransaction,
405
 
                          vf.add_lines, b'b', [], [])
406
 
        self._transaction = transactions.WriteTransaction()
407
 
        vf = self.vfstore.get_weave(b'id', self._transaction)
408
 
        self._transaction.finish()
409
 
        self._transaction = None
410
 
        self.assertRaises(errors.OutSideTransaction,
411
 
                          vf.add_lines, b'b', [], [])
412
 
 
413
 
    def test_get_weave_readonly_cant_write(self):
414
 
        self._transaction = transactions.WriteTransaction()
415
 
        vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
416
 
        self._transaction.finish()
417
 
        self._transaction = transactions.ReadOnlyTransaction()
418
 
        vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
419
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, b'b', [], [])
420
 
 
421
 
    def test___iter__escaped(self):
422
 
        self.vfstore = VersionedFileStore(MemoryTransport(),
423
 
                                          prefixed=True, escaped=True, versionedfile_class=WeaveFile)
424
 
        self.vfstore.get_scope = self.get_scope
425
 
        self._transaction = transactions.WriteTransaction()
426
 
        vf = self.vfstore.get_weave_or_empty(b' ', self._transaction)
427
 
        vf.add_lines(b'a', [], [])
428
 
        del vf
429
 
        self._transaction.finish()
430
 
        self.assertEqual([b' '], list(self.vfstore))
 
346
    def test_copy_suffixes(self):
 
347
        from_store = self.get_populated_store()
 
348
        to_store = CompressedTextStore(MemoryTransport(), True)
 
349
        to_store.register_suffix('sig')
 
350
        copy_all(from_store, to_store)
 
351
        self.assertEqual(1, len(to_store))
 
352
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
 
353
        self.assertEqual('content', to_store.get('foo').read())
 
354
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
 
355
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')