32
36
SmartServerRequest,
33
37
SuccessfulSmartServerResponse,
35
from bzrlib.repository import _strip_NULL_ghosts
39
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
36
40
from bzrlib import revision as _mod_revision
41
from bzrlib.util import bencode
42
from bzrlib.versionedfile import NetworkRecordStream
39
45
class SmartServerRepositoryRequest(SmartServerRequest):
336
342
return SuccessfulSmartServerResponse(('ok',))
345
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
347
def do_repository_request(self, repository, str_bool_new_value):
348
if str_bool_new_value == 'True':
352
repository.set_make_working_trees(new_value)
353
return SuccessfulSmartServerResponse(('ok',))
339
356
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
340
357
"""Get the raw repository files as a tarball.
392
409
tarball.add(dirname, '.bzr') # recursive by default
414
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
416
def do_repository_request(self, repository):
417
"""StreamSink.insert_stream for a remote repository."""
418
repository.lock_write()
419
repository.start_write_group()
420
self.repository = repository
421
self.stream_decoder = pack.ContainerPushParser()
422
self.src_format = None
423
self.queue = Queue.Queue()
424
self.insert_thread = None
426
def do_chunk(self, body_stream_chunk):
427
self.stream_decoder.accept_bytes(body_stream_chunk)
428
for record in self.stream_decoder.read_pending_records():
429
record_names, record_bytes = record
430
if self.src_format is None:
431
src_format_name = record_bytes
432
src_format = network_format_registry.get(src_format_name)
433
self.src_format = src_format
434
self.insert_thread = threading.Thread(target=self._inserter_thread)
435
self.insert_thread.start()
437
record_name, = record_names
438
substream_type = record_name[0]
439
stream = NetworkRecordStream([record_bytes])
440
for record in stream.read():
441
self.queue.put((substream_type, [record]))
443
def _inserter_thread(self):
444
self.repository._get_sink().insert_stream(self.blocking_read_stream(),
447
def blocking_read_stream(self):
449
item = self.queue.get()
450
if item is StopIteration:
456
self.queue.put(StopIteration)
457
if self.insert_thread is not None:
458
self.insert_thread.join()
459
self.repository.commit_write_group()
460
self.repository.unlock()
461
return SuccessfulSmartServerResponse(('ok', ))