228
226
# add parents to the result
229
227
result[encoded_id] = parents
230
228
# Approximate the serialized cost of this revision_id.
231
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
229
line = '%s %s\n' % (encoded_id, ' '.join(parents))
230
estimator.add_content(line)
232
231
# get all the directly asked for parents, and then flesh out to
233
232
# 64K (compressed) or so. We do one level of depth at a time to
234
233
# stay in sync with the client. The 250000 magic number is
235
234
# estimated compression ratio taken from bzr.dev itself.
236
if self.no_extra_results or (
237
first_loop_done and size_so_far > 250000):
235
if self.no_extra_results or (first_loop_done and estimator.full()):
236
trace.mutter('size: %d, z_size: %d'
237
% (estimator._uncompressed_size_added,
238
estimator._compressed_size_added))
238
239
next_revs = set()
240
241
# don't query things we've already queried
241
next_revs.difference_update(queried_revs)
242
next_revs = next_revs.difference(queried_revs)
242
243
first_loop_done = True
246
def _do_repository_request(self, body_bytes):
247
repository = self._repository
248
revision_ids = set(self._revision_ids)
249
include_missing = 'include-missing:' in revision_ids
251
revision_ids.remove('include-missing:')
252
body_lines = body_bytes.split('\n')
253
search_result, error = self.recreate_search_from_recipe(
254
repository, body_lines)
255
if error is not None:
257
# TODO might be nice to start up the search again; but thats not
258
# written or tested yet.
259
client_seen_revs = set(search_result.get_keys())
260
# Always include the requested ids.
261
client_seen_revs.difference_update(revision_ids)
263
repo_graph = repository.get_graph()
264
result = self._expand_requested_revs(repo_graph, revision_ids,
265
client_seen_revs, include_missing)
244
267
# sorting trivially puts lexographically similar revision ids together.
245
268
# Compression FTW.
246
270
for revision, parents in sorted(result.items()):
247
271
lines.append(' '.join((revision, ) + tuple(parents)))
828
958
self.do_insert_stream_request(repository, resume_tokens)
961
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
962
"""Add a revision signature text.
967
def do_repository_request(self, repository, lock_token, revision_id,
968
*write_group_tokens):
969
"""Add a revision signature text.
971
:param repository: Repository to operate on
972
:param lock_token: Lock token
973
:param revision_id: Revision for which to add signature
974
:param write_group_tokens: Write group tokens
976
self._lock_token = lock_token
977
self._revision_id = revision_id
978
self._write_group_tokens = write_group_tokens
981
def do_body(self, body_bytes):
982
"""Add a signature text.
984
:param body_bytes: GPG signature text
985
:return: SuccessfulSmartServerResponse with arguments 'ok' and
986
the list of new write group tokens.
988
self._repository.lock_write(token=self._lock_token)
990
self._repository.resume_write_group(self._write_group_tokens)
992
self._repository.add_signature_text(self._revision_id,
995
new_write_group_tokens = self._repository.suspend_write_group()
997
self._repository.unlock()
998
return SuccessfulSmartServerResponse(
999
('ok', ) + tuple(new_write_group_tokens))
1002
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1003
"""Start a write group.
1008
def do_repository_request(self, repository, lock_token):
1009
"""Start a write group."""
1010
repository.lock_write(token=lock_token)
1012
repository.start_write_group()
1014
tokens = repository.suspend_write_group()
1015
except errors.UnsuspendableWriteGroup:
1016
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1019
return SuccessfulSmartServerResponse(('ok', tokens))
1022
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1023
"""Commit a write group.
1028
def do_repository_request(self, repository, lock_token,
1029
write_group_tokens):
1030
"""Commit a write group."""
1031
repository.lock_write(token=lock_token)
1034
repository.resume_write_group(write_group_tokens)
1035
except errors.UnresumableWriteGroup, e:
1036
return FailedSmartServerResponse(
1037
('UnresumableWriteGroup', e.write_groups, e.reason))
1039
repository.commit_write_group()
1041
write_group_tokens = repository.suspend_write_group()
1042
# FIXME JRV 2011-11-19: What if the write_group_tokens
1047
return SuccessfulSmartServerResponse(('ok', ))
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1051
"""Abort a write group.
1056
def do_repository_request(self, repository, lock_token, write_group_tokens):
1057
"""Abort a write group."""
1058
repository.lock_write(token=lock_token)
1061
repository.resume_write_group(write_group_tokens)
1062
except errors.UnresumableWriteGroup, e:
1063
return FailedSmartServerResponse(
1064
('UnresumableWriteGroup', e.write_groups, e.reason))
1065
repository.abort_write_group()
1068
return SuccessfulSmartServerResponse(('ok', ))
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1072
"""Check that a write group is still valid.
1077
def do_repository_request(self, repository, lock_token, write_group_tokens):
1078
"""Abort a write group."""
1079
repository.lock_write(token=lock_token)
1082
repository.resume_write_group(write_group_tokens)
1083
except errors.UnresumableWriteGroup, e:
1084
return FailedSmartServerResponse(
1085
('UnresumableWriteGroup', e.write_groups, e.reason))
1087
repository.suspend_write_group()
1090
return SuccessfulSmartServerResponse(('ok', ))
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1094
"""Retrieve all of the revision ids in a repository.
1099
def do_repository_request(self, repository):
1100
revids = repository.all_revision_ids()
1101
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1104
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1105
"""Reconcile a repository.
1110
def do_repository_request(self, repository, lock_token):
1112
repository.lock_write(token=lock_token)
1113
except errors.TokenLockingNotSupported, e:
1114
return FailedSmartServerResponse(
1115
('TokenLockingNotSupported', ))
1117
reconciler = repository.reconcile()
1121
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1122
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1124
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1127
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1128
"""Pack a repository.
1133
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1134
self._repository = repository
1135
self._lock_token = lock_token
1136
if clean_obsolete_packs == 'True':
1137
self._clean_obsolete_packs = True
1139
self._clean_obsolete_packs = False
1142
def do_body(self, body_bytes):
1143
if body_bytes == "":
1146
hint = body_bytes.splitlines()
1147
self._repository.lock_write(token=self._lock_token)
1149
self._repository.pack(hint, self._clean_obsolete_packs)
1151
self._repository.unlock()
1152
return SuccessfulSmartServerResponse(("ok", ), )
1155
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1156
"""Iterate over the contents of files.
1158
The client sends a list of desired files to stream, one
1159
per line, and as tuples of file id and revision, separated by
1162
The server replies with a stream. Each entry is preceded by a header,
1163
which can either be:
1165
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1166
list sent by the client. This header is followed by the contents of
1167
the file, bzip2-compressed.
1168
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1169
The client can then raise an appropriate RevisionNotPresent error
1170
or check its fallback repositories.
1175
def body_stream(self, repository, desired_files):
1176
self._repository.lock_read()
1179
for i, key in enumerate(desired_files):
1181
for record in repository.texts.get_record_stream(text_keys,
1183
identifier = text_keys[record.key]
1184
if record.storage_kind == 'absent':
1185
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1186
record.key[1], identifier)
1187
# FIXME: Way to abort early?
1189
yield "ok\0%d\n" % identifier
1190
compressor = zlib.compressobj()
1191
for bytes in record.get_bytes_as('chunked'):
1192
data = compressor.compress(bytes)
1195
data = compressor.flush()
1199
self._repository.unlock()
1201
def do_body(self, body_bytes):
1203
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1204
return SuccessfulSmartServerResponse(('ok', ),
1205
body_stream=self.body_stream(self._repository, desired_files))
1207
def do_repository_request(self, repository):
1208
# Signal that we want a body
1212
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1213
"""Stream a list of revisions.
1215
The client sends a list of newline-separated revision ids in the
1216
body of the request and the server replies with the serializer format,
1217
and a stream of bzip2-compressed revision texts (using the specified
1220
Any revisions the server does not have are omitted from the stream.
1225
def do_repository_request(self, repository):
1226
self._repository = repository
1227
# Signal there is a body
1230
def do_body(self, body_bytes):
1231
revision_ids = body_bytes.split("\n")
1232
return SuccessfulSmartServerResponse(
1233
('ok', self._repository.get_serializer_format()),
1234
body_stream=self.body_stream(self._repository, revision_ids))
1236
def body_stream(self, repository, revision_ids):
1237
self._repository.lock_read()
1239
for record in repository.revisions.get_record_stream(
1240
[(revid,) for revid in revision_ids], 'unordered', True):
1241
if record.storage_kind == 'absent':
1243
yield zlib.compress(record.get_bytes_as('fulltext'))
1245
self._repository.unlock()