227
231
# add parents to the result
228
232
result[encoded_id] = parents
229
233
# Approximate the serialized cost of this revision_id.
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
234
line = '%s %s\n' % (encoded_id, ' '.join(parents))
235
estimator.add_content(line)
231
236
# get all the directly asked for parents, and then flesh out to
232
237
# 64K (compressed) or so. We do one level of depth at a time to
233
238
# stay in sync with the client. The 250000 magic number is
234
239
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
240
if self.no_extra_results or (first_loop_done and estimator.full()):
241
trace.mutter('size: %d, z_size: %d'
242
% (estimator._uncompressed_size_added,
243
estimator._compressed_size_added))
237
244
next_revs = set()
239
246
# don't query things we've already queried
240
next_revs.difference_update(queried_revs)
247
next_revs = next_revs.difference(queried_revs)
241
248
first_loop_done = True
251
def _do_repository_request(self, body_bytes):
252
repository = self._repository
253
revision_ids = set(self._revision_ids)
254
include_missing = 'include-missing:' in revision_ids
256
revision_ids.remove('include-missing:')
257
body_lines = body_bytes.split('\n')
258
search_result, error = self.recreate_search_from_recipe(
259
repository, body_lines)
260
if error is not None:
262
# TODO might be nice to start up the search again; but thats not
263
# written or tested yet.
264
client_seen_revs = set(search_result.get_keys())
265
# Always include the requested ids.
266
client_seen_revs.difference_update(revision_ids)
268
repo_graph = repository.get_graph()
269
result = self._expand_requested_revs(repo_graph, revision_ids,
270
client_seen_revs, include_missing)
243
272
# sorting trivially puts lexographically similar revision ids together.
244
273
# Compression FTW.
245
275
for revision, parents in sorted(result.items()):
246
276
lines.append(' '.join((revision, ) + tuple(parents)))
581
704
def record_stream(self):
582
705
"""Yield substream_type, substream from the byte stream."""
706
def wrap_and_count(pb, rc, substream):
707
"""Yield records from stream while showing progress."""
710
if self.current_type != 'revisions' and self.key_count != 0:
711
# As we know the number of revisions now (in self.key_count)
712
# we can setup and use record_counter (rc).
713
if not rc.is_initialized():
714
rc.setup(self.key_count, self.key_count)
715
for record in substream.read():
717
if rc.is_initialized() and counter == rc.STEP:
718
rc.increment(counter)
719
pb.update('Estimate', rc.current, rc.max)
721
if self.current_type == 'revisions':
722
# Total records is proportional to number of revs
723
# to fetch. With remote, we used self.key_count to
724
# track the number of revs. Once we have the revs
725
# counts in self.key_count, the progress bar changes
726
# from 'Estimating..' to 'Estimate' above.
728
if counter == rc.STEP:
729
pb.update('Estimating..', self.key_count)
583
734
self.seed_state()
584
# Make and consume sub generators, one per substream type:
585
while self.first_bytes is not None:
586
substream = NetworkRecordStream(self.iter_substream_bytes())
587
# after substream is fully consumed, self.current_type is set to
588
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
735
with ui.ui_factory.nested_progress_bar() as pb:
736
rc = self._record_counter
738
# Make and consume sub generators, one per substream type:
739
while self.first_bytes is not None:
740
substream = NetworkRecordStream(self.iter_substream_bytes())
741
# after substream is fully consumed, self.current_type is set
742
# to the next type, and self.first_bytes is set to the matching
744
yield self.current_type, wrap_and_count(pb, rc, substream)
747
pb.update('Done', rc.max, rc.max)
591
749
def seed_state(self):
592
750
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
792
962
self.do_insert_stream_request(repository, resume_tokens)
965
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
966
"""Add a revision signature text.
971
def do_repository_request(self, repository, lock_token, revision_id,
972
*write_group_tokens):
973
"""Add a revision signature text.
975
:param repository: Repository to operate on
976
:param lock_token: Lock token
977
:param revision_id: Revision for which to add signature
978
:param write_group_tokens: Write group tokens
980
self._lock_token = lock_token
981
self._revision_id = revision_id
982
self._write_group_tokens = write_group_tokens
985
def do_body(self, body_bytes):
986
"""Add a signature text.
988
:param body_bytes: GPG signature text
989
:return: SuccessfulSmartServerResponse with arguments 'ok' and
990
the list of new write group tokens.
992
self._repository.lock_write(token=self._lock_token)
994
self._repository.resume_write_group(self._write_group_tokens)
996
self._repository.add_signature_text(self._revision_id,
999
new_write_group_tokens = self._repository.suspend_write_group()
1001
self._repository.unlock()
1002
return SuccessfulSmartServerResponse(
1003
('ok', ) + tuple(new_write_group_tokens))
1006
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1007
"""Start a write group.
1012
def do_repository_request(self, repository, lock_token):
1013
"""Start a write group."""
1014
repository.lock_write(token=lock_token)
1016
repository.start_write_group()
1018
tokens = repository.suspend_write_group()
1019
except errors.UnsuspendableWriteGroup:
1020
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1023
return SuccessfulSmartServerResponse(('ok', tokens))
1026
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1027
"""Commit a write group.
1032
def do_repository_request(self, repository, lock_token,
1033
write_group_tokens):
1034
"""Commit a write group."""
1035
repository.lock_write(token=lock_token)
1038
repository.resume_write_group(write_group_tokens)
1039
except errors.UnresumableWriteGroup as e:
1040
return FailedSmartServerResponse(
1041
('UnresumableWriteGroup', e.write_groups, e.reason))
1043
repository.commit_write_group()
1045
write_group_tokens = repository.suspend_write_group()
1046
# FIXME JRV 2011-11-19: What if the write_group_tokens
1051
return SuccessfulSmartServerResponse(('ok', ))
1054
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1055
"""Abort a write group.
1060
def do_repository_request(self, repository, lock_token, write_group_tokens):
1061
"""Abort a write group."""
1062
repository.lock_write(token=lock_token)
1065
repository.resume_write_group(write_group_tokens)
1066
except errors.UnresumableWriteGroup as e:
1067
return FailedSmartServerResponse(
1068
('UnresumableWriteGroup', e.write_groups, e.reason))
1069
repository.abort_write_group()
1072
return SuccessfulSmartServerResponse(('ok', ))
1075
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1076
"""Check that a write group is still valid.
1081
def do_repository_request(self, repository, lock_token, write_group_tokens):
1082
"""Abort a write group."""
1083
repository.lock_write(token=lock_token)
1086
repository.resume_write_group(write_group_tokens)
1087
except errors.UnresumableWriteGroup as e:
1088
return FailedSmartServerResponse(
1089
('UnresumableWriteGroup', e.write_groups, e.reason))
1091
repository.suspend_write_group()
1094
return SuccessfulSmartServerResponse(('ok', ))
1097
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1098
"""Retrieve all of the revision ids in a repository.
1103
def do_repository_request(self, repository):
1104
revids = repository.all_revision_ids()
1105
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1108
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1109
"""Reconcile a repository.
1114
def do_repository_request(self, repository, lock_token):
1116
repository.lock_write(token=lock_token)
1117
except errors.TokenLockingNotSupported as e:
1118
return FailedSmartServerResponse(
1119
('TokenLockingNotSupported', ))
1121
reconciler = repository.reconcile()
1125
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1126
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1128
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1131
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1132
"""Pack a repository.
1137
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1138
self._repository = repository
1139
self._lock_token = lock_token
1140
if clean_obsolete_packs == 'True':
1141
self._clean_obsolete_packs = True
1143
self._clean_obsolete_packs = False
1146
def do_body(self, body_bytes):
1147
if body_bytes == "":
1150
hint = body_bytes.splitlines()
1151
self._repository.lock_write(token=self._lock_token)
1153
self._repository.pack(hint, self._clean_obsolete_packs)
1155
self._repository.unlock()
1156
return SuccessfulSmartServerResponse(("ok", ), )
1159
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1160
"""Iterate over the contents of files.
1162
The client sends a list of desired files to stream, one
1163
per line, and as tuples of file id and revision, separated by
1166
The server replies with a stream. Each entry is preceded by a header,
1167
which can either be:
1169
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1170
list sent by the client. This header is followed by the contents of
1171
the file, bzip2-compressed.
1172
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1173
The client can then raise an appropriate RevisionNotPresent error
1174
or check its fallback repositories.
1179
def body_stream(self, repository, desired_files):
1180
with self._repository.lock_read():
1182
for i, key in enumerate(desired_files):
1184
for record in repository.texts.get_record_stream(text_keys,
1186
identifier = text_keys[record.key]
1187
if record.storage_kind == 'absent':
1188
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1189
record.key[1], identifier)
1190
# FIXME: Way to abort early?
1192
yield "ok\0%d\n" % identifier
1193
compressor = zlib.compressobj()
1194
for bytes in record.get_bytes_as('chunked'):
1195
data = compressor.compress(bytes)
1198
data = compressor.flush()
1202
def do_body(self, body_bytes):
1204
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1205
return SuccessfulSmartServerResponse(('ok', ),
1206
body_stream=self.body_stream(self._repository, desired_files))
1208
def do_repository_request(self, repository):
1209
# Signal that we want a body
1213
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1214
"""Stream a list of revisions.
1216
The client sends a list of newline-separated revision ids in the
1217
body of the request and the server replies with the serializer format,
1218
and a stream of bzip2-compressed revision texts (using the specified
1221
Any revisions the server does not have are omitted from the stream.
1226
def do_repository_request(self, repository):
1227
self._repository = repository
1228
# Signal there is a body
1231
def do_body(self, body_bytes):
1232
revision_ids = body_bytes.split("\n")
1233
return SuccessfulSmartServerResponse(
1234
('ok', self._repository.get_serializer_format()),
1235
body_stream=self.body_stream(self._repository, revision_ids))
1237
def body_stream(self, repository, revision_ids):
1238
with self._repository.lock_read():
1239
for record in repository.revisions.get_record_stream(
1240
[(revid,) for revid in revision_ids], 'unordered', True):
1241
if record.storage_kind == 'absent':
1243
yield zlib.compress(record.get_bytes_as('fulltext'))
1246
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1247
"""Get the inventory deltas for a set of revision ids.
1249
This accepts a list of revision ids, and then sends a chain
1250
of deltas for the inventories of those revisions. The first
1251
revision will be empty.
1253
The server writes back zlibbed serialized inventory deltas,
1254
in the ordering specified. The base for each delta is the
1255
inventory generated by the previous delta.
1260
def _inventory_delta_stream(self, repository, ordering, revids):
1261
prev_inv = _mod_inventory.Inventory(root_id=None,
1262
revision_id=_mod_revision.NULL_REVISION)
1263
serializer = inventory_delta.InventoryDeltaSerializer(
1264
repository.supports_rich_root(),
1265
repository._format.supports_tree_reference)
1266
with repository.lock_read():
1267
for inv, revid in repository._iter_inventories(revids, ordering):
1270
inv_delta = inv._make_delta(prev_inv)
1271
lines = serializer.delta_to_lines(
1272
prev_inv.revision_id, inv.revision_id, inv_delta)
1273
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1276
def body_stream(self, repository, ordering, revids):
1277
substream = self._inventory_delta_stream(repository,
1279
return _stream_to_byte_stream([('inventory-deltas', substream)],
1282
def do_body(self, body_bytes):
1283
return SuccessfulSmartServerResponse(('ok', ),
1284
body_stream=self.body_stream(self._repository, self._ordering,
1285
body_bytes.splitlines()))
1287
def do_repository_request(self, repository, ordering):
1288
if ordering == 'unordered':
1289
# inventory deltas for a topologically sorted stream
1290
# are likely to be smaller
1291
ordering = 'topological'
1292
self._ordering = ordering
1293
# Signal that we want a body