18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
23
from bzrlib.lazy_import import lazy_import
25
from breezy.lazy_import import lazy_import
24
26
lazy_import(globals(), """
36
from breezy.i18n import (
48
SIGNATURE_KEY_MISSING = 1
49
SIGNATURE_NOT_VALID = 2
50
SIGNATURE_NOT_SIGNED = 3
54
def bulk_verify_signatures(repository, revids, strategy,
55
process_events_callback=None):
56
"""Do verifications on a set of revisions
58
:param repository: repository object
59
:param revids: list of revision ids to verify
60
:param strategy: GPG strategy to use
61
:param process_events_callback: method to call for GUI frontends that
62
want to keep their UI refreshed
64
:return: count dictionary of results of each type,
65
result list for each revision,
66
boolean True if all results are verified successfully
68
count = {SIGNATURE_VALID: 0,
69
SIGNATURE_KEY_MISSING: 0,
70
SIGNATURE_NOT_VALID: 0,
71
SIGNATURE_NOT_SIGNED: 0,
76
pb = ui.ui_factory.nested_progress_bar()
78
for i, (rev_id, verification_result, uid) in enumerate(
79
repository.verify_revision_signatures(
81
pb.update("verifying signatures", i, total)
82
result.append([rev_id, verification_result, uid])
83
count[verification_result] += 1
84
if verification_result != SIGNATURE_VALID:
85
all_verifiable = False
86
if process_events_callback is not None:
87
process_events_callback()
90
return (count, result, all_verifiable)
36
93
class DisabledGPGStrategy(object):
37
94
"""A GPG Strategy that makes everything fail."""
97
def verify_signatures_available():
39
100
def __init__(self, ignored):
40
101
"""Real strategies take a configuration."""
42
103
def sign(self, content):
43
104
raise errors.SigningFailed('Signing is disabled.')
106
def verify(self, content, testament):
107
raise errors.SignatureVerificationFailed('Signature verification is \
110
def set_acceptable_keys(self, command_line_input):
46
114
class LoopbackGPGStrategy(object):
47
"""A GPG Strategy that acts like 'cat' - data is just passed through."""
115
"""A GPG Strategy that acts like 'cat' - data is just passed through.
120
def verify_signatures_available():
49
123
def __init__(self, ignored):
50
124
"""Real strategies take a configuration."""
105
218
except ValueError:
106
219
# bad subprocess parameters, should never happen.
109
222
if e.errno == errno.ENOENT:
110
223
# gpg is not installed
111
224
raise errors.SigningFailed(self._command_line())
228
def verify(self, content, testament):
229
"""Check content has a valid signature.
231
:param content: the commit signature
232
:param testament: the valid testament string for the commit
234
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
238
except ImportError as error:
239
raise errors.GpgmeNotInstalled(error)
241
signature = BytesIO(content)
242
plain_output = BytesIO()
244
result = self.context.verify(signature, None, plain_output)
245
except gpgme.GpgmeError as error:
246
raise errors.SignatureVerificationFailed(error[2])
248
# No result if input is invalid.
249
# test_verify_invalid()
251
return SIGNATURE_NOT_VALID, None
252
# User has specified a list of acceptable keys, check our result is in
253
# it. test_verify_unacceptable_key()
254
fingerprint = result[0].fpr
255
if self.acceptable_keys is not None:
256
if not fingerprint in self.acceptable_keys:
257
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
258
# Check the signature actually matches the testament.
259
# test_verify_bad_testament()
260
if testament != plain_output.getvalue():
261
return SIGNATURE_NOT_VALID, None
262
# Yay gpgme set the valid bit.
263
# Can't write a test for this one as you can't set a key to be
264
# trusted using gpgme.
265
if result[0].summary & gpgme.SIGSUM_VALID:
266
key = self.context.get_key(fingerprint)
267
name = key.uids[0].name
268
email = key.uids[0].email
269
return SIGNATURE_VALID, name + " <" + email + ">"
270
# Sigsum_red indicates a problem, unfortunatly I have not been able
271
# to write any tests which actually set this.
272
if result[0].summary & gpgme.SIGSUM_RED:
273
return SIGNATURE_NOT_VALID, None
274
# GPG does not know this key.
275
# test_verify_unknown_key()
276
if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
277
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
278
# Summary isn't set if sig is valid but key is untrusted but if user
279
# has explicity set the key as acceptable we can validate it.
280
if result[0].summary == 0 and self.acceptable_keys is not None:
281
if fingerprint in self.acceptable_keys:
282
# test_verify_untrusted_but_accepted()
283
return SIGNATURE_VALID, None
284
# test_verify_valid_but_untrusted()
285
if result[0].summary == 0 and self.acceptable_keys is None:
286
return SIGNATURE_NOT_VALID, None
287
if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
288
expires = self.context.get_key(result[0].fpr).subkeys[0].expires
289
if expires > result[0].timestamp:
290
# The expired key was not expired at time of signing.
291
# test_verify_expired_but_valid()
292
return SIGNATURE_EXPIRED, fingerprint[-8:]
294
# I can't work out how to create a test where the signature
295
# was expired at the time of signing.
296
return SIGNATURE_NOT_VALID, None
297
# A signature from a revoked key gets this.
298
# test_verify_revoked_signature()
299
if ((result[0].summary & gpgme.SIGSUM_SYS_ERROR
300
or result[0].status.strerror == 'Certificate revoked')):
301
return SIGNATURE_NOT_VALID, None
302
# Other error types such as revoked keys should (I think) be caught by
303
# SIGSUM_RED so anything else means something is buggy.
304
raise errors.SignatureVerificationFailed(
305
"Unknown GnuPG key verification result")
307
def set_acceptable_keys(self, command_line_input):
308
"""Set the acceptable keys for verifying with this GPGStrategy.
310
:param command_line_input: comma separated list of patterns from
315
acceptable_keys_config = self._config_stack.get('acceptable_keys')
316
if acceptable_keys_config is not None:
317
patterns = acceptable_keys_config
318
if command_line_input is not None: # command line overrides config
319
patterns = command_line_input.split(',')
322
self.acceptable_keys = []
323
for pattern in patterns:
324
result = self.context.keylist(pattern)
328
self.acceptable_keys.append(key.subkeys[0].fpr)
329
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
332
"No GnuPG key results for pattern: {0}"
336
def valid_commits_message(count):
337
"""returns message for number of commits"""
338
return gettext(u"{0} commits with valid signatures").format(
339
count[SIGNATURE_VALID])
342
def unknown_key_message(count):
343
"""returns message for number of commits"""
344
return ngettext(u"{0} commit with unknown key",
345
u"{0} commits with unknown keys",
346
count[SIGNATURE_KEY_MISSING]).format(
347
count[SIGNATURE_KEY_MISSING])
350
def commit_not_valid_message(count):
351
"""returns message for number of commits"""
352
return ngettext(u"{0} commit not valid",
353
u"{0} commits not valid",
354
count[SIGNATURE_NOT_VALID]).format(
355
count[SIGNATURE_NOT_VALID])
358
def commit_not_signed_message(count):
359
"""returns message for number of commits"""
360
return ngettext(u"{0} commit not signed",
361
u"{0} commits not signed",
362
count[SIGNATURE_NOT_SIGNED]).format(
363
count[SIGNATURE_NOT_SIGNED])
366
def expired_commit_message(count):
367
"""returns message for number of commits"""
368
return ngettext(u"{0} commit with key now expired",
369
u"{0} commits with key now expired",
370
count[SIGNATURE_EXPIRED]).format(
371
count[SIGNATURE_EXPIRED])
374
def verbose_expired_key_message(result, repo):
375
"""takes a verify result and returns list of expired key info"""
377
fingerprint_to_authors = {}
378
for rev_id, validity, fingerprint in result:
379
if validity == SIGNATURE_EXPIRED:
380
revision = repo.get_revision(rev_id)
381
authors = ', '.join(revision.get_apparent_authors())
382
signers.setdefault(fingerprint, 0)
383
signers[fingerprint] += 1
384
fingerprint_to_authors[fingerprint] = authors
386
for fingerprint, number in signers.items():
388
ngettext(u"{0} commit by author {1} with key {2} now expired",
389
u"{0} commits by author {1} with key {2} now expired",
391
number, fingerprint_to_authors[fingerprint], fingerprint))
395
def verbose_valid_message(result):
396
"""takes a verify result and returns list of signed commits strings"""
398
for rev_id, validity, uid in result:
399
if validity == SIGNATURE_VALID:
400
signers.setdefault(uid, 0)
403
for uid, number in signers.items():
404
result.append(ngettext(u"{0} signed {1} commit",
405
u"{0} signed {1} commits",
406
number).format(uid, number))
410
def verbose_not_valid_message(result, repo):
411
"""takes a verify result and returns list of not valid commit info"""
413
for rev_id, validity, empty in result:
414
if validity == SIGNATURE_NOT_VALID:
415
revision = repo.get_revision(rev_id)
416
authors = ', '.join(revision.get_apparent_authors())
417
signers.setdefault(authors, 0)
418
signers[authors] += 1
420
for authors, number in signers.items():
421
result.append(ngettext(u"{0} commit by author {1}",
422
u"{0} commits by author {1}",
423
number).format(number, authors))
427
def verbose_not_signed_message(result, repo):
428
"""takes a verify result and returns list of not signed commit info"""
430
for rev_id, validity, empty in result:
431
if validity == SIGNATURE_NOT_SIGNED:
432
revision = repo.get_revision(rev_id)
433
authors = ', '.join(revision.get_apparent_authors())
434
signers.setdefault(authors, 0)
435
signers[authors] += 1
437
for authors, number in signers.items():
438
result.append(ngettext(u"{0} commit by author {1}",
439
u"{0} commits by author {1}",
440
number).format(number, authors))
444
def verbose_missing_key_message(result):
445
"""takes a verify result and returns list of missing key info"""
447
for rev_id, validity, fingerprint in result:
448
if validity == SIGNATURE_KEY_MISSING:
449
signers.setdefault(fingerprint, 0)
450
signers[fingerprint] += 1
452
for fingerprint, number in signers.items():
453
result.append(ngettext(u"Unknown key {0} signed {1} commit",
454
u"Unknown key {0} signed {1} commits",
455
number).format(fingerprint, number))