32
36
SmartServerRequest,
33
37
SuccessfulSmartServerResponse,
35
from bzrlib.repository import _strip_NULL_ghosts
39
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
36
40
from bzrlib import revision as _mod_revision
41
from bzrlib.util import bencode
42
from bzrlib.versionedfile import NetworkRecordStream
39
45
class SmartServerRepositoryRequest(SmartServerRequest):
107
113
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
108
114
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
116
no_extra_results = False
110
118
def do_repository_request(self, repository, *revision_ids):
111
119
"""Get parent details for some revisions.
113
121
All the parents for revision_ids are returned. Additionally up to 64KB
114
122
of additional parent data found by performing a breadth first search
115
123
from revision_ids is returned. The verb takes a body containing the
174
182
# 64K (compressed) or so. We do one level of depth at a time to
175
183
# stay in sync with the client. The 250000 magic number is
176
184
# estimated compression ratio taken from bzr.dev itself.
177
if first_loop_done and size_so_far > 250000:
185
if self.no_extra_results or (
186
first_loop_done and size_so_far > 250000):
178
187
next_revs = set()
180
189
# don't query things we've already queried
193
202
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
195
204
def do_readlocked_repository_request(self, repository, revision_id):
196
205
"""Return the result of repository.get_revision_graph(revision_id).
198
207
Deprecated as of bzr 1.4, but supported for older clients.
200
209
:param repository: The repository to query in.
201
210
:param revision_id: The utf8 encoded revision_id to get a graph from.
202
211
:return: A smart server response where the body contains an utf8
336
345
return SuccessfulSmartServerResponse(('ok',))
348
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
350
def do_repository_request(self, repository, str_bool_new_value):
351
if str_bool_new_value == 'True':
355
repository.set_make_working_trees(new_value)
356
return SuccessfulSmartServerResponse(('ok',))
339
359
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
340
360
"""Get the raw repository files as a tarball.
342
362
The returned tarball contains a .bzr control directory which in turn
343
363
contains a repository.
345
This takes one parameter, compression, which currently must be
365
This takes one parameter, compression, which currently must be
346
366
"", "gz", or "bz2".
348
368
This is used to implement the Repository.copy_content_into operation.
392
412
tarball.add(dirname, '.bzr') # recursive by default
417
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
419
def do_repository_request(self, repository, resume_tokens):
420
"""StreamSink.insert_stream for a remote repository."""
421
repository.lock_write()
422
tokens = [token for token in resume_tokens.split(' ') if token]
424
self.repository = repository
425
self.stream_decoder = pack.ContainerPushParser()
426
self.src_format = None
427
self.queue = Queue.Queue()
428
self.insert_thread = None
430
def do_chunk(self, body_stream_chunk):
431
self.stream_decoder.accept_bytes(body_stream_chunk)
432
for record in self.stream_decoder.read_pending_records():
433
record_names, record_bytes = record
434
if self.src_format is None:
435
src_format_name = record_bytes
436
src_format = network_format_registry.get(src_format_name)
437
self.src_format = src_format
438
self.insert_thread = threading.Thread(target=self._inserter_thread)
439
self.insert_thread.start()
441
record_name, = record_names
442
substream_type = record_name[0]
443
stream = NetworkRecordStream([record_bytes])
444
for record in stream.read():
445
self.queue.put((substream_type, [record]))
447
def _inserter_thread(self):
449
self.insert_result = self.repository._get_sink().insert_stream(
450
self.blocking_read_stream(), self.src_format, self.tokens)
451
self.insert_ok = True
453
self.insert_exception = sys.exc_info()
454
self.insert_ok = False
456
def blocking_read_stream(self):
458
item = self.queue.get()
459
if item is StopIteration:
465
self.queue.put(StopIteration)
466
if self.insert_thread is not None:
467
self.insert_thread.join()
468
if not self.insert_ok:
469
exc_info = self.insert_exception
470
raise exc_info[0], exc_info[1], exc_info[2]
471
write_group_tokens, missing_keys = self.insert_result
472
if write_group_tokens or missing_keys:
473
# bzip needed? missing keys should typically be a small set.
474
# Should this be a streaming body response ?
475
missing_keys = sorted(missing_keys)
476
bytes = bencode.bencode((write_group_tokens, missing_keys))
477
self.repository.unlock()
478
return SuccessfulSmartServerResponse(('missing-basis', bytes))
480
self.repository.unlock()
481
return SuccessfulSmartServerResponse(('ok', ))