227
225
# add parents to the result
228
226
result[encoded_id] = parents
229
227
# Approximate the serialized cost of this revision_id.
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
228
line = '%s %s\n' % (encoded_id, ' '.join(parents))
229
estimator.add_content(line)
231
230
# get all the directly asked for parents, and then flesh out to
232
231
# 64K (compressed) or so. We do one level of depth at a time to
233
232
# stay in sync with the client. The 250000 magic number is
234
233
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
234
if self.no_extra_results or (first_loop_done and estimator.full()):
235
trace.mutter('size: %d, z_size: %d'
236
% (estimator._uncompressed_size_added,
237
estimator._compressed_size_added))
237
238
next_revs = set()
239
240
# don't query things we've already queried
240
next_revs.difference_update(queried_revs)
241
next_revs = next_revs.difference(queried_revs)
241
242
first_loop_done = True
245
def _do_repository_request(self, body_bytes):
246
repository = self._repository
247
revision_ids = set(self._revision_ids)
248
include_missing = 'include-missing:' in revision_ids
250
revision_ids.remove('include-missing:')
251
body_lines = body_bytes.split('\n')
252
search_result, error = self.recreate_search_from_recipe(
253
repository, body_lines)
254
if error is not None:
256
# TODO might be nice to start up the search again; but thats not
257
# written or tested yet.
258
client_seen_revs = set(search_result.get_keys())
259
# Always include the requested ids.
260
client_seen_revs.difference_update(revision_ids)
262
repo_graph = repository.get_graph()
263
result = self._expand_requested_revs(repo_graph, revision_ids,
264
client_seen_revs, include_missing)
243
266
# sorting trivially puts lexographically similar revision ids together.
244
267
# Compression FTW.
245
269
for revision, parents in sorted(result.items()):
246
270
lines.append(' '.join((revision, ) + tuple(parents)))
581
700
def record_stream(self):
582
701
"""Yield substream_type, substream from the byte stream."""
702
def wrap_and_count(pb, rc, substream):
703
"""Yield records from stream while showing progress."""
706
if self.current_type != 'revisions' and self.key_count != 0:
707
# As we know the number of revisions now (in self.key_count)
708
# we can setup and use record_counter (rc).
709
if not rc.is_initialized():
710
rc.setup(self.key_count, self.key_count)
711
for record in substream.read():
713
if rc.is_initialized() and counter == rc.STEP:
714
rc.increment(counter)
715
pb.update('Estimate', rc.current, rc.max)
717
if self.current_type == 'revisions':
718
# Total records is proportional to number of revs
719
# to fetch. With remote, we used self.key_count to
720
# track the number of revs. Once we have the revs
721
# counts in self.key_count, the progress bar changes
722
# from 'Estimating..' to 'Estimate' above.
724
if counter == rc.STEP:
725
pb.update('Estimating..', self.key_count)
583
730
self.seed_state()
731
pb = ui.ui_factory.nested_progress_bar()
732
rc = self._record_counter
584
733
# Make and consume sub generators, one per substream type:
585
734
while self.first_bytes is not None:
586
735
substream = NetworkRecordStream(self.iter_substream_bytes())
587
736
# after substream is fully consumed, self.current_type is set to
588
737
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
738
yield self.current_type, wrap_and_count(pb, rc, substream)
740
pb.update('Done', rc.max, rc.max)
591
743
def seed_state(self):
592
744
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
792
957
self.do_insert_stream_request(repository, resume_tokens)
960
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
961
"""Add a revision signature text.
966
def do_repository_request(self, repository, lock_token, revision_id,
967
*write_group_tokens):
968
"""Add a revision signature text.
970
:param repository: Repository to operate on
971
:param lock_token: Lock token
972
:param revision_id: Revision for which to add signature
973
:param write_group_tokens: Write group tokens
975
self._lock_token = lock_token
976
self._revision_id = revision_id
977
self._write_group_tokens = write_group_tokens
980
def do_body(self, body_bytes):
981
"""Add a signature text.
983
:param body_bytes: GPG signature text
984
:return: SuccessfulSmartServerResponse with arguments 'ok' and
985
the list of new write group tokens.
987
self._repository.lock_write(token=self._lock_token)
989
self._repository.resume_write_group(self._write_group_tokens)
991
self._repository.add_signature_text(self._revision_id,
994
new_write_group_tokens = self._repository.suspend_write_group()
996
self._repository.unlock()
997
return SuccessfulSmartServerResponse(
998
('ok', ) + tuple(new_write_group_tokens))
1001
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1002
"""Start a write group.
1007
def do_repository_request(self, repository, lock_token):
1008
"""Start a write group."""
1009
repository.lock_write(token=lock_token)
1011
repository.start_write_group()
1013
tokens = repository.suspend_write_group()
1014
except errors.UnsuspendableWriteGroup:
1015
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1018
return SuccessfulSmartServerResponse(('ok', tokens))
1021
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1022
"""Commit a write group.
1027
def do_repository_request(self, repository, lock_token,
1028
write_group_tokens):
1029
"""Commit a write group."""
1030
repository.lock_write(token=lock_token)
1033
repository.resume_write_group(write_group_tokens)
1034
except errors.UnresumableWriteGroup, e:
1035
return FailedSmartServerResponse(
1036
('UnresumableWriteGroup', e.write_groups, e.reason))
1038
repository.commit_write_group()
1040
write_group_tokens = repository.suspend_write_group()
1041
# FIXME JRV 2011-11-19: What if the write_group_tokens
1046
return SuccessfulSmartServerResponse(('ok', ))
1049
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1050
"""Abort a write group.
1055
def do_repository_request(self, repository, lock_token, write_group_tokens):
1056
"""Abort a write group."""
1057
repository.lock_write(token=lock_token)
1060
repository.resume_write_group(write_group_tokens)
1061
except errors.UnresumableWriteGroup, e:
1062
return FailedSmartServerResponse(
1063
('UnresumableWriteGroup', e.write_groups, e.reason))
1064
repository.abort_write_group()
1067
return SuccessfulSmartServerResponse(('ok', ))
1070
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1071
"""Check that a write group is still valid.
1076
def do_repository_request(self, repository, lock_token, write_group_tokens):
1077
"""Abort a write group."""
1078
repository.lock_write(token=lock_token)
1081
repository.resume_write_group(write_group_tokens)
1082
except errors.UnresumableWriteGroup, e:
1083
return FailedSmartServerResponse(
1084
('UnresumableWriteGroup', e.write_groups, e.reason))
1086
repository.suspend_write_group()
1089
return SuccessfulSmartServerResponse(('ok', ))
1092
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1093
"""Retrieve all of the revision ids in a repository.
1098
def do_repository_request(self, repository):
1099
revids = repository.all_revision_ids()
1100
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1103
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1104
"""Pack a repository.
1109
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1110
self._repository = repository
1111
self._lock_token = lock_token
1112
if clean_obsolete_packs == 'True':
1113
self._clean_obsolete_packs = True
1115
self._clean_obsolete_packs = False
1118
def do_body(self, body_bytes):
1119
if body_bytes == "":
1122
hint = body_bytes.splitlines()
1123
self._repository.lock_write(token=self._lock_token)
1125
self._repository.pack(hint, self._clean_obsolete_packs)
1127
self._repository.unlock()
1128
return SuccessfulSmartServerResponse(("ok", ), )
1131
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1132
"""Iterate over the contents of files.
1134
The client sends a list of desired files to stream, one
1135
per line, and as tuples of file id and revision, separated by
1138
The server replies with a stream. Each entry is preceded by a header,
1139
which can either be:
1141
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1142
list sent by the client. This header is followed by the contents of
1143
the file, bzip2-compressed.
1144
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1145
The client can then raise an appropriate RevisionNotPresent error
1146
or check its fallback repositories.
1151
def body_stream(self, repository, desired_files):
1152
self._repository.lock_read()
1155
for i, key in enumerate(desired_files):
1157
for record in repository.texts.get_record_stream(text_keys,
1159
identifier = text_keys[record.key]
1160
if record.storage_kind == 'absent':
1161
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1162
record.key[1], identifier)
1163
# FIXME: Way to abort early?
1165
yield "ok\0%d\n" % identifier
1166
compressor = zlib.compressobj()
1167
for bytes in record.get_bytes_as('chunked'):
1168
data = compressor.compress(bytes)
1171
data = compressor.flush()
1175
self._repository.unlock()
1177
def do_body(self, body_bytes):
1179
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1180
return SuccessfulSmartServerResponse(('ok', ),
1181
body_stream=self.body_stream(self._repository, desired_files))
1183
def do_repository_request(self, repository):
1184
# Signal that we want a body
1188
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1189
"""Stream a list of revisions.
1191
The client sends a list of newline-separated revision ids in the
1192
body of the request and the server replies with the serializer format,
1193
and a stream of bzip2-compressed revision texts (using the specified
1196
Any revisions the server does not have are omitted from the stream.
1201
def do_repository_request(self, repository):
1202
self._repository = repository
1203
# Signal there is a body
1206
def do_body(self, body_bytes):
1207
revision_ids = body_bytes.split("\n")
1208
return SuccessfulSmartServerResponse(
1209
('ok', self._repository.get_serializer_format()),
1210
body_stream=self.body_stream(self._repository, revision_ids))
1212
def body_stream(self, repository, revision_ids):
1213
self._repository.lock_read()
1215
for record in repository.revisions.get_record_stream(
1216
[(revid,) for revid in revision_ids], 'unordered', True):
1217
if record.storage_kind == 'absent':
1219
yield zlib.compress(record.get_bytes_as('fulltext'))
1221
self._repository.unlock()