227
238
# add parents to the result
228
239
result[encoded_id] = parents
229
240
# Approximate the serialized cost of this revision_id.
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
241
line = '%s %s\n' % (encoded_id, ' '.join(parents))
242
estimator.add_content(line)
231
243
# get all the directly asked for parents, and then flesh out to
232
244
# 64K (compressed) or so. We do one level of depth at a time to
233
245
# stay in sync with the client. The 250000 magic number is
234
246
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
247
if self.no_extra_results or (first_loop_done and estimator.full()):
248
trace.mutter('size: %d, z_size: %d'
249
% (estimator._uncompressed_size_added,
250
estimator._compressed_size_added))
237
251
next_revs = set()
239
253
# don't query things we've already queried
240
next_revs.difference_update(queried_revs)
254
next_revs = next_revs.difference(queried_revs)
241
255
first_loop_done = True
258
def _do_repository_request(self, body_bytes):
259
repository = self._repository
260
revision_ids = set(self._revision_ids)
261
include_missing = 'include-missing:' in revision_ids
263
revision_ids.remove('include-missing:')
264
body_lines = body_bytes.split('\n')
265
search_result, error = self.recreate_search_from_recipe(
266
repository, body_lines)
267
if error is not None:
269
# TODO might be nice to start up the search again; but thats not
270
# written or tested yet.
271
client_seen_revs = set(search_result.get_keys())
272
# Always include the requested ids.
273
client_seen_revs.difference_update(revision_ids)
275
repo_graph = repository.get_graph()
276
result = self._expand_requested_revs(repo_graph, revision_ids,
277
client_seen_revs, include_missing)
243
279
# sorting trivially puts lexographically similar revision ids together.
244
280
# Compression FTW.
245
282
for revision, parents in sorted(result.items()):
246
283
lines.append(' '.join((revision, ) + tuple(parents)))
581
711
def record_stream(self):
582
712
"""Yield substream_type, substream from the byte stream."""
713
def wrap_and_count(pb, rc, substream):
714
"""Yield records from stream while showing progress."""
717
if self.current_type != 'revisions' and self.key_count != 0:
718
# As we know the number of revisions now (in self.key_count)
719
# we can setup and use record_counter (rc).
720
if not rc.is_initialized():
721
rc.setup(self.key_count, self.key_count)
722
for record in substream.read():
724
if rc.is_initialized() and counter == rc.STEP:
725
rc.increment(counter)
726
pb.update('Estimate', rc.current, rc.max)
728
if self.current_type == 'revisions':
729
# Total records is proportional to number of revs
730
# to fetch. With remote, we used self.key_count to
731
# track the number of revs. Once we have the revs
732
# counts in self.key_count, the progress bar changes
733
# from 'Estimating..' to 'Estimate' above.
735
if counter == rc.STEP:
736
pb.update('Estimating..', self.key_count)
583
741
self.seed_state()
584
# Make and consume sub generators, one per substream type:
585
while self.first_bytes is not None:
586
substream = NetworkRecordStream(self.iter_substream_bytes())
587
# after substream is fully consumed, self.current_type is set to
588
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
742
pb = ui.ui_factory.nested_progress_bar()
743
rc = self._record_counter
745
# Make and consume sub generators, one per substream type:
746
while self.first_bytes is not None:
747
substream = NetworkRecordStream(self.iter_substream_bytes())
748
# after substream is fully consumed, self.current_type is set
749
# to the next type, and self.first_bytes is set to the matching
751
yield self.current_type, wrap_and_count(pb, rc, substream)
754
pb.update('Done', rc.max, rc.max)
591
757
def seed_state(self):
592
758
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
792
973
self.do_insert_stream_request(repository, resume_tokens)
976
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
977
"""Add a revision signature text.
982
def do_repository_request(self, repository, lock_token, revision_id,
983
*write_group_tokens):
984
"""Add a revision signature text.
986
:param repository: Repository to operate on
987
:param lock_token: Lock token
988
:param revision_id: Revision for which to add signature
989
:param write_group_tokens: Write group tokens
991
self._lock_token = lock_token
992
self._revision_id = revision_id
993
self._write_group_tokens = write_group_tokens
996
def do_body(self, body_bytes):
997
"""Add a signature text.
999
:param body_bytes: GPG signature text
1000
:return: SuccessfulSmartServerResponse with arguments 'ok' and
1001
the list of new write group tokens.
1003
self._repository.lock_write(token=self._lock_token)
1005
self._repository.resume_write_group(self._write_group_tokens)
1007
self._repository.add_signature_text(self._revision_id,
1010
new_write_group_tokens = self._repository.suspend_write_group()
1012
self._repository.unlock()
1013
return SuccessfulSmartServerResponse(
1014
('ok', ) + tuple(new_write_group_tokens))
1017
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1018
"""Start a write group.
1023
def do_repository_request(self, repository, lock_token):
1024
"""Start a write group."""
1025
repository.lock_write(token=lock_token)
1027
repository.start_write_group()
1029
tokens = repository.suspend_write_group()
1030
except errors.UnsuspendableWriteGroup:
1031
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1034
return SuccessfulSmartServerResponse(('ok', tokens))
1037
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1038
"""Commit a write group.
1043
def do_repository_request(self, repository, lock_token,
1044
write_group_tokens):
1045
"""Commit a write group."""
1046
repository.lock_write(token=lock_token)
1049
repository.resume_write_group(write_group_tokens)
1050
except errors.UnresumableWriteGroup as e:
1051
return FailedSmartServerResponse(
1052
('UnresumableWriteGroup', e.write_groups, e.reason))
1054
repository.commit_write_group()
1056
write_group_tokens = repository.suspend_write_group()
1057
# FIXME JRV 2011-11-19: What if the write_group_tokens
1062
return SuccessfulSmartServerResponse(('ok', ))
1065
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1066
"""Abort a write group.
1071
def do_repository_request(self, repository, lock_token, write_group_tokens):
1072
"""Abort a write group."""
1073
repository.lock_write(token=lock_token)
1076
repository.resume_write_group(write_group_tokens)
1077
except errors.UnresumableWriteGroup as e:
1078
return FailedSmartServerResponse(
1079
('UnresumableWriteGroup', e.write_groups, e.reason))
1080
repository.abort_write_group()
1083
return SuccessfulSmartServerResponse(('ok', ))
1086
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1087
"""Check that a write group is still valid.
1092
def do_repository_request(self, repository, lock_token, write_group_tokens):
1093
"""Abort a write group."""
1094
repository.lock_write(token=lock_token)
1097
repository.resume_write_group(write_group_tokens)
1098
except errors.UnresumableWriteGroup as e:
1099
return FailedSmartServerResponse(
1100
('UnresumableWriteGroup', e.write_groups, e.reason))
1102
repository.suspend_write_group()
1105
return SuccessfulSmartServerResponse(('ok', ))
1108
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1109
"""Retrieve all of the revision ids in a repository.
1114
def do_repository_request(self, repository):
1115
revids = repository.all_revision_ids()
1116
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1119
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1120
"""Reconcile a repository.
1125
def do_repository_request(self, repository, lock_token):
1127
repository.lock_write(token=lock_token)
1128
except errors.TokenLockingNotSupported as e:
1129
return FailedSmartServerResponse(
1130
('TokenLockingNotSupported', ))
1132
reconciler = repository.reconcile()
1136
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1137
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1139
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1142
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1143
"""Pack a repository.
1148
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1149
self._repository = repository
1150
self._lock_token = lock_token
1151
if clean_obsolete_packs == 'True':
1152
self._clean_obsolete_packs = True
1154
self._clean_obsolete_packs = False
1157
def do_body(self, body_bytes):
1158
if body_bytes == "":
1161
hint = body_bytes.splitlines()
1162
self._repository.lock_write(token=self._lock_token)
1164
self._repository.pack(hint, self._clean_obsolete_packs)
1166
self._repository.unlock()
1167
return SuccessfulSmartServerResponse(("ok", ), )
1170
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1171
"""Iterate over the contents of files.
1173
The client sends a list of desired files to stream, one
1174
per line, and as tuples of file id and revision, separated by
1177
The server replies with a stream. Each entry is preceded by a header,
1178
which can either be:
1180
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1181
list sent by the client. This header is followed by the contents of
1182
the file, bzip2-compressed.
1183
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1184
The client can then raise an appropriate RevisionNotPresent error
1185
or check its fallback repositories.
1190
def body_stream(self, repository, desired_files):
1191
self._repository.lock_read()
1194
for i, key in enumerate(desired_files):
1196
for record in repository.texts.get_record_stream(text_keys,
1198
identifier = text_keys[record.key]
1199
if record.storage_kind == 'absent':
1200
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1201
record.key[1], identifier)
1202
# FIXME: Way to abort early?
1204
yield "ok\0%d\n" % identifier
1205
compressor = zlib.compressobj()
1206
for bytes in record.get_bytes_as('chunked'):
1207
data = compressor.compress(bytes)
1210
data = compressor.flush()
1214
self._repository.unlock()
1216
def do_body(self, body_bytes):
1218
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1219
return SuccessfulSmartServerResponse(('ok', ),
1220
body_stream=self.body_stream(self._repository, desired_files))
1222
def do_repository_request(self, repository):
1223
# Signal that we want a body
1227
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1228
"""Stream a list of revisions.
1230
The client sends a list of newline-separated revision ids in the
1231
body of the request and the server replies with the serializer format,
1232
and a stream of bzip2-compressed revision texts (using the specified
1235
Any revisions the server does not have are omitted from the stream.
1240
def do_repository_request(self, repository):
1241
self._repository = repository
1242
# Signal there is a body
1245
def do_body(self, body_bytes):
1246
revision_ids = body_bytes.split("\n")
1247
return SuccessfulSmartServerResponse(
1248
('ok', self._repository.get_serializer_format()),
1249
body_stream=self.body_stream(self._repository, revision_ids))
1251
def body_stream(self, repository, revision_ids):
1252
self._repository.lock_read()
1254
for record in repository.revisions.get_record_stream(
1255
[(revid,) for revid in revision_ids], 'unordered', True):
1256
if record.storage_kind == 'absent':
1258
yield zlib.compress(record.get_bytes_as('fulltext'))
1260
self._repository.unlock()
1263
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1264
"""Get the inventory deltas for a set of revision ids.
1266
This accepts a list of revision ids, and then sends a chain
1267
of deltas for the inventories of those revisions. The first
1268
revision will be empty.
1270
The server writes back zlibbed serialized inventory deltas,
1271
in the ordering specified. The base for each delta is the
1272
inventory generated by the previous delta.
1277
def _inventory_delta_stream(self, repository, ordering, revids):
1278
prev_inv = _mod_inventory.Inventory(root_id=None,
1279
revision_id=_mod_revision.NULL_REVISION)
1280
serializer = inventory_delta.InventoryDeltaSerializer(
1281
repository.supports_rich_root(),
1282
repository._format.supports_tree_reference)
1283
repository.lock_read()
1285
for inv, revid in repository._iter_inventories(revids, ordering):
1288
inv_delta = inv._make_delta(prev_inv)
1289
lines = serializer.delta_to_lines(
1290
prev_inv.revision_id, inv.revision_id, inv_delta)
1291
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1296
def body_stream(self, repository, ordering, revids):
1297
substream = self._inventory_delta_stream(repository,
1299
return _stream_to_byte_stream([('inventory-deltas', substream)],
1302
def do_body(self, body_bytes):
1303
return SuccessfulSmartServerResponse(('ok', ),
1304
body_stream=self.body_stream(self._repository, self._ordering,
1305
body_bytes.splitlines()))
1307
def do_repository_request(self, repository, ordering):
1308
if ordering == 'unordered':
1309
# inventory deltas for a topologically sorted stream
1310
# are likely to be smaller
1311
ordering = 'topological'
1312
self._ordering = ordering
1313
# Signal that we want a body