395
token = repository.lock_write(token=token)
394
token = repository.lock_write(token=token).repository_token
396
395
except errors.LockContention, e:
397
396
return FailedSmartServerResponse(('LockContention',))
398
397
except errors.UnlockableTransport:
505
504
for record in substream:
506
505
if record.storage_kind in ('chunked', 'fulltext'):
507
506
serialised = record_to_fulltext_bytes(record)
508
elif record.storage_kind == 'inventory-delta':
509
serialised = record_to_inventory_delta_bytes(record)
510
507
elif record.storage_kind == 'absent':
511
508
raise ValueError("Absent factory for %s" % (record.key,))
544
541
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
547
def __init__(self, byte_stream):
544
def __init__(self, byte_stream, record_counter):
548
545
"""Create a _ByteStreamDecoder."""
549
546
self.stream_decoder = pack.ContainerPushParser()
550
547
self.current_type = None
551
548
self.first_bytes = None
552
549
self.byte_stream = byte_stream
550
self._record_counter = record_counter
554
553
def iter_stream_decoder(self):
555
554
"""Iterate the contents of the pack from stream_decoder."""
581
580
def record_stream(self):
582
581
"""Yield substream_type, substream from the byte stream."""
582
def wrap_and_count(pb, rc, substream):
583
"""Yield records from stream while showing progress."""
586
if self.current_type != 'revisions' and self.key_count != 0:
587
# As we know the number of revisions now (in self.key_count)
588
# we can setup and use record_counter (rc).
589
if not rc.is_initialized():
590
rc.setup(self.key_count, self.key_count)
591
for record in substream.read():
593
if rc.is_initialized() and counter == rc.STEP:
594
rc.increment(counter)
595
pb.update('Estimate', rc.current, rc.max)
597
if self.current_type == 'revisions':
598
# Total records is proportional to number of revs
599
# to fetch. With remote, we used self.key_count to
600
# track the number of revs. Once we have the revs
601
# counts in self.key_count, the progress bar changes
602
# from 'Estimating..' to 'Estimate' above.
604
if counter == rc.STEP:
605
pb.update('Estimating..', self.key_count)
583
610
self.seed_state()
611
pb = ui.ui_factory.nested_progress_bar()
612
rc = self._record_counter
584
613
# Make and consume sub generators, one per substream type:
585
614
while self.first_bytes is not None:
586
615
substream = NetworkRecordStream(self.iter_substream_bytes())
587
616
# after substream is fully consumed, self.current_type is set to
588
617
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
618
yield self.current_type, wrap_and_count(pb, rc, substream)
620
pb.update('Done', rc.max, rc.max)
591
623
def seed_state(self):
592
624
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
629
list(self.iter_substream_bytes())
600
def _byte_stream_to_stream(byte_stream):
632
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
633
"""Convert a byte stream into a format and a stream.
603
635
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
636
:return: (RepositoryFormat, stream_generator)
606
decoder = _ByteStreamDecoder(byte_stream)
638
decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
639
for bytes in byte_stream:
608
640
decoder.stream_decoder.accept_bytes(bytes)
609
641
for record in decoder.stream_decoder.read_pending_records(max=1):