227
240
# add parents to the result
228
241
result[encoded_id] = parents
229
242
# Approximate the serialized cost of this revision_id.
230
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
243
line = '%s %s\n' % (encoded_id, ' '.join(parents))
244
estimator.add_content(line)
231
245
# get all the directly asked for parents, and then flesh out to
232
246
# 64K (compressed) or so. We do one level of depth at a time to
233
247
# stay in sync with the client. The 250000 magic number is
234
248
# estimated compression ratio taken from bzr.dev itself.
235
if self.no_extra_results or (
236
first_loop_done and size_so_far > 250000):
249
if self.no_extra_results or (first_loop_done and estimator.full()):
250
trace.mutter('size: %d, z_size: %d'
251
% (estimator._uncompressed_size_added,
252
estimator._compressed_size_added))
237
253
next_revs = set()
239
255
# don't query things we've already queried
240
next_revs.difference_update(queried_revs)
256
next_revs = next_revs.difference(queried_revs)
241
257
first_loop_done = True
260
def _do_repository_request(self, body_bytes):
261
repository = self._repository
262
revision_ids = set(self._revision_ids)
263
include_missing = 'include-missing:' in revision_ids
265
revision_ids.remove('include-missing:')
266
body_lines = body_bytes.split('\n')
267
search_result, error = self.recreate_search_from_recipe(
268
repository, body_lines)
269
if error is not None:
271
# TODO might be nice to start up the search again; but thats not
272
# written or tested yet.
273
client_seen_revs = set(search_result.get_keys())
274
# Always include the requested ids.
275
client_seen_revs.difference_update(revision_ids)
277
repo_graph = repository.get_graph()
278
result = self._expand_requested_revs(repo_graph, revision_ids,
279
client_seen_revs, include_missing)
243
281
# sorting trivially puts lexographically similar revision ids together.
244
282
# Compression FTW.
245
284
for revision, parents in sorted(result.items()):
246
285
lines.append(' '.join((revision, ) + tuple(parents)))
581
713
def record_stream(self):
582
714
"""Yield substream_type, substream from the byte stream."""
715
def wrap_and_count(pb, rc, substream):
716
"""Yield records from stream while showing progress."""
719
if self.current_type != 'revisions' and self.key_count != 0:
720
# As we know the number of revisions now (in self.key_count)
721
# we can setup and use record_counter (rc).
722
if not rc.is_initialized():
723
rc.setup(self.key_count, self.key_count)
724
for record in substream.read():
726
if rc.is_initialized() and counter == rc.STEP:
727
rc.increment(counter)
728
pb.update('Estimate', rc.current, rc.max)
730
if self.current_type == 'revisions':
731
# Total records is proportional to number of revs
732
# to fetch. With remote, we used self.key_count to
733
# track the number of revs. Once we have the revs
734
# counts in self.key_count, the progress bar changes
735
# from 'Estimating..' to 'Estimate' above.
737
if counter == rc.STEP:
738
pb.update('Estimating..', self.key_count)
583
743
self.seed_state()
584
# Make and consume sub generators, one per substream type:
585
while self.first_bytes is not None:
586
substream = NetworkRecordStream(self.iter_substream_bytes())
587
# after substream is fully consumed, self.current_type is set to
588
# the next type, and self.first_bytes is set to the matching bytes.
589
yield self.current_type, substream.read()
744
pb = ui.ui_factory.nested_progress_bar()
745
rc = self._record_counter
747
# Make and consume sub generators, one per substream type:
748
while self.first_bytes is not None:
749
substream = NetworkRecordStream(self.iter_substream_bytes())
750
# after substream is fully consumed, self.current_type is set
751
# to the next type, and self.first_bytes is set to the matching
753
yield self.current_type, wrap_and_count(pb, rc, substream)
756
pb.update('Done', rc.max, rc.max)
591
759
def seed_state(self):
592
760
"""Prepare the _ByteStreamDecoder to decode from the pack stream."""
792
975
self.do_insert_stream_request(repository, resume_tokens)
978
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
979
"""Add a revision signature text.
984
def do_repository_request(self, repository, lock_token, revision_id,
985
*write_group_tokens):
986
"""Add a revision signature text.
988
:param repository: Repository to operate on
989
:param lock_token: Lock token
990
:param revision_id: Revision for which to add signature
991
:param write_group_tokens: Write group tokens
993
self._lock_token = lock_token
994
self._revision_id = revision_id
995
self._write_group_tokens = write_group_tokens
998
def do_body(self, body_bytes):
999
"""Add a signature text.
1001
:param body_bytes: GPG signature text
1002
:return: SuccessfulSmartServerResponse with arguments 'ok' and
1003
the list of new write group tokens.
1005
self._repository.lock_write(token=self._lock_token)
1007
self._repository.resume_write_group(self._write_group_tokens)
1009
self._repository.add_signature_text(self._revision_id,
1012
new_write_group_tokens = self._repository.suspend_write_group()
1014
self._repository.unlock()
1015
return SuccessfulSmartServerResponse(
1016
('ok', ) + tuple(new_write_group_tokens))
1019
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1020
"""Start a write group.
1025
def do_repository_request(self, repository, lock_token):
1026
"""Start a write group."""
1027
repository.lock_write(token=lock_token)
1029
repository.start_write_group()
1031
tokens = repository.suspend_write_group()
1032
except errors.UnsuspendableWriteGroup:
1033
return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
1036
return SuccessfulSmartServerResponse(('ok', tokens))
1039
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1040
"""Commit a write group.
1045
def do_repository_request(self, repository, lock_token,
1046
write_group_tokens):
1047
"""Commit a write group."""
1048
repository.lock_write(token=lock_token)
1051
repository.resume_write_group(write_group_tokens)
1052
except errors.UnresumableWriteGroup as e:
1053
return FailedSmartServerResponse(
1054
('UnresumableWriteGroup', e.write_groups, e.reason))
1056
repository.commit_write_group()
1058
write_group_tokens = repository.suspend_write_group()
1059
# FIXME JRV 2011-11-19: What if the write_group_tokens
1064
return SuccessfulSmartServerResponse(('ok', ))
1067
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1068
"""Abort a write group.
1073
def do_repository_request(self, repository, lock_token, write_group_tokens):
1074
"""Abort a write group."""
1075
repository.lock_write(token=lock_token)
1078
repository.resume_write_group(write_group_tokens)
1079
except errors.UnresumableWriteGroup as e:
1080
return FailedSmartServerResponse(
1081
('UnresumableWriteGroup', e.write_groups, e.reason))
1082
repository.abort_write_group()
1085
return SuccessfulSmartServerResponse(('ok', ))
1088
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1089
"""Check that a write group is still valid.
1094
def do_repository_request(self, repository, lock_token, write_group_tokens):
1095
"""Abort a write group."""
1096
repository.lock_write(token=lock_token)
1099
repository.resume_write_group(write_group_tokens)
1100
except errors.UnresumableWriteGroup as e:
1101
return FailedSmartServerResponse(
1102
('UnresumableWriteGroup', e.write_groups, e.reason))
1104
repository.suspend_write_group()
1107
return SuccessfulSmartServerResponse(('ok', ))
1110
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1111
"""Retrieve all of the revision ids in a repository.
1116
def do_repository_request(self, repository):
1117
revids = repository.all_revision_ids()
1118
return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
1121
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1122
"""Reconcile a repository.
1127
def do_repository_request(self, repository, lock_token):
1129
repository.lock_write(token=lock_token)
1130
except errors.TokenLockingNotSupported as e:
1131
return FailedSmartServerResponse(
1132
('TokenLockingNotSupported', ))
1134
reconciler = repository.reconcile()
1138
"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1139
"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1141
return SuccessfulSmartServerResponse(('ok', ), "".join(body))
1144
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1145
"""Pack a repository.
1150
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1151
self._repository = repository
1152
self._lock_token = lock_token
1153
if clean_obsolete_packs == 'True':
1154
self._clean_obsolete_packs = True
1156
self._clean_obsolete_packs = False
1159
def do_body(self, body_bytes):
1160
if body_bytes == "":
1163
hint = body_bytes.splitlines()
1164
self._repository.lock_write(token=self._lock_token)
1166
self._repository.pack(hint, self._clean_obsolete_packs)
1168
self._repository.unlock()
1169
return SuccessfulSmartServerResponse(("ok", ), )
1172
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1173
"""Iterate over the contents of files.
1175
The client sends a list of desired files to stream, one
1176
per line, and as tuples of file id and revision, separated by
1179
The server replies with a stream. Each entry is preceded by a header,
1180
which can either be:
1182
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1183
list sent by the client. This header is followed by the contents of
1184
the file, bzip2-compressed.
1185
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1186
The client can then raise an appropriate RevisionNotPresent error
1187
or check its fallback repositories.
1192
def body_stream(self, repository, desired_files):
1193
self._repository.lock_read()
1196
for i, key in enumerate(desired_files):
1198
for record in repository.texts.get_record_stream(text_keys,
1200
identifier = text_keys[record.key]
1201
if record.storage_kind == 'absent':
1202
yield "absent\0%s\0%s\0%d\n" % (record.key[0],
1203
record.key[1], identifier)
1204
# FIXME: Way to abort early?
1206
yield "ok\0%d\n" % identifier
1207
compressor = zlib.compressobj()
1208
for bytes in record.get_bytes_as('chunked'):
1209
data = compressor.compress(bytes)
1212
data = compressor.flush()
1216
self._repository.unlock()
1218
def do_body(self, body_bytes):
1220
tuple(l.split("\0")) for l in body_bytes.splitlines()]
1221
return SuccessfulSmartServerResponse(('ok', ),
1222
body_stream=self.body_stream(self._repository, desired_files))
1224
def do_repository_request(self, repository):
1225
# Signal that we want a body
1229
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1230
"""Stream a list of revisions.
1232
The client sends a list of newline-separated revision ids in the
1233
body of the request and the server replies with the serializer format,
1234
and a stream of bzip2-compressed revision texts (using the specified
1237
Any revisions the server does not have are omitted from the stream.
1242
def do_repository_request(self, repository):
1243
self._repository = repository
1244
# Signal there is a body
1247
def do_body(self, body_bytes):
1248
revision_ids = body_bytes.split("\n")
1249
return SuccessfulSmartServerResponse(
1250
('ok', self._repository.get_serializer_format()),
1251
body_stream=self.body_stream(self._repository, revision_ids))
1253
def body_stream(self, repository, revision_ids):
1254
self._repository.lock_read()
1256
for record in repository.revisions.get_record_stream(
1257
[(revid,) for revid in revision_ids], 'unordered', True):
1258
if record.storage_kind == 'absent':
1260
yield zlib.compress(record.get_bytes_as('fulltext'))
1262
self._repository.unlock()
1265
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1266
"""Get the inventory deltas for a set of revision ids.
1268
This accepts a list of revision ids, and then sends a chain
1269
of deltas for the inventories of those revisions. The first
1270
revision will be empty.
1272
The server writes back zlibbed serialized inventory deltas,
1273
in the ordering specified. The base for each delta is the
1274
inventory generated by the previous delta.
1279
def _inventory_delta_stream(self, repository, ordering, revids):
1280
prev_inv = _mod_inventory.Inventory(root_id=None,
1281
revision_id=_mod_revision.NULL_REVISION)
1282
serializer = inventory_delta.InventoryDeltaSerializer(
1283
repository.supports_rich_root(),
1284
repository._format.supports_tree_reference)
1285
repository.lock_read()
1287
for inv, revid in repository._iter_inventories(revids, ordering):
1290
inv_delta = inv._make_delta(prev_inv)
1291
lines = serializer.delta_to_lines(
1292
prev_inv.revision_id, inv.revision_id, inv_delta)
1293
yield ChunkedContentFactory(inv.revision_id, None, None, lines)
1298
def body_stream(self, repository, ordering, revids):
1299
substream = self._inventory_delta_stream(repository,
1301
return _stream_to_byte_stream([('inventory-deltas', substream)],
1304
def do_body(self, body_bytes):
1305
return SuccessfulSmartServerResponse(('ok', ),
1306
body_stream=self.body_stream(self._repository, self._ordering,
1307
body_bytes.splitlines()))
1309
def do_repository_request(self, repository, ordering):
1310
if ordering == 'unordered':
1311
# inventory deltas for a topologically sorted stream
1312
# are likely to be smaller
1313
ordering = 'topological'
1314
self._ordering = ordering
1315
# Signal that we want a body