1
# Copyright (C) 2005, 2007 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Development Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Test Store implementations."""
19
19
from cStringIO import StringIO
23
import bzrlib.errors as errors
24
from bzrlib.errors import BzrError, UnlistableStore, NoSuchFile
22
from bzrlib.errors import BzrError, UnlistableStore
23
from bzrlib.store import copy_all
25
24
from bzrlib.transport.local import LocalTransport
25
from bzrlib.transport import NoSuchFile
26
from bzrlib.store.compressed_text import CompressedTextStore
26
27
from bzrlib.store.text import TextStore
27
from bzrlib.tests import TestCase, TestCaseInTempDir, TestCaseWithTransport
28
from bzrlib.selftest import TestCase, TestCaseInTempDir
28
29
import bzrlib.store as store
29
import bzrlib.store.versioned
30
import bzrlib.transactions as transactions
31
30
import bzrlib.transport as transport
32
31
from bzrlib.transport.memory import MemoryTransport
51
50
store_a = self.get_store('a')
52
store_a.add(StringIO('foo'), '1')
51
store_a.add('foo', '1')
54
53
store_b = self.get_store('b')
55
store_b.copy_all_ids(store_a)
54
copy_all(store_a, store_b)
56
55
self.assertEqual(store_a.get('1').read(), 'foo')
57
56
self.assertEqual(store_b.get('1').read(), 'foo')
58
57
# TODO: Switch the exception form UnlistableStore to
64
63
def test_get(self):
65
64
store = self.get_store()
66
65
self.fill_store(store)
68
67
self.check_content(store, 'a', 'hello')
69
68
self.check_content(store, 'b', 'other')
70
69
self.check_content(store, 'c', 'something')
72
71
# Make sure that requesting a non-existing file fails
73
72
self.assertRaises(KeyError, self.check_content, store, 'd', None)
82
81
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
84
def get_store(self, path=u'.'):
85
t = transport.get_transport(path)
86
return TextStore(t, compressed=True)
83
def get_store(self, path='.'):
84
t = LocalTransport(path)
85
return CompressedTextStore(t)
88
87
def test_total_size(self):
89
store = self.get_store(u'.')
88
store = self.get_store('.')
90
89
store.register_suffix('dsc')
91
90
store.add(StringIO('goodbye'), '123123')
92
91
store.add(StringIO('goodbye2'), '123123', 'dsc')
93
92
# these get gzipped - content should be stable
94
93
self.assertEqual(store.total_size(), (2, 55))
96
95
def test__relpath_suffixed(self):
97
my_store = TextStore(MockTransport(),
98
prefixed=True, compressed=True)
96
my_store = CompressedTextStore(MockTransport(), True)
99
97
my_store.register_suffix('dsc')
100
self.assertEqual('45/foo.dsc', my_store._relpath('foo', ['dsc']))
98
self.assertEqual('45/foo.dsc.gz', my_store._relpath('foo', ['dsc']))
103
101
class TestMemoryStore(TestCase):
105
103
def get_store(self):
106
return TextStore(MemoryTransport())
104
return store.ImmutableMemoryStore()
106
def test_imports(self):
107
from bzrlib.store import ImmutableMemoryStore
108
109
def test_add_and_retrieve(self):
109
110
store = self.get_store()
139
140
class TestTextStore(TestCaseInTempDir, TestStores):
141
def get_store(self, path=u'.'):
142
t = transport.get_transport(path)
143
return TextStore(t, compressed=False)
142
def get_store(self, path='.'):
143
t = LocalTransport(path)
145
146
def test_total_size(self):
146
147
store = self.get_store()
154
155
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
157
class TestMixedTextStore(TestCaseInTempDir, TestStores):
159
def get_store(self, path=u'.', compressed=True):
160
t = transport.get_transport(path)
161
return TextStore(t, compressed=compressed)
163
def test_get_mixed(self):
164
cs = self.get_store(u'.', compressed=True)
165
s = self.get_store(u'.', compressed=False)
166
cs.add(StringIO('hello there'), 'a')
168
self.failUnlessExists('a.gz')
169
self.failIf(os.path.lexists('a'))
171
self.assertEquals(gzip.GzipFile('a.gz').read(), 'hello there')
173
self.assertEquals(cs.has_id('a'), True)
174
self.assertEquals(s.has_id('a'), True)
175
self.assertEquals(cs.get('a').read(), 'hello there')
176
self.assertEquals(s.get('a').read(), 'hello there')
178
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
180
s.add(StringIO('goodbye'), 'b')
181
self.failUnlessExists('b')
182
self.failIf(os.path.lexists('b.gz'))
183
self.assertEquals(open('b').read(), 'goodbye')
185
self.assertEquals(cs.has_id('b'), True)
186
self.assertEquals(s.has_id('b'), True)
187
self.assertEquals(cs.get('b').read(), 'goodbye')
188
self.assertEquals(s.get('b').read(), 'goodbye')
190
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
192
158
class MockTransport(transport.Transport):
193
159
"""A fake transport for testing with."""
267
233
def test__relpath_simple_suffixed(self):
268
234
my_store = store.TransportStore(MockTransport())
235
my_store.register_suffix('gz')
269
236
my_store.register_suffix('bar')
270
my_store.register_suffix('baz')
271
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
237
self.assertEqual('foo.gz', my_store._relpath('foo', ['gz']))
238
self.assertEqual('foo.gz.bar', my_store._relpath('foo', ['gz', 'bar']))
274
240
def test__relpath_prefixed_suffixed(self):
275
241
my_store = store.TransportStore(MockTransport(), True)
242
my_store.register_suffix('gz')
276
243
my_store.register_suffix('bar')
277
my_store.register_suffix('baz')
278
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
279
self.assertEqual('45/foo.bar.baz',
280
my_store._relpath('foo', ['bar', 'baz']))
244
self.assertEqual('45/foo.gz', my_store._relpath('foo', ['gz']))
245
self.assertEqual('45/foo.gz.bar',
246
my_store._relpath('foo', ['gz', 'bar']))
282
248
def test_add_simple(self):
283
249
stream = StringIO("content")
297
263
my_store.register_suffix('dsc')
298
264
my_store.add(stream, "foo", 'dsc')
299
265
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
301
267
def test_add_simple_suffixed(self):
302
268
stream = StringIO("content")
303
269
my_store = InstrumentedTransportStore(MockTransport(), True)
305
271
my_store.add(stream, "foo", 'dsc')
306
272
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
308
def get_populated_store(self, prefixed=False,
309
store_class=TextStore, compressed=False):
310
my_store = store_class(MemoryTransport(), prefixed,
311
compressed=compressed)
274
def get_populated_store(self, prefixed=False, store_class=TextStore):
275
my_store = store_class(MemoryTransport(), prefixed)
312
276
my_store.register_suffix('sig')
313
277
stream = StringIO("signature")
314
278
my_store.add(stream, "foo", 'sig')
357
321
my_store.get('missing', 'sig').read())
359
323
def test___iter__no_suffix(self):
360
my_store = TextStore(MemoryTransport(),
361
prefixed=False, compressed=False)
324
my_store = TextStore(MemoryTransport(), False)
362
325
stream = StringIO("content")
363
326
my_store.add(stream, "foo")
364
327
self.assertEqual(set(['foo']),
373
336
def test___iter__compressed(self):
374
337
self.assertEqual(set(['foo']),
375
338
set(self.get_populated_store(
376
compressed=True).__iter__()))
339
store_class=CompressedTextStore).__iter__()))
377
340
self.assertEqual(set(['foo']),
378
341
set(self.get_populated_store(
379
True, compressed=True).__iter__()))
342
True, CompressedTextStore).__iter__()))
381
344
def test___len__(self):
382
345
self.assertEqual(1, len(self.get_populated_store()))
384
347
def test_copy_suffixes(self):
385
348
from_store = self.get_populated_store()
386
to_store = TextStore(MemoryTransport(),
387
prefixed=True, compressed=True)
349
to_store = CompressedTextStore(MemoryTransport(), True)
388
350
to_store.register_suffix('sig')
389
to_store.copy_all_ids(from_store)
351
copy_all(from_store, to_store)
390
352
self.assertEqual(1, len(to_store))
391
353
self.assertEqual(set(['foo']), set(to_store.__iter__()))
392
354
self.assertEqual('content', to_store.get('foo').read())
396
358
def test_relpath_escaped(self):
397
359
my_store = store.TransportStore(MemoryTransport())
398
360
self.assertEqual('%25', my_store._relpath('%'))
400
def test_escaped_uppercase(self):
401
"""Uppercase letters are escaped for safety on Windows"""
402
my_store = store.TransportStore(MemoryTransport(), prefixed=True,
404
# a particularly perverse file-id! :-)
405
self.assertEquals(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
408
class TestVersionFileStore(TestCaseWithTransport):
411
return self._transaction
414
super(TestVersionFileStore, self).setUp()
415
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport())
416
self.vfstore.get_scope = self.get_scope
417
self._transaction = None
419
def test_get_weave_registers_dirty_in_write(self):
420
self._transaction = transactions.WriteTransaction()
421
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
422
self._transaction.finish()
423
self._transaction = None
424
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
425
self._transaction = transactions.WriteTransaction()
426
vf = self.vfstore.get_weave('id', self._transaction)
427
self._transaction.finish()
428
self._transaction = None
429
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
431
def test_get_weave_readonly_cant_write(self):
432
self._transaction = transactions.WriteTransaction()
433
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
434
self._transaction.finish()
435
self._transaction = transactions.ReadOnlyTransaction()
436
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
437
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
439
def test___iter__escaped(self):
440
self.vfstore = store.versioned.VersionedFileStore(MemoryTransport(),
441
prefixed=True, escaped=True)
442
self.vfstore.get_scope = self.get_scope
443
self._transaction = transactions.WriteTransaction()
444
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
445
vf.add_lines('a', [], [])
447
self._transaction.finish()
448
self.assertEqual([' '], list(self.vfstore))