306
272
search_ids = repository.all_revision_ids()
307
273
search = graph._make_breadth_first_searcher(search_ids)
308
transitive_ids = set(itertools.chain.from_iterable(search))
274
transitive_ids = set()
275
map(transitive_ids.update, list(search))
309
276
parent_map = graph.get_parent_map(transitive_ids)
310
277
revision_graph = _strip_NULL_ghosts(parent_map)
311
278
if revision_id and revision_id not in revision_graph:
312
279
# Note that we return an empty body, rather than omitting the body.
313
280
# This way the client knows that it can always expect to find a body
314
281
# in the response for this method, even in the error case.
315
return FailedSmartServerResponse((b'nosuchrevision', revision_id), b'')
282
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
317
284
for revision, parents in revision_graph.items():
318
lines.append(b' '.join((revision, ) + tuple(parents)))
285
lines.append(' '.join((revision, ) + tuple(parents)))
320
return SuccessfulSmartServerResponse((b'ok', ), b'\n'.join(lines))
287
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
323
290
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
325
292
def do_readlocked_repository_request(self, repository, revno,
327
294
"""Find the revid for a given revno, given a known revno/revid pair.
332
found_flag, result = repository.get_rev_id_for_revno(
334
except errors.NoSuchRevision as err:
335
if err.revision != known_pair[1]:
299
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
300
except errors.RevisionNotPresent, err:
301
if err.revision_id != known_pair[1]:
336
302
raise AssertionError(
337
303
'get_rev_id_for_revno raised RevisionNotPresent for '
338
'non-initial revision: ' + err.revision)
339
return FailedSmartServerResponse(
340
(b'nosuchrevision', err.revision))
341
except errors.RevnoOutOfBounds as e:
342
return FailedSmartServerResponse(
343
(b'revno-outofbounds', e.revno, e.minimum, e.maximum))
304
'non-initial revision: ' + err.revision_id)
305
return FailedSmartServerResponse(
306
('nosuchrevision', err.revision_id))
345
return SuccessfulSmartServerResponse((b'ok', result))
308
return SuccessfulSmartServerResponse(('ok', result))
347
310
earliest_revno, earliest_revid = result
348
311
return SuccessfulSmartServerResponse(
349
(b'history-incomplete', earliest_revno, earliest_revid))
352
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
354
def do_repository_request(self, repository):
355
"""Return the serializer format for this repository.
359
:param repository: The repository to query
360
:return: A smart server response (b'ok', FORMAT)
362
serializer = repository.get_serializer_format()
363
return SuccessfulSmartServerResponse((b'ok', serializer))
312
('history-incomplete', earliest_revno, earliest_revid))
366
315
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
971
792
self.do_insert_stream_request(repository, resume_tokens)
974
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
975
"""Add a revision signature text.
980
def do_repository_request(self, repository, lock_token, revision_id,
981
*write_group_tokens):
982
"""Add a revision signature text.
984
:param repository: Repository to operate on
985
:param lock_token: Lock token
986
:param revision_id: Revision for which to add signature
987
:param write_group_tokens: Write group tokens
989
self._lock_token = lock_token
990
self._revision_id = revision_id
991
self._write_group_tokens = [token.decode(
992
'utf-8') for token in write_group_tokens]
995
def do_body(self, body_bytes):
996
"""Add a signature text.
998
:param body_bytes: GPG signature text
999
:return: SuccessfulSmartServerResponse with arguments 'ok' and
1000
the list of new write group tokens.
1002
with self._repository.lock_write(token=self._lock_token):
1003
self._repository.resume_write_group(self._write_group_tokens)
1005
self._repository.add_signature_text(self._revision_id,
1008
new_write_group_tokens = self._repository.suspend_write_group()
1009
return SuccessfulSmartServerResponse(
1010
(b'ok', ) + tuple([token.encode('utf-8') for token in new_write_group_tokens]))
1013
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1014
"""Start a write group.
1019
def do_repository_request(self, repository, lock_token):
1020
"""Start a write group."""
1021
with repository.lock_write(token=lock_token):
1022
repository.start_write_group()
1024
tokens = repository.suspend_write_group()
1025
except errors.UnsuspendableWriteGroup:
1026
return FailedSmartServerResponse((b'UnsuspendableWriteGroup',))
1027
return SuccessfulSmartServerResponse((b'ok', tokens))
1030
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1031
"""Commit a write group.
1036
def do_repository_request(self, repository, lock_token,
1037
write_group_tokens):
1038
"""Commit a write group."""
1039
with repository.lock_write(token=lock_token):
1041
repository.resume_write_group(
1042
[token.decode('utf-8') for token in write_group_tokens])
1043
except errors.UnresumableWriteGroup as e:
1044
return FailedSmartServerResponse(
1045
(b'UnresumableWriteGroup', [token.encode('utf-8') for token
1046
in e.write_groups], e.reason.encode('utf-8')))
1048
repository.commit_write_group()
1050
write_group_tokens = repository.suspend_write_group()
1051
# FIXME JRV 2011-11-19: What if the write_group_tokens
1054
return SuccessfulSmartServerResponse((b'ok', ))
1057
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1058
"""Abort a write group.
1063
def do_repository_request(self, repository, lock_token, write_group_tokens):
1064
"""Abort a write group."""
1065
with repository.lock_write(token=lock_token):
1067
repository.resume_write_group(
1068
[token.decode('utf-8') for token in write_group_tokens])
1069
except errors.UnresumableWriteGroup as e:
1070
return FailedSmartServerResponse(
1071
(b'UnresumableWriteGroup',
1072
[token.encode('utf-8') for token in e.write_groups],
1073
e.reason.encode('utf-8')))
1074
repository.abort_write_group()
1075
return SuccessfulSmartServerResponse((b'ok', ))
1078
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1079
"""Check that a write group is still valid.
1084
def do_repository_request(self, repository, lock_token, write_group_tokens):
1085
"""Abort a write group."""
1086
with repository.lock_write(token=lock_token):
1088
repository.resume_write_group(
1089
[token.decode('utf-8') for token in write_group_tokens])
1090
except errors.UnresumableWriteGroup as e:
1091
return FailedSmartServerResponse(
1092
(b'UnresumableWriteGroup',
1093
[token.encode('utf-8') for token in e.write_groups],
1094
e.reason.encode('utf-8')))
1096
repository.suspend_write_group()
1097
return SuccessfulSmartServerResponse((b'ok', ))
1100
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1101
"""Retrieve all of the revision ids in a repository.
1106
def do_repository_request(self, repository):
1107
revids = repository.all_revision_ids()
1108
return SuccessfulSmartServerResponse((b"ok", ), b"\n".join(revids))
1111
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1112
"""Reconcile a repository.
1117
def do_repository_request(self, repository, lock_token):
1119
repository.lock_write(token=lock_token)
1120
except errors.TokenLockingNotSupported as e:
1121
return FailedSmartServerResponse(
1122
(b'TokenLockingNotSupported', ))
1124
reconciler = repository.reconcile()
1128
b"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1129
b"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1131
return SuccessfulSmartServerResponse((b'ok', ), b"".join(body))
1134
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1135
"""Pack a repository.
1140
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1141
self._repository = repository
1142
self._lock_token = lock_token
1143
if clean_obsolete_packs == b'True':
1144
self._clean_obsolete_packs = True
1146
self._clean_obsolete_packs = False
1149
def do_body(self, body_bytes):
1150
if body_bytes == "":
1153
hint = body_bytes.splitlines()
1154
with self._repository.lock_write(token=self._lock_token):
1155
self._repository.pack(hint, self._clean_obsolete_packs)
1156
return SuccessfulSmartServerResponse((b"ok", ), )
1159
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1160
"""Iterate over the contents of files.
1162
The client sends a list of desired files to stream, one
1163
per line, and as tuples of file id and revision, separated by
1166
The server replies with a stream. Each entry is preceded by a header,
1167
which can either be:
1169
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1170
list sent by the client. This header is followed by the contents of
1171
the file, bzip2-compressed.
1172
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1173
The client can then raise an appropriate RevisionNotPresent error
1174
or check its fallback repositories.
1179
def body_stream(self, repository, desired_files):
1180
with self._repository.lock_read():
1182
for i, key in enumerate(desired_files):
1184
for record in repository.texts.get_record_stream(text_keys,
1186
identifier = text_keys[record.key]
1187
if record.storage_kind == 'absent':
1188
yield b"absent\0%s\0%s\0%d\n" % (record.key[0],
1189
record.key[1], identifier)
1190
# FIXME: Way to abort early?
1192
yield b"ok\0%d\n" % identifier
1193
compressor = zlib.compressobj()
1194
for bytes in record.iter_bytes_as('chunked'):
1195
data = compressor.compress(bytes)
1198
data = compressor.flush()
1202
def do_body(self, body_bytes):
1204
tuple(l.split(b"\0")) for l in body_bytes.splitlines()]
1205
return SuccessfulSmartServerResponse((b'ok', ),
1206
body_stream=self.body_stream(self._repository, desired_files))
1208
def do_repository_request(self, repository):
1209
# Signal that we want a body
1213
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1214
"""Stream a list of revisions.
1216
The client sends a list of newline-separated revision ids in the
1217
body of the request and the server replies with the serializer format,
1218
and a stream of bzip2-compressed revision texts (using the specified
1221
Any revisions the server does not have are omitted from the stream.
1226
def do_repository_request(self, repository):
1227
self._repository = repository
1228
# Signal there is a body
1231
def do_body(self, body_bytes):
1232
revision_ids = body_bytes.split(b"\n")
1233
return SuccessfulSmartServerResponse(
1234
(b'ok', self._repository.get_serializer_format()),
1235
body_stream=self.body_stream(self._repository, revision_ids))
1237
def body_stream(self, repository, revision_ids):
1238
with self._repository.lock_read():
1239
for record in repository.revisions.get_record_stream(
1240
[(revid,) for revid in revision_ids], 'unordered', True):
1241
if record.storage_kind == 'absent':
1243
yield zlib.compress(record.get_bytes_as('fulltext'))
1246
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1247
"""Get the inventory deltas for a set of revision ids.
1249
This accepts a list of revision ids, and then sends a chain
1250
of deltas for the inventories of those revisions. The first
1251
revision will be empty.
1253
The server writes back zlibbed serialized inventory deltas,
1254
in the ordering specified. The base for each delta is the
1255
inventory generated by the previous delta.
1260
def _inventory_delta_stream(self, repository, ordering, revids):
1261
prev_inv = _mod_inventory.Inventory(root_id=None,
1262
revision_id=_mod_revision.NULL_REVISION)
1263
serializer = inventory_delta.InventoryDeltaSerializer(
1264
repository.supports_rich_root(),
1265
repository._format.supports_tree_reference)
1266
with repository.lock_read():
1267
for inv, revid in repository._iter_inventories(revids, ordering):
1270
inv_delta = inv._make_delta(prev_inv)
1271
lines = serializer.delta_to_lines(
1272
prev_inv.revision_id, inv.revision_id, inv_delta)
1273
yield ChunkedContentFactory(
1274
inv.revision_id, None, None, lines,
1275
chunks_are_lines=True)
1278
def body_stream(self, repository, ordering, revids):
1279
substream = self._inventory_delta_stream(repository,
1281
return _stream_to_byte_stream([('inventory-deltas', substream)],
1284
def do_body(self, body_bytes):
1285
return SuccessfulSmartServerResponse((b'ok', ),
1286
body_stream=self.body_stream(self._repository, self._ordering,
1287
body_bytes.splitlines()))
1289
def do_repository_request(self, repository, ordering):
1290
ordering = ordering.decode('ascii')
1291
if ordering == 'unordered':
1292
# inventory deltas for a topologically sorted stream
1293
# are likely to be smaller
1294
ordering = 'topological'
1295
self._ordering = ordering
1296
# Signal that we want a body
1300
class SmartServerRepositoryGetStreamForMissingKeys(SmartServerRepositoryRequest):
1302
def do_repository_request(self, repository, to_network_name):
1303
"""Get a stream for missing keys.
1305
:param repository: The repository to stream from.
1306
:param to_network_name: The network name of the format of the target
1310
self._to_format = network_format_registry.get(to_network_name)
1312
return FailedSmartServerResponse(
1313
(b'UnknownFormat', b'repository', to_network_name))
1314
return None # Signal that we want a body.
1316
def do_body(self, body_bytes):
1317
repository = self._repository
1318
repository.lock_read()
1320
source = repository._get_source(self._to_format)
1322
for entry in body_bytes.split(b'\n'):
1323
(kind, revid) = entry.split(b'\t')
1324
keys.append((kind.decode('utf-8'), revid))
1325
stream = source.get_stream_for_missing_keys(keys)
1328
# On non-error, unlocking is done by the body stream handler.
1332
return SuccessfulSmartServerResponse((b'ok',),
1333
body_stream=self.body_stream(stream, repository))
1335
def body_stream(self, stream, repository):
1336
byte_stream = _stream_to_byte_stream(stream, repository._format)
1338
for bytes in byte_stream:
1340
except errors.RevisionNotPresent as e:
1341
# This shouldn't be able to happen, but as we don't buffer
1342
# everything it can in theory happen.
1344
yield FailedSmartServerResponse((b'NoSuchRevision', e.revision_id))
1349
class SmartServerRepositoryRevisionArchive(SmartServerRepositoryRequest):
1351
def do_repository_request(self, repository, revision_id, format, name,
1352
root, subdir=None, force_mtime=None):
1353
"""Stream an archive file for a specific revision.
1354
:param repository: The repository to stream from.
1355
:param revision_id: Revision for which to export the tree
1356
:param format: Format (tar, tgz, tbz2, etc)
1357
:param name: Target file name
1358
:param root: Name of root directory (or '')
1359
:param subdir: Subdirectory to export, if not the root
1361
tree = repository.revision_tree(revision_id)
1362
if subdir is not None:
1363
subdir = subdir.decode('utf-8')
1364
if root is not None:
1365
root = root.decode('utf-8')
1366
name = name.decode('utf-8')
1367
return SuccessfulSmartServerResponse((b'ok',),
1368
body_stream=self.body_stream(
1369
tree, format.decode(
1370
'utf-8'), os.path.basename(name), root, subdir,
1373
def body_stream(self, tree, format, name, root, subdir=None, force_mtime=None):
1374
with tree.lock_read():
1375
return tree.archive(format, name, root, subdir, force_mtime)
1378
class SmartServerRepositoryAnnotateFileRevision(SmartServerRepositoryRequest):
1380
def do_repository_request(self, repository, revision_id, tree_path,
1381
file_id=None, default_revision=None):
1382
"""Stream an archive file for a specific revision.
1384
:param repository: The repository to stream from.
1385
:param revision_id: Revision for which to export the tree
1386
:param tree_path: The path inside the tree
1387
:param file_id: Optional file_id for the file
1388
:param default_revision: Default revision
1390
tree = repository.revision_tree(revision_id)
1391
with tree.lock_read():
1392
body = bencode.bencode(list(tree.annotate_iter(
1393
tree_path.decode('utf-8'), default_revision)))
1394
return SuccessfulSmartServerResponse((b'ok',), body=body)