18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
23
from bzrlib.lazy_import import lazy_import
25
from breezy.lazy_import import lazy_import
24
26
lazy_import(globals(), """
36
from breezy.i18n import (
45
from .symbol_versioning import (
52
SIGNATURE_KEY_MISSING = 1
53
SIGNATURE_NOT_VALID = 2
54
SIGNATURE_NOT_SIGNED = 3
58
def bulk_verify_signatures(repository, revids, strategy,
59
process_events_callback=None):
60
"""Do verifications on a set of revisions
62
:param repository: repository object
63
:param revids: list of revision ids to verify
64
:param strategy: GPG strategy to use
65
:param process_events_callback: method to call for GUI frontends that
66
want to keep their UI refreshed
68
:return: count dictionary of results of each type,
69
result list for each revision,
70
boolean True if all results are verified successfully
72
count = {SIGNATURE_VALID: 0,
73
SIGNATURE_KEY_MISSING: 0,
74
SIGNATURE_NOT_VALID: 0,
75
SIGNATURE_NOT_SIGNED: 0,
80
pb = ui.ui_factory.nested_progress_bar()
82
for i, (rev_id, verification_result, uid) in enumerate(
83
repository.verify_revision_signatures(
85
pb.update("verifying signatures", i, total)
86
result.append([rev_id, verification_result, uid])
87
count[verification_result] += 1
88
if verification_result != SIGNATURE_VALID:
89
all_verifiable = False
90
if process_events_callback is not None:
91
process_events_callback()
94
return (count, result, all_verifiable)
36
97
class DisabledGPGStrategy(object):
37
98
"""A GPG Strategy that makes everything fail."""
101
def verify_signatures_available():
39
104
def __init__(self, ignored):
40
105
"""Real strategies take a configuration."""
42
107
def sign(self, content):
43
108
raise errors.SigningFailed('Signing is disabled.')
110
def verify(self, content, testament):
111
raise errors.SignatureVerificationFailed('Signature verification is \
114
def set_acceptable_keys(self, command_line_input):
46
118
class LoopbackGPGStrategy(object):
47
"""A GPG Strategy that acts like 'cat' - data is just passed through."""
119
"""A GPG Strategy that acts like 'cat' - data is just passed through.
124
def verify_signatures_available():
49
127
def __init__(self, ignored):
50
128
"""Real strategies take a configuration."""
105
246
except ValueError:
106
247
# bad subprocess parameters, should never happen.
109
250
if e.errno == errno.ENOENT:
110
251
# gpg is not installed
111
252
raise errors.SigningFailed(self._command_line())
256
def verify(self, content, testament):
257
"""Check content has a valid signature.
259
:param content: the commit signature
260
:param testament: the valid testament string for the commit
262
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
266
except ImportError as error:
267
raise errors.GpgmeNotInstalled(error)
269
signature = BytesIO(content)
270
plain_output = BytesIO()
272
result = self.context.verify(signature, None, plain_output)
273
except gpgme.GpgmeError as error:
274
raise errors.SignatureVerificationFailed(error[2])
276
# No result if input is invalid.
277
# test_verify_invalid()
279
return SIGNATURE_NOT_VALID, None
280
# User has specified a list of acceptable keys, check our result is in
281
# it. test_verify_unacceptable_key()
282
fingerprint = result[0].fpr
283
if self.acceptable_keys is not None:
284
if not fingerprint in self.acceptable_keys:
285
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
286
# Check the signature actually matches the testament.
287
# test_verify_bad_testament()
288
if testament != plain_output.getvalue():
289
return SIGNATURE_NOT_VALID, None
290
# Yay gpgme set the valid bit.
291
# Can't write a test for this one as you can't set a key to be
292
# trusted using gpgme.
293
if result[0].summary & gpgme.SIGSUM_VALID:
294
key = self.context.get_key(fingerprint)
295
name = key.uids[0].name
296
email = key.uids[0].email
297
return SIGNATURE_VALID, name + " <" + email + ">"
298
# Sigsum_red indicates a problem, unfortunatly I have not been able
299
# to write any tests which actually set this.
300
if result[0].summary & gpgme.SIGSUM_RED:
301
return SIGNATURE_NOT_VALID, None
302
# GPG does not know this key.
303
# test_verify_unknown_key()
304
if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
305
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
306
# Summary isn't set if sig is valid but key is untrusted but if user
307
# has explicity set the key as acceptable we can validate it.
308
if result[0].summary == 0 and self.acceptable_keys is not None:
309
if fingerprint in self.acceptable_keys:
310
# test_verify_untrusted_but_accepted()
311
return SIGNATURE_VALID, None
312
# test_verify_valid_but_untrusted()
313
if result[0].summary == 0 and self.acceptable_keys is None:
314
return SIGNATURE_NOT_VALID, None
315
if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
316
expires = self.context.get_key(result[0].fpr).subkeys[0].expires
317
if expires > result[0].timestamp:
318
# The expired key was not expired at time of signing.
319
# test_verify_expired_but_valid()
320
return SIGNATURE_EXPIRED, fingerprint[-8:]
322
# I can't work out how to create a test where the signature
323
# was expired at the time of signing.
324
return SIGNATURE_NOT_VALID, None
325
# A signature from a revoked key gets this.
326
# test_verify_revoked_signature()
327
if ((result[0].summary & gpgme.SIGSUM_SYS_ERROR
328
or result[0].status.strerror == 'Certificate revoked')):
329
return SIGNATURE_NOT_VALID, None
330
# Other error types such as revoked keys should (I think) be caught by
331
# SIGSUM_RED so anything else means something is buggy.
332
raise errors.SignatureVerificationFailed(
333
"Unknown GnuPG key verification result")
335
def set_acceptable_keys(self, command_line_input):
336
"""Set the acceptable keys for verifying with this GPGStrategy.
338
:param command_line_input: comma separated list of patterns from
343
acceptable_keys_config = self._config_stack.get('acceptable_keys')
344
if acceptable_keys_config is not None:
345
patterns = acceptable_keys_config
346
if command_line_input is not None: # command line overrides config
347
patterns = command_line_input.split(',')
350
self.acceptable_keys = []
351
for pattern in patterns:
352
result = self.context.keylist(pattern)
356
self.acceptable_keys.append(key.subkeys[0].fpr)
357
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
360
"No GnuPG key results for pattern: {0}"
363
@deprecated_method(deprecated_in((2, 6, 0)))
364
def do_verifications(self, revisions, repository,
365
process_events_callback=None):
366
"""do verifications on a set of revisions
368
:param revisions: list of revision ids to verify
369
:param repository: repository object
370
:param process_events_callback: method to call for GUI frontends that
371
want to keep their UI refreshed
373
:return: count dictionary of results of each type,
374
result list for each revision,
375
boolean True if all results are verified successfully
377
return bulk_verify_signatures(repository, revisions, self,
378
process_events_callback)
380
@deprecated_method(deprecated_in((2, 6, 0)))
381
def verbose_valid_message(self, result):
382
"""takes a verify result and returns list of signed commits strings"""
383
return verbose_valid_message(result)
385
@deprecated_method(deprecated_in((2, 6, 0)))
386
def verbose_not_valid_message(self, result, repo):
387
"""takes a verify result and returns list of not valid commit info"""
388
return verbose_not_valid_message(result, repo)
390
@deprecated_method(deprecated_in((2, 6, 0)))
391
def verbose_not_signed_message(self, result, repo):
392
"""takes a verify result and returns list of not signed commit info"""
393
return verbose_not_valid_message(result, repo)
395
@deprecated_method(deprecated_in((2, 6, 0)))
396
def verbose_missing_key_message(self, result):
397
"""takes a verify result and returns list of missing key info"""
398
return verbose_missing_key_message(result)
400
@deprecated_method(deprecated_in((2, 6, 0)))
401
def verbose_expired_key_message(self, result, repo):
402
"""takes a verify result and returns list of expired key info"""
403
return verbose_expired_key_message(result, repo)
405
@deprecated_method(deprecated_in((2, 6, 0)))
406
def valid_commits_message(self, count):
407
"""returns message for number of commits"""
408
return valid_commits_message(count)
410
@deprecated_method(deprecated_in((2, 6, 0)))
411
def unknown_key_message(self, count):
412
"""returns message for number of commits"""
413
return unknown_key_message(count)
415
@deprecated_method(deprecated_in((2, 6, 0)))
416
def commit_not_valid_message(self, count):
417
"""returns message for number of commits"""
418
return commit_not_valid_message(count)
420
@deprecated_method(deprecated_in((2, 6, 0)))
421
def commit_not_signed_message(self, count):
422
"""returns message for number of commits"""
423
return commit_not_signed_message(count)
425
@deprecated_method(deprecated_in((2, 6, 0)))
426
def expired_commit_message(self, count):
427
"""returns message for number of commits"""
428
return expired_commit_message(count)
431
def valid_commits_message(count):
432
"""returns message for number of commits"""
433
return gettext(u"{0} commits with valid signatures").format(
434
count[SIGNATURE_VALID])
437
def unknown_key_message(count):
438
"""returns message for number of commits"""
439
return ngettext(u"{0} commit with unknown key",
440
u"{0} commits with unknown keys",
441
count[SIGNATURE_KEY_MISSING]).format(
442
count[SIGNATURE_KEY_MISSING])
445
def commit_not_valid_message(count):
446
"""returns message for number of commits"""
447
return ngettext(u"{0} commit not valid",
448
u"{0} commits not valid",
449
count[SIGNATURE_NOT_VALID]).format(
450
count[SIGNATURE_NOT_VALID])
453
def commit_not_signed_message(count):
454
"""returns message for number of commits"""
455
return ngettext(u"{0} commit not signed",
456
u"{0} commits not signed",
457
count[SIGNATURE_NOT_SIGNED]).format(
458
count[SIGNATURE_NOT_SIGNED])
461
def expired_commit_message(count):
462
"""returns message for number of commits"""
463
return ngettext(u"{0} commit with key now expired",
464
u"{0} commits with key now expired",
465
count[SIGNATURE_EXPIRED]).format(
466
count[SIGNATURE_EXPIRED])
469
def verbose_expired_key_message(result, repo):
470
"""takes a verify result and returns list of expired key info"""
472
fingerprint_to_authors = {}
473
for rev_id, validity, fingerprint in result:
474
if validity == SIGNATURE_EXPIRED:
475
revision = repo.get_revision(rev_id)
476
authors = ', '.join(revision.get_apparent_authors())
477
signers.setdefault(fingerprint, 0)
478
signers[fingerprint] += 1
479
fingerprint_to_authors[fingerprint] = authors
481
for fingerprint, number in signers.items():
483
ngettext(u"{0} commit by author {1} with key {2} now expired",
484
u"{0} commits by author {1} with key {2} now expired",
486
number, fingerprint_to_authors[fingerprint], fingerprint))
490
def verbose_valid_message(result):
491
"""takes a verify result and returns list of signed commits strings"""
493
for rev_id, validity, uid in result:
494
if validity == SIGNATURE_VALID:
495
signers.setdefault(uid, 0)
498
for uid, number in signers.items():
499
result.append(ngettext(u"{0} signed {1} commit",
500
u"{0} signed {1} commits",
501
number).format(uid, number))
505
def verbose_not_valid_message(result, repo):
506
"""takes a verify result and returns list of not valid commit info"""
508
for rev_id, validity, empty in result:
509
if validity == SIGNATURE_NOT_VALID:
510
revision = repo.get_revision(rev_id)
511
authors = ', '.join(revision.get_apparent_authors())
512
signers.setdefault(authors, 0)
513
signers[authors] += 1
515
for authors, number in signers.items():
516
result.append(ngettext(u"{0} commit by author {1}",
517
u"{0} commits by author {1}",
518
number).format(number, authors))
522
def verbose_not_signed_message(result, repo):
523
"""takes a verify result and returns list of not signed commit info"""
525
for rev_id, validity, empty in result:
526
if validity == SIGNATURE_NOT_SIGNED:
527
revision = repo.get_revision(rev_id)
528
authors = ', '.join(revision.get_apparent_authors())
529
signers.setdefault(authors, 0)
530
signers[authors] += 1
532
for authors, number in signers.items():
533
result.append(ngettext(u"{0} commit by author {1}",
534
u"{0} commits by author {1}",
535
number).format(number, authors))
539
def verbose_missing_key_message(result):
540
"""takes a verify result and returns list of missing key info"""
542
for rev_id, validity, fingerprint in result:
543
if validity == SIGNATURE_KEY_MISSING:
544
signers.setdefault(fingerprint, 0)
545
signers[fingerprint] += 1
547
for fingerprint, number in signers.items():
548
result.append(ngettext(u"Unknown key {0} signed {1} commit",
549
u"Unknown key {0} signed {1} commits",
550
number).format(fingerprint, number))