2219
2214
self.assertEqual([], client._calls)
2222
class TestRepositoryInsertStream(TestRemoteRepository):
2224
def test_unlocked_repo(self):
2225
transport_path = 'quack'
2226
repo, client = self.setup_fake_client_and_repository(transport_path)
2227
client.add_expected_call(
2228
'Repository.insert_stream', ('quack/', ''),
2230
client.add_expected_call(
2231
'Repository.insert_stream', ('quack/', ''),
2233
sink = repo._get_sink()
2234
fmt = repository.RepositoryFormat.get_default_format()
2235
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2236
self.assertEqual([], resume_tokens)
2237
self.assertEqual(set(), missing_keys)
2238
self.assertFinished(client)
2240
def test_locked_repo_with_no_lock_token(self):
2241
transport_path = 'quack'
2242
repo, client = self.setup_fake_client_and_repository(transport_path)
2243
client.add_expected_call(
2244
'Repository.lock_write', ('quack/', ''),
2245
'success', ('ok', ''))
2246
client.add_expected_call(
2247
'Repository.insert_stream', ('quack/', ''),
2249
client.add_expected_call(
2250
'Repository.insert_stream', ('quack/', ''),
2253
sink = repo._get_sink()
2254
fmt = repository.RepositoryFormat.get_default_format()
2255
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2256
self.assertEqual([], resume_tokens)
2257
self.assertEqual(set(), missing_keys)
2258
self.assertFinished(client)
2260
def test_locked_repo_with_lock_token(self):
2261
transport_path = 'quack'
2262
repo, client = self.setup_fake_client_and_repository(transport_path)
2263
client.add_expected_call(
2264
'Repository.lock_write', ('quack/', ''),
2265
'success', ('ok', 'a token'))
2266
client.add_expected_call(
2267
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2269
client.add_expected_call(
2270
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2273
sink = repo._get_sink()
2274
fmt = repository.RepositoryFormat.get_default_format()
2275
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2276
self.assertEqual([], resume_tokens)
2277
self.assertEqual(set(), missing_keys)
2278
self.assertFinished(client)
2217
class TestRepositoryInsertStreamBase(TestRemoteRepository):
2218
"""Base class for Repository.insert_stream and .insert_stream_1.19
2222
def checkInsertEmptyStream(self, repo, client):
2223
"""Insert an empty stream, checking the result.
2225
This checks that there are no resume_tokens or missing_keys, and that
2226
the client is finished.
2228
sink = repo._get_sink()
2229
fmt = repository.RepositoryFormat.get_default_format()
2230
resume_tokens, missing_keys = sink.insert_stream([], fmt, [])
2231
self.assertEqual([], resume_tokens)
2232
self.assertEqual(set(), missing_keys)
2233
self.assertFinished(client)
2236
class TestRepositoryInsertStream(TestRepositoryInsertStreamBase):
2237
"""Tests for using Repository.insert_stream verb when the _1.19 variant is
2240
This test case is very similar to TestRepositoryInsertStream_1_19.
2244
TestRemoteRepository.setUp(self)
2245
self.disable_verb('Repository.insert_stream_1.19')
2247
def test_unlocked_repo(self):
2248
transport_path = 'quack'
2249
repo, client = self.setup_fake_client_and_repository(transport_path)
2250
client.add_expected_call(
2251
'Repository.insert_stream_1.19', ('quack/', ''),
2252
'unknown', ('Repository.insert_stream_1.19',))
2253
client.add_expected_call(
2254
'Repository.insert_stream', ('quack/', ''),
2256
client.add_expected_call(
2257
'Repository.insert_stream', ('quack/', ''),
2259
self.checkInsertEmptyStream(repo, client)
2261
def test_locked_repo_with_no_lock_token(self):
2262
transport_path = 'quack'
2263
repo, client = self.setup_fake_client_and_repository(transport_path)
2264
client.add_expected_call(
2265
'Repository.lock_write', ('quack/', ''),
2266
'success', ('ok', ''))
2267
client.add_expected_call(
2268
'Repository.insert_stream_1.19', ('quack/', ''),
2269
'unknown', ('Repository.insert_stream_1.19',))
2270
client.add_expected_call(
2271
'Repository.insert_stream', ('quack/', ''),
2273
client.add_expected_call(
2274
'Repository.insert_stream', ('quack/', ''),
2277
self.checkInsertEmptyStream(repo, client)
2279
def test_locked_repo_with_lock_token(self):
2280
transport_path = 'quack'
2281
repo, client = self.setup_fake_client_and_repository(transport_path)
2282
client.add_expected_call(
2283
'Repository.lock_write', ('quack/', ''),
2284
'success', ('ok', 'a token'))
2285
client.add_expected_call(
2286
'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
2287
'unknown', ('Repository.insert_stream_1.19',))
2288
client.add_expected_call(
2289
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2291
client.add_expected_call(
2292
'Repository.insert_stream_locked', ('quack/', '', 'a token'),
2295
self.checkInsertEmptyStream(repo, client)
2297
def test_stream_with_inventory_deltas(self):
2298
"""'inventory-deltas' substreams cannot be sent to the
2299
Repository.insert_stream verb, because not all servers that implement
2300
that verb will accept them. So when one is encountered the RemoteSink
2301
immediately stops using that verb and falls back to VFS insert_stream.
2303
transport_path = 'quack'
2304
repo, client = self.setup_fake_client_and_repository(transport_path)
2305
client.add_expected_call(
2306
'Repository.insert_stream_1.19', ('quack/', ''),
2307
'unknown', ('Repository.insert_stream_1.19',))
2308
client.add_expected_call(
2309
'Repository.insert_stream', ('quack/', ''),
2311
client.add_expected_call(
2312
'Repository.insert_stream', ('quack/', ''),
2314
# Create a fake real repository for insert_stream to fall back on, so
2315
# that we can directly see the records the RemoteSink passes to the
2320
def insert_stream(self, stream, src_format, resume_tokens):
2321
for substream_kind, substream in stream:
2322
self.records.append(
2323
(substream_kind, [record.key for record in substream]))
2324
return ['fake tokens'], ['fake missing keys']
2325
fake_real_sink = FakeRealSink()
2326
class FakeRealRepository:
2327
def _get_sink(self):
2328
return fake_real_sink
2329
repo._real_repository = FakeRealRepository()
2330
sink = repo._get_sink()
2331
fmt = repository.RepositoryFormat.get_default_format()
2332
stream = self.make_stream_with_inv_deltas(fmt)
2333
resume_tokens, missing_keys = sink.insert_stream(stream, fmt, [])
2334
# Every record from the first inventory delta should have been sent to
2336
expected_records = [
2337
('inventory-deltas', [('rev2',), ('rev3',)]),
2338
('texts', [('some-rev', 'some-file')])]
2339
self.assertEqual(expected_records, fake_real_sink.records)
2340
# The return values from the real sink's insert_stream are propagated
2341
# back to the original caller.
2342
self.assertEqual(['fake tokens'], resume_tokens)
2343
self.assertEqual(['fake missing keys'], missing_keys)
2344
self.assertFinished(client)
2346
def make_stream_with_inv_deltas(self, fmt):
2347
"""Make a simple stream with an inventory delta followed by more
2348
records and more substreams to test that all records and substreams
2349
from that point on are used.
2351
This sends, in order:
2352
* inventories substream: rev1, rev2, rev3. rev2 and rev3 are
2354
* texts substream: (some-rev, some-file)
2356
# Define a stream using generators so that it isn't rewindable.
2357
inv = inventory.Inventory(revision_id='rev1')
2358
inv.root.revision = 'rev1'
2359
def stream_with_inv_delta():
2360
yield ('inventories', inventories_substream())
2361
yield ('inventory-deltas', inventory_delta_substream())
2363
versionedfile.FulltextContentFactory(
2364
('some-rev', 'some-file'), (), None, 'content')])
2365
def inventories_substream():
2366
# An empty inventory fulltext. This will be streamed normally.
2367
text = fmt._serializer.write_inventory_to_string(inv)
2368
yield versionedfile.FulltextContentFactory(
2369
('rev1',), (), None, text)
2370
def inventory_delta_substream():
2371
# An inventory delta. This can't be streamed via this verb, so it
2372
# will trigger a fallback to VFS insert_stream.
2373
entry = inv.make_entry(
2374
'directory', 'newdir', inv.root.file_id, 'newdir-id')
2375
entry.revision = 'ghost'
2376
delta = [(None, 'newdir', 'newdir-id', entry)]
2377
serializer = inventory_delta.InventoryDeltaSerializer(
2378
versioned_root=True, tree_references=False)
2379
lines = serializer.delta_to_lines('rev1', 'rev2', delta)
2380
yield versionedfile.ChunkedContentFactory(
2381
('rev2',), (('rev1',)), None, lines)
2383
lines = serializer.delta_to_lines('rev1', 'rev3', delta)
2384
yield versionedfile.ChunkedContentFactory(
2385
('rev3',), (('rev1',)), None, lines)
2386
return stream_with_inv_delta()
2389
class TestRepositoryInsertStream_1_19(TestRepositoryInsertStreamBase):
2391
def test_unlocked_repo(self):
2392
transport_path = 'quack'
2393
repo, client = self.setup_fake_client_and_repository(transport_path)
2394
client.add_expected_call(
2395
'Repository.insert_stream_1.19', ('quack/', ''),
2397
client.add_expected_call(
2398
'Repository.insert_stream_1.19', ('quack/', ''),
2400
self.checkInsertEmptyStream(repo, client)
2402
def test_locked_repo_with_no_lock_token(self):
2403
transport_path = 'quack'
2404
repo, client = self.setup_fake_client_and_repository(transport_path)
2405
client.add_expected_call(
2406
'Repository.lock_write', ('quack/', ''),
2407
'success', ('ok', ''))
2408
client.add_expected_call(
2409
'Repository.insert_stream_1.19', ('quack/', ''),
2411
client.add_expected_call(
2412
'Repository.insert_stream_1.19', ('quack/', ''),
2415
self.checkInsertEmptyStream(repo, client)
2417
def test_locked_repo_with_lock_token(self):
2418
transport_path = 'quack'
2419
repo, client = self.setup_fake_client_and_repository(transport_path)
2420
client.add_expected_call(
2421
'Repository.lock_write', ('quack/', ''),
2422
'success', ('ok', 'a token'))
2423
client.add_expected_call(
2424
'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
2426
client.add_expected_call(
2427
'Repository.insert_stream_1.19', ('quack/', '', 'a token'),
2430
self.checkInsertEmptyStream(repo, client)
2281
2433
class TestRepositoryTarball(TestRemoteRepository):