1
# Copyright (C) 2006-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Server-side repository related request implementations."""
19
from __future__ import absolute_import
36
estimate_compressed_size,
37
inventory as _mod_inventory,
45
from ..bzrdir import BzrDir
46
from ..sixish import (
49
from .request import (
50
FailedSmartServerResponse,
52
SuccessfulSmartServerResponse,
54
from ..repository import _strip_NULL_ghosts, network_format_registry
55
from .. import revision as _mod_revision
56
from ..versionedfile import (
57
ChunkedContentFactory,
59
record_to_fulltext_bytes,
63
class SmartServerRepositoryRequest(SmartServerRequest):
64
"""Common base class for Repository requests."""
66
def do(self, path, *args):
67
"""Execute a repository request.
69
All Repository requests take a path to the repository as their first
70
argument. The repository must be at the exact path given by the
71
client - no searching is done.
73
The actual logic is delegated to self.do_repository_request.
75
:param client_path: The path for the repository as received from the
77
:return: A SmartServerResponse from self.do_repository_request().
79
transport = self.transport_from_client_path(path)
80
bzrdir = BzrDir.open_from_transport(transport)
81
# Save the repository for use with do_body.
82
self._repository = bzrdir.open_repository()
83
return self.do_repository_request(self._repository, *args)
85
def do_repository_request(self, repository, *args):
86
"""Override to provide an implementation for a verb."""
87
# No-op for verbs that take bodies (None as a result indicates a body
91
def recreate_search(self, repository, search_bytes, discard_excess=False):
92
"""Recreate a search from its serialised form.
94
:param discard_excess: If True, and the search refers to data we don't
95
have, just silently accept that fact - the verb calling
96
recreate_search trusts that clients will look for missing things
97
they expected and get it from elsewhere.
99
if search_bytes == 'everything':
100
return vf_search.EverythingResult(repository), None
101
lines = search_bytes.split('\n')
102
if lines[0] == 'ancestry-of':
104
search_result = vf_search.PendingAncestryResult(heads, repository)
105
return search_result, None
106
elif lines[0] == 'search':
107
return self.recreate_search_from_recipe(repository, lines[1:],
108
discard_excess=discard_excess)
110
return (None, FailedSmartServerResponse(('BadSearch',)))
112
def recreate_search_from_recipe(self, repository, lines,
113
discard_excess=False):
114
"""Recreate a specific revision search (vs a from-tip search).
116
:param discard_excess: If True, and the search refers to data we don't
117
have, just silently accept that fact - the verb calling
118
recreate_search trusts that clients will look for missing things
119
they expected and get it from elsewhere.
121
start_keys = set(lines[0].split(' '))
122
exclude_keys = set(lines[1].split(' '))
123
revision_count = int(lines[2])
124
repository.lock_read()
126
search = repository.get_graph()._make_breadth_first_searcher(
130
next_revs = next(search)
131
except StopIteration:
133
search.stop_searching_any(exclude_keys.intersection(next_revs))
134
(started_keys, excludes, included_keys) = search.get_state()
135
if (not discard_excess and len(included_keys) != revision_count):
136
# we got back a different amount of data than expected, this
137
# gets reported as NoSuchRevision, because less revisions
138
# indicates missing revisions, and more should never happen as
139
# the excludes list considers ghosts and ensures that ghost
140
# filling races are not a problem.
141
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
142
search_result = vf_search.SearchResult(started_keys, excludes,
143
len(included_keys), included_keys)
144
return (search_result, None)
149
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
150
"""Calls self.do_readlocked_repository_request."""
152
def do_repository_request(self, repository, *args):
153
"""Read lock a repository for do_readlocked_repository_request."""
154
repository.lock_read()
156
return self.do_readlocked_repository_request(repository, *args)
161
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
162
"""Break a repository lock."""
164
def do_repository_request(self, repository):
165
repository.break_lock()
166
return SuccessfulSmartServerResponse(('ok', ))
171
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
172
"""Bzr 1.2+ - get parent data for revisions during a graph search."""
174
no_extra_results = False
176
def do_repository_request(self, repository, *revision_ids):
177
"""Get parent details for some revisions.
179
All the parents for revision_ids are returned. Additionally up to 64KB
180
of additional parent data found by performing a breadth first search
181
from revision_ids is returned. The verb takes a body containing the
182
current search state, see do_body for details.
184
If 'include-missing:' is in revision_ids, ghosts encountered in the
185
graph traversal for getting parent data are included in the result with
186
a prefix of 'missing:'.
188
:param repository: The repository to query in.
189
:param revision_ids: The utf8 encoded revision_id to answer for.
191
self._revision_ids = revision_ids
192
return None # Signal that we want a body.
194
def do_body(self, body_bytes):
195
"""Process the current search state and perform the parent lookup.
197
:return: A smart server response where the body contains an utf8
198
encoded flattened list of the parents of the revisions (the same
199
format as Repository.get_revision_graph) which has been bz2
202
repository = self._repository
203
repository.lock_read()
205
return self._do_repository_request(body_bytes)
209
def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
210
include_missing, max_size=65536):
213
estimator = estimate_compressed_size.ZLibEstimator(max_size)
214
next_revs = revision_ids
215
first_loop_done = False
217
queried_revs.update(next_revs)
218
parent_map = repo_graph.get_parent_map(next_revs)
219
current_revs = next_revs
221
for revision_id in current_revs:
223
parents = parent_map.get(revision_id)
224
if parents is not None:
225
# adjust for the wire
226
if parents == (_mod_revision.NULL_REVISION,):
228
# prepare the next query
229
next_revs.update(parents)
230
encoded_id = revision_id
233
encoded_id = "missing:" + revision_id
235
if (revision_id not in client_seen_revs and
236
(not missing_rev or include_missing)):
237
# Client does not have this revision, give it to it.
238
# add parents to the result
239
result[encoded_id] = parents
240
# Approximate the serialized cost of this revision_id.
241
line = '%s %s\n' % (encoded_id, ' '.join(parents))
242
estimator.add_content(line)
243
# get all the directly asked for parents, and then flesh out to
244
# 64K (compressed) or so. We do one level of depth at a time to
245
# stay in sync with the client. The 250000 magic number is
246
# estimated compression ratio taken from bzr.dev itself.
247
if self.no_extra_results or (first_loop_done and estimator.full()):
248
trace.mutter('size: %d, z_size: %d'
249
% (estimator._uncompressed_size_added,
250
estimator._compressed_size_added))
253
# don't query things we've already queried
254
next_revs = next_revs.difference(queried_revs)
255
first_loop_done = True
258
def _do_repository_request(self, body_bytes):
259
repository = self._repository
260
revision_ids = set(self._revision_ids)
261
include_missing = 'include-missing:' in revision_ids
263
revision_ids.remove('include-missing:')
264
body_lines = body_bytes.split('\n')
265
search_result, error = self.recreate_search_from_recipe(
266
repository, body_lines)
267
if error is not None:
269
# TODO might be nice to start up the search again; but thats not
270
# written or tested yet.
271
client_seen_revs = set(search_result.get_keys())
272
# Always include the requested ids.
273
client_seen_revs.difference_update(revision_ids)
275
repo_graph = repository.get_graph()
276
result = self._expand_requested_revs(repo_graph, revision_ids,
277
client_seen_revs, include_missing)
279
# sorting trivially puts lexographically similar revision ids together.
282
for revision, parents in sorted(result.items()):
283
lines.append(' '.join((revision, ) + tuple(parents)))
285
return SuccessfulSmartServerResponse(
286
('ok', ), bz2.compress('\n'.join(lines)))
289
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
291
def do_readlocked_repository_request(self, repository, revision_id):
292
"""Return the result of repository.get_revision_graph(revision_id).
294
Deprecated as of bzr 1.4, but supported for older clients.
296
:param repository: The repository to query in.
297
:param revision_id: The utf8 encoded revision_id to get a graph from.
298
:return: A smart server response where the body contains an utf8
299
encoded flattened list of the revision graph.
305
graph = repository.get_graph()
307
search_ids = [revision_id]
309
search_ids = repository.all_revision_ids()
310
search = graph._make_breadth_first_searcher(search_ids)
311
transitive_ids = set(itertools.chain.from_iterable(search))
312
parent_map = graph.get_parent_map(transitive_ids)
313
revision_graph = _strip_NULL_ghosts(parent_map)
314
if revision_id and revision_id not in revision_graph:
315
# Note that we return an empty body, rather than omitting the body.
316
# This way the client knows that it can always expect to find a body
317
# in the response for this method, even in the error case.
318
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
320
for revision, parents in revision_graph.items():
321
lines.append(' '.join((revision, ) + tuple(parents)))
323
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
326
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
328
def do_readlocked_repository_request(self, repository, revno,
330
"""Find the revid for a given revno, given a known revno/revid pair.
335
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
336
except errors.RevisionNotPresent as err:
337
if err.revision_id != known_pair[1]:
338
raise AssertionError(
339
'get_rev_id_for_revno raised RevisionNotPresent for '
340
'non-initial revision: ' + err.revision_id)
341
return FailedSmartServerResponse(
342
('nosuchrevision', err.revision_id))
344
return SuccessfulSmartServerResponse(('ok', result))
346
earliest_revno, earliest_revid = result
347
return SuccessfulSmartServerResponse(
348
('history-incomplete', earliest_revno, earliest_revid))
351
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
353
def do_repository_request(self, repository):
354
"""Return the serializer format for this repository.
358
:param repository: The repository to query
359
:return: A smart server response ('ok', FORMAT)
361
serializer = repository.get_serializer_format()
362
return SuccessfulSmartServerResponse(('ok', serializer))
365
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
367
def do_repository_request(self, repository, revision_id):
368
"""Return ok if a specific revision is in the repository at path.
370
:param repository: The repository to query in.
371
:param revision_id: The utf8 encoded revision_id to lookup.
372
:return: A smart server response of ('yes', ) if the revision is
373
present. ('no', ) if it is missing.
375
if repository.has_revision(revision_id):
376
return SuccessfulSmartServerResponse(('yes', ))
378
return SuccessfulSmartServerResponse(('no', ))
381
class SmartServerRequestHasSignatureForRevisionId(
382
SmartServerRepositoryRequest):
384
def do_repository_request(self, repository, revision_id):
385
"""Return ok if a signature is present for a revision.
387
Introduced in bzr 2.5.0.
389
:param repository: The repository to query in.
390
:param revision_id: The utf8 encoded revision_id to lookup.
391
:return: A smart server response of ('yes', ) if a
392
signature for the revision is present,
393
('no', ) if it is missing.
396
if repository.has_signature_for_revision_id(revision_id):
397
return SuccessfulSmartServerResponse(('yes', ))
399
return SuccessfulSmartServerResponse(('no', ))
400
except errors.NoSuchRevision:
401
return FailedSmartServerResponse(
402
('nosuchrevision', revision_id))
405
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
407
def do_repository_request(self, repository, revid, committers):
408
"""Return the result of repository.gather_stats().
410
:param repository: The repository to query in.
411
:param revid: utf8 encoded rev id or an empty string to indicate None
412
:param committers: 'yes' or 'no'.
414
:return: A SmartServerResponse ('ok',), a encoded body looking like
417
latestrev: 345.700 3600
420
But containing only fields returned by the gather_stats() call
423
decoded_revision_id = None
425
decoded_revision_id = revid
426
if committers == 'yes':
427
decoded_committers = True
429
decoded_committers = None
431
stats = repository.gather_stats(decoded_revision_id,
433
except errors.NoSuchRevision:
434
return FailedSmartServerResponse(('nosuchrevision', revid))
437
if 'committers' in stats:
438
body += 'committers: %d\n' % stats['committers']
439
if 'firstrev' in stats:
440
body += 'firstrev: %.3f %d\n' % stats['firstrev']
441
if 'latestrev' in stats:
442
body += 'latestrev: %.3f %d\n' % stats['latestrev']
443
if 'revisions' in stats:
444
body += 'revisions: %d\n' % stats['revisions']
446
body += 'size: %d\n' % stats['size']
448
return SuccessfulSmartServerResponse(('ok', ), body)
451
class SmartServerRepositoryGetRevisionSignatureText(
452
SmartServerRepositoryRequest):
453
"""Return the signature text of a revision.
458
def do_repository_request(self, repository, revision_id):
459
"""Return the result of repository.get_signature_text().
461
:param repository: The repository to query in.
462
:return: A smart server response of with the signature text as
466
text = repository.get_signature_text(revision_id)
467
except errors.NoSuchRevision as err:
468
return FailedSmartServerResponse(
469
('nosuchrevision', err.revision))
470
return SuccessfulSmartServerResponse(('ok', ), text)
473
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
475
def do_repository_request(self, repository):
476
"""Return the result of repository.is_shared().
478
:param repository: The repository to query in.
479
:return: A smart server response of ('yes', ) if the repository is
480
shared, and ('no', ) if it is not.
482
if repository.is_shared():
483
return SuccessfulSmartServerResponse(('yes', ))
485
return SuccessfulSmartServerResponse(('no', ))
488
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
490
def do_repository_request(self, repository):
491
"""Return the result of repository.make_working_trees().
493
Introduced in bzr 2.5.0.
495
:param repository: The repository to query in.
496
:return: A smart server response of ('yes', ) if the repository uses
497
working trees, and ('no', ) if it is not.
499
if repository.make_working_trees():
500
return SuccessfulSmartServerResponse(('yes', ))
502
return SuccessfulSmartServerResponse(('no', ))
505
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
507
def do_repository_request(self, repository, token=''):
508
# XXX: this probably should not have a token.
512
token = repository.lock_write(token=token).repository_token
513
except errors.LockContention as e:
514
return FailedSmartServerResponse(('LockContention',))
515
except errors.UnlockableTransport:
516
return FailedSmartServerResponse(('UnlockableTransport',))
517
except errors.LockFailed as e:
518
return FailedSmartServerResponse(('LockFailed',
519
str(e.lock), str(e.why)))
520
if token is not None:
521
repository.leave_lock_in_place()
525
return SuccessfulSmartServerResponse(('ok', token))
528
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
530
def do_repository_request(self, repository, to_network_name):
531
"""Get a stream for inserting into a to_format repository.
533
The request body is 'search_bytes', a description of the revisions
536
In 2.3 this verb added support for search_bytes == 'everything'. Older
537
implementations will respond with a BadSearch error, and clients should
538
catch this and fallback appropriately.
540
:param repository: The repository to stream from.
541
:param to_network_name: The network name of the format of the target
544
self._to_format = network_format_registry.get(to_network_name)
545
if self._should_fake_unknown():
546
return FailedSmartServerResponse(
547
('UnknownMethod', 'Repository.get_stream'))
548
return None # Signal that we want a body.
550
def _should_fake_unknown(self):
551
"""Return True if we should return UnknownMethod to the client.
553
This is a workaround for bugs in pre-1.19 clients that claim to
554
support receiving streams of CHK repositories. The pre-1.19 client
555
expects inventory records to be serialized in the format defined by
556
to_network_name, but in pre-1.19 (at least) that format definition
557
tries to use the xml5 serializer, which does not correctly handle
558
rich-roots. After 1.19 the client can also accept inventory-deltas
559
(which avoids this issue), and those clients will use the
560
Repository.get_stream_1.19 verb instead of this one.
561
So: if this repository is CHK, and the to_format doesn't match,
562
we should just fake an UnknownSmartMethod error so that the client
563
will fallback to VFS, rather than sending it a stream we know it
566
from_format = self._repository._format
567
to_format = self._to_format
568
if not from_format.supports_chks:
569
# Source not CHK: that's ok
571
if (to_format.supports_chks and
572
from_format.repository_class is to_format.repository_class and
573
from_format._serializer == to_format._serializer):
574
# Source is CHK, but target matches: that's ok
575
# (e.g. 2a->2a, or CHK2->2a)
577
# Source is CHK, and target is not CHK or incompatible CHK. We can't
578
# generate a compatible stream.
581
def do_body(self, body_bytes):
582
repository = self._repository
583
repository.lock_read()
585
search_result, error = self.recreate_search(repository, body_bytes,
587
if error is not None:
590
source = repository._get_source(self._to_format)
591
stream = source.get_stream(search_result)
594
# On non-error, unlocking is done by the body stream handler.
598
return SuccessfulSmartServerResponse(('ok',),
599
body_stream=self.body_stream(stream, repository))
601
def body_stream(self, stream, repository):
602
byte_stream = _stream_to_byte_stream(stream, repository._format)
604
for bytes in byte_stream:
606
except errors.RevisionNotPresent as e:
607
# This shouldn't be able to happen, but as we don't buffer
608
# everything it can in theory happen.
610
yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
615
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
616
"""The same as Repository.get_stream, but will return stream CHK formats to
619
See SmartServerRepositoryGetStream._should_fake_unknown.
624
def _should_fake_unknown(self):
625
"""Returns False; we don't need to workaround bugs in 1.19+ clients."""
629
def _stream_to_byte_stream(stream, src_format):
630
"""Convert a record stream to a self delimited byte stream."""
631
pack_writer = pack.ContainerSerialiser()
632
yield pack_writer.begin()
633
yield pack_writer.bytes_record(src_format.network_name(), '')
634
for substream_type, substream in stream:
635
for record in substream:
636
if record.storage_kind in ('chunked', 'fulltext'):
637
serialised = record_to_fulltext_bytes(record)
638
elif record.storage_kind == 'absent':
639
raise ValueError("Absent factory for %s" % (record.key,))
641
serialised = record.get_bytes_as(record.storage_kind)
643
# Some streams embed the whole stream into the wire
644
# representation of the first record, which means that
645
# later records have no wire representation: we skip them.
646
yield pack_writer.bytes_record(serialised, [(substream_type,)])
647
yield pack_writer.end()
650
class _ByteStreamDecoder(object):
651
"""Helper for _byte_stream_to_stream.
653
The expected usage of this class is via the function _byte_stream_to_stream
654
which creates a _ByteStreamDecoder, pops off the stream format and then
655
yields the output of record_stream(), the main entry point to
658
Broadly this class has to unwrap two layers of iterators:
662
This is complicated by wishing to return type, iterator_for_type, but
663
getting the data for iterator_for_type when we find out type: we can't
664
simply pass a generator down to the NetworkRecordStream parser, instead
665
we have a little local state to seed each NetworkRecordStream instance,
666
and gather the type that we'll be yielding.
668
:ivar byte_stream: The byte stream being decoded.
669
:ivar stream_decoder: A pack parser used to decode the bytestream
670
:ivar current_type: The current type, used to join adjacent records of the
671
same type into a single stream.
672
:ivar first_bytes: The first bytes to give the next NetworkRecordStream.
675
def __init__(self, byte_stream, record_counter):
676
"""Create a _ByteStreamDecoder."""
677
self.stream_decoder = pack.ContainerPushParser()
678
self.current_type = None
679
self.first_bytes = None
680
self.byte_stream = byte_stream
681
self._record_counter = record_counter
684
def iter_stream_decoder(self):
685
"""Iterate the contents of the pack from stream_decoder."""
686
# dequeue pending items
687
for record in self.stream_decoder.read_pending_records():
689
# Pull bytes of the wire, decode them to records, yield those records.
690
for bytes in self.byte_stream:
691
self.stream_decoder.accept_bytes(bytes)
692
for record in self.stream_decoder.read_pending_records():
695
def iter_substream_bytes(self):
696
if self.first_bytes is not None:
697
yield self.first_bytes
698
# If we run out of pack records, single the outer layer to stop.
699
self.first_bytes = None
700
for record in self.iter_pack_records:
701
record_names, record_bytes = record
702
record_name, = record_names
703
substream_type = record_name[0]
704
if substream_type != self.current_type:
705
# end of a substream, seed the next substream.
706
self.current_type = substream_type
707
self.first_bytes = record_bytes
711
def record_stream(self):
712
"""Yield substream_type, substream from the byte stream."""
713
def wrap_and_count(pb, rc, substream):
714
"""Yield records from stream while showing progress."""
717
if self.current_type != 'revisions' and self.key_count != 0:
718
# As we know the number of revisions now (in self.key_count)
719
# we can setup and use record_counter (rc).
720
if not rc.is_initialized():
721
rc.setup(self.key_count, self.key_count)
722
for record in substream.read():
724
if rc.is_initialized() and counter == rc.STEP:
725
rc.increment(counter)
726
pb.update('Estimate', rc.current, rc.max)
728
if self.current_type == 'revisions':
729
# Total records is proportional to number of revs
730
# to fetch. With remote, we used self.key_count to
731
# track the number of revs. Once we have the revs
732
# counts in self.key_count, the progress bar changes
733
# from 'Estimating..' to 'Estimate' above.
735
if counter == rc.STEP:
736
pb.update('Estimating..', self.key_count)
742
pb = ui.ui_factory.nested_progress_bar()
743
rc = self._record_counter
745
# Make and consume sub generators, one per substream type:
746
while self.first_bytes is not None:
747
substream = NetworkRecordStream(self.iter_substream_bytes())
748
# after substream is fully consumed, self.current_type is set
749
# to the next type, and self.first_bytes is set to the matching
751
yield self.current_type, wrap_and_count(pb, rc, substream)
754
pb.update('Done', rc.max, rc.max)
757
def seed_state(self):
758
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
759
# Set a single generator we can use to get data from the pack stream.
760
self.iter_pack_records = self.iter_stream_decoder()
761
# Seed the very first subiterator with content; after this each one
763
list(self.iter_substream_bytes())
766
def _byte_stream_to_stream(byte_stream, record_counter=None):
767
"""Convert a byte stream into a format and a stream.
769
:param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
770
:return: (RepositoryFormat, stream_generator)
772
decoder = _ByteStreamDecoder(byte_stream, record_counter)
773
for bytes in byte_stream:
774
decoder.stream_decoder.accept_bytes(bytes)
775
for record in decoder.stream_decoder.read_pending_records(max=1):
776
record_names, src_format_name = record
777
src_format = network_format_registry.get(src_format_name)
778
return src_format, decoder.record_stream()
781
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
783
def do_repository_request(self, repository, token):
785
repository.lock_write(token=token)
786
except errors.TokenMismatch as e:
787
return FailedSmartServerResponse(('TokenMismatch',))
788
repository.dont_leave_lock_in_place()
790
return SuccessfulSmartServerResponse(('ok',))
793
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
794
"""Get the physical lock status for a repository.
799
def do_repository_request(self, repository):
800
if repository.get_physical_lock_status():
801
return SuccessfulSmartServerResponse(('yes', ))
803
return SuccessfulSmartServerResponse(('no', ))
806
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
808
def do_repository_request(self, repository, str_bool_new_value):
809
if str_bool_new_value == 'True':
813
repository.set_make_working_trees(new_value)
814
return SuccessfulSmartServerResponse(('ok',))
817
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
818
"""Get the raw repository files as a tarball.
820
The returned tarball contains a .bzr control directory which in turn
821
contains a repository.
823
This takes one parameter, compression, which currently must be
826
This is used to implement the Repository.copy_content_into operation.
829
def do_repository_request(self, repository, compression):
830
tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
832
controldir_name = tmp_dirname + '/.bzr'
833
return self._tarfile_response(controldir_name, compression)
835
osutils.rmtree(tmp_dirname)
837
def _copy_to_tempdir(self, from_repo):
838
tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
839
tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
840
tmp_repo = from_repo._format.initialize(tmp_bzrdir)
841
from_repo.copy_content_into(tmp_repo)
842
return tmp_dirname, tmp_repo
844
def _tarfile_response(self, tmp_dirname, compression):
845
temp = tempfile.NamedTemporaryFile()
847
self._tarball_of_dir(tmp_dirname, compression, temp.file)
848
# all finished; write the tempfile out to the network
850
return SuccessfulSmartServerResponse(('ok',), temp.read())
851
# FIXME: Don't read the whole thing into memory here; rather stream
852
# it out from the file onto the network. mbp 20070411
856
def _tarball_of_dir(self, dirname, compression, ofile):
858
filename = os.path.basename(ofile.name)
859
tarball = tarfile.open(fileobj=ofile, name=filename,
860
mode='w|' + compression)
862
# The tarball module only accepts ascii names, and (i guess)
863
# packs them with their 8bit names. We know all the files
864
# within the repository have ASCII names so the should be safe
866
dirname = dirname.encode(sys.getfilesystemencoding())
867
# python's tarball module includes the whole path by default so
869
if not dirname.endswith('.bzr'):
870
raise ValueError(dirname)
871
tarball.add(dirname, '.bzr') # recursive by default
876
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
877
"""Insert a record stream from a RemoteSink into a repository.
879
This gets bytes pushed to it by the network infrastructure and turns that
880
into a bytes iterator using a thread. That is then processed by
881
_byte_stream_to_stream.
886
def do_repository_request(self, repository, resume_tokens, lock_token):
887
"""StreamSink.insert_stream for a remote repository."""
888
repository.lock_write(token=lock_token)
889
self.do_insert_stream_request(repository, resume_tokens)
891
def do_insert_stream_request(self, repository, resume_tokens):
892
tokens = [token for token in resume_tokens.split(' ') if token]
894
self.repository = repository
895
self.queue = queue.Queue()
896
self.insert_thread = threading.Thread(target=self._inserter_thread)
897
self.insert_thread.start()
899
def do_chunk(self, body_stream_chunk):
900
self.queue.put(body_stream_chunk)
902
def _inserter_thread(self):
904
src_format, stream = _byte_stream_to_stream(
905
self.blocking_byte_stream())
906
self.insert_result = self.repository._get_sink().insert_stream(
907
stream, src_format, self.tokens)
908
self.insert_ok = True
910
self.insert_exception = sys.exc_info()
911
self.insert_ok = False
913
def blocking_byte_stream(self):
915
bytes = self.queue.get()
916
if bytes is StopIteration:
922
self.queue.put(StopIteration)
923
if self.insert_thread is not None:
924
self.insert_thread.join()
925
if not self.insert_ok:
927
reraise(*self.insert_exception)
929
del self.insert_exception
930
write_group_tokens, missing_keys = self.insert_result
931
if write_group_tokens or missing_keys:
932
# bzip needed? missing keys should typically be a small set.
933
# Should this be a streaming body response ?
934
missing_keys = sorted(missing_keys)
935
bytes = bencode.bencode((write_group_tokens, missing_keys))
936
self.repository.unlock()
937
return SuccessfulSmartServerResponse(('missing-basis', bytes))
939
self.repository.unlock()
940
return SuccessfulSmartServerResponse(('ok', ))
943
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
944
"""Insert a record stream from a RemoteSink into a repository.
946
Same as SmartServerRepositoryInsertStreamLocked, except:
947
- the lock token argument is optional
948
- servers that implement this verb accept 'inventory-delta' records in the
954
def do_repository_request(self, repository, resume_tokens, lock_token=None):
955
"""StreamSink.insert_stream for a remote repository."""
956
SmartServerRepositoryInsertStreamLocked.do_repository_request(
957
self, repository, resume_tokens, lock_token)
960
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
961
"""Insert a record stream from a RemoteSink into an unlocked repository.
963
This is the same as SmartServerRepositoryInsertStreamLocked, except it
964
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
965
like pack format) repository.
970
def do_repository_request(self, repository, resume_tokens):
971
"""StreamSink.insert_stream for a remote repository."""
972
repository.lock_write()
973
self.do_insert_stream_request(repository, resume_tokens)
976
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
977
"""Add a revision signature text.
982
def do_repository_request(self, repository, lock_token, revision_id,
983
*write_group_tokens):
984
"""Add a revision signature text.
986
:param repository: Repository to operate on
987
:param lock_token: Lock token
988
:param revision_id: Revision for which to add signature
989
:param write_group_tokens: Write group tokens
991
self._lock_token = lock_token
992
self._revision_id = revision_id
993
self._write_group_tokens = write_group_tokens
996
def do_body(self, body_bytes):
997
"""Add a signature text.
999
:param body_bytes: GPG signature text
1000
:return: SuccessfulSmartServerResponse with arguments 'ok' and
1001
the list of new write group tokens.
1003
self._repository.lock_write(token=self._lock_token)
1005
self._repository.resume_write_group(self._write_group_tokens)
1007
self._repository.add_signature_text(self._revision_id,
1010
new_write_group_tokens = self._repository.suspend_write_group()
1012
self._repository.unlock()
1013
return SuccessfulSmartServerResponse(
1014
('ok', ) + tuple(new_write_group_tokens))
1017
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1018
"""Start a write group.
1023
def do_repository_request(self, repository, lock_token):
1024
"""Start a write group."""
1025
repository.lock_write(token=lock_token)
1027
repository.start_write_group()
1029
tokens = repository.suspend_write_group()
1030
except errors.UnsuspendableWriteGroup:
1031
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1034
return SuccessfulSmartServerResponse(('ok', tokens))
1037
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1038
"""Commit a write group.
1043
def do_repository_request(self, repository, lock_token,
1044
write_group_tokens):
1045
"""Commit a write group."""
1046
repository.lock_write(token=lock_token)
1049
repository.resume_write_group(write_group_tokens)
1050
except errors.UnresumableWriteGroup as e:
1051
return FailedSmartServerResponse(
1052
('UnresumableWriteGroup', e.write_groups, e.reason))
1054
repository.commit_write_group()
1056
write_group_tokens = repository.suspend_write_group()
1057
# FIXME JRV 2011-11-19: What if the write_group_tokens
1062
return SuccessfulSmartServerResponse(('ok', ))
1065
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1066
"""Abort a write group.
1071
def do_repository_request(self, repository, lock_token, write_group_tokens):
1072
"""Abort a write group."""
1073
repository.lock_write(token=lock_token)
1076
repository.resume_write_group(write_group_tokens)
1077
except errors.UnresumableWriteGroup as e:
1078
return FailedSmartServerResponse(
1079
('UnresumableWriteGroup', e.write_groups, e.reason))
1080
repository.abort_write_group()
1083
return SuccessfulSmartServerResponse(('ok', ))
1086
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1087
"""Check that a write group is still valid.
1092
def do_repository_request(self, repository, lock_token, write_group_tokens):
1093
"""Abort a write group."""
1094
repository.lock_write(token=lock_token)
1097
repository.resume_write_group(write_group_tokens)
1098
except errors.UnresumableWriteGroup as e:
1099
return FailedSmartServerResponse(
1100
('UnresumableWriteGroup', e.write_groups, e.reason))
1102
repository.suspend_write_group()
1105
return SuccessfulSmartServerResponse(('ok', ))
1108
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1109
"""Retrieve all of the revision ids in a repository.
1114
def do_repository_request(self, repository):
1115
revids = repository.all_revision_ids()
1116
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1119
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1120
"""Reconcile a repository.
1125
def do_repository_request(self, repository, lock_token):
1127
repository.lock_write(token=lock_token)
1128
except errors.TokenLockingNotSupported as e:
1129
return FailedSmartServerResponse(
1130
('TokenLockingNotSupported', ))
1132
reconciler = repository.reconcile()
1136
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1137
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1139
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1142
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1143
"""Pack a repository.
1148
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1149
self._repository = repository
1150
self._lock_token = lock_token
1151
if clean_obsolete_packs == 'True':
1152
self._clean_obsolete_packs = True
1154
self._clean_obsolete_packs = False
1157
def do_body(self, body_bytes):
1158
if body_bytes == "":
1161
hint = body_bytes.splitlines()
1162
self._repository.lock_write(token=self._lock_token)
1164
self._repository.pack(hint, self._clean_obsolete_packs)
1166
self._repository.unlock()
1167
return SuccessfulSmartServerResponse(("ok", ), )
1170
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1171
"""Iterate over the contents of files.
1173
The client sends a list of desired files to stream, one
1174
per line, and as tuples of file id and revision, separated by
1177
The server replies with a stream. Each entry is preceded by a header,
1178
which can either be:
1180
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1181
list sent by the client. This header is followed by the contents of
1182
the file, bzip2-compressed.
1183
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1184
The client can then raise an appropriate RevisionNotPresent error
1185
or check its fallback repositories.
1190
def body_stream(self, repository, desired_files):
1191
self._repository.lock_read()
1194
for i, key in enumerate(desired_files):
1196
for record in repository.texts.get_record_stream(text_keys,
1198
identifier = text_keys[record.key]
1199
if record.storage_kind == 'absent':
1200
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1201
record.key[1], identifier)
1202
# FIXME: Way to abort early?
1204
yield "ok\0%d\n" % identifier
1205
compressor = zlib.compressobj()
1206
for bytes in record.get_bytes_as('chunked'):
1207
data = compressor.compress(bytes)
1210
data = compressor.flush()
1214
self._repository.unlock()
1216
def do_body(self, body_bytes):
1218
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1219
return SuccessfulSmartServerResponse(('ok', ),
1220
body_stream=self.body_stream(self._repository, desired_files))
1222
def do_repository_request(self, repository):
1223
# Signal that we want a body
1227
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1228
"""Stream a list of revisions.
1230
The client sends a list of newline-separated revision ids in the
1231
body of the request and the server replies with the serializer format,
1232
and a stream of bzip2-compressed revision texts (using the specified
1235
Any revisions the server does not have are omitted from the stream.
1240
def do_repository_request(self, repository):
1241
self._repository = repository
1242
# Signal there is a body
1245
def do_body(self, body_bytes):
1246
revision_ids = body_bytes.split("\n")
1247
return SuccessfulSmartServerResponse(
1248
('ok', self._repository.get_serializer_format()),
1249
body_stream=self.body_stream(self._repository, revision_ids))
1251
def body_stream(self, repository, revision_ids):
1252
self._repository.lock_read()
1254
for record in repository.revisions.get_record_stream(
1255
[(revid,) for revid in revision_ids], 'unordered', True):
1256
if record.storage_kind == 'absent':
1258
yield zlib.compress(record.get_bytes_as('fulltext'))
1260
self._repository.unlock()
1263
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1264
"""Get the inventory deltas for a set of revision ids.
1266
This accepts a list of revision ids, and then sends a chain
1267
of deltas for the inventories of those revisions. The first
1268
revision will be empty.
1270
The server writes back zlibbed serialized inventory deltas,
1271
in the ordering specified. The base for each delta is the
1272
inventory generated by the previous delta.
1277
def _inventory_delta_stream(self, repository, ordering, revids):
1278
prev_inv = _mod_inventory.Inventory(root_id=None,
1279
revision_id=_mod_revision.NULL_REVISION)
1280
serializer = inventory_delta.InventoryDeltaSerializer(
1281
repository.supports_rich_root(),
1282
repository._format.supports_tree_reference)
1283
repository.lock_read()
1285
for inv, revid in repository._iter_inventories(revids, ordering):
1288
inv_delta = inv._make_delta(prev_inv)
1289
lines = serializer.delta_to_lines(
1290
prev_inv.revision_id, inv.revision_id, inv_delta)
1291
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1296
def body_stream(self, repository, ordering, revids):
1297
substream = self._inventory_delta_stream(repository,
1299
return _stream_to_byte_stream([('inventory-deltas', substream)],
1302
def do_body(self, body_bytes):
1303
return SuccessfulSmartServerResponse(('ok', ),
1304
body_stream=self.body_stream(self._repository, self._ordering,
1305
body_bytes.splitlines()))
1307
def do_repository_request(self, repository, ordering):
1308
if ordering == 'unordered':
1309
# inventory deltas for a topologically sorted stream
1310
# are likely to be smaller
1311
ordering = 'topological'
1312
self._ordering = ordering
1313
# Signal that we want a body