73
def recreate_search(self, repository, recipe_bytes):
74
lines = recipe_bytes.split('\n')
74
def recreate_search(self, repository, search_bytes):
75
lines = search_bytes.split('\n')
76
if lines[0] == 'ancestry-of':
78
search_result = graph.PendingAncestryResult(heads, repository)
79
return search_result, None
80
elif lines[0] == 'search':
81
return self.recreate_search_from_recipe(repository, lines[1:])
83
return (None, FailedSmartServerResponse(('BadSearch',)))
85
def recreate_search_from_recipe(self, repository, lines):
75
86
start_keys = set(lines[0].split(' '))
76
87
exclude_keys = set(lines[1].split(' '))
77
88
revision_count = int(lines[2])
330
346
return SuccessfulSmartServerResponse(('ok', token))
349
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
351
def do_repository_request(self, repository, to_network_name):
352
"""Get a stream for inserting into a to_format repository.
354
:param repository: The repository to stream from.
355
:param to_network_name: The network name of the format of the target
358
self._to_format = network_format_registry.get(to_network_name)
359
return None # Signal that we want a body.
361
def do_body(self, body_bytes):
362
repository = self._repository
363
repository.lock_read()
365
search_result, error = self.recreate_search(repository, body_bytes)
366
if error is not None:
369
source = repository._get_source(self._to_format)
370
stream = source.get_stream(search_result)
372
exc_info = sys.exc_info()
374
# On non-error, unlocking is done by the body stream handler.
377
raise exc_info[0], exc_info[1], exc_info[2]
378
return SuccessfulSmartServerResponse(('ok',),
379
body_stream=self.body_stream(stream, repository))
381
def body_stream(self, stream, repository):
382
byte_stream = _stream_to_byte_stream(stream, repository._format)
384
for bytes in byte_stream:
386
except errors.RevisionNotPresent, e:
387
# This shouldn't be able to happen, but as we don't buffer
388
# everything it can in theory happen.
390
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
395
def _stream_to_byte_stream(stream, src_format):
396
"""Convert a record stream to a self delimited byte stream."""
397
pack_writer = pack.ContainerSerialiser()
398
yield pack_writer.begin()
399
yield pack_writer.bytes_record(src_format.network_name(), '')
400
for substream_type, substream in stream:
401
for record in substream:
402
if record.storage_kind in ('chunked', 'fulltext'):
403
serialised = record_to_fulltext_bytes(record)
405
serialised = record.get_bytes_as(record.storage_kind)
407
# Some streams embed the whole stream into the wire
408
# representation of the first record, which means that
409
# later records have no wire representation: we skip them.
410
yield pack_writer.bytes_record(serialised, [(substream_type,)])
411
yield pack_writer.end()
414
def _byte_stream_to_stream(byte_stream):
415
"""Convert a byte stream into a format and a stream.
417
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
418
:return: (RepositoryFormat, stream_generator)
420
stream_decoder = pack.ContainerPushParser()
422
"""Closure to return the substreams."""
423
# May have fully parsed records already.
424
for record in stream_decoder.read_pending_records():
425
record_names, record_bytes = record
426
record_name, = record_names
427
substream_type = record_name[0]
428
substream = NetworkRecordStream([record_bytes])
429
yield substream_type, substream.read()
430
for bytes in byte_stream:
431
stream_decoder.accept_bytes(bytes)
432
for record in stream_decoder.read_pending_records():
433
record_names, record_bytes = record
434
record_name, = record_names
435
substream_type = record_name[0]
436
substream = NetworkRecordStream([record_bytes])
437
yield substream_type, substream.read()
438
for bytes in byte_stream:
439
stream_decoder.accept_bytes(bytes)
440
for record in stream_decoder.read_pending_records(max=1):
441
record_names, src_format_name = record
442
src_format = network_format_registry.get(src_format_name)
443
return src_format, record_stream()
333
446
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
335
448
def do_repository_request(self, repository, token):
414
527
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
528
"""Insert a record stream from a RemoteSink into a repository.
530
This gets bytes pushed to it by the network infrastructure and turns that
531
into a bytes iterator using a thread. That is then processed by
532
_byte_stream_to_stream.
416
535
def do_repository_request(self, repository, resume_tokens):
417
536
"""StreamSink.insert_stream for a remote repository."""
418
537
repository.lock_write()
419
538
tokens = [token for token in resume_tokens.split(' ') if token]
421
repository.resume_write_group(tokens)
423
repository.start_write_group()
424
540
self.repository = repository
425
self.stream_decoder = pack.ContainerPushParser()
426
self.src_format = None
427
541
self.queue = Queue.Queue()
428
self.insert_thread = None
542
self.insert_thread = threading.Thread(target=self._inserter_thread)
543
self.insert_thread.start()
430
545
def do_chunk(self, body_stream_chunk):
431
self.stream_decoder.accept_bytes(body_stream_chunk)
432
for record in self.stream_decoder.read_pending_records():
433
record_names, record_bytes = record
434
if self.src_format is None:
435
src_format_name = record_bytes
436
src_format = network_format_registry.get(src_format_name)
437
self.src_format = src_format
438
self.insert_thread = threading.Thread(target=self._inserter_thread)
439
self.insert_thread.start()
441
record_name, = record_names
442
substream_type = record_name[0]
443
stream = NetworkRecordStream([record_bytes])
444
for record in stream.read():
445
self.queue.put((substream_type, [record]))
546
self.queue.put(body_stream_chunk)
447
548
def _inserter_thread(self):
448
self.repository._get_sink().insert_stream(self.blocking_read_stream(),
550
src_format, stream = _byte_stream_to_stream(
551
self.blocking_byte_stream())
552
self.insert_result = self.repository._get_sink().insert_stream(
553
stream, src_format, self.tokens)
554
self.insert_ok = True
556
self.insert_exception = sys.exc_info()
557
self.insert_ok = False
451
def blocking_read_stream(self):
559
def blocking_byte_stream(self):
453
item = self.queue.get()
454
if item is StopIteration:
561
bytes = self.queue.get()
562
if bytes is StopIteration:
459
567
def do_end(self):
460
568
self.queue.put(StopIteration)
461
569
if self.insert_thread is not None:
462
570
self.insert_thread.join()
465
for prefix, versioned_file in (
466
('texts', self.repository.texts),
467
('inventories', self.repository.inventories),
468
('revisions', self.repository.revisions),
469
('signatures', self.repository.signatures),
471
missing_keys.update((prefix,) + key for key in
472
versioned_file.get_missing_compression_parent_keys())
473
except NotImplementedError:
474
# cannot even attempt suspending.
571
if not self.insert_ok:
572
exc_info = self.insert_exception
573
raise exc_info[0], exc_info[1], exc_info[2]
574
write_group_tokens, missing_keys = self.insert_result
575
if write_group_tokens or missing_keys:
576
# bzip needed? missing keys should typically be a small set.
577
# Should this be a streaming body response ?
578
missing_keys = sorted(missing_keys)
579
bytes = bencode.bencode((write_group_tokens, missing_keys))
580
self.repository.unlock()
581
return SuccessfulSmartServerResponse(('missing-basis', bytes))
478
# suspend the write group and tell the caller what we is
479
# missing. We know we can suspend or else we would not have
480
# entered this code path. (All repositories that can handle
481
# missing keys can handle suspending a write group).
482
write_group_tokens = self.repository.suspend_write_group()
483
# bzip needed? missing keys should typically be a small set.
484
# Should this be a streaming body response ?
485
missing_keys = sorted(missing_keys)
486
bytes = bencode.bencode((write_group_tokens, missing_keys))
487
return SuccessfulSmartServerResponse(('missing-basis', bytes))
489
self.repository.commit_write_group()
490
self.repository.unlock()
491
return SuccessfulSmartServerResponse(('ok', ))
583
self.repository.unlock()
584
return SuccessfulSmartServerResponse(('ok', ))