1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implementations."""
19
from __future__ import absolute_import
36
estimate_compressed_size,
42
inventory as _mod_inventory,
47
from ..bzr.bzrdir import BzrDir
48
from ..sixish import (
51
from .request import (
52
FailedSmartServerResponse,
54
SuccessfulSmartServerResponse,
56
from ..repository import _strip_NULL_ghosts, network_format_registry
57
from .. import revision as _mod_revision
58
from ..bzr.versionedfile import (
59
ChunkedContentFactory,
61
record_to_fulltext_bytes,
65
class SmartServerRepositoryRequest(SmartServerRequest):
66
"""Common base class for Repository requests."""
68
def do(self, path, *args):
69
"""Execute a repository request.
71
All Repository requests take a path to the repository as their first
72
argument. The repository must be at the exact path given by the
73
client - no searching is done.
75
The actual logic is delegated to self.do_repository_request.
77
:param client_path: The path for the repository as received from the
79
:return: A SmartServerResponse from self.do_repository_request().
81
transport = self.transport_from_client_path(path)
82
bzrdir = BzrDir.open_from_transport(transport)
83
# Save the repository for use with do_body.
84
self._repository = bzrdir.open_repository()
85
return self.do_repository_request(self._repository, *args)
87
def do_repository_request(self, repository, *args):
88
"""Override to provide an implementation for a verb."""
89
# No-op for verbs that take bodies (None as a result indicates a body
93
def recreate_search(self, repository, search_bytes, discard_excess=False):
94
"""Recreate a search from its serialised form.
96
:param discard_excess: If True, and the search refers to data we don't
97
have, just silently accept that fact - the verb calling
98
recreate_search trusts that clients will look for missing things
99
they expected and get it from elsewhere.
101
if search_bytes == 'everything':
102
return vf_search.EverythingResult(repository), None
103
lines = search_bytes.split('\n')
104
if lines[0] == 'ancestry-of':
106
search_result = vf_search.PendingAncestryResult(heads, repository)
107
return search_result, None
108
elif lines[0] == 'search':
109
return self.recreate_search_from_recipe(repository, lines[1:],
110
discard_excess=discard_excess)
112
return (None, FailedSmartServerResponse(('BadSearch',)))
114
def recreate_search_from_recipe(self, repository, lines,
115
discard_excess=False):
116
"""Recreate a specific revision search (vs a from-tip search).
118
:param discard_excess: If True, and the search refers to data we don't
119
have, just silently accept that fact - the verb calling
120
recreate_search trusts that clients will look for missing things
121
they expected and get it from elsewhere.
123
start_keys = set(lines[0].split(' '))
124
exclude_keys = set(lines[1].split(' '))
125
revision_count = int(lines[2])
126
repository.lock_read()
128
search = repository.get_graph()._make_breadth_first_searcher(
132
next_revs = next(search)
133
except StopIteration:
135
search.stop_searching_any(exclude_keys.intersection(next_revs))
136
(started_keys, excludes, included_keys) = search.get_state()
137
if (not discard_excess and len(included_keys) != revision_count):
138
# we got back a different amount of data than expected, this
139
# gets reported as NoSuchRevision, because less revisions
140
# indicates missing revisions, and more should never happen as
141
# the excludes list considers ghosts and ensures that ghost
142
# filling races are not a problem.
143
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
144
search_result = vf_search.SearchResult(started_keys, excludes,
145
len(included_keys), included_keys)
146
return (search_result, None)
151
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
152
"""Calls self.do_readlocked_repository_request."""
154
def do_repository_request(self, repository, *args):
155
"""Read lock a repository for do_readlocked_repository_request."""
156
repository.lock_read()
158
return self.do_readlocked_repository_request(repository, *args)
163
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
164
"""Break a repository lock."""
166
def do_repository_request(self, repository):
167
repository.break_lock()
168
return SuccessfulSmartServerResponse(('ok', ))
173
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
174
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
176
no_extra_results = False
178
def do_repository_request(self, repository, *revision_ids):
179
"""Get parent details for some revisions.
181
All the parents for revision_ids are returned. Additionally up to 64KB
182
of additional parent data found by performing a breadth first search
183
from revision_ids is returned. The verb takes a body containing the
184
current search state, see do_body for details.
186
If 'include-missing:' is in revision_ids, ghosts encountered in the
187
graph traversal for getting parent data are included in the result with
188
a prefix of 'missing:'.
190
:param repository: The repository to query in.
191
:param revision_ids: The utf8 encoded revision_id to answer for.
193
self._revision_ids = revision_ids
194
return None # Signal that we want a body.
196
def do_body(self, body_bytes):
197
"""Process the current search state and perform the parent lookup.
199
:return: A smart server response where the body contains an utf8
200
encoded flattened list of the parents of the revisions (the same
201
format as Repository.get_revision_graph) which has been bz2
204
repository = self._repository
205
repository.lock_read()
207
return self._do_repository_request(body_bytes)
211
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
212
include_missing, max_size=65536):
215
estimator = estimate_compressed_size.ZLibEstimator(max_size)
216
next_revs = revision_ids
217
first_loop_done = False
219
queried_revs.update(next_revs)
220
parent_map = repo_graph.get_parent_map(next_revs)
221
current_revs = next_revs
223
for revision_id in current_revs:
225
parents = parent_map.get(revision_id)
226
if parents is not None:
227
# adjust for the wire
228
if parents == (_mod_revision.NULL_REVISION,):
230
# prepare the next query
231
next_revs.update(parents)
232
encoded_id = revision_id
235
encoded_id = "missing:" + revision_id
237
if (revision_id not in client_seen_revs and
238
(not missing_rev or include_missing)):
239
# Client does not have this revision, give it to it.
240
# add parents to the result
241
result[encoded_id] = parents
242
# Approximate the serialized cost of this revision_id.
243
line = '%s %s\n' % (encoded_id, ' '.join(parents))
244
estimator.add_content(line)
245
# get all the directly asked for parents, and then flesh out to
246
# 64K (compressed) or so. We do one level of depth at a time to
247
# stay in sync with the client. The 250000 magic number is
248
# estimated compression ratio taken from bzr.dev itself.
249
if self.no_extra_results or (first_loop_done and estimator.full()):
250
trace.mutter('size: %d, z_size: %d'
251
% (estimator._uncompressed_size_added,
252
estimator._compressed_size_added))
255
# don't query things we've already queried
256
next_revs = next_revs.difference(queried_revs)
257
first_loop_done = True
260
def _do_repository_request(self, body_bytes):
261
repository = self._repository
262
revision_ids = set(self._revision_ids)
263
include_missing = 'include-missing:' in revision_ids
265
revision_ids.remove('include-missing:')
266
body_lines = body_bytes.split('\n')
267
search_result, error = self.recreate_search_from_recipe(
268
repository, body_lines)
269
if error is not None:
271
# TODO might be nice to start up the search again; but thats not
272
# written or tested yet.
273
client_seen_revs = set(search_result.get_keys())
274
# Always include the requested ids.
275
client_seen_revs.difference_update(revision_ids)
277
repo_graph = repository.get_graph()
278
result = self._expand_requested_revs(repo_graph, revision_ids,
279
client_seen_revs, include_missing)
281
# sorting trivially puts lexographically similar revision ids together.
284
for revision, parents in sorted(result.items()):
285
lines.append(' '.join((revision, ) + tuple(parents)))
287
return SuccessfulSmartServerResponse(
288
('ok', ), bz2.compress('\n'.join(lines)))
291
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
293
def do_readlocked_repository_request(self, repository, revision_id):
294
"""Return the result of repository.get_revision_graph(revision_id).
296
Deprecated as of bzr 1.4, but supported for older clients.
298
:param repository: The repository to query in.
299
:param revision_id: The utf8 encoded revision_id to get a graph from.
300
:return: A smart server response where the body contains an utf8
301
encoded flattened list of the revision graph.
307
graph = repository.get_graph()
309
search_ids = [revision_id]
311
search_ids = repository.all_revision_ids()
312
search = graph._make_breadth_first_searcher(search_ids)
313
transitive_ids = set(itertools.chain.from_iterable(search))
314
parent_map = graph.get_parent_map(transitive_ids)
315
revision_graph = _strip_NULL_ghosts(parent_map)
316
if revision_id and revision_id not in revision_graph:
317
# Note that we return an empty body, rather than omitting the body.
318
# This way the client knows that it can always expect to find a body
319
# in the response for this method, even in the error case.
320
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
322
for revision, parents in revision_graph.items():
323
lines.append(' '.join((revision, ) + tuple(parents)))
325
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
328
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
330
def do_readlocked_repository_request(self, repository, revno,
332
"""Find the revid for a given revno, given a known revno/revid pair.
337
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
338
except errors.RevisionNotPresent as err:
339
if err.revision_id != known_pair[1]:
340
raise AssertionError(
341
'get_rev_id_for_revno raised RevisionNotPresent for '
342
'non-initial revision: ' + err.revision_id)
343
return FailedSmartServerResponse(
344
('nosuchrevision', err.revision_id))
346
return SuccessfulSmartServerResponse(('ok', result))
348
earliest_revno, earliest_revid = result
349
return SuccessfulSmartServerResponse(
350
('history-incomplete', earliest_revno, earliest_revid))
353
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
355
def do_repository_request(self, repository):
356
"""Return the serializer format for this repository.
360
:param repository: The repository to query
361
:return: A smart server response ('ok', FORMAT)
363
serializer = repository.get_serializer_format()
364
return SuccessfulSmartServerResponse(('ok', serializer))
367
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
369
def do_repository_request(self, repository, revision_id):
370
"""Return ok if a specific revision is in the repository at path.
372
:param repository: The repository to query in.
373
:param revision_id: The utf8 encoded revision_id to lookup.
374
:return: A smart server response of ('yes', ) if the revision is
375
present. ('no', ) if it is missing.
377
if repository.has_revision(revision_id):
378
return SuccessfulSmartServerResponse(('yes', ))
380
return SuccessfulSmartServerResponse(('no', ))
383
class SmartServerRequestHasSignatureForRevisionId(
384
SmartServerRepositoryRequest):
386
def do_repository_request(self, repository, revision_id):
387
"""Return ok if a signature is present for a revision.
389
Introduced in bzr 2.5.0.
391
:param repository: The repository to query in.
392
:param revision_id: The utf8 encoded revision_id to lookup.
393
:return: A smart server response of ('yes', ) if a
394
signature for the revision is present,
395
('no', ) if it is missing.
398
if repository.has_signature_for_revision_id(revision_id):
399
return SuccessfulSmartServerResponse(('yes', ))
401
return SuccessfulSmartServerResponse(('no', ))
402
except errors.NoSuchRevision:
403
return FailedSmartServerResponse(
404
('nosuchrevision', revision_id))
407
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
409
def do_repository_request(self, repository, revid, committers):
410
"""Return the result of repository.gather_stats().
412
:param repository: The repository to query in.
413
:param revid: utf8 encoded rev id or an empty string to indicate None
414
:param committers: 'yes' or 'no'.
416
:return: A SmartServerResponse ('ok',), a encoded body looking like
419
latestrev: 345.700 3600
422
But containing only fields returned by the gather_stats() call
425
decoded_revision_id = None
427
decoded_revision_id = revid
428
if committers == 'yes':
429
decoded_committers = True
431
decoded_committers = None
433
stats = repository.gather_stats(decoded_revision_id,
435
except errors.NoSuchRevision:
436
return FailedSmartServerResponse(('nosuchrevision', revid))
439
if 'committers' in stats:
440
body += 'committers: %d\n' % stats['committers']
441
if 'firstrev' in stats:
442
body += 'firstrev: %.3f %d\n' % stats['firstrev']
443
if 'latestrev' in stats:
444
body += 'latestrev: %.3f %d\n' % stats['latestrev']
445
if 'revisions' in stats:
446
body += 'revisions: %d\n' % stats['revisions']
448
body += 'size: %d\n' % stats['size']
450
return SuccessfulSmartServerResponse(('ok', ), body)
453
class SmartServerRepositoryGetRevisionSignatureText(
454
SmartServerRepositoryRequest):
455
"""Return the signature text of a revision.
460
def do_repository_request(self, repository, revision_id):
461
"""Return the result of repository.get_signature_text().
463
:param repository: The repository to query in.
464
:return: A smart server response of with the signature text as
468
text = repository.get_signature_text(revision_id)
469
except errors.NoSuchRevision as err:
470
return FailedSmartServerResponse(
471
('nosuchrevision', err.revision))
472
return SuccessfulSmartServerResponse(('ok', ), text)
475
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
477
def do_repository_request(self, repository):
478
"""Return the result of repository.is_shared().
480
:param repository: The repository to query in.
481
:return: A smart server response of ('yes', ) if the repository is
482
shared, and ('no', ) if it is not.
484
if repository.is_shared():
485
return SuccessfulSmartServerResponse(('yes', ))
487
return SuccessfulSmartServerResponse(('no', ))
490
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
492
def do_repository_request(self, repository):
493
"""Return the result of repository.make_working_trees().
495
Introduced in bzr 2.5.0.
497
:param repository: The repository to query in.
498
:return: A smart server response of ('yes', ) if the repository uses
499
working trees, and ('no', ) if it is not.
501
if repository.make_working_trees():
502
return SuccessfulSmartServerResponse(('yes', ))
504
return SuccessfulSmartServerResponse(('no', ))
507
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
509
def do_repository_request(self, repository, token=''):
510
# XXX: this probably should not have a token.
514
token = repository.lock_write(token=token).repository_token
515
except errors.LockContention as e:
516
return FailedSmartServerResponse(('LockContention',))
517
except errors.UnlockableTransport:
518
return FailedSmartServerResponse(('UnlockableTransport',))
519
except errors.LockFailed as e:
520
return FailedSmartServerResponse(('LockFailed',
521
str(e.lock), str(e.why)))
522
if token is not None:
523
repository.leave_lock_in_place()
527
return SuccessfulSmartServerResponse(('ok', token))
530
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
532
def do_repository_request(self, repository, to_network_name):
533
"""Get a stream for inserting into a to_format repository.
535
The request body is 'search_bytes', a description of the revisions
538
In 2.3 this verb added support for search_bytes == 'everything'. Older
539
implementations will respond with a BadSearch error, and clients should
540
catch this and fallback appropriately.
542
:param repository: The repository to stream from.
543
:param to_network_name: The network name of the format of the target
546
self._to_format = network_format_registry.get(to_network_name)
547
if self._should_fake_unknown():
548
return FailedSmartServerResponse(
549
('UnknownMethod', 'Repository.get_stream'))
550
return None # Signal that we want a body.
552
def _should_fake_unknown(self):
553
"""Return True if we should return UnknownMethod to the client.
555
This is a workaround for bugs in pre-1.19 clients that claim to
556
support receiving streams of CHK repositories. The pre-1.19 client
557
expects inventory records to be serialized in the format defined by
558
to_network_name, but in pre-1.19 (at least) that format definition
559
tries to use the xml5 serializer, which does not correctly handle
560
rich-roots. After 1.19 the client can also accept inventory-deltas
561
(which avoids this issue), and those clients will use the
562
Repository.get_stream_1.19 verb instead of this one.
563
So: if this repository is CHK, and the to_format doesn't match,
564
we should just fake an UnknownSmartMethod error so that the client
565
will fallback to VFS, rather than sending it a stream we know it
568
from_format = self._repository._format
569
to_format = self._to_format
570
if not from_format.supports_chks:
571
# Source not CHK: that's ok
573
if (to_format.supports_chks and
574
from_format.repository_class is to_format.repository_class and
575
from_format._serializer == to_format._serializer):
576
# Source is CHK, but target matches: that's ok
577
# (e.g. 2a->2a, or CHK2->2a)
579
# Source is CHK, and target is not CHK or incompatible CHK. We can't
580
# generate a compatible stream.
583
def do_body(self, body_bytes):
584
repository = self._repository
585
repository.lock_read()
587
search_result, error = self.recreate_search(repository, body_bytes,
589
if error is not None:
592
source = repository._get_source(self._to_format)
593
stream = source.get_stream(search_result)
596
# On non-error, unlocking is done by the body stream handler.
600
return SuccessfulSmartServerResponse(('ok',),
601
body_stream=self.body_stream(stream, repository))
603
def body_stream(self, stream, repository):
604
byte_stream = _stream_to_byte_stream(stream, repository._format)
606
for bytes in byte_stream:
608
except errors.RevisionNotPresent as e:
609
# This shouldn't be able to happen, but as we don't buffer
610
# everything it can in theory happen.
612
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
617
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
618
"""The same as Repository.get_stream, but will return stream CHK formats to
621
See SmartServerRepositoryGetStream._should_fake_unknown.
626
def _should_fake_unknown(self):
627
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
631
def _stream_to_byte_stream(stream, src_format):
632
"""Convert a record stream to a self delimited byte stream."""
633
pack_writer = pack.ContainerSerialiser()
634
yield pack_writer.begin()
635
yield pack_writer.bytes_record(src_format.network_name(), '')
636
for substream_type, substream in stream:
637
for record in substream:
638
if record.storage_kind in ('chunked', 'fulltext'):
639
serialised = record_to_fulltext_bytes(record)
640
elif record.storage_kind == 'absent':
641
raise ValueError("Absent factory for %s" % (record.key,))
643
serialised = record.get_bytes_as(record.storage_kind)
645
# Some streams embed the whole stream into the wire
646
# representation of the first record, which means that
647
# later records have no wire representation: we skip them.
648
yield pack_writer.bytes_record(serialised, [(substream_type,)])
649
yield pack_writer.end()
652
class _ByteStreamDecoder(object):
653
"""Helper for _byte_stream_to_stream.
655
The expected usage of this class is via the function _byte_stream_to_stream
656
which creates a _ByteStreamDecoder, pops off the stream format and then
657
yields the output of record_stream(), the main entry point to
660
Broadly this class has to unwrap two layers of iterators:
664
This is complicated by wishing to return type, iterator_for_type, but
665
getting the data for iterator_for_type when we find out type: we can't
666
simply pass a generator down to the NetworkRecordStream parser, instead
667
we have a little local state to seed each NetworkRecordStream instance,
668
and gather the type that we'll be yielding.
670
:ivar byte_stream: The byte stream being decoded.
671
:ivar stream_decoder: A pack parser used to decode the bytestream
672
:ivar current_type: The current type, used to join adjacent records of the
673
same type into a single stream.
674
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
677
def __init__(self, byte_stream, record_counter):
678
"""Create a _ByteStreamDecoder."""
679
self.stream_decoder = pack.ContainerPushParser()
680
self.current_type = None
681
self.first_bytes = None
682
self.byte_stream = byte_stream
683
self._record_counter = record_counter
686
def iter_stream_decoder(self):
687
"""Iterate the contents of the pack from stream_decoder."""
688
# dequeue pending items
689
for record in self.stream_decoder.read_pending_records():
691
# Pull bytes of the wire, decode them to records, yield those records.
692
for bytes in self.byte_stream:
693
self.stream_decoder.accept_bytes(bytes)
694
for record in self.stream_decoder.read_pending_records():
697
def iter_substream_bytes(self):
698
if self.first_bytes is not None:
699
yield self.first_bytes
700
# If we run out of pack records, single the outer layer to stop.
701
self.first_bytes = None
702
for record in self.iter_pack_records:
703
record_names, record_bytes = record
704
record_name, = record_names
705
substream_type = record_name[0]
706
if substream_type != self.current_type:
707
# end of a substream, seed the next substream.
708
self.current_type = substream_type
709
self.first_bytes = record_bytes
713
def record_stream(self):
714
"""Yield substream_type, substream from the byte stream."""
715
def wrap_and_count(pb, rc, substream):
716
"""Yield records from stream while showing progress."""
719
if self.current_type != 'revisions' and self.key_count != 0:
720
# As we know the number of revisions now (in self.key_count)
721
# we can setup and use record_counter (rc).
722
if not rc.is_initialized():
723
rc.setup(self.key_count, self.key_count)
724
for record in substream.read():
726
if rc.is_initialized() and counter == rc.STEP:
727
rc.increment(counter)
728
pb.update('Estimate', rc.current, rc.max)
730
if self.current_type == 'revisions':
731
# Total records is proportional to number of revs
732
# to fetch. With remote, we used self.key_count to
733
# track the number of revs. Once we have the revs
734
# counts in self.key_count, the progress bar changes
735
# from 'Estimating..' to 'Estimate' above.
737
if counter == rc.STEP:
738
pb.update('Estimating..', self.key_count)
744
pb = ui.ui_factory.nested_progress_bar()
745
rc = self._record_counter
747
# Make and consume sub generators, one per substream type:
748
while self.first_bytes is not None:
749
substream = NetworkRecordStream(self.iter_substream_bytes())
750
# after substream is fully consumed, self.current_type is set
751
# to the next type, and self.first_bytes is set to the matching
753
yield self.current_type, wrap_and_count(pb, rc, substream)
756
pb.update('Done', rc.max, rc.max)
759
def seed_state(self):
760
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
761
# Set a single generator we can use to get data from the pack stream.
762
self.iter_pack_records = self.iter_stream_decoder()
763
# Seed the very first subiterator with content; after this each one
765
list(self.iter_substream_bytes())
768
def _byte_stream_to_stream(byte_stream, record_counter=None):
769
"""Convert a byte stream into a format and a stream.
771
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
772
:return: (RepositoryFormat, stream_generator)
774
decoder = _ByteStreamDecoder(byte_stream, record_counter)
775
for bytes in byte_stream:
776
decoder.stream_decoder.accept_bytes(bytes)
777
for record in decoder.stream_decoder.read_pending_records(max=1):
778
record_names, src_format_name = record
779
src_format = network_format_registry.get(src_format_name)
780
return src_format, decoder.record_stream()
783
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
785
def do_repository_request(self, repository, token):
787
repository.lock_write(token=token)
788
except errors.TokenMismatch as e:
789
return FailedSmartServerResponse(('TokenMismatch',))
790
repository.dont_leave_lock_in_place()
792
return SuccessfulSmartServerResponse(('ok',))
795
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
796
"""Get the physical lock status for a repository.
801
def do_repository_request(self, repository):
802
if repository.get_physical_lock_status():
803
return SuccessfulSmartServerResponse(('yes', ))
805
return SuccessfulSmartServerResponse(('no', ))
808
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
810
def do_repository_request(self, repository, str_bool_new_value):
811
if str_bool_new_value == 'True':
815
repository.set_make_working_trees(new_value)
816
return SuccessfulSmartServerResponse(('ok',))
819
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
820
"""Get the raw repository files as a tarball.
822
The returned tarball contains a .bzr control directory which in turn
823
contains a repository.
825
This takes one parameter, compression, which currently must be
828
This is used to implement the Repository.copy_content_into operation.
831
def do_repository_request(self, repository, compression):
832
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
834
controldir_name = tmp_dirname + '/.bzr'
835
return self._tarfile_response(controldir_name, compression)
837
osutils.rmtree(tmp_dirname)
839
def _copy_to_tempdir(self, from_repo):
840
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
841
tmp_bzrdir = from_repo.controldir._format.initialize(tmp_dirname)
842
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
843
from_repo.copy_content_into(tmp_repo)
844
return tmp_dirname, tmp_repo
846
def _tarfile_response(self, tmp_dirname, compression):
847
temp = tempfile.NamedTemporaryFile()
849
self._tarball_of_dir(tmp_dirname, compression, temp.file)
850
# all finished; write the tempfile out to the network
852
return SuccessfulSmartServerResponse(('ok',), temp.read())
853
# FIXME: Don't read the whole thing into memory here; rather stream
854
# it out from the file onto the network. mbp 20070411
858
def _tarball_of_dir(self, dirname, compression, ofile):
860
filename = os.path.basename(ofile.name)
861
tarball = tarfile.open(fileobj=ofile, name=filename,
862
mode='w|' + compression)
864
# The tarball module only accepts ascii names, and (i guess)
865
# packs them with their 8bit names. We know all the files
866
# within the repository have ASCII names so the should be safe
868
dirname = dirname.encode(sys.getfilesystemencoding())
869
# python's tarball module includes the whole path by default so
871
if not dirname.endswith('.bzr'):
872
raise ValueError(dirname)
873
tarball.add(dirname, '.bzr') # recursive by default
878
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
879
"""Insert a record stream from a RemoteSink into a repository.
881
This gets bytes pushed to it by the network infrastructure and turns that
882
into a bytes iterator using a thread. That is then processed by
883
_byte_stream_to_stream.
888
def do_repository_request(self, repository, resume_tokens, lock_token):
889
"""StreamSink.insert_stream for a remote repository."""
890
repository.lock_write(token=lock_token)
891
self.do_insert_stream_request(repository, resume_tokens)
893
def do_insert_stream_request(self, repository, resume_tokens):
894
tokens = [token for token in resume_tokens.split(' ') if token]
896
self.repository = repository
897
self.queue = queue.Queue()
898
self.insert_thread = threading.Thread(target=self._inserter_thread)
899
self.insert_thread.start()
901
def do_chunk(self, body_stream_chunk):
902
self.queue.put(body_stream_chunk)
904
def _inserter_thread(self):
906
src_format, stream = _byte_stream_to_stream(
907
self.blocking_byte_stream())
908
self.insert_result = self.repository._get_sink().insert_stream(
909
stream, src_format, self.tokens)
910
self.insert_ok = True
912
self.insert_exception = sys.exc_info()
913
self.insert_ok = False
915
def blocking_byte_stream(self):
917
bytes = self.queue.get()
918
if bytes is StopIteration:
924
self.queue.put(StopIteration)
925
if self.insert_thread is not None:
926
self.insert_thread.join()
927
if not self.insert_ok:
929
reraise(*self.insert_exception)
931
del self.insert_exception
932
write_group_tokens, missing_keys = self.insert_result
933
if write_group_tokens or missing_keys:
934
# bzip needed? missing keys should typically be a small set.
935
# Should this be a streaming body response ?
936
missing_keys = sorted(missing_keys)
937
bytes = bencode.bencode((write_group_tokens, missing_keys))
938
self.repository.unlock()
939
return SuccessfulSmartServerResponse(('missing-basis', bytes))
941
self.repository.unlock()
942
return SuccessfulSmartServerResponse(('ok', ))
945
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
946
"""Insert a record stream from a RemoteSink into a repository.
948
Same as SmartServerRepositoryInsertStreamLocked, except:
949
- the lock token argument is optional
950
- servers that implement this verb accept 'inventory-delta' records in the
956
def do_repository_request(self, repository, resume_tokens, lock_token=None):
957
"""StreamSink.insert_stream for a remote repository."""
958
SmartServerRepositoryInsertStreamLocked.do_repository_request(
959
self, repository, resume_tokens, lock_token)
962
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
963
"""Insert a record stream from a RemoteSink into an unlocked repository.
965
This is the same as SmartServerRepositoryInsertStreamLocked, except it
966
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
967
like pack format) repository.
972
def do_repository_request(self, repository, resume_tokens):
973
"""StreamSink.insert_stream for a remote repository."""
974
repository.lock_write()
975
self.do_insert_stream_request(repository, resume_tokens)
978
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
979
"""Add a revision signature text.
984
def do_repository_request(self, repository, lock_token, revision_id,
985
*write_group_tokens):
986
"""Add a revision signature text.
988
:param repository: Repository to operate on
989
:param lock_token: Lock token
990
:param revision_id: Revision for which to add signature
991
:param write_group_tokens: Write group tokens
993
self._lock_token = lock_token
994
self._revision_id = revision_id
995
self._write_group_tokens = write_group_tokens
998
def do_body(self, body_bytes):
999
"""Add a signature text.
1001
:param body_bytes: GPG signature text
1002
:return: SuccessfulSmartServerResponse with arguments 'ok' and
1003
the list of new write group tokens.
1005
self._repository.lock_write(token=self._lock_token)
1007
self._repository.resume_write_group(self._write_group_tokens)
1009
self._repository.add_signature_text(self._revision_id,
1012
new_write_group_tokens = self._repository.suspend_write_group()
1014
self._repository.unlock()
1015
return SuccessfulSmartServerResponse(
1016
('ok', ) + tuple(new_write_group_tokens))
1019
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1020
"""Start a write group.
1025
def do_repository_request(self, repository, lock_token):
1026
"""Start a write group."""
1027
repository.lock_write(token=lock_token)
1029
repository.start_write_group()
1031
tokens = repository.suspend_write_group()
1032
except errors.UnsuspendableWriteGroup:
1033
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1036
return SuccessfulSmartServerResponse(('ok', tokens))
1039
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1040
"""Commit a write group.
1045
def do_repository_request(self, repository, lock_token,
1046
write_group_tokens):
1047
"""Commit a write group."""
1048
repository.lock_write(token=lock_token)
1051
repository.resume_write_group(write_group_tokens)
1052
except errors.UnresumableWriteGroup as e:
1053
return FailedSmartServerResponse(
1054
('UnresumableWriteGroup', e.write_groups, e.reason))
1056
repository.commit_write_group()
1058
write_group_tokens = repository.suspend_write_group()
1059
# FIXME JRV 2011-11-19: What if the write_group_tokens
1064
return SuccessfulSmartServerResponse(('ok', ))
1067
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1068
"""Abort a write group.
1073
def do_repository_request(self, repository, lock_token, write_group_tokens):
1074
"""Abort a write group."""
1075
repository.lock_write(token=lock_token)
1078
repository.resume_write_group(write_group_tokens)
1079
except errors.UnresumableWriteGroup as e:
1080
return FailedSmartServerResponse(
1081
('UnresumableWriteGroup', e.write_groups, e.reason))
1082
repository.abort_write_group()
1085
return SuccessfulSmartServerResponse(('ok', ))
1088
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1089
"""Check that a write group is still valid.
1094
def do_repository_request(self, repository, lock_token, write_group_tokens):
1095
"""Abort a write group."""
1096
repository.lock_write(token=lock_token)
1099
repository.resume_write_group(write_group_tokens)
1100
except errors.UnresumableWriteGroup as e:
1101
return FailedSmartServerResponse(
1102
('UnresumableWriteGroup', e.write_groups, e.reason))
1104
repository.suspend_write_group()
1107
return SuccessfulSmartServerResponse(('ok', ))
1110
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1111
"""Retrieve all of the revision ids in a repository.
1116
def do_repository_request(self, repository):
1117
revids = repository.all_revision_ids()
1118
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1121
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1122
"""Reconcile a repository.
1127
def do_repository_request(self, repository, lock_token):
1129
repository.lock_write(token=lock_token)
1130
except errors.TokenLockingNotSupported as e:
1131
return FailedSmartServerResponse(
1132
('TokenLockingNotSupported', ))
1134
reconciler = repository.reconcile()
1138
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1139
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1141
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1144
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1145
"""Pack a repository.
1150
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1151
self._repository = repository
1152
self._lock_token = lock_token
1153
if clean_obsolete_packs == 'True':
1154
self._clean_obsolete_packs = True
1156
self._clean_obsolete_packs = False
1159
def do_body(self, body_bytes):
1160
if body_bytes == "":
1163
hint = body_bytes.splitlines()
1164
self._repository.lock_write(token=self._lock_token)
1166
self._repository.pack(hint, self._clean_obsolete_packs)
1168
self._repository.unlock()
1169
return SuccessfulSmartServerResponse(("ok", ), )
1172
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1173
"""Iterate over the contents of files.
1175
The client sends a list of desired files to stream, one
1176
per line, and as tuples of file id and revision, separated by
1179
The server replies with a stream. Each entry is preceded by a header,
1180
which can either be:
1182
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1183
list sent by the client. This header is followed by the contents of
1184
the file, bzip2-compressed.
1185
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1186
The client can then raise an appropriate RevisionNotPresent error
1187
or check its fallback repositories.
1192
def body_stream(self, repository, desired_files):
1193
self._repository.lock_read()
1196
for i, key in enumerate(desired_files):
1198
for record in repository.texts.get_record_stream(text_keys,
1200
identifier = text_keys[record.key]
1201
if record.storage_kind == 'absent':
1202
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1203
record.key[1], identifier)
1204
# FIXME: Way to abort early?
1206
yield "ok\0%d\n" % identifier
1207
compressor = zlib.compressobj()
1208
for bytes in record.get_bytes_as('chunked'):
1209
data = compressor.compress(bytes)
1212
data = compressor.flush()
1216
self._repository.unlock()
1218
def do_body(self, body_bytes):
1220
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1221
return SuccessfulSmartServerResponse(('ok', ),
1222
body_stream=self.body_stream(self._repository, desired_files))
1224
def do_repository_request(self, repository):
1225
# Signal that we want a body
1229
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1230
"""Stream a list of revisions.
1232
The client sends a list of newline-separated revision ids in the
1233
body of the request and the server replies with the serializer format,
1234
and a stream of bzip2-compressed revision texts (using the specified
1237
Any revisions the server does not have are omitted from the stream.
1242
def do_repository_request(self, repository):
1243
self._repository = repository
1244
# Signal there is a body
1247
def do_body(self, body_bytes):
1248
revision_ids = body_bytes.split("\n")
1249
return SuccessfulSmartServerResponse(
1250
('ok', self._repository.get_serializer_format()),
1251
body_stream=self.body_stream(self._repository, revision_ids))
1253
def body_stream(self, repository, revision_ids):
1254
self._repository.lock_read()
1256
for record in repository.revisions.get_record_stream(
1257
[(revid,) for revid in revision_ids], 'unordered', True):
1258
if record.storage_kind == 'absent':
1260
yield zlib.compress(record.get_bytes_as('fulltext'))
1262
self._repository.unlock()
1265
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1266
"""Get the inventory deltas for a set of revision ids.
1268
This accepts a list of revision ids, and then sends a chain
1269
of deltas for the inventories of those revisions. The first
1270
revision will be empty.
1272
The server writes back zlibbed serialized inventory deltas,
1273
in the ordering specified. The base for each delta is the
1274
inventory generated by the previous delta.
1279
def _inventory_delta_stream(self, repository, ordering, revids):
1280
prev_inv = _mod_inventory.Inventory(root_id=None,
1281
revision_id=_mod_revision.NULL_REVISION)
1282
serializer = inventory_delta.InventoryDeltaSerializer(
1283
repository.supports_rich_root(),
1284
repository._format.supports_tree_reference)
1285
repository.lock_read()
1287
for inv, revid in repository._iter_inventories(revids, ordering):
1290
inv_delta = inv._make_delta(prev_inv)
1291
lines = serializer.delta_to_lines(
1292
prev_inv.revision_id, inv.revision_id, inv_delta)
1293
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1298
def body_stream(self, repository, ordering, revids):
1299
substream = self._inventory_delta_stream(repository,
1301
return _stream_to_byte_stream([('inventory-deltas', substream)],
1304
def do_body(self, body_bytes):
1305
return SuccessfulSmartServerResponse(('ok', ),
1306
body_stream=self.body_stream(self._repository, self._ordering,
1307
body_bytes.splitlines()))
1309
def do_repository_request(self, repository, ordering):
1310
if ordering == 'unordered':
1311
# inventory deltas for a topologically sorted stream
1312
# are likely to be smaller
1313
ordering = 'topological'
1314
self._ordering = ordering
1315
# Signal that we want a body