/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/selftest/teststore.py

Fixing mutter() calls to not have to do string processing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2007 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Development Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Test Store implementations."""
18
18
 
19
19
from cStringIO import StringIO
20
20
import os
21
 
import gzip
22
21
 
23
 
import bzrlib.errors as errors
24
 
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
 
22
from bzrlib.errors import BzrError, UnlistableStore
 
23
from bzrlib.store import copy_all
25
24
from bzrlib.transport.local import LocalTransport
 
25
from bzrlib.transport import NoSuchFile
 
26
from bzrlib.store.compressed_text import CompressedTextStore
26
27
from bzrlib.store.text import TextStore
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
 
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
 
import bzrlib.store.versioned
30
 
import bzrlib.transactions as transactions
31
30
import bzrlib.transport as transport
32
31
from bzrlib.transport.memory import MemoryTransport
33
32
 
49
48
        """Test copying"""
50
49
        os.mkdir('a')
51
50
        store_a = self.get_store('a')
52
 
        store_a.add(StringIO('foo'), '1')
 
51
        store_a.add('foo', '1')
53
52
        os.mkdir('b')
54
53
        store_b = self.get_store('b')
55
 
        store_b.copy_all_ids(store_a)
 
54
        copy_all(store_a, store_b)
56
55
        self.assertEqual(store_a.get('1').read(), 'foo')
57
56
        self.assertEqual(store_b.get('1').read(), 'foo')
58
57
        # TODO: Switch the exception form UnlistableStore to
64
63
    def test_get(self):
65
64
        store = self.get_store()
66
65
        self.fill_store(store)
67
 
 
 
66
    
68
67
        self.check_content(store, 'a', 'hello')
69
68
        self.check_content(store, 'b', 'other')
70
69
        self.check_content(store, 'c', 'something')
71
 
 
 
70
    
72
71
        # Make sure that requesting a non-existing file fails
73
72
        self.assertRaises(KeyError, self.check_content, store, 'd', None)
74
73
 
81
80
 
82
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
83
82
 
84
 
    def get_store(self, path=u'.'):
85
 
        t = transport.get_transport(path)
86
 
        return TextStore(t, compressed=True)
 
83
    def get_store(self, path='.'):
 
84
        t = LocalTransport(path)
 
85
        return CompressedTextStore(t)
87
86
 
88
87
    def test_total_size(self):
89
 
        store = self.get_store(u'.')
 
88
        store = self.get_store('.')
90
89
        store.register_suffix('dsc')
91
90
        store.add(StringIO('goodbye'), '123123')
92
91
        store.add(StringIO('goodbye2'), '123123', 'dsc')
93
92
        # these get gzipped - content should be stable
94
93
        self.assertEqual(store.total_size(), (2, 55))
95
 
 
 
94
        
96
95
    def test__relpath_suffixed(self):
97
 
        my_store = TextStore(MockTransport(),
98
 
                             prefixed=True, compressed=True)
 
96
        my_store = CompressedTextStore(MockTransport(), True)
99
97
        my_store.register_suffix('dsc')
100
 
        self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
 
98
        self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
101
99
 
102
100
 
103
101
class TestMemoryStore(TestCase):
104
 
 
 
102
    
105
103
    def get_store(self):
106
 
        return TextStore(MemoryTransport())
 
104
        return store.ImmutableMemoryStore()
 
105
    
 
106
    def test_imports(self):
 
107
        from bzrlib.store import ImmutableMemoryStore
107
108
 
108
109
    def test_add_and_retrieve(self):
109
110
        store = self.get_store()
138
139
 
139
140
class TestTextStore(TestCaseInTempDir, TestStores):
140
141
 
141
 
    def get_store(self, path=u'.'):
142
 
        t = transport.get_transport(path)
143
 
        return TextStore(t, compressed=False)
 
142
    def get_store(self, path='.'):
 
143
        t = LocalTransport(path)
 
144
        return TextStore(t)
144
145
 
145
146
    def test_total_size(self):
146
147
        store = self.get_store()
154
155
        # self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
155
156
 
156
157
 
157
 
class TestMixedTextStore(TestCaseInTempDir, TestStores):
158
 
 
159
 
    def get_store(self, path=u'.', compressed=True):
160
 
        t = transport.get_transport(path)
161
 
        return TextStore(t, compressed=compressed)
162
 
 
163
 
    def test_get_mixed(self):
164
 
        cs = self.get_store(u'.', compressed=True)
165
 
        s = self.get_store(u'.', compressed=False)
166
 
        cs.add(StringIO('hello there'), 'a')
167
 
 
168
 
        self.failUnlessExists('a.gz')
169
 
        self.failIf(os.path.lexists('a'))
170
 
 
171
 
        self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
172
 
 
173
 
        self.assertEquals(cs.has_id('a'), True)
174
 
        self.assertEquals(s.has_id('a'), True)
175
 
        self.assertEquals(cs.get('a').read(), 'hello there')
176
 
        self.assertEquals(s.get('a').read(), 'hello there')
177
 
 
178
 
        self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
179
 
 
180
 
        s.add(StringIO('goodbye'), 'b')
181
 
        self.failUnlessExists('b')
182
 
        self.failIf(os.path.lexists('b.gz'))
183
 
        self.assertEquals(open('b').read(), 'goodbye')
184
 
 
185
 
        self.assertEquals(cs.has_id('b'), True)
186
 
        self.assertEquals(s.has_id('b'), True)
187
 
        self.assertEquals(cs.get('b').read(), 'goodbye')
188
 
        self.assertEquals(s.get('b').read(), 'goodbye')
189
 
 
190
 
        self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
191
 
 
192
158
class MockTransport(transport.Transport):
193
159
    """A fake transport for testing with."""
194
160
 
240
206
 
241
207
 
242
208
class TestTransportStore(TestCase):
243
 
 
 
209
    
244
210
    def test__relpath_invalid(self):
245
211
        my_store = store.TransportStore(MockTransport())
246
212
        self.assertRaises(ValueError, my_store._relpath, '/foo')
266
232
 
267
233
    def test__relpath_simple_suffixed(self):
268
234
        my_store = store.TransportStore(MockTransport())
 
235
        my_store.register_suffix('gz')
269
236
        my_store.register_suffix('bar')
270
 
        my_store.register_suffix('baz')
271
 
        self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
 
        self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
 
237
        self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
 
238
        self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
273
239
 
274
240
    def test__relpath_prefixed_suffixed(self):
275
241
        my_store = store.TransportStore(MockTransport(), True)
 
242
        my_store.register_suffix('gz')
276
243
        my_store.register_suffix('bar')
277
 
        my_store.register_suffix('baz')
278
 
        self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
279
 
        self.assertEqual('45/foo.bar.baz',
280
 
                         my_store._relpath('foo', ['bar', 'baz']))
 
244
        self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
 
245
        self.assertEqual('45/foo.gz.bar',
 
246
                         my_store._relpath('foo', ['gz', 'bar']))
281
247
 
282
248
    def test_add_simple(self):
283
249
        stream = StringIO("content")
297
263
        my_store.register_suffix('dsc')
298
264
        my_store.add(stream, "foo", 'dsc')
299
265
        self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
300
 
 
 
266
        
301
267
    def test_add_simple_suffixed(self):
302
268
        stream = StringIO("content")
303
269
        my_store = InstrumentedTransportStore(MockTransport(), True)
305
271
        my_store.add(stream, "foo", 'dsc')
306
272
        self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
307
273
 
308
 
    def get_populated_store(self, prefixed=False,
309
 
            store_class=TextStore, compressed=False):
310
 
        my_store = store_class(MemoryTransport(), prefixed,
311
 
                               compressed=compressed)
 
274
    def get_populated_store(self, prefixed=False, store_class=TextStore):
 
275
        my_store = store_class(MemoryTransport(), prefixed)
312
276
        my_store.register_suffix('sig')
313
277
        stream = StringIO("signature")
314
278
        my_store.add(stream, "foo", 'sig')
317
281
        stream = StringIO("signature for missing base")
318
282
        my_store.add(stream, "missing", 'sig')
319
283
        return my_store
320
 
 
 
284
        
321
285
    def test_has_simple(self):
322
286
        my_store = self.get_populated_store()
323
287
        self.assertEqual(True, my_store.has_id('foo'))
357
321
                         my_store.get('missing', 'sig').read())
358
322
 
359
323
    def test___iter__no_suffix(self):
360
 
        my_store = TextStore(MemoryTransport(),
361
 
                             prefixed=False, compressed=False)
 
324
        my_store = TextStore(MemoryTransport(), False)
362
325
        stream = StringIO("content")
363
326
        my_store.add(stream, "foo")
364
327
        self.assertEqual(set(['foo']),
373
336
    def test___iter__compressed(self):
374
337
        self.assertEqual(set(['foo']),
375
338
                         set(self.get_populated_store(
376
 
                             compressed=True).__iter__()))
 
339
                             store_class=CompressedTextStore).__iter__()))
377
340
        self.assertEqual(set(['foo']),
378
341
                         set(self.get_populated_store(
379
 
                             True, compressed=True).__iter__()))
 
342
                             True, CompressedTextStore).__iter__()))
380
343
 
381
344
    def test___len__(self):
382
345
        self.assertEqual(1, len(self.get_populated_store()))
383
346
 
384
347
    def test_copy_suffixes(self):
385
348
        from_store = self.get_populated_store()
386
 
        to_store = TextStore(MemoryTransport(),
387
 
                             prefixed=True, compressed=True)
 
349
        to_store = CompressedTextStore(MemoryTransport(), True)
388
350
        to_store.register_suffix('sig')
389
 
        to_store.copy_all_ids(from_store)
 
351
        copy_all(from_store, to_store)
390
352
        self.assertEqual(1, len(to_store))
391
353
        self.assertEqual(set(['foo']), set(to_store.__iter__()))
392
354
        self.assertEqual('content', to_store.get('foo').read())
396
358
    def test_relpath_escaped(self):
397
359
        my_store = store.TransportStore(MemoryTransport())
398
360
        self.assertEqual('%25', my_store._relpath('%'))
399
 
 
400
 
    def test_escaped_uppercase(self):
401
 
        """Uppercase letters are escaped for safety on Windows"""
402
 
        my_store = store.TransportStore(MemoryTransport(), prefixed=True,
403
 
            escaped=True)
404
 
        # a particularly perverse file-id! :-)
405
 
        self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
406
 
 
407
 
 
408
 
class TestVersionFileStore(TestCaseWithTransport):
409
 
 
410
 
    def get_scope(self):
411
 
        return self._transaction
412
 
 
413
 
    def setUp(self):
414
 
        super(TestVersionFileStore, self).setUp()
415
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
416
 
        self.vfstore.get_scope = self.get_scope
417
 
        self._transaction = None
418
 
 
419
 
    def test_get_weave_registers_dirty_in_write(self):
420
 
        self._transaction = transactions.WriteTransaction()
421
 
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
422
 
        self._transaction.finish()
423
 
        self._transaction = None
424
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
 
        self._transaction = transactions.WriteTransaction()
426
 
        vf = self.vfstore.get_weave('id', self._transaction)
427
 
        self._transaction.finish()
428
 
        self._transaction = None
429
 
        self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
430
 
 
431
 
    def test_get_weave_readonly_cant_write(self):
432
 
        self._transaction = transactions.WriteTransaction()
433
 
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
434
 
        self._transaction.finish()
435
 
        self._transaction = transactions.ReadOnlyTransaction()
436
 
        vf = self.vfstore.get_weave_or_empty('id', self._transaction)
437
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
438
 
 
439
 
    def test___iter__escaped(self):
440
 
        self.vfstore = store.versioned.VersionedFileStore(MemoryTransport(),
441
 
            prefixed=True, escaped=True)
442
 
        self.vfstore.get_scope = self.get_scope
443
 
        self._transaction = transactions.WriteTransaction()
444
 
        vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
445
 
        vf.add_lines('a', [], [])
446
 
        del vf
447
 
        self._transaction.finish()
448
 
        self.assertEqual([' '], list(self.vfstore))