107
107
def test_adding_fails_when_present(self):
108
108
my_store = self.get_store()
109
my_store.add(BytesIO(b'hello'), 'aa')
109
my_store.add(BytesIO(b'hello'), b'aa')
110
110
self.assertRaises(BzrError,
111
my_store.add, BytesIO(b'hello'), 'aa')
111
my_store.add, BytesIO(b'hello'), b'aa')
113
113
def test_total_size(self):
114
114
store = self.get_store()
115
store.add(BytesIO(b'goodbye'), '123123')
116
store.add(BytesIO(b'goodbye2'), '123123.dsc')
115
store.add(BytesIO(b'goodbye'), b'123123')
116
store.add(BytesIO(b'goodbye2'), b'123123.dsc')
117
117
self.assertEqual(store.total_size(), (2, 15))
118
118
# TODO: Switch the exception form UnlistableStore to
119
119
# or make Stores throw UnlistableStore if their
240
240
def test__relpath_unregister_suffixes(self):
241
241
my_store = TransportStore(MockTransport())
242
self.assertRaises(ValueError, my_store._relpath, 'foo', ['gz'])
243
self.assertRaises(ValueError, my_store._relpath, 'foo', ['dsc', 'gz'])
242
self.assertRaises(ValueError, my_store._relpath, b'foo', [b'gz'])
243
self.assertRaises(ValueError, my_store._relpath, b'foo', [b'dsc', b'gz'])
245
245
def test__relpath_simple(self):
246
246
my_store = TransportStore(MockTransport())
247
self.assertEqual("foo", my_store._relpath('foo'))
247
self.assertEqual("foo", my_store._relpath(b'foo'))
249
249
def test__relpath_prefixed(self):
250
250
my_store = TransportStore(MockTransport(), True)
251
self.assertEqual('45/foo', my_store._relpath('foo'))
251
self.assertEqual('45/foo', my_store._relpath(b'foo'))
253
253
def test__relpath_simple_suffixed(self):
254
254
my_store = TransportStore(MockTransport())
255
255
my_store.register_suffix('bar')
256
256
my_store.register_suffix('baz')
257
self.assertEqual('foo.baz', my_store._relpath('foo', ['baz']))
258
self.assertEqual('foo.bar.baz', my_store._relpath('foo', ['bar', 'baz']))
257
self.assertEqual('foo.baz', my_store._relpath(b'foo', ['baz']))
258
self.assertEqual('foo.bar.baz', my_store._relpath(b'foo', ['bar', 'baz']))
260
260
def test__relpath_prefixed_suffixed(self):
261
261
my_store = TransportStore(MockTransport(), True)
262
262
my_store.register_suffix('bar')
263
263
my_store.register_suffix('baz')
264
self.assertEqual('45/foo.baz', my_store._relpath('foo', ['baz']))
264
self.assertEqual('45/foo.baz', my_store._relpath(b'foo', [b'baz']))
265
265
self.assertEqual('45/foo.bar.baz',
266
my_store._relpath('foo', ['bar', 'baz']))
266
my_store._relpath(b'foo', [b'bar', b'baz']))
268
268
def test_add_simple(self):
269
269
stream = BytesIO(b"content")
270
270
my_store = InstrumentedTransportStore(MockTransport())
271
my_store.add(stream, "foo")
271
my_store.add(stream, b"foo")
272
272
self.assertEqual([("_add", "foo", stream)], my_store._calls)
274
274
def test_add_prefixed(self):
275
275
stream = BytesIO(b"content")
276
276
my_store = InstrumentedTransportStore(MockTransport(), True)
277
my_store.add(stream, "foo")
277
my_store.add(stream, b"foo")
278
278
self.assertEqual([("_add", "45/foo", stream)], my_store._calls)
280
280
def test_add_simple_suffixed(self):
281
281
stream = BytesIO(b"content")
282
282
my_store = InstrumentedTransportStore(MockTransport())
283
283
my_store.register_suffix('dsc')
284
my_store.add(stream, "foo", 'dsc')
284
my_store.add(stream, b"foo", b'dsc')
285
285
self.assertEqual([("_add", "foo.dsc", stream)], my_store._calls)
287
287
def test_add_simple_suffixed(self):
288
288
stream = BytesIO(b"content")
289
289
my_store = InstrumentedTransportStore(MockTransport(), True)
290
290
my_store.register_suffix('dsc')
291
my_store.add(stream, "foo", 'dsc')
291
my_store.add(stream, b"foo", b'dsc')
292
292
self.assertEqual([("_add", "45/foo.dsc", stream)], my_store._calls)
294
294
def get_populated_store(self, prefixed=False,
297
297
compressed=compressed)
298
298
my_store.register_suffix('sig')
299
299
stream = BytesIO(b"signature")
300
my_store.add(stream, "foo", 'sig')
300
my_store.add(stream, b"foo", 'sig')
301
301
stream = BytesIO(b"content")
302
my_store.add(stream, "foo")
302
my_store.add(stream, b"foo")
303
303
stream = BytesIO(b"signature for missing base")
304
my_store.add(stream, "missing", 'sig')
304
my_store.add(stream, b"missing", 'sig')
307
307
def test_has_simple(self):