/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_store.py

  • Committer: Richard Wilbur
  • Date: 2016-02-04 19:07:28 UTC
  • mto: This revision was merged to the branch mainline in revision 6618.
  • Revision ID: richard.wilbur@gmail.com-20160204190728-p0zvfii6zase0fw7
Update COPYING.txt from the original http://www.gnu.org/licenses/gpl-2.0.txt  (Only differences were in whitespace.)  Thanks to Petr Stodulka for pointing out the discrepancy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2009, 2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Test Store implementations."""
18
18
 
 
19
from cStringIO import StringIO
19
20
import os
20
21
import gzip
21
22
 
22
 
from ... import errors as errors
23
 
from ...errors import BzrError
24
 
from ...sixish import (
25
 
    BytesIO,
26
 
    )
27
 
from .store import TransportStore
28
 
from .store.text import TextStore
29
 
from .store.versioned import VersionedFileStore
30
 
from ...tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
31
 
from ... import transactions
32
 
from ... import transport
33
 
from ...transport.memory import MemoryTransport
34
 
from ...bzr.weave import WeaveFile
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import BzrError
 
25
from bzrlib.store import TransportStore
 
26
from bzrlib.store.text import TextStore
 
27
from bzrlib.store.versioned import VersionedFileStore
 
28
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
 
29
import bzrlib.transactions as transactions
 
30
import bzrlib.transport as transport
 
31
from bzrlib.transport.memory import MemoryTransport
 
32
from bzrlib.weave import WeaveFile
35
33
 
36
34
 
37
35
class TestStores(object):
42
40
        self.assertEqual(f.read(), value)
43
41
 
44
42
    def fill_store(self, store):
45
 
        store.add(BytesIO(b'hello'), 'a')
46
 
        store.add(BytesIO(b'other'), 'b')
47
 
        store.add(BytesIO(b'something'), 'c')
48
 
        store.add(BytesIO(b'goodbye'), '123123')
 
43
        store.add(StringIO('hello'), 'a')
 
44
        store.add(StringIO('other'), 'b')
 
45
        store.add(StringIO('something'), 'c')
 
46
        store.add(StringIO('goodbye'), '123123')
 
47
 
 
48
    def test_copy_all(self):
 
49
        """Test copying"""
 
50
        os.mkdir('a')
 
51
        store_a = self.get_store('a')
 
52
        store_a.add(StringIO('foo'), '1')
 
53
        os.mkdir('b')
 
54
        store_b = self.get_store('b')
 
55
        store_b.copy_all_ids(store_a)
 
56
        self.assertEqual(store_a.get('1').read(), 'foo')
 
57
        self.assertEqual(store_b.get('1').read(), 'foo')
 
58
        # TODO: Switch the exception form UnlistableStore to
 
59
        #       or make Stores throw UnlistableStore if their
 
60
        #       Transport doesn't support listing
 
61
        # store_c = RemoteStore('http://example.com/')
 
62
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
49
63
 
50
64
    def test_get(self):
51
65
        store = self.get_store()
52
66
        self.fill_store(store)
53
67
 
54
 
        self.check_content(store, 'a', b'hello')
55
 
        self.check_content(store, 'b', b'other')
56
 
        self.check_content(store, 'c', b'something')
 
68
        self.check_content(store, 'a', 'hello')
 
69
        self.check_content(store, 'b', 'other')
 
70
        self.check_content(store, 'c', 'something')
57
71
 
58
72
        # Make sure that requesting a non-existing file fails
59
 
        self.assertRaises(KeyError, self.check_content, store, b'd', None)
 
73
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
60
74
 
61
75
    def test_multiple_add(self):
62
76
        """Multiple add with same ID should raise a BzrError"""
63
77
        store = self.get_store()
64
78
        self.fill_store(store)
65
 
        self.assertRaises(BzrError, store.add, BytesIO(b'goodbye'), '123123')
 
79
        self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
66
80
 
67
81
 
68
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
74
88
    def test_total_size(self):
75
89
        store = self.get_store(u'.')
76
90
        store.register_suffix('dsc')
77
 
        store.add(BytesIO(b'goodbye'), '123123')
78
 
        store.add(BytesIO(b'goodbye2'), '123123', 'dsc')
 
91
        store.add(StringIO('goodbye'), '123123')
 
92
        store.add(StringIO('goodbye2'), '123123', 'dsc')
79
93
        # these get gzipped - content should be stable
80
94
        self.assertEqual(store.total_size(), (2, 55))
81
95
 
93
107
 
94
108
    def test_add_and_retrieve(self):
95
109
        store = self.get_store()
96
 
        store.add(BytesIO(b'hello'), 'aa')
 
110
        store.add(StringIO('hello'), 'aa')
97
111
        self.assertNotEqual(store.get('aa'), None)
98
 
        self.assertEqual(store.get('aa').read(), b'hello')
99
 
        store.add(BytesIO(b'hello world'), 'bb')
 
112
        self.assertEqual(store.get('aa').read(), 'hello')
 
113
        store.add(StringIO('hello world'), 'bb')
100
114
        self.assertNotEqual(store.get('bb'), None)
101
 
        self.assertEqual(store.get('bb').read(), b'hello world')
 
115
        self.assertEqual(store.get('bb').read(), 'hello world')
102
116
 
103
117
    def test_missing_is_absent(self):
104
118
        store = self.get_store()
106
120
 
107
121
    def test_adding_fails_when_present(self):
108
122
        my_store = self.get_store()
109
 
        my_store.add(BytesIO(b'hello'), 'aa')
 
123
        my_store.add(StringIO('hello'), 'aa')
110
124
        self.assertRaises(BzrError,
111
 
                          my_store.add, BytesIO(b'hello'), 'aa')
 
125
                          my_store.add, StringIO('hello'), 'aa')
112
126
 
113
127
    def test_total_size(self):
114
128
        store = self.get_store()
115
 
        store.add(BytesIO(b'goodbye'), '123123')
116
 
        store.add(BytesIO(b'goodbye2'), '123123.dsc')
 
129
        store.add(StringIO('goodbye'), '123123')
 
130
        store.add(StringIO('goodbye2'), '123123.dsc')
117
131
        self.assertEqual(store.total_size(), (2, 15))
118
132
        # TODO: Switch the exception form UnlistableStore to
119
133
        #       or make Stores throw UnlistableStore if their
130
144
 
131
145
    def test_total_size(self):
132
146
        store = self.get_store()
133
 
        store.add(BytesIO(b'goodbye'), '123123')
134
 
        store.add(BytesIO(b'goodbye2'), '123123.dsc')
 
147
        store.add(StringIO('goodbye'), '123123')
 
148
        store.add(StringIO('goodbye2'), '123123.dsc')
135
149
        self.assertEqual(store.total_size(), (2, 15))
136
150
        # TODO: Switch the exception form UnlistableStore to
137
151
        #       or make Stores throw UnlistableStore if their
149
163
    def test_get_mixed(self):
150
164
        cs = self.get_store(u'.', compressed=True)
151
165
        s = self.get_store(u'.', compressed=False)
152
 
        cs.add(BytesIO(b'hello there'), 'a')
 
166
        cs.add(StringIO('hello there'), 'a')
153
167
 
154
168
        self.assertPathExists('a.gz')
155
169
        self.assertFalse(os.path.lexists('a'))
156
170
 
157
 
        self.assertEqual(gzip.GzipFile('a.gz').read(), 'hello there')
158
 
 
159
 
        self.assertEqual(cs.has_id('a'), True)
160
 
        self.assertEqual(s.has_id('a'), True)
161
 
        self.assertEqual(cs.get('a').read(), 'hello there')
162
 
        self.assertEqual(s.get('a').read(), 'hello there')
163
 
 
164
 
        self.assertRaises(BzrError, s.add, BytesIO(b'goodbye'), 'a')
165
 
 
166
 
        s.add(BytesIO(b'goodbye'), 'b')
 
171
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
 
172
 
 
173
        self.assertEquals(cs.has_id('a'), True)
 
174
        self.assertEquals(s.has_id('a'), True)
 
175
        self.assertEquals(cs.get('a').read(), 'hello there')
 
176
        self.assertEquals(s.get('a').read(), 'hello there')
 
177
 
 
178
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
 
179
 
 
180
        s.add(StringIO('goodbye'), 'b')
167
181
        self.assertPathExists('b')
168
182
        self.assertFalse(os.path.lexists('b.gz'))
169
 
        self.assertEqual(open('b').read(), 'goodbye')
170
 
 
171
 
        self.assertEqual(cs.has_id('b'), True)
172
 
        self.assertEqual(s.has_id('b'), True)
173
 
        self.assertEqual(cs.get('b').read(), 'goodbye')
174
 
        self.assertEqual(s.get('b').read(), 'goodbye')
175
 
 
176
 
        self.assertRaises(BzrError, cs.add, BytesIO(b'again'), 'b')
 
183
        self.assertEquals(open('b').read(), 'goodbye')
 
184
 
 
185
        self.assertEquals(cs.has_id('b'), True)
 
186
        self.assertEquals(s.has_id('b'), True)
 
187
        self.assertEquals(cs.get('b').read(), 'goodbye')
 
188
        self.assertEquals(s.get('b').read(), 'goodbye')
 
189
 
 
190
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
177
191
 
178
192
class MockTransport(transport.Transport):
179
193
    """A fake transport for testing with."""
266
280
                         my_store._relpath('foo', ['bar', 'baz']))
267
281
 
268
282
    def test_add_simple(self):
269
 
        stream = BytesIO(b"content")
 
283
        stream = StringIO("content")
270
284
        my_store = InstrumentedTransportStore(MockTransport())
271
285
        my_store.add(stream, "foo")
272
286
        self.assertEqual([("_add", "foo", stream)], my_store._calls)
273
287
 
274
288
    def test_add_prefixed(self):
275
 
        stream = BytesIO(b"content")
 
289
        stream = StringIO("content")
276
290
        my_store = InstrumentedTransportStore(MockTransport(), True)
277
291
        my_store.add(stream, "foo")
278
292
        self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
279
293
 
280
294
    def test_add_simple_suffixed(self):
281
 
        stream = BytesIO(b"content")
 
295
        stream = StringIO("content")
282
296
        my_store = InstrumentedTransportStore(MockTransport())
283
297
        my_store.register_suffix('dsc')
284
298
        my_store.add(stream, "foo", 'dsc')
285
299
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
286
300
 
287
301
    def test_add_simple_suffixed(self):
288
 
        stream = BytesIO(b"content")
 
302
        stream = StringIO("content")
289
303
        my_store = InstrumentedTransportStore(MockTransport(), True)
290
304
        my_store.register_suffix('dsc')
291
305
        my_store.add(stream, "foo", 'dsc')
296
310
        my_store = store_class(MemoryTransport(), prefixed,
297
311
                               compressed=compressed)
298
312
        my_store.register_suffix('sig')
299
 
        stream = BytesIO(b"signature")
 
313
        stream = StringIO("signature")
300
314
        my_store.add(stream, "foo", 'sig')
301
 
        stream = BytesIO(b"content")
 
315
        stream = StringIO("content")
302
316
        my_store.add(stream, "foo")
303
 
        stream = BytesIO(b"signature for missing base")
 
317
        stream = StringIO("signature for missing base")
304
318
        my_store.add(stream, "missing", 'sig')
305
319
        return my_store
306
320
 
324
338
 
325
339
    def test_get_simple(self):
326
340
        my_store = self.get_populated_store()
327
 
        self.assertEqual(b'content', my_store.get('foo').read())
 
341
        self.assertEqual('content', my_store.get('foo').read())
328
342
        my_store = self.get_populated_store(True)
329
 
        self.assertEqual(b'content', my_store.get('foo').read())
 
343
        self.assertEqual('content', my_store.get('foo').read())
330
344
 
331
345
    def test_get_suffixed(self):
332
346
        my_store = self.get_populated_store()
333
 
        self.assertEqual(b'signature', my_store.get('foo', 'sig').read())
 
347
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
334
348
        my_store = self.get_populated_store(True)
335
 
        self.assertEqual(b'signature', my_store.get('foo', 'sig').read())
 
349
        self.assertEqual('signature', my_store.get('foo', 'sig').read())
336
350
 
337
351
    def test_get_suffixed_no_base(self):
338
352
        my_store = self.get_populated_store()
339
 
        self.assertEqual(b'signature for missing base',
 
353
        self.assertEqual('signature for missing base',
340
354
                         my_store.get('missing', 'sig').read())
341
355
        my_store = self.get_populated_store(True)
342
 
        self.assertEqual(b'signature for missing base',
 
356
        self.assertEqual('signature for missing base',
343
357
                         my_store.get('missing', 'sig').read())
344
358
 
345
359
    def test___iter__no_suffix(self):
346
360
        my_store = TextStore(MemoryTransport(),
347
361
                             prefixed=False, compressed=False)
348
 
        stream = BytesIO(b"content")
 
362
        stream = StringIO("content")
349
363
        my_store.add(stream, "foo")
350
 
        self.assertEqual({'foo'},
 
364
        self.assertEqual(set(['foo']),
351
365
                         set(my_store.__iter__()))
352
366
 
353
367
    def test___iter__(self):
354
 
        self.assertEqual({'foo'},
 
368
        self.assertEqual(set(['foo']),
355
369
                         set(self.get_populated_store().__iter__()))
356
 
        self.assertEqual({'foo'},
 
370
        self.assertEqual(set(['foo']),
357
371
                         set(self.get_populated_store(True).__iter__()))
358
372
 
359
373
    def test___iter__compressed(self):
360
 
        self.assertEqual({'foo'},
 
374
        self.assertEqual(set(['foo']),
361
375
                         set(self.get_populated_store(
362
376
                             compressed=True).__iter__()))
363
 
        self.assertEqual({'foo'},
 
377
        self.assertEqual(set(['foo']),
364
378
                         set(self.get_populated_store(
365
379
                             True, compressed=True).__iter__()))
366
380
 
367
381
    def test___len__(self):
368
382
        self.assertEqual(1, len(self.get_populated_store()))
369
383
 
 
384
    def test_copy_suffixes(self):
 
385
        from_store = self.get_populated_store()
 
386
        to_store = TextStore(MemoryTransport(),
 
387
                             prefixed=True, compressed=True)
 
388
        to_store.register_suffix('sig')
 
389
        to_store.copy_all_ids(from_store)
 
390
        self.assertEqual(1, len(to_store))
 
391
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
 
392
        self.assertEqual('content', to_store.get('foo').read())
 
393
        self.assertEqual('signature', to_store.get('foo', 'sig').read())
 
394
        self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
 
395
 
370
396
    def test_relpath_escaped(self):
371
397
        my_store = TransportStore(MemoryTransport())
372
398
        self.assertEqual('%25', my_store._relpath('%'))
376
402
        my_store = TransportStore(MemoryTransport(), prefixed=True,
377
403
            escaped=True)
378
404
        # a particularly perverse file-id! :-)
379
 
        self.assertEqual(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
 
405
        self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
380
406
 
381
407
 
382
408
class TestVersionFileStore(TestCaseWithTransport):