40
40
self.assertEqual(f.read(), value)
42
42
def fill_store(self, store):
43
store.add(BytesIO(b'hello'), b'a')
44
store.add(BytesIO(b'other'), b'b')
45
store.add(BytesIO(b'something'), b'c')
46
store.add(BytesIO(b'goodbye'), b'123123')
43
store.add(StringIO('hello'), 'a')
44
store.add(StringIO('other'), 'b')
45
store.add(StringIO('something'), 'c')
46
store.add(StringIO('goodbye'), '123123')
48
def test_copy_all(self):
51
store_a = self.get_store('a')
52
store_a.add(StringIO('foo'), '1')
54
store_b = self.get_store('b')
55
store_b.copy_all_ids(store_a)
56
self.assertEqual(store_a.get('1').read(), 'foo')
57
self.assertEqual(store_b.get('1').read(), 'foo')
58
# TODO: Switch the exception form UnlistableStore to
59
# or make Stores throw UnlistableStore if their
60
# Transport doesn't support listing
61
# store_c = RemoteStore('http://example.com/')
62
# self.assertRaises(UnlistableStore, copy_all, store_c, store_b)
48
64
def test_get(self):
49
65
store = self.get_store()
50
66
self.fill_store(store)
52
self.check_content(store, b'a', b'hello')
53
self.check_content(store, b'b', b'other')
54
self.check_content(store, b'c', b'something')
68
self.check_content(store, 'a', 'hello')
69
self.check_content(store, 'b', 'other')
70
self.check_content(store, 'c', 'something')
56
72
# Make sure that requesting a non-existing file fails
57
self.assertRaises(KeyError, self.check_content, store, b'd', None)
73
self.assertRaises(KeyError, self.check_content, store, 'd', None)
59
75
def test_multiple_add(self):
60
76
"""Multiple add with same ID should raise a BzrError"""
61
77
store = self.get_store()
62
78
self.fill_store(store)
63
self.assertRaises(BzrError, store.add, BytesIO(b'goodbye'), b'123123')
79
self.assertRaises(BzrError, store.add, StringIO('goodbye'), '123123')
66
82
class TestCompressedTextStore(TestCaseInTempDir, TestStores):
147
163
def test_get_mixed(self):
148
164
cs = self.get_store(u'.', compressed=True)
149
165
s = self.get_store(u'.', compressed=False)
150
cs.add(BytesIO(b'hello there'), b'a')
166
cs.add(StringIO('hello there'), 'a')
152
168
self.assertPathExists('a.gz')
153
169
self.assertFalse(os.path.lexists('a'))
155
with gzip.GzipFile('a.gz') as f:
156
self.assertEqual(f.read(), b'hello there')
158
self.assertEqual(cs.has_id(b'a'), True)
159
self.assertEqual(s.has_id(b'a'), True)
160
self.assertEqual(cs.get(b'a').read(), b'hello there')
161
self.assertEqual(s.get(b'a').read(), b'hello there')
163
self.assertRaises(BzrError, s.add, BytesIO(b'goodbye'), b'a')
165
s.add(BytesIO(b'goodbye'), b'b')
171
self.assertEqual(gzip.GzipFile('a.gz').read(), 'hello there')
173
self.assertEqual(cs.has_id('a'), True)
174
self.assertEqual(s.has_id('a'), True)
175
self.assertEqual(cs.get('a').read(), 'hello there')
176
self.assertEqual(s.get('a').read(), 'hello there')
178
self.assertRaises(BzrError, s.add, StringIO('goodbye'), 'a')
180
s.add(StringIO('goodbye'), 'b')
166
181
self.assertPathExists('b')
167
182
self.assertFalse(os.path.lexists('b.gz'))
168
with open('b', 'rb') as f:
169
self.assertEqual(f.read(), b'goodbye')
171
self.assertEqual(cs.has_id(b'b'), True)
172
self.assertEqual(s.has_id(b'b'), True)
173
self.assertEqual(cs.get(b'b').read(), b'goodbye')
174
self.assertEqual(s.get(b'b').read(), b'goodbye')
176
self.assertRaises(BzrError, cs.add, BytesIO(b'again'), b'b')
183
self.assertEqual(open('b').read(), 'goodbye')
185
self.assertEqual(cs.has_id('b'), True)
186
self.assertEqual(s.has_id('b'), True)
187
self.assertEqual(cs.get('b').read(), 'goodbye')
188
self.assertEqual(s.get('b').read(), 'goodbye')
190
self.assertRaises(BzrError, cs.add, StringIO('again'), 'b')
179
192
class MockTransport(transport.Transport):
180
193
"""A fake transport for testing with."""
241
254
def test__relpath_unregister_suffixes(self):
242
255
my_store = TransportStore(MockTransport())
243
self.assertRaises(ValueError, my_store._relpath, b'foo', [b'gz'])
244
self.assertRaises(ValueError, my_store._relpath,
245
b'foo', [b'dsc', b'gz'])
256
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
257
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
247
259
def test__relpath_simple(self):
248
260
my_store = TransportStore(MockTransport())
249
self.assertEqual("foo", my_store._relpath(b'foo'))
261
self.assertEqual("foo", my_store._relpath('foo'))
251
263
def test__relpath_prefixed(self):
252
264
my_store = TransportStore(MockTransport(), True)
253
self.assertEqual('45/foo', my_store._relpath(b'foo'))
265
self.assertEqual('45/foo', my_store._relpath('foo'))
255
267
def test__relpath_simple_suffixed(self):
256
268
my_store = TransportStore(MockTransport())
257
269
my_store.register_suffix('bar')
258
270
my_store.register_suffix('baz')
259
self.assertEqual('foo.baz', my_store._relpath(b'foo', ['baz']))
260
self.assertEqual('foo.bar.baz', my_store._relpath(
261
b'foo', ['bar', 'baz']))
271
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
272
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
263
274
def test__relpath_prefixed_suffixed(self):
264
275
my_store = TransportStore(MockTransport(), True)
265
276
my_store.register_suffix('bar')
266
277
my_store.register_suffix('baz')
267
self.assertEqual('45/foo.baz', my_store._relpath(b'foo', ['baz']))
278
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
268
279
self.assertEqual('45/foo.bar.baz',
269
my_store._relpath(b'foo', ['bar', 'baz']))
280
my_store._relpath('foo', ['bar', 'baz']))
271
282
def test_add_simple(self):
272
stream = BytesIO(b"content")
283
stream = StringIO("content")
273
284
my_store = InstrumentedTransportStore(MockTransport())
274
my_store.add(stream, b"foo")
285
my_store.add(stream, "foo")
275
286
self.assertEqual([("_add", "foo", stream)], my_store._calls)
277
288
def test_add_prefixed(self):
278
stream = BytesIO(b"content")
289
stream = StringIO("content")
279
290
my_store = InstrumentedTransportStore(MockTransport(), True)
280
my_store.add(stream, b"foo")
291
my_store.add(stream, "foo")
281
292
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
283
294
def test_add_simple_suffixed(self):
284
stream = BytesIO(b"content")
295
stream = StringIO("content")
285
296
my_store = InstrumentedTransportStore(MockTransport())
286
297
my_store.register_suffix('dsc')
287
my_store.add(stream, b"foo", b'dsc')
298
my_store.add(stream, "foo", 'dsc')
288
299
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
290
301
def test_add_simple_suffixed(self):
291
stream = BytesIO(b"content")
302
stream = StringIO("content")
292
303
my_store = InstrumentedTransportStore(MockTransport(), True)
293
304
my_store.register_suffix('dsc')
294
my_store.add(stream, b"foo", 'dsc')
305
my_store.add(stream, "foo", 'dsc')
295
306
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
297
308
def get_populated_store(self, prefixed=False,
298
store_class=TextStore, compressed=False):
309
store_class=TextStore, compressed=False):
299
310
my_store = store_class(MemoryTransport(), prefixed,
300
311
compressed=compressed)
301
312
my_store.register_suffix('sig')
302
stream = BytesIO(b"signature")
303
my_store.add(stream, b"foo", 'sig')
304
stream = BytesIO(b"content")
305
my_store.add(stream, b"foo")
306
stream = BytesIO(b"signature for missing base")
307
my_store.add(stream, b"missing", 'sig')
313
stream = StringIO("signature")
314
my_store.add(stream, "foo", 'sig')
315
stream = StringIO("content")
316
my_store.add(stream, "foo")
317
stream = StringIO("signature for missing base")
318
my_store.add(stream, "missing", 'sig')
310
321
def test_has_simple(self):
311
322
my_store = self.get_populated_store()
312
self.assertEqual(True, my_store.has_id(b'foo'))
323
self.assertEqual(True, my_store.has_id('foo'))
313
324
my_store = self.get_populated_store(True)
314
self.assertEqual(True, my_store.has_id(b'foo'))
325
self.assertEqual(True, my_store.has_id('foo'))
316
327
def test_has_suffixed(self):
317
328
my_store = self.get_populated_store()
318
self.assertEqual(True, my_store.has_id(b'foo', 'sig'))
329
self.assertEqual(True, my_store.has_id('foo', 'sig'))
319
330
my_store = self.get_populated_store(True)
320
self.assertEqual(True, my_store.has_id(b'foo', 'sig'))
331
self.assertEqual(True, my_store.has_id('foo', 'sig'))
322
333
def test_has_suffixed_no_base(self):
323
334
my_store = self.get_populated_store()
324
self.assertEqual(False, my_store.has_id(b'missing'))
335
self.assertEqual(False, my_store.has_id('missing'))
325
336
my_store = self.get_populated_store(True)
326
self.assertEqual(False, my_store.has_id(b'missing'))
337
self.assertEqual(False, my_store.has_id('missing'))
328
339
def test_get_simple(self):
329
340
my_store = self.get_populated_store()
330
self.assertEqual(b'content', my_store.get(b'foo').read())
341
self.assertEqual('content', my_store.get('foo').read())
331
342
my_store = self.get_populated_store(True)
332
self.assertEqual(b'content', my_store.get(b'foo').read())
343
self.assertEqual('content', my_store.get('foo').read())
334
345
def test_get_suffixed(self):
335
346
my_store = self.get_populated_store()
336
self.assertEqual(b'signature', my_store.get(b'foo', 'sig').read())
347
self.assertEqual('signature', my_store.get('foo', 'sig').read())
337
348
my_store = self.get_populated_store(True)
338
self.assertEqual(b'signature', my_store.get(b'foo', 'sig').read())
349
self.assertEqual('signature', my_store.get('foo', 'sig').read())
340
351
def test_get_suffixed_no_base(self):
341
352
my_store = self.get_populated_store()
342
self.assertEqual(b'signature for missing base',
343
my_store.get(b'missing', 'sig').read())
353
self.assertEqual('signature for missing base',
354
my_store.get('missing', 'sig').read())
344
355
my_store = self.get_populated_store(True)
345
self.assertEqual(b'signature for missing base',
346
my_store.get(b'missing', 'sig').read())
356
self.assertEqual('signature for missing base',
357
my_store.get('missing', 'sig').read())
348
359
def test___iter__no_suffix(self):
349
360
my_store = TextStore(MemoryTransport(),
350
361
prefixed=False, compressed=False)
351
stream = BytesIO(b"content")
352
my_store.add(stream, b"foo")
353
self.assertEqual({b'foo'},
362
stream = StringIO("content")
363
my_store.add(stream, "foo")
364
self.assertEqual(set(['foo']),
354
365
set(my_store.__iter__()))
356
367
def test___iter__(self):
357
self.assertEqual({b'foo'},
368
self.assertEqual(set(['foo']),
358
369
set(self.get_populated_store().__iter__()))
359
self.assertEqual({b'foo'},
370
self.assertEqual(set(['foo']),
360
371
set(self.get_populated_store(True).__iter__()))
362
373
def test___iter__compressed(self):
363
self.assertEqual({b'foo'},
374
self.assertEqual(set(['foo']),
364
375
set(self.get_populated_store(
365
376
compressed=True).__iter__()))
366
self.assertEqual({b'foo'},
377
self.assertEqual(set(['foo']),
367
378
set(self.get_populated_store(
368
379
True, compressed=True).__iter__()))
370
381
def test___len__(self):
371
382
self.assertEqual(1, len(self.get_populated_store()))
384
def test_copy_suffixes(self):
385
from_store = self.get_populated_store()
386
to_store = TextStore(MemoryTransport(),
387
prefixed=True, compressed=True)
388
to_store.register_suffix('sig')
389
to_store.copy_all_ids(from_store)
390
self.assertEqual(1, len(to_store))
391
self.assertEqual(set(['foo']), set(to_store.__iter__()))
392
self.assertEqual('content', to_store.get('foo').read())
393
self.assertEqual('signature', to_store.get('foo', 'sig').read())
394
self.assertRaises(KeyError, to_store.get, 'missing', 'sig')
373
396
def test_relpath_escaped(self):
374
397
my_store = TransportStore(MemoryTransport())
375
self.assertEqual('%25', my_store._relpath(b'%'))
398
self.assertEqual('%25', my_store._relpath('%'))
377
400
def test_escaped_uppercase(self):
378
401
"""Uppercase letters are escaped for safety on Windows"""
379
402
my_store = TransportStore(MemoryTransport(), prefixed=True,
381
404
# a particularly perverse file-id! :-)
382
self.assertEqual(my_store._relpath(b'C:<>'), 'be/%2543%253a%253c%253e')
405
self.assertEqual(my_store._relpath('C:<>'), 'be/%2543%253a%253c%253e')
385
408
class TestVersionFileStore(TestCaseWithTransport):
391
414
super(TestVersionFileStore, self).setUp()
392
415
self.vfstore = VersionedFileStore(MemoryTransport(),
393
versionedfile_class=WeaveFile)
416
versionedfile_class=WeaveFile)
394
417
self.vfstore.get_scope = self.get_scope
395
418
self._transaction = None
397
420
def test_get_weave_registers_dirty_in_write(self):
398
421
self._transaction = transactions.WriteTransaction()
399
vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
422
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
400
423
self._transaction.finish()
401
424
self._transaction = None
402
self.assertRaises(errors.OutSideTransaction,
403
vf.add_lines, b'b', [], [])
425
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
404
426
self._transaction = transactions.WriteTransaction()
405
vf = self.vfstore.get_weave(b'id', self._transaction)
427
vf = self.vfstore.get_weave('id', self._transaction)
406
428
self._transaction.finish()
407
429
self._transaction = None
408
self.assertRaises(errors.OutSideTransaction,
409
vf.add_lines, b'b', [], [])
430
self.assertRaises(errors.OutSideTransaction, vf.add_lines, 'b', [], [])
411
432
def test_get_weave_readonly_cant_write(self):
412
433
self._transaction = transactions.WriteTransaction()
413
vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
434
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
414
435
self._transaction.finish()
415
436
self._transaction = transactions.ReadOnlyTransaction()
416
vf = self.vfstore.get_weave_or_empty(b'id', self._transaction)
417
self.assertRaises(errors.ReadOnlyError, vf.add_lines, b'b', [], [])
437
vf = self.vfstore.get_weave_or_empty('id', self._transaction)
438
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'b', [], [])
419
440
def test___iter__escaped(self):
420
441
self.vfstore = VersionedFileStore(MemoryTransport(),
421
prefixed=True, escaped=True, versionedfile_class=WeaveFile)
442
prefixed=True, escaped=True, versionedfile_class=WeaveFile)
422
443
self.vfstore.get_scope = self.get_scope
423
444
self._transaction = transactions.WriteTransaction()
424
vf = self.vfstore.get_weave_or_empty(b' ', self._transaction)
425
vf.add_lines(b'a', [], [])
445
vf = self.vfstore.get_weave_or_empty(' ', self._transaction)
446
vf.add_lines('a', [], [])
427
448
self._transaction.finish()
428
self.assertEqual([b' '], list(self.vfstore))
449
self.assertEqual([' '], list(self.vfstore))