1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implementations."""
19
from __future__ import absolute_import
36
estimate_compressed_size,
42
inventory as _mod_inventory,
47
from ..bzrdir import BzrDir
48
from ...sixish import (
51
from .request import (
52
FailedSmartServerResponse,
54
SuccessfulSmartServerResponse,
56
from ...repository import (
58
network_format_registry,
60
from ... import revision as _mod_revision
61
from ..versionedfile import (
62
ChunkedContentFactory,
64
record_to_fulltext_bytes,
68
class SmartServerRepositoryRequest(SmartServerRequest):
69
"""Common base class for Repository requests."""
71
def do(self, path, *args):
72
"""Execute a repository request.
74
All Repository requests take a path to the repository as their first
75
argument. The repository must be at the exact path given by the
76
client - no searching is done.
78
The actual logic is delegated to self.do_repository_request.
80
:param client_path: The path for the repository as received from the
82
:return: A SmartServerResponse from self.do_repository_request().
84
transport = self.transport_from_client_path(path)
85
bzrdir = BzrDir.open_from_transport(transport)
86
# Save the repository for use with do_body.
87
self._repository = bzrdir.open_repository()
88
return self.do_repository_request(self._repository, *args)
90
def do_repository_request(self, repository, *args):
91
"""Override to provide an implementation for a verb."""
92
# No-op for verbs that take bodies (None as a result indicates a body
96
def recreate_search(self, repository, search_bytes, discard_excess=False):
97
"""Recreate a search from its serialised form.
99
:param discard_excess: If True, and the search refers to data we don't
100
have, just silently accept that fact - the verb calling
101
recreate_search trusts that clients will look for missing things
102
they expected and get it from elsewhere.
104
if search_bytes == b'everything':
105
return vf_search.EverythingResult(repository), None
106
lines = search_bytes.split(b'\n')
107
if lines[0] == b'ancestry-of':
109
search_result = vf_search.PendingAncestryResult(heads, repository)
110
return search_result, None
111
elif lines[0] == b'search':
112
return self.recreate_search_from_recipe(repository, lines[1:],
113
discard_excess=discard_excess)
115
return (None, FailedSmartServerResponse((b'BadSearch',)))
117
def recreate_search_from_recipe(self, repository, lines,
118
discard_excess=False):
119
"""Recreate a specific revision search (vs a from-tip search).
121
:param discard_excess: If True, and the search refers to data we don't
122
have, just silently accept that fact - the verb calling
123
recreate_search trusts that clients will look for missing things
124
they expected and get it from elsewhere.
126
start_keys = set(lines[0].split(b' '))
127
exclude_keys = set(lines[1].split(b' '))
128
revision_count = int(lines[2].decode('ascii'))
129
with repository.lock_read():
130
search = repository.get_graph()._make_breadth_first_searcher(
134
next_revs = next(search)
135
except StopIteration:
137
search.stop_searching_any(exclude_keys.intersection(next_revs))
138
(started_keys, excludes, included_keys) = search.get_state()
139
if (not discard_excess and len(included_keys) != revision_count):
140
# we got back a different amount of data than expected, this
141
# gets reported as NoSuchRevision, because less revisions
142
# indicates missing revisions, and more should never happen as
143
# the excludes list considers ghosts and ensures that ghost
144
# filling races are not a problem.
145
return (None, FailedSmartServerResponse((b'NoSuchRevision',)))
146
search_result = vf_search.SearchResult(started_keys, excludes,
147
len(included_keys), included_keys)
148
return (search_result, None)
151
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
152
"""Calls self.do_readlocked_repository_request."""
154
def do_repository_request(self, repository, *args):
155
"""Read lock a repository for do_readlocked_repository_request."""
156
with repository.lock_read():
157
return self.do_readlocked_repository_request(repository, *args)
160
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
161
"""Break a repository lock."""
163
def do_repository_request(self, repository):
164
repository.break_lock()
165
return SuccessfulSmartServerResponse((b'ok', ))
171
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
172
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
174
no_extra_results = False
176
def do_repository_request(self, repository, *revision_ids):
177
"""Get parent details for some revisions.
179
All the parents for revision_ids are returned. Additionally up to 64KB
180
of additional parent data found by performing a breadth first search
181
from revision_ids is returned. The verb takes a body containing the
182
current search state, see do_body for details.
184
If 'include-missing:' is in revision_ids, ghosts encountered in the
185
graph traversal for getting parent data are included in the result with
186
a prefix of 'missing:'.
188
:param repository: The repository to query in.
189
:param revision_ids: The utf8 encoded revision_id to answer for.
191
self._revision_ids = revision_ids
192
return None # Signal that we want a body.
194
def do_body(self, body_bytes):
195
"""Process the current search state and perform the parent lookup.
197
:return: A smart server response where the body contains an utf8
198
encoded flattened list of the parents of the revisions (the same
199
format as Repository.get_revision_graph) which has been bz2
202
repository = self._repository
203
with repository.lock_read():
204
return self._do_repository_request(body_bytes)
206
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
207
include_missing, max_size=65536):
210
estimator = estimate_compressed_size.ZLibEstimator(max_size)
211
next_revs = revision_ids
212
first_loop_done = False
214
queried_revs.update(next_revs)
215
parent_map = repo_graph.get_parent_map(next_revs)
216
current_revs = next_revs
218
for revision_id in current_revs:
220
parents = parent_map.get(revision_id)
221
if parents is not None:
222
# adjust for the wire
223
if parents == (_mod_revision.NULL_REVISION,):
225
# prepare the next query
226
next_revs.update(parents)
227
encoded_id = revision_id
230
encoded_id = b"missing:" + revision_id
232
if (revision_id not in client_seen_revs
233
and (not missing_rev or include_missing)):
234
# Client does not have this revision, give it to it.
235
# add parents to the result
236
result[encoded_id] = parents
237
# Approximate the serialized cost of this revision_id.
238
line = encoded_id + b' ' + b' '.join(parents) + b'\n'
239
estimator.add_content(line)
240
# get all the directly asked for parents, and then flesh out to
241
# 64K (compressed) or so. We do one level of depth at a time to
242
# stay in sync with the client. The 250000 magic number is
243
# estimated compression ratio taken from bzr.dev itself.
244
if self.no_extra_results or (first_loop_done and estimator.full()):
245
trace.mutter('size: %d, z_size: %d'
246
% (estimator._uncompressed_size_added,
247
estimator._compressed_size_added))
250
# don't query things we've already queried
251
next_revs = next_revs.difference(queried_revs)
252
first_loop_done = True
255
def _do_repository_request(self, body_bytes):
256
repository = self._repository
257
revision_ids = set(self._revision_ids)
258
include_missing = b'include-missing:' in revision_ids
260
revision_ids.remove(b'include-missing:')
261
body_lines = body_bytes.split(b'\n')
262
search_result, error = self.recreate_search_from_recipe(
263
repository, body_lines)
264
if error is not None:
266
# TODO might be nice to start up the search again; but thats not
267
# written or tested yet.
268
client_seen_revs = set(search_result.get_keys())
269
# Always include the requested ids.
270
client_seen_revs.difference_update(revision_ids)
272
repo_graph = repository.get_graph()
273
result = self._expand_requested_revs(repo_graph, revision_ids,
274
client_seen_revs, include_missing)
276
# sorting trivially puts lexographically similar revision ids together.
279
for revision, parents in sorted(result.items()):
280
lines.append(b' '.join((revision, ) + tuple(parents)))
282
return SuccessfulSmartServerResponse(
283
(b'ok', ), bz2.compress(b'\n'.join(lines)))
286
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
288
def do_readlocked_repository_request(self, repository, revision_id):
289
"""Return the result of repository.get_revision_graph(revision_id).
291
Deprecated as of bzr 1.4, but supported for older clients.
293
:param repository: The repository to query in.
294
:param revision_id: The utf8 encoded revision_id to get a graph from.
295
:return: A smart server response where the body contains an utf8
296
encoded flattened list of the revision graph.
302
graph = repository.get_graph()
304
search_ids = [revision_id]
306
search_ids = repository.all_revision_ids()
307
search = graph._make_breadth_first_searcher(search_ids)
308
transitive_ids = set(itertools.chain.from_iterable(search))
309
parent_map = graph.get_parent_map(transitive_ids)
310
revision_graph = _strip_NULL_ghosts(parent_map)
311
if revision_id and revision_id not in revision_graph:
312
# Note that we return an empty body, rather than omitting the body.
313
# This way the client knows that it can always expect to find a body
314
# in the response for this method, even in the error case.
315
return FailedSmartServerResponse((b'nosuchrevision', revision_id), b'')
317
for revision, parents in revision_graph.items():
318
lines.append(b' '.join((revision, ) + tuple(parents)))
320
return SuccessfulSmartServerResponse((b'ok', ), b'\n'.join(lines))
323
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
325
def do_readlocked_repository_request(self, repository, revno,
327
"""Find the revid for a given revno, given a known revno/revid pair.
332
found_flag, result = repository.get_rev_id_for_revno(
334
except errors.NoSuchRevision as err:
335
if err.revision != known_pair[1]:
336
raise AssertionError(
337
'get_rev_id_for_revno raised RevisionNotPresent for '
338
'non-initial revision: ' + err.revision)
339
return FailedSmartServerResponse(
340
(b'nosuchrevision', err.revision))
341
except errors.RevnoOutOfBounds as e:
342
return FailedSmartServerResponse(
343
(b'revno-outofbounds', e.revno, e.minimum, e.maximum))
345
return SuccessfulSmartServerResponse((b'ok', result))
347
earliest_revno, earliest_revid = result
348
return SuccessfulSmartServerResponse(
349
(b'history-incomplete', earliest_revno, earliest_revid))
352
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
354
def do_repository_request(self, repository):
355
"""Return the serializer format for this repository.
359
:param repository: The repository to query
360
:return: A smart server response (b'ok', FORMAT)
362
serializer = repository.get_serializer_format()
363
return SuccessfulSmartServerResponse((b'ok', serializer))
366
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
368
def do_repository_request(self, repository, revision_id):
369
"""Return ok if a specific revision is in the repository at path.
371
:param repository: The repository to query in.
372
:param revision_id: The utf8 encoded revision_id to lookup.
373
:return: A smart server response of ('yes', ) if the revision is
374
present. ('no', ) if it is missing.
376
if repository.has_revision(revision_id):
377
return SuccessfulSmartServerResponse((b'yes', ))
379
return SuccessfulSmartServerResponse((b'no', ))
382
class SmartServerRequestHasSignatureForRevisionId(
383
SmartServerRepositoryRequest):
385
def do_repository_request(self, repository, revision_id):
386
"""Return ok if a signature is present for a revision.
388
Introduced in bzr 2.5.0.
390
:param repository: The repository to query in.
391
:param revision_id: The utf8 encoded revision_id to lookup.
392
:return: A smart server response of ('yes', ) if a
393
signature for the revision is present,
394
('no', ) if it is missing.
397
if repository.has_signature_for_revision_id(revision_id):
398
return SuccessfulSmartServerResponse((b'yes', ))
400
return SuccessfulSmartServerResponse((b'no', ))
401
except errors.NoSuchRevision:
402
return FailedSmartServerResponse(
403
(b'nosuchrevision', revision_id))
406
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
408
def do_repository_request(self, repository, revid, committers):
409
"""Return the result of repository.gather_stats().
411
:param repository: The repository to query in.
412
:param revid: utf8 encoded rev id or an empty string to indicate None
413
:param committers: 'yes' or 'no'.
415
:return: A SmartServerResponse (b'ok',), a encoded body looking like
418
latestrev: 345.700 3600
421
But containing only fields returned by the gather_stats() call
424
decoded_revision_id = None
426
decoded_revision_id = revid
427
if committers == b'yes':
428
decoded_committers = True
430
decoded_committers = None
432
stats = repository.gather_stats(decoded_revision_id,
434
except errors.NoSuchRevision:
435
return FailedSmartServerResponse((b'nosuchrevision', revid))
438
if 'committers' in stats:
439
body += b'committers: %d\n' % stats['committers']
440
if 'firstrev' in stats:
441
body += b'firstrev: %.3f %d\n' % stats['firstrev']
442
if 'latestrev' in stats:
443
body += b'latestrev: %.3f %d\n' % stats['latestrev']
444
if 'revisions' in stats:
445
body += b'revisions: %d\n' % stats['revisions']
447
body += b'size: %d\n' % stats['size']
449
return SuccessfulSmartServerResponse((b'ok', ), body)
452
class SmartServerRepositoryGetRevisionSignatureText(
453
SmartServerRepositoryRequest):
454
"""Return the signature text of a revision.
459
def do_repository_request(self, repository, revision_id):
460
"""Return the result of repository.get_signature_text().
462
:param repository: The repository to query in.
463
:return: A smart server response of with the signature text as
467
text = repository.get_signature_text(revision_id)
468
except errors.NoSuchRevision as err:
469
return FailedSmartServerResponse(
470
(b'nosuchrevision', err.revision))
471
return SuccessfulSmartServerResponse((b'ok', ), text)
474
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
476
def do_repository_request(self, repository):
477
"""Return the result of repository.is_shared().
479
:param repository: The repository to query in.
480
:return: A smart server response of ('yes', ) if the repository is
481
shared, and ('no', ) if it is not.
483
if repository.is_shared():
484
return SuccessfulSmartServerResponse((b'yes', ))
486
return SuccessfulSmartServerResponse((b'no', ))
489
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
491
def do_repository_request(self, repository):
492
"""Return the result of repository.make_working_trees().
494
Introduced in bzr 2.5.0.
496
:param repository: The repository to query in.
497
:return: A smart server response of ('yes', ) if the repository uses
498
working trees, and ('no', ) if it is not.
500
if repository.make_working_trees():
501
return SuccessfulSmartServerResponse((b'yes', ))
503
return SuccessfulSmartServerResponse((b'no', ))
506
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
508
def do_repository_request(self, repository, token=b''):
509
# XXX: this probably should not have a token.
513
token = repository.lock_write(token=token).repository_token
514
except errors.LockContention as e:
515
return FailedSmartServerResponse((b'LockContention',))
516
except errors.UnlockableTransport:
517
return FailedSmartServerResponse((b'UnlockableTransport',))
518
except errors.LockFailed as e:
519
return FailedSmartServerResponse((b'LockFailed',
520
str(e.lock), str(e.why)))
521
if token is not None:
522
repository.leave_lock_in_place()
526
return SuccessfulSmartServerResponse((b'ok', token))
529
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
531
def do_repository_request(self, repository, to_network_name):
532
"""Get a stream for inserting into a to_format repository.
534
The request body is 'search_bytes', a description of the revisions
537
In 2.3 this verb added support for search_bytes == 'everything'. Older
538
implementations will respond with a BadSearch error, and clients should
539
catch this and fallback appropriately.
541
:param repository: The repository to stream from.
542
:param to_network_name: The network name of the format of the target
545
self._to_format = network_format_registry.get(to_network_name)
546
if self._should_fake_unknown():
547
return FailedSmartServerResponse(
548
(b'UnknownMethod', b'Repository.get_stream'))
549
return None # Signal that we want a body.
551
def _should_fake_unknown(self):
552
"""Return True if we should return UnknownMethod to the client.
554
This is a workaround for bugs in pre-1.19 clients that claim to
555
support receiving streams of CHK repositories. The pre-1.19 client
556
expects inventory records to be serialized in the format defined by
557
to_network_name, but in pre-1.19 (at least) that format definition
558
tries to use the xml5 serializer, which does not correctly handle
559
rich-roots. After 1.19 the client can also accept inventory-deltas
560
(which avoids this issue), and those clients will use the
561
Repository.get_stream_1.19 verb instead of this one.
562
So: if this repository is CHK, and the to_format doesn't match,
563
we should just fake an UnknownSmartMethod error so that the client
564
will fallback to VFS, rather than sending it a stream we know it
567
from_format = self._repository._format
568
to_format = self._to_format
569
if not from_format.supports_chks:
570
# Source not CHK: that's ok
572
if (to_format.supports_chks
573
and from_format.repository_class is to_format.repository_class
574
and from_format._serializer == to_format._serializer):
575
# Source is CHK, but target matches: that's ok
576
# (e.g. 2a->2a, or CHK2->2a)
578
# Source is CHK, and target is not CHK or incompatible CHK. We can't
579
# generate a compatible stream.
582
def do_body(self, body_bytes):
583
repository = self._repository
584
repository.lock_read()
586
search_result, error = self.recreate_search(repository, body_bytes,
588
if error is not None:
591
source = repository._get_source(self._to_format)
592
stream = source.get_stream(search_result)
595
# On non-error, unlocking is done by the body stream handler.
599
return SuccessfulSmartServerResponse((b'ok',),
600
body_stream=self.body_stream(stream, repository))
602
def body_stream(self, stream, repository):
603
byte_stream = _stream_to_byte_stream(stream, repository._format)
605
for bytes in byte_stream:
607
except errors.RevisionNotPresent as e:
608
# This shouldn't be able to happen, but as we don't buffer
609
# everything it can in theory happen.
611
yield FailedSmartServerResponse((b'NoSuchRevision', e.revision_id))
616
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
617
"""The same as Repository.get_stream, but will return stream CHK formats to
620
See SmartServerRepositoryGetStream._should_fake_unknown.
625
def _should_fake_unknown(self):
626
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
630
def _stream_to_byte_stream(stream, src_format):
631
"""Convert a record stream to a self delimited byte stream."""
632
pack_writer = pack.ContainerSerialiser()
633
yield pack_writer.begin()
634
yield pack_writer.bytes_record(src_format.network_name(), b'')
635
for substream_type, substream in stream:
636
for record in substream:
637
if record.storage_kind in ('chunked', 'fulltext'):
638
serialised = record_to_fulltext_bytes(record)
639
elif record.storage_kind == 'absent':
640
raise ValueError("Absent factory for %s" % (record.key,))
642
serialised = record.get_bytes_as(record.storage_kind)
644
# Some streams embed the whole stream into the wire
645
# representation of the first record, which means that
646
# later records have no wire representation: we skip them.
647
yield pack_writer.bytes_record(serialised, [(substream_type.encode('ascii'),)])
648
yield pack_writer.end()
651
class _ByteStreamDecoder(object):
652
"""Helper for _byte_stream_to_stream.
654
The expected usage of this class is via the function _byte_stream_to_stream
655
which creates a _ByteStreamDecoder, pops off the stream format and then
656
yields the output of record_stream(), the main entry point to
659
Broadly this class has to unwrap two layers of iterators:
663
This is complicated by wishing to return type, iterator_for_type, but
664
getting the data for iterator_for_type when we find out type: we can't
665
simply pass a generator down to the NetworkRecordStream parser, instead
666
we have a little local state to seed each NetworkRecordStream instance,
667
and gather the type that we'll be yielding.
669
:ivar byte_stream: The byte stream being decoded.
670
:ivar stream_decoder: A pack parser used to decode the bytestream
671
:ivar current_type: The current type, used to join adjacent records of the
672
same type into a single stream.
673
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
676
def __init__(self, byte_stream, record_counter):
677
"""Create a _ByteStreamDecoder."""
678
self.stream_decoder = pack.ContainerPushParser()
679
self.current_type = None
680
self.first_bytes = None
681
self.byte_stream = byte_stream
682
self._record_counter = record_counter
685
def iter_stream_decoder(self):
686
"""Iterate the contents of the pack from stream_decoder."""
687
# dequeue pending items
688
for record in self.stream_decoder.read_pending_records():
690
# Pull bytes of the wire, decode them to records, yield those records.
691
for bytes in self.byte_stream:
692
self.stream_decoder.accept_bytes(bytes)
693
for record in self.stream_decoder.read_pending_records():
696
def iter_substream_bytes(self):
697
if self.first_bytes is not None:
698
yield self.first_bytes
699
# If we run out of pack records, single the outer layer to stop.
700
self.first_bytes = None
701
for record in self.iter_pack_records:
702
record_names, record_bytes = record
703
record_name, = record_names
704
substream_type = record_name[0]
705
if substream_type != self.current_type:
706
# end of a substream, seed the next substream.
707
self.current_type = substream_type
708
self.first_bytes = record_bytes
712
def record_stream(self):
713
"""Yield substream_type, substream from the byte stream."""
714
def wrap_and_count(pb, rc, substream):
715
"""Yield records from stream while showing progress."""
718
if self.current_type != 'revisions' and self.key_count != 0:
719
# As we know the number of revisions now (in self.key_count)
720
# we can setup and use record_counter (rc).
721
if not rc.is_initialized():
722
rc.setup(self.key_count, self.key_count)
723
for record in substream.read():
725
if rc.is_initialized() and counter == rc.STEP:
726
rc.increment(counter)
727
pb.update('Estimate', rc.current, rc.max)
729
if self.current_type == 'revisions':
730
# Total records is proportional to number of revs
731
# to fetch. With remote, we used self.key_count to
732
# track the number of revs. Once we have the revs
733
# counts in self.key_count, the progress bar changes
734
# from 'Estimating..' to 'Estimate' above.
736
if counter == rc.STEP:
737
pb.update('Estimating..', self.key_count)
743
with ui.ui_factory.nested_progress_bar() as pb:
744
rc = self._record_counter
746
# Make and consume sub generators, one per substream type:
747
while self.first_bytes is not None:
748
substream = NetworkRecordStream(
749
self.iter_substream_bytes())
750
# after substream is fully consumed, self.current_type is set
751
# to the next type, and self.first_bytes is set to the matching
753
yield self.current_type.decode('ascii'), wrap_and_count(pb, rc, substream)
756
pb.update('Done', rc.max, rc.max)
758
def seed_state(self):
759
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
760
# Set a single generator we can use to get data from the pack stream.
761
self.iter_pack_records = self.iter_stream_decoder()
762
# Seed the very first subiterator with content; after this each one
764
list(self.iter_substream_bytes())
767
def _byte_stream_to_stream(byte_stream, record_counter=None):
768
"""Convert a byte stream into a format and a stream.
770
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
771
:return: (RepositoryFormat, stream_generator)
773
decoder = _ByteStreamDecoder(byte_stream, record_counter)
774
for bytes in byte_stream:
775
decoder.stream_decoder.accept_bytes(bytes)
776
for record in decoder.stream_decoder.read_pending_records(max=1):
777
record_names, src_format_name = record
778
src_format = network_format_registry.get(src_format_name)
779
return src_format, decoder.record_stream()
782
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
784
def do_repository_request(self, repository, token):
786
repository.lock_write(token=token)
787
except errors.TokenMismatch as e:
788
return FailedSmartServerResponse((b'TokenMismatch',))
789
repository.dont_leave_lock_in_place()
791
return SuccessfulSmartServerResponse((b'ok',))
794
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
795
"""Get the physical lock status for a repository.
800
def do_repository_request(self, repository):
801
if repository.get_physical_lock_status():
802
return SuccessfulSmartServerResponse((b'yes', ))
804
return SuccessfulSmartServerResponse((b'no', ))
807
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
809
def do_repository_request(self, repository, str_bool_new_value):
810
if str_bool_new_value == b'True':
814
repository.set_make_working_trees(new_value)
815
return SuccessfulSmartServerResponse((b'ok',))
818
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
819
"""Get the raw repository files as a tarball.
821
The returned tarball contains a .bzr control directory which in turn
822
contains a repository.
824
This takes one parameter, compression, which currently must be
827
This is used to implement the Repository.copy_content_into operation.
830
def do_repository_request(self, repository, compression):
831
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
833
controldir_name = tmp_dirname + '/.bzr'
834
return self._tarfile_response(controldir_name, compression)
836
osutils.rmtree(tmp_dirname)
838
def _copy_to_tempdir(self, from_repo):
839
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
840
tmp_bzrdir = from_repo.controldir._format.initialize(tmp_dirname)
841
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
842
from_repo.copy_content_into(tmp_repo)
843
return tmp_dirname, tmp_repo
845
def _tarfile_response(self, tmp_dirname, compression):
846
with tempfile.NamedTemporaryFile() as temp:
847
self._tarball_of_dir(tmp_dirname, compression, temp.file)
848
# all finished; write the tempfile out to the network
850
return SuccessfulSmartServerResponse((b'ok',), temp.read())
851
# FIXME: Don't read the whole thing into memory here; rather stream
852
# it out from the file onto the network. mbp 20070411
854
def _tarball_of_dir(self, dirname, compression, ofile):
856
filename = os.path.basename(ofile.name)
857
with tarfile.open(fileobj=ofile, name=filename,
858
mode='w|' + compression) as tarball:
859
# The tarball module only accepts ascii names, and (i guess)
860
# packs them with their 8bit names. We know all the files
861
# within the repository have ASCII names so the should be safe
863
dirname = dirname.encode(sys.getfilesystemencoding())
864
# python's tarball module includes the whole path by default so
866
if not dirname.endswith('.bzr'):
867
raise ValueError(dirname)
868
tarball.add(dirname, '.bzr') # recursive by default
871
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
872
"""Insert a record stream from a RemoteSink into a repository.
874
This gets bytes pushed to it by the network infrastructure and turns that
875
into a bytes iterator using a thread. That is then processed by
876
_byte_stream_to_stream.
881
def do_repository_request(self, repository, resume_tokens, lock_token):
882
"""StreamSink.insert_stream for a remote repository."""
883
repository.lock_write(token=lock_token)
884
self.do_insert_stream_request(repository, resume_tokens)
886
def do_insert_stream_request(self, repository, resume_tokens):
887
tokens = [token.decode('utf-8')
888
for token in resume_tokens.split(b' ') if token]
890
self.repository = repository
891
self.queue = queue.Queue()
892
self.insert_thread = threading.Thread(target=self._inserter_thread)
893
self.insert_thread.start()
895
def do_chunk(self, body_stream_chunk):
896
self.queue.put(body_stream_chunk)
898
def _inserter_thread(self):
900
src_format, stream = _byte_stream_to_stream(
901
self.blocking_byte_stream())
902
self.insert_result = self.repository._get_sink().insert_stream(
903
stream, src_format, self.tokens)
904
self.insert_ok = True
906
self.insert_exception = sys.exc_info()
907
self.insert_ok = False
909
def blocking_byte_stream(self):
911
bytes = self.queue.get()
912
if bytes is StopIteration:
918
self.queue.put(StopIteration)
919
if self.insert_thread is not None:
920
self.insert_thread.join()
921
if not self.insert_ok:
923
reraise(*self.insert_exception)
925
del self.insert_exception
926
write_group_tokens, missing_keys = self.insert_result
927
if write_group_tokens or missing_keys:
928
# bzip needed? missing keys should typically be a small set.
929
# Should this be a streaming body response ?
930
missing_keys = sorted(
931
[(entry[0].encode('utf-8'),) + entry[1:] for entry in missing_keys])
932
bytes = bencode.bencode((
933
[token.encode('utf-8') for token in write_group_tokens], missing_keys))
934
self.repository.unlock()
935
return SuccessfulSmartServerResponse((b'missing-basis', bytes))
937
self.repository.unlock()
938
return SuccessfulSmartServerResponse((b'ok', ))
941
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
942
"""Insert a record stream from a RemoteSink into a repository.
944
Same as SmartServerRepositoryInsertStreamLocked, except:
945
- the lock token argument is optional
946
- servers that implement this verb accept 'inventory-delta' records in the
952
def do_repository_request(self, repository, resume_tokens, lock_token=None):
953
"""StreamSink.insert_stream for a remote repository."""
954
SmartServerRepositoryInsertStreamLocked.do_repository_request(
955
self, repository, resume_tokens, lock_token)
958
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
959
"""Insert a record stream from a RemoteSink into an unlocked repository.
961
This is the same as SmartServerRepositoryInsertStreamLocked, except it
962
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
963
like pack format) repository.
968
def do_repository_request(self, repository, resume_tokens):
969
"""StreamSink.insert_stream for a remote repository."""
970
repository.lock_write()
971
self.do_insert_stream_request(repository, resume_tokens)
974
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
975
"""Add a revision signature text.
980
def do_repository_request(self, repository, lock_token, revision_id,
981
*write_group_tokens):
982
"""Add a revision signature text.
984
:param repository: Repository to operate on
985
:param lock_token: Lock token
986
:param revision_id: Revision for which to add signature
987
:param write_group_tokens: Write group tokens
989
self._lock_token = lock_token
990
self._revision_id = revision_id
991
self._write_group_tokens = [token.decode(
992
'utf-8') for token in write_group_tokens]
995
def do_body(self, body_bytes):
996
"""Add a signature text.
998
:param body_bytes: GPG signature text
999
:return: SuccessfulSmartServerResponse with arguments 'ok' and
1000
the list of new write group tokens.
1002
with self._repository.lock_write(token=self._lock_token):
1003
self._repository.resume_write_group(self._write_group_tokens)
1005
self._repository.add_signature_text(self._revision_id,
1008
new_write_group_tokens = self._repository.suspend_write_group()
1009
return SuccessfulSmartServerResponse(
1010
(b'ok', ) + tuple([token.encode('utf-8') for token in new_write_group_tokens]))
1013
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1014
"""Start a write group.
1019
def do_repository_request(self, repository, lock_token):
1020
"""Start a write group."""
1021
with repository.lock_write(token=lock_token):
1022
repository.start_write_group()
1024
tokens = repository.suspend_write_group()
1025
except errors.UnsuspendableWriteGroup:
1026
return FailedSmartServerResponse((b'UnsuspendableWriteGroup',))
1027
return SuccessfulSmartServerResponse((b'ok', tokens))
1030
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1031
"""Commit a write group.
1036
def do_repository_request(self, repository, lock_token,
1037
write_group_tokens):
1038
"""Commit a write group."""
1039
with repository.lock_write(token=lock_token):
1041
repository.resume_write_group(
1042
[token.decode('utf-8') for token in write_group_tokens])
1043
except errors.UnresumableWriteGroup as e:
1044
return FailedSmartServerResponse(
1045
(b'UnresumableWriteGroup', [token.encode('utf-8') for token
1046
in e.write_groups], e.reason.encode('utf-8')))
1048
repository.commit_write_group()
1050
write_group_tokens = repository.suspend_write_group()
1051
# FIXME JRV 2011-11-19: What if the write_group_tokens
1054
return SuccessfulSmartServerResponse((b'ok', ))
1057
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1058
"""Abort a write group.
1063
def do_repository_request(self, repository, lock_token, write_group_tokens):
1064
"""Abort a write group."""
1065
with repository.lock_write(token=lock_token):
1067
repository.resume_write_group(
1068
[token.decode('utf-8') for token in write_group_tokens])
1069
except errors.UnresumableWriteGroup as e:
1070
return FailedSmartServerResponse(
1071
(b'UnresumableWriteGroup',
1072
[token.encode('utf-8') for token in e.write_groups],
1073
e.reason.encode('utf-8')))
1074
repository.abort_write_group()
1075
return SuccessfulSmartServerResponse((b'ok', ))
1078
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1079
"""Check that a write group is still valid.
1084
def do_repository_request(self, repository, lock_token, write_group_tokens):
1085
"""Abort a write group."""
1086
with repository.lock_write(token=lock_token):
1088
repository.resume_write_group(
1089
[token.decode('utf-8') for token in write_group_tokens])
1090
except errors.UnresumableWriteGroup as e:
1091
return FailedSmartServerResponse(
1092
(b'UnresumableWriteGroup',
1093
[token.encode('utf-8') for token in e.write_groups],
1094
e.reason.encode('utf-8')))
1096
repository.suspend_write_group()
1097
return SuccessfulSmartServerResponse((b'ok', ))
1100
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1101
"""Retrieve all of the revision ids in a repository.
1106
def do_repository_request(self, repository):
1107
revids = repository.all_revision_ids()
1108
return SuccessfulSmartServerResponse((b"ok", ), b"\n".join(revids))
1111
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1112
"""Reconcile a repository.
1117
def do_repository_request(self, repository, lock_token):
1119
repository.lock_write(token=lock_token)
1120
except errors.TokenLockingNotSupported as e:
1121
return FailedSmartServerResponse(
1122
(b'TokenLockingNotSupported', ))
1124
reconciler = repository.reconcile()
1128
b"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1129
b"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1131
return SuccessfulSmartServerResponse((b'ok', ), b"".join(body))
1134
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1135
"""Pack a repository.
1140
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1141
self._repository = repository
1142
self._lock_token = lock_token
1143
if clean_obsolete_packs == b'True':
1144
self._clean_obsolete_packs = True
1146
self._clean_obsolete_packs = False
1149
def do_body(self, body_bytes):
1150
if body_bytes == "":
1153
hint = body_bytes.splitlines()
1154
with self._repository.lock_write(token=self._lock_token):
1155
self._repository.pack(hint, self._clean_obsolete_packs)
1156
return SuccessfulSmartServerResponse((b"ok", ), )
1159
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1160
"""Iterate over the contents of files.
1162
The client sends a list of desired files to stream, one
1163
per line, and as tuples of file id and revision, separated by
1166
The server replies with a stream. Each entry is preceded by a header,
1167
which can either be:
1169
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1170
list sent by the client. This header is followed by the contents of
1171
the file, bzip2-compressed.
1172
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1173
The client can then raise an appropriate RevisionNotPresent error
1174
or check its fallback repositories.
1179
def body_stream(self, repository, desired_files):
1180
with self._repository.lock_read():
1182
for i, key in enumerate(desired_files):
1184
for record in repository.texts.get_record_stream(text_keys,
1186
identifier = text_keys[record.key]
1187
if record.storage_kind == 'absent':
1188
yield b"absent\0%s\0%s\0%d\n" % (record.key[0],
1189
record.key[1], identifier)
1190
# FIXME: Way to abort early?
1192
yield b"ok\0%d\n" % identifier
1193
compressor = zlib.compressobj()
1194
for bytes in record.iter_bytes_as('chunked'):
1195
data = compressor.compress(bytes)
1198
data = compressor.flush()
1202
def do_body(self, body_bytes):
1204
tuple(l.split(b"\0")) for l in body_bytes.splitlines()]
1205
return SuccessfulSmartServerResponse((b'ok', ),
1206
body_stream=self.body_stream(self._repository, desired_files))
1208
def do_repository_request(self, repository):
1209
# Signal that we want a body
1213
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1214
"""Stream a list of revisions.
1216
The client sends a list of newline-separated revision ids in the
1217
body of the request and the server replies with the serializer format,
1218
and a stream of bzip2-compressed revision texts (using the specified
1221
Any revisions the server does not have are omitted from the stream.
1226
def do_repository_request(self, repository):
1227
self._repository = repository
1228
# Signal there is a body
1231
def do_body(self, body_bytes):
1232
revision_ids = body_bytes.split(b"\n")
1233
return SuccessfulSmartServerResponse(
1234
(b'ok', self._repository.get_serializer_format()),
1235
body_stream=self.body_stream(self._repository, revision_ids))
1237
def body_stream(self, repository, revision_ids):
1238
with self._repository.lock_read():
1239
for record in repository.revisions.get_record_stream(
1240
[(revid,) for revid in revision_ids], 'unordered', True):
1241
if record.storage_kind == 'absent':
1243
yield zlib.compress(record.get_bytes_as('fulltext'))
1246
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1247
"""Get the inventory deltas for a set of revision ids.
1249
This accepts a list of revision ids, and then sends a chain
1250
of deltas for the inventories of those revisions. The first
1251
revision will be empty.
1253
The server writes back zlibbed serialized inventory deltas,
1254
in the ordering specified. The base for each delta is the
1255
inventory generated by the previous delta.
1260
def _inventory_delta_stream(self, repository, ordering, revids):
1261
prev_inv = _mod_inventory.Inventory(root_id=None,
1262
revision_id=_mod_revision.NULL_REVISION)
1263
serializer = inventory_delta.InventoryDeltaSerializer(
1264
repository.supports_rich_root(),
1265
repository._format.supports_tree_reference)
1266
with repository.lock_read():
1267
for inv, revid in repository._iter_inventories(revids, ordering):
1270
inv_delta = inv._make_delta(prev_inv)
1271
lines = serializer.delta_to_lines(
1272
prev_inv.revision_id, inv.revision_id, inv_delta)
1273
yield ChunkedContentFactory(
1274
inv.revision_id, None, None, lines,
1275
chunks_are_lines=True)
1278
def body_stream(self, repository, ordering, revids):
1279
substream = self._inventory_delta_stream(repository,
1281
return _stream_to_byte_stream([('inventory-deltas', substream)],
1284
def do_body(self, body_bytes):
1285
return SuccessfulSmartServerResponse((b'ok', ),
1286
body_stream=self.body_stream(self._repository, self._ordering,
1287
body_bytes.splitlines()))
1289
def do_repository_request(self, repository, ordering):
1290
ordering = ordering.decode('ascii')
1291
if ordering == 'unordered':
1292
# inventory deltas for a topologically sorted stream
1293
# are likely to be smaller
1294
ordering = 'topological'
1295
self._ordering = ordering
1296
# Signal that we want a body
1300
class SmartServerRepositoryGetStreamForMissingKeys(SmartServerRepositoryRequest):
1302
def do_repository_request(self, repository, to_network_name):
1303
"""Get a stream for missing keys.
1305
:param repository: The repository to stream from.
1306
:param to_network_name: The network name of the format of the target
1310
self._to_format = network_format_registry.get(to_network_name)
1312
return FailedSmartServerResponse(
1313
(b'UnknownFormat', b'repository', to_network_name))
1314
return None # Signal that we want a body.
1316
def do_body(self, body_bytes):
1317
repository = self._repository
1318
repository.lock_read()
1320
source = repository._get_source(self._to_format)
1322
for entry in body_bytes.split(b'\n'):
1323
(kind, revid) = entry.split(b'\t')
1324
keys.append((kind.decode('utf-8'), revid))
1325
stream = source.get_stream_for_missing_keys(keys)
1328
# On non-error, unlocking is done by the body stream handler.
1332
return SuccessfulSmartServerResponse((b'ok',),
1333
body_stream=self.body_stream(stream, repository))
1335
def body_stream(self, stream, repository):
1336
byte_stream = _stream_to_byte_stream(stream, repository._format)
1338
for bytes in byte_stream:
1340
except errors.RevisionNotPresent as e:
1341
# This shouldn't be able to happen, but as we don't buffer
1342
# everything it can in theory happen.
1344
yield FailedSmartServerResponse((b'NoSuchRevision', e.revision_id))
1349
class SmartServerRepositoryRevisionArchive(SmartServerRepositoryRequest):
1351
def do_repository_request(self, repository, revision_id, format, name,
1352
root, subdir=None, force_mtime=None):
1353
"""Stream an archive file for a specific revision.
1354
:param repository: The repository to stream from.
1355
:param revision_id: Revision for which to export the tree
1356
:param format: Format (tar, tgz, tbz2, etc)
1357
:param name: Target file name
1358
:param root: Name of root directory (or '')
1359
:param subdir: Subdirectory to export, if not the root
1361
tree = repository.revision_tree(revision_id)
1362
if subdir is not None:
1363
subdir = subdir.decode('utf-8')
1364
if root is not None:
1365
root = root.decode('utf-8')
1366
name = name.decode('utf-8')
1367
return SuccessfulSmartServerResponse((b'ok',),
1368
body_stream=self.body_stream(
1369
tree, format.decode(
1370
'utf-8'), os.path.basename(name), root, subdir,
1373
def body_stream(self, tree, format, name, root, subdir=None, force_mtime=None):
1374
with tree.lock_read():
1375
return tree.archive(format, name, root, subdir, force_mtime)
1378
class SmartServerRepositoryAnnotateFileRevision(SmartServerRepositoryRequest):
1380
def do_repository_request(self, repository, revision_id, tree_path,
1381
file_id=None, default_revision=None):
1382
"""Stream an archive file for a specific revision.
1384
:param repository: The repository to stream from.
1385
:param revision_id: Revision for which to export the tree
1386
:param tree_path: The path inside the tree
1387
:param file_id: Optional file_id for the file
1388
:param default_revision: Default revision
1390
tree = repository.revision_tree(revision_id)
1391
with tree.lock_read():
1392
body = bencode.bencode(list(tree.annotate_iter(
1393
tree_path.decode('utf-8'), default_revision)))
1394
return SuccessfulSmartServerResponse((b'ok',), body=body)