272
298
search_ids = repository.all_revision_ids()
273
299
search = graph._make_breadth_first_searcher(search_ids)
274
transitive_ids = set()
275
map(transitive_ids.update, list(search))
300
transitive_ids = set(itertools.chain.from_iterable(search))
276
301
parent_map = graph.get_parent_map(transitive_ids)
277
302
revision_graph = _strip_NULL_ghosts(parent_map)
278
303
if revision_id and revision_id not in revision_graph:
279
304
# Note that we return an empty body, rather than omitting the body.
280
305
# This way the client knows that it can always expect to find a body
281
306
# in the response for this method, even in the error case.
282
return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
307
return FailedSmartServerResponse((b'nosuchrevision', revision_id), b'')
284
309
for revision, parents in revision_graph.items():
285
lines.append(' '.join((revision, ) + tuple(parents)))
310
lines.append(b' '.join((revision, ) + tuple(parents)))
287
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
312
return SuccessfulSmartServerResponse((b'ok', ), b'\n'.join(lines))
290
315
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
292
317
def do_readlocked_repository_request(self, repository, revno,
294
319
"""Find the revid for a given revno, given a known revno/revid pair.
299
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
300
except errors.RevisionNotPresent, err:
301
if err.revision_id != known_pair[1]:
324
found_flag, result = repository.get_rev_id_for_revno(
326
except errors.NoSuchRevision as err:
327
if err.revision != known_pair[1]:
302
328
raise AssertionError(
303
329
'get_rev_id_for_revno raised RevisionNotPresent for '
304
'non-initial revision: ' + err.revision_id)
305
return FailedSmartServerResponse(
306
('nosuchrevision', err.revision_id))
330
'non-initial revision: ' + err.revision)
331
return FailedSmartServerResponse(
332
(b'nosuchrevision', err.revision))
333
except errors.RevnoOutOfBounds as e:
334
return FailedSmartServerResponse(
335
(b'revno-outofbounds', e.revno, e.minimum, e.maximum))
308
return SuccessfulSmartServerResponse(('ok', result))
337
return SuccessfulSmartServerResponse((b'ok', result))
310
339
earliest_revno, earliest_revid = result
311
340
return SuccessfulSmartServerResponse(
312
('history-incomplete', earliest_revno, earliest_revid))
341
(b'history-incomplete', earliest_revno, earliest_revid))
344
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
346
def do_repository_request(self, repository):
347
"""Return the serializer format for this repository.
351
:param repository: The repository to query
352
:return: A smart server response (b'ok', FORMAT)
354
serializer = repository.get_serializer_format()
355
return SuccessfulSmartServerResponse((b'ok', serializer))
315
358
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
792
964
self.do_insert_stream_request(repository, resume_tokens)
967
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
968
"""Add a revision signature text.
973
def do_repository_request(self, repository, lock_token, revision_id,
974
*write_group_tokens):
975
"""Add a revision signature text.
977
:param repository: Repository to operate on
978
:param lock_token: Lock token
979
:param revision_id: Revision for which to add signature
980
:param write_group_tokens: Write group tokens
982
self._lock_token = lock_token
983
self._revision_id = revision_id
984
self._write_group_tokens = [token.decode(
985
'utf-8') for token in write_group_tokens]
988
def do_body(self, body_bytes):
989
"""Add a signature text.
991
:param body_bytes: GPG signature text
992
:return: SuccessfulSmartServerResponse with arguments 'ok' and
993
the list of new write group tokens.
995
with self._repository.lock_write(token=self._lock_token):
996
self._repository.resume_write_group(self._write_group_tokens)
998
self._repository.add_signature_text(self._revision_id,
1001
new_write_group_tokens = self._repository.suspend_write_group()
1002
return SuccessfulSmartServerResponse(
1003
(b'ok', ) + tuple([token.encode('utf-8') for token in new_write_group_tokens]))
1006
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
1007
"""Start a write group.
1012
def do_repository_request(self, repository, lock_token):
1013
"""Start a write group."""
1014
with repository.lock_write(token=lock_token):
1015
repository.start_write_group()
1017
tokens = repository.suspend_write_group()
1018
except errors.UnsuspendableWriteGroup:
1019
return FailedSmartServerResponse((b'UnsuspendableWriteGroup',))
1020
return SuccessfulSmartServerResponse((b'ok', tokens))
1023
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
1024
"""Commit a write group.
1029
def do_repository_request(self, repository, lock_token,
1030
write_group_tokens):
1031
"""Commit a write group."""
1032
with repository.lock_write(token=lock_token):
1034
repository.resume_write_group(
1035
[token.decode('utf-8') for token in write_group_tokens])
1036
except errors.UnresumableWriteGroup as e:
1037
return FailedSmartServerResponse(
1038
(b'UnresumableWriteGroup', [token.encode('utf-8') for token
1039
in e.write_groups], e.reason.encode('utf-8')))
1041
repository.commit_write_group()
1043
write_group_tokens = repository.suspend_write_group()
1044
# FIXME JRV 2011-11-19: What if the write_group_tokens
1047
return SuccessfulSmartServerResponse((b'ok', ))
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
1051
"""Abort a write group.
1056
def do_repository_request(self, repository, lock_token, write_group_tokens):
1057
"""Abort a write group."""
1058
with repository.lock_write(token=lock_token):
1060
repository.resume_write_group(
1061
[token.decode('utf-8') for token in write_group_tokens])
1062
except errors.UnresumableWriteGroup as e:
1063
return FailedSmartServerResponse(
1064
(b'UnresumableWriteGroup',
1065
[token.encode('utf-8') for token in e.write_groups],
1066
e.reason.encode('utf-8')))
1067
repository.abort_write_group()
1068
return SuccessfulSmartServerResponse((b'ok', ))
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
1072
"""Check that a write group is still valid.
1077
def do_repository_request(self, repository, lock_token, write_group_tokens):
1078
"""Abort a write group."""
1079
with repository.lock_write(token=lock_token):
1081
repository.resume_write_group(
1082
[token.decode('utf-8') for token in write_group_tokens])
1083
except errors.UnresumableWriteGroup as e:
1084
return FailedSmartServerResponse(
1085
(b'UnresumableWriteGroup',
1086
[token.encode('utf-8') for token in e.write_groups],
1087
e.reason.encode('utf-8')))
1089
repository.suspend_write_group()
1090
return SuccessfulSmartServerResponse((b'ok', ))
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
1094
"""Retrieve all of the revision ids in a repository.
1099
def do_repository_request(self, repository):
1100
revids = repository.all_revision_ids()
1101
return SuccessfulSmartServerResponse((b"ok", ), b"\n".join(revids))
1104
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
1105
"""Reconcile a repository.
1110
def do_repository_request(self, repository, lock_token):
1112
repository.lock_write(token=lock_token)
1113
except errors.TokenLockingNotSupported as e:
1114
return FailedSmartServerResponse(
1115
(b'TokenLockingNotSupported', ))
1117
reconciler = repository.reconcile()
1121
b"garbage_inventories: %d\n" % reconciler.garbage_inventories,
1122
b"inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
1124
return SuccessfulSmartServerResponse((b'ok', ), b"".join(body))
1127
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
1128
"""Pack a repository.
1133
def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
1134
self._repository = repository
1135
self._lock_token = lock_token
1136
if clean_obsolete_packs == b'True':
1137
self._clean_obsolete_packs = True
1139
self._clean_obsolete_packs = False
1142
def do_body(self, body_bytes):
1143
if body_bytes == "":
1146
hint = body_bytes.splitlines()
1147
with self._repository.lock_write(token=self._lock_token):
1148
self._repository.pack(hint, self._clean_obsolete_packs)
1149
return SuccessfulSmartServerResponse((b"ok", ), )
1152
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
1153
"""Iterate over the contents of files.
1155
The client sends a list of desired files to stream, one
1156
per line, and as tuples of file id and revision, separated by
1159
The server replies with a stream. Each entry is preceded by a header,
1160
which can either be:
1162
* "ok\x00IDX\n" where IDX is the index of the entry in the desired files
1163
list sent by the client. This header is followed by the contents of
1164
the file, bzip2-compressed.
1165
* "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
1166
The client can then raise an appropriate RevisionNotPresent error
1167
or check its fallback repositories.
1172
def body_stream(self, repository, desired_files):
1173
with self._repository.lock_read():
1175
for i, key in enumerate(desired_files):
1177
for record in repository.texts.get_record_stream(text_keys,
1179
identifier = text_keys[record.key]
1180
if record.storage_kind == 'absent':
1181
yield b"absent\0%s\0%s\0%d\n" % (record.key[0],
1182
record.key[1], identifier)
1183
# FIXME: Way to abort early?
1185
yield b"ok\0%d\n" % identifier
1186
compressor = zlib.compressobj()
1187
for bytes in record.iter_bytes_as('chunked'):
1188
data = compressor.compress(bytes)
1191
data = compressor.flush()
1195
def do_body(self, body_bytes):
1197
tuple(l.split(b"\0")) for l in body_bytes.splitlines()]
1198
return SuccessfulSmartServerResponse((b'ok', ),
1199
body_stream=self.body_stream(self._repository, desired_files))
1201
def do_repository_request(self, repository):
1202
# Signal that we want a body
1206
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
1207
"""Stream a list of revisions.
1209
The client sends a list of newline-separated revision ids in the
1210
body of the request and the server replies with the serializer format,
1211
and a stream of bzip2-compressed revision texts (using the specified
1214
Any revisions the server does not have are omitted from the stream.
1219
def do_repository_request(self, repository):
1220
self._repository = repository
1221
# Signal there is a body
1224
def do_body(self, body_bytes):
1225
revision_ids = body_bytes.split(b"\n")
1226
return SuccessfulSmartServerResponse(
1227
(b'ok', self._repository.get_serializer_format()),
1228
body_stream=self.body_stream(self._repository, revision_ids))
1230
def body_stream(self, repository, revision_ids):
1231
with self._repository.lock_read():
1232
for record in repository.revisions.get_record_stream(
1233
[(revid,) for revid in revision_ids], 'unordered', True):
1234
if record.storage_kind == 'absent':
1236
yield zlib.compress(record.get_bytes_as('fulltext'))
1239
class SmartServerRepositoryGetInventories(SmartServerRepositoryRequest):
1240
"""Get the inventory deltas for a set of revision ids.
1242
This accepts a list of revision ids, and then sends a chain
1243
of deltas for the inventories of those revisions. The first
1244
revision will be empty.
1246
The server writes back zlibbed serialized inventory deltas,
1247
in the ordering specified. The base for each delta is the
1248
inventory generated by the previous delta.
1253
def _inventory_delta_stream(self, repository, ordering, revids):
1254
prev_inv = _mod_inventory.Inventory(root_id=None,
1255
revision_id=_mod_revision.NULL_REVISION)
1256
serializer = inventory_delta.InventoryDeltaSerializer(
1257
repository.supports_rich_root(),
1258
repository._format.supports_tree_reference)
1259
with repository.lock_read():
1260
for inv, revid in repository._iter_inventories(revids, ordering):
1263
inv_delta = inv._make_delta(prev_inv)
1264
lines = serializer.delta_to_lines(
1265
prev_inv.revision_id, inv.revision_id, inv_delta)
1266
yield ChunkedContentFactory(
1267
inv.revision_id, None, None, lines,
1268
chunks_are_lines=True)
1271
def body_stream(self, repository, ordering, revids):
1272
substream = self._inventory_delta_stream(repository,
1274
return _stream_to_byte_stream([('inventory-deltas', substream)],
1277
def do_body(self, body_bytes):
1278
return SuccessfulSmartServerResponse((b'ok', ),
1279
body_stream=self.body_stream(self._repository, self._ordering,
1280
body_bytes.splitlines()))
1282
def do_repository_request(self, repository, ordering):
1283
ordering = ordering.decode('ascii')
1284
if ordering == 'unordered':
1285
# inventory deltas for a topologically sorted stream
1286
# are likely to be smaller
1287
ordering = 'topological'
1288
self._ordering = ordering
1289
# Signal that we want a body
1293
class SmartServerRepositoryGetStreamForMissingKeys(SmartServerRepositoryRequest):
1295
def do_repository_request(self, repository, to_network_name):
1296
"""Get a stream for missing keys.
1298
:param repository: The repository to stream from.
1299
:param to_network_name: The network name of the format of the target
1303
self._to_format = network_format_registry.get(to_network_name)
1305
return FailedSmartServerResponse(
1306
(b'UnknownFormat', b'repository', to_network_name))
1307
return None # Signal that we want a body.
1309
def do_body(self, body_bytes):
1310
repository = self._repository
1311
repository.lock_read()
1313
source = repository._get_source(self._to_format)
1315
for entry in body_bytes.split(b'\n'):
1316
(kind, revid) = entry.split(b'\t')
1317
keys.append((kind.decode('utf-8'), revid))
1318
stream = source.get_stream_for_missing_keys(keys)
1321
# On non-error, unlocking is done by the body stream handler.
1325
return SuccessfulSmartServerResponse((b'ok',),
1326
body_stream=self.body_stream(stream, repository))
1328
def body_stream(self, stream, repository):
1329
byte_stream = _stream_to_byte_stream(stream, repository._format)
1331
for bytes in byte_stream:
1333
except errors.RevisionNotPresent as e:
1334
# This shouldn't be able to happen, but as we don't buffer
1335
# everything it can in theory happen.
1337
yield FailedSmartServerResponse((b'NoSuchRevision', e.revision_id))
1342
class SmartServerRepositoryRevisionArchive(SmartServerRepositoryRequest):
1344
def do_repository_request(self, repository, revision_id, format, name,
1345
root, subdir=None, force_mtime=None):
1346
"""Stream an archive file for a specific revision.
1347
:param repository: The repository to stream from.
1348
:param revision_id: Revision for which to export the tree
1349
:param format: Format (tar, tgz, tbz2, etc)
1350
:param name: Target file name
1351
:param root: Name of root directory (or '')
1352
:param subdir: Subdirectory to export, if not the root
1354
tree = repository.revision_tree(revision_id)
1355
if subdir is not None:
1356
subdir = subdir.decode('utf-8')
1357
if root is not None:
1358
root = root.decode('utf-8')
1359
name = name.decode('utf-8')
1360
return SuccessfulSmartServerResponse((b'ok',),
1361
body_stream=self.body_stream(
1362
tree, format.decode(
1363
'utf-8'), os.path.basename(name), root, subdir,
1366
def body_stream(self, tree, format, name, root, subdir=None, force_mtime=None):
1367
with tree.lock_read():
1368
return tree.archive(format, name, root, subdir, force_mtime)
1371
class SmartServerRepositoryAnnotateFileRevision(SmartServerRepositoryRequest):
1373
def do_repository_request(self, repository, revision_id, tree_path,
1374
file_id=None, default_revision=None):
1375
"""Stream an archive file for a specific revision.
1377
:param repository: The repository to stream from.
1378
:param revision_id: Revision for which to export the tree
1379
:param tree_path: The path inside the tree
1380
:param file_id: Optional file_id for the file
1381
:param default_revision: Default revision
1383
tree = repository.revision_tree(revision_id)
1384
with tree.lock_read():
1385
body = bencode.bencode(list(tree.annotate_iter(
1386
tree_path.decode('utf-8'), default_revision)))
1387
return SuccessfulSmartServerResponse((b'ok',), body=body)