18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
23
from bzrlib.lazy_import import lazy_import
25
from breezy.lazy_import import lazy_import
24
26
lazy_import(globals(), """
35
from breezy.i18n import (
50
SIGNATURE_KEY_MISSING = 1
51
SIGNATURE_NOT_VALID = 2
52
SIGNATURE_NOT_SIGNED = 3
56
class GpgNotInstalled(errors.DependencyNotPresent):
58
_fmt = 'python-gpg is not installed, it is needed to verify signatures'
60
def __init__(self, error):
61
errors.DependencyNotPresent.__init__(self, 'gpg', error)
64
class SigningFailed(errors.BzrError):
66
_fmt = 'Failed to GPG sign data: "%(error)s"'
68
def __init__(self, error):
69
errors.BzrError.__init__(self, error=error)
72
class SignatureVerificationFailed(errors.BzrError):
74
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
76
def __init__(self, error):
77
errors.BzrError.__init__(self, error=error)
80
def bulk_verify_signatures(repository, revids, strategy,
81
process_events_callback=None):
82
"""Do verifications on a set of revisions
84
:param repository: repository object
85
:param revids: list of revision ids to verify
86
:param strategy: GPG strategy to use
87
:param process_events_callback: method to call for GUI frontends that
88
want to keep their UI refreshed
90
:return: count dictionary of results of each type,
91
result list for each revision,
92
boolean True if all results are verified successfully
94
count = {SIGNATURE_VALID: 0,
95
SIGNATURE_KEY_MISSING: 0,
96
SIGNATURE_NOT_VALID: 0,
97
SIGNATURE_NOT_SIGNED: 0,
100
all_verifiable = True
102
pb = ui.ui_factory.nested_progress_bar()
104
for i, (rev_id, verification_result, uid) in enumerate(
105
repository.verify_revision_signatures(
107
pb.update("verifying signatures", i, total)
108
result.append([rev_id, verification_result, uid])
109
count[verification_result] += 1
110
if verification_result != SIGNATURE_VALID:
111
all_verifiable = False
112
if process_events_callback is not None:
113
process_events_callback()
116
return (count, result, all_verifiable)
36
119
class DisabledGPGStrategy(object):
37
120
"""A GPG Strategy that makes everything fail."""
123
def verify_signatures_available():
39
126
def __init__(self, ignored):
40
127
"""Real strategies take a configuration."""
42
129
def sign(self, content):
43
raise errors.SigningFailed('Signing is disabled.')
130
raise SigningFailed('Signing is disabled.')
132
def verify(self, content, testament):
133
raise SignatureVerificationFailed('Signature verification is \
136
def set_acceptable_keys(self, command_line_input):
46
140
class LoopbackGPGStrategy(object):
47
"""A GPG Strategy that acts like 'cat' - data is just passed through."""
141
"""A GPG Strategy that acts like 'cat' - data is just passed through.
146
def verify_signatures_available():
49
149
def __init__(self, ignored):
50
150
"""Real strategies take a configuration."""
70
183
class GPGStrategy(object):
71
184
"""GPG Signing and checking facilities."""
73
def _command_line(self):
74
return [self._config.gpg_signing_command(), '--clearsign']
76
def __init__(self, config):
186
acceptable_keys = None
188
def __init__(self, config_stack):
189
self._config_stack = config_stack
192
self.context = gpg.Context()
193
except ImportError as error:
194
pass # can't use verify()
196
self.context.signers = self._get_signing_keys()
198
def _get_signing_keys(self):
200
keyname = self._config_stack.get('gpg_signing_key')
203
return [self.context.get_key(keyname)]
204
except gpg.errors.KeyNotFound:
207
if keyname is None or keyname == 'default':
208
# 'default' or not setting gpg_signing_key at all means we should
209
# use the user email address
210
keyname = config.extract_email_address(self._config_stack.get('email'))
211
possible_keys = self.context.keylist(keyname, secret=True)
213
return [possible_keys.next()]
214
except StopIteration:
218
def verify_signatures_available():
220
check if this strategy can verify signatures
222
:return: boolean if this strategy can verify signatures
227
except ImportError as error:
79
230
def sign(self, content):
80
232
if isinstance(content, unicode):
81
233
raise errors.BzrBadParameterUnicode('content')
82
ui.ui_factory.clear_term()
84
preexec_fn = _set_gpg_tty
85
if sys.platform == 'win32':
86
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
89
process = subprocess.Popen(self._command_line(),
90
stdin=subprocess.PIPE,
91
stdout=subprocess.PIPE,
92
preexec_fn=preexec_fn)
94
result = process.communicate(content)[0]
95
if process.returncode is None:
97
if process.returncode != 0:
98
raise errors.SigningFailed(self._command_line())
101
if e.errno == errno.EPIPE:
102
raise errors.SigningFailed(self._command_line())
235
plain_text = gpg.Data(content)
237
output, result = self.context.sign(
238
plain_text, mode=gpg.constants.sig.mode.CLEAR)
239
except gpg.errors.GPGMEError as error:
240
raise SigningFailed(str(error))
244
def verify(self, content, testament):
245
"""Check content has a valid signature.
247
:param content: the commit signature
248
:param testament: the valid testament string for the commit
250
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
254
except ImportError as error:
255
raise errors.GpgNotInstalled(error)
257
signature = gpg.Data(content)
260
plain_output, result = self.context.verify(signature)
261
except gpg.errors.BadSignatures as error:
262
fingerprint = error.result.signatures[0].fpr
263
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
264
expires = self.context.get_key(error.result.signatures[0].fpr).subkeys[0].expires
265
if expires > error.result.signatures[0].timestamp:
266
# The expired key was not expired at time of signing.
267
# test_verify_expired_but_valid()
268
return SIGNATURE_EXPIRED, fingerprint[-8:]
106
# bad subprocess parameters, should never happen.
109
if e.errno == errno.ENOENT:
110
# gpg is not installed
111
raise errors.SigningFailed(self._command_line())
270
# I can't work out how to create a test where the signature
271
# was expired at the time of signing.
272
return SIGNATURE_NOT_VALID, None
274
# GPG does not know this key.
275
# test_verify_unknown_key()
276
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_MISSING:
277
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
279
return SIGNATURE_NOT_VALID, None
280
except gpg.errors.GPGMEError as error:
281
raise SignatureVerificationFailed(error[2])
283
# No result if input is invalid.
284
# test_verify_invalid()
285
if len(result.signatures) == 0:
286
return SIGNATURE_NOT_VALID, None
287
# User has specified a list of acceptable keys, check our result is in
288
# it. test_verify_unacceptable_key()
289
fingerprint = result.signatures[0].fpr
290
if self.acceptable_keys is not None:
291
if not fingerprint in self.acceptable_keys:
292
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
293
# Check the signature actually matches the testament.
294
# test_verify_bad_testament()
295
if testament != plain_output:
296
return SIGNATURE_NOT_VALID, None
297
# Yay gpg set the valid bit.
298
# Can't write a test for this one as you can't set a key to be
300
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
301
key = self.context.get_key(fingerprint)
302
name = key.uids[0].name
303
email = key.uids[0].email
304
return SIGNATURE_VALID, name + " <" + email + ">"
305
# Sigsum_red indicates a problem, unfortunatly I have not been able
306
# to write any tests which actually set this.
307
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
308
return SIGNATURE_NOT_VALID, None
309
# Summary isn't set if sig is valid but key is untrusted but if user
310
# has explicity set the key as acceptable we can validate it.
311
if result.signatures[0].summary == 0 and self.acceptable_keys is not None:
312
if fingerprint in self.acceptable_keys:
313
# test_verify_untrusted_but_accepted()
314
return SIGNATURE_VALID, None
315
# test_verify_valid_but_untrusted()
316
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
317
return SIGNATURE_NOT_VALID, None
318
# Other error types such as revoked keys should (I think) be caught by
319
# SIGSUM_RED so anything else means something is buggy.
320
raise SignatureVerificationFailed(
321
"Unknown GnuPG key verification result")
323
def set_acceptable_keys(self, command_line_input):
324
"""Set the acceptable keys for verifying with this GPGStrategy.
326
:param command_line_input: comma separated list of patterns from
331
acceptable_keys_config = self._config_stack.get('acceptable_keys')
332
if acceptable_keys_config is not None:
333
patterns = acceptable_keys_config
334
if command_line_input is not None: # command line overrides config
335
patterns = command_line_input.split(',')
338
self.acceptable_keys = []
339
for pattern in patterns:
340
result = self.context.keylist(pattern)
344
self.acceptable_keys.append(key.subkeys[0].fpr)
345
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
348
"No GnuPG key results for pattern: {0}"
352
def valid_commits_message(count):
353
"""returns message for number of commits"""
354
return gettext(u"{0} commits with valid signatures").format(
355
count[SIGNATURE_VALID])
358
def unknown_key_message(count):
359
"""returns message for number of commits"""
360
return ngettext(u"{0} commit with unknown key",
361
u"{0} commits with unknown keys",
362
count[SIGNATURE_KEY_MISSING]).format(
363
count[SIGNATURE_KEY_MISSING])
366
def commit_not_valid_message(count):
367
"""returns message for number of commits"""
368
return ngettext(u"{0} commit not valid",
369
u"{0} commits not valid",
370
count[SIGNATURE_NOT_VALID]).format(
371
count[SIGNATURE_NOT_VALID])
374
def commit_not_signed_message(count):
375
"""returns message for number of commits"""
376
return ngettext(u"{0} commit not signed",
377
u"{0} commits not signed",
378
count[SIGNATURE_NOT_SIGNED]).format(
379
count[SIGNATURE_NOT_SIGNED])
382
def expired_commit_message(count):
383
"""returns message for number of commits"""
384
return ngettext(u"{0} commit with key now expired",
385
u"{0} commits with key now expired",
386
count[SIGNATURE_EXPIRED]).format(
387
count[SIGNATURE_EXPIRED])
390
def verbose_expired_key_message(result, repo):
391
"""takes a verify result and returns list of expired key info"""
393
fingerprint_to_authors = {}
394
for rev_id, validity, fingerprint in result:
395
if validity == SIGNATURE_EXPIRED:
396
revision = repo.get_revision(rev_id)
397
authors = ', '.join(revision.get_apparent_authors())
398
signers.setdefault(fingerprint, 0)
399
signers[fingerprint] += 1
400
fingerprint_to_authors[fingerprint] = authors
402
for fingerprint, number in signers.items():
404
ngettext(u"{0} commit by author {1} with key {2} now expired",
405
u"{0} commits by author {1} with key {2} now expired",
407
number, fingerprint_to_authors[fingerprint], fingerprint))
411
def verbose_valid_message(result):
412
"""takes a verify result and returns list of signed commits strings"""
414
for rev_id, validity, uid in result:
415
if validity == SIGNATURE_VALID:
416
signers.setdefault(uid, 0)
419
for uid, number in signers.items():
420
result.append(ngettext(u"{0} signed {1} commit",
421
u"{0} signed {1} commits",
422
number).format(uid, number))
426
def verbose_not_valid_message(result, repo):
427
"""takes a verify result and returns list of not valid commit info"""
429
for rev_id, validity, empty in result:
430
if validity == SIGNATURE_NOT_VALID:
431
revision = repo.get_revision(rev_id)
432
authors = ', '.join(revision.get_apparent_authors())
433
signers.setdefault(authors, 0)
434
signers[authors] += 1
436
for authors, number in signers.items():
437
result.append(ngettext(u"{0} commit by author {1}",
438
u"{0} commits by author {1}",
439
number).format(number, authors))
443
def verbose_not_signed_message(result, repo):
444
"""takes a verify result and returns list of not signed commit info"""
446
for rev_id, validity, empty in result:
447
if validity == SIGNATURE_NOT_SIGNED:
448
revision = repo.get_revision(rev_id)
449
authors = ', '.join(revision.get_apparent_authors())
450
signers.setdefault(authors, 0)
451
signers[authors] += 1
453
for authors, number in signers.items():
454
result.append(ngettext(u"{0} commit by author {1}",
455
u"{0} commits by author {1}",
456
number).format(number, authors))
460
def verbose_missing_key_message(result):
461
"""takes a verify result and returns list of missing key info"""
463
for rev_id, validity, fingerprint in result:
464
if validity == SIGNATURE_KEY_MISSING:
465
signers.setdefault(fingerprint, 0)
466
signers[fingerprint] += 1
468
for fingerprint, number in list(signers.items()):
469
result.append(ngettext(u"Unknown key {0} signed {1} commit",
470
u"Unknown key {0} signed {1} commits",
471
number).format(fingerprint, number))