18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
23
from bzrlib.lazy_import import lazy_import
24
from breezy.lazy_import import lazy_import
24
25
lazy_import(globals(), """
31
from breezy.i18n import (
44
# verification results
46
SIGNATURE_KEY_MISSING = 1
47
SIGNATURE_NOT_VALID = 2
48
SIGNATURE_NOT_SIGNED = 3
56
class GpgNotInstalled(errors.DependencyNotPresent):
58
_fmt = ('python-gpg is not installed, it is needed to create or '
59
'verify signatures. %(error)s')
61
def __init__(self, error):
62
errors.DependencyNotPresent.__init__(self, 'gpg', error)
65
class SigningFailed(errors.BzrError):
67
_fmt = 'Failed to GPG sign data: "%(error)s"'
69
def __init__(self, error):
70
errors.BzrError.__init__(self, error=error)
73
class SignatureVerificationFailed(errors.BzrError):
75
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
77
def __init__(self, error):
78
errors.BzrError.__init__(self, error=error)
81
def bulk_verify_signatures(repository, revids, strategy,
82
process_events_callback=None):
83
"""Do verifications on a set of revisions
85
:param repository: repository object
86
:param revids: list of revision ids to verify
87
:param strategy: GPG strategy to use
88
:param process_events_callback: method to call for GUI frontends that
89
want to keep their UI refreshed
91
:return: count dictionary of results of each type,
92
result list for each revision,
93
boolean True if all results are verified successfully
95
count = {SIGNATURE_VALID: 0,
96
SIGNATURE_KEY_MISSING: 0,
97
SIGNATURE_NOT_VALID: 0,
98
SIGNATURE_NOT_SIGNED: 0,
101
all_verifiable = True
103
with ui.ui_factory.nested_progress_bar() as pb:
104
for i, (rev_id, verification_result, uid) in enumerate(
105
repository.verify_revision_signatures(
107
pb.update("verifying signatures", i, total)
108
result.append([rev_id, verification_result, uid])
109
count[verification_result] += 1
110
if verification_result != SIGNATURE_VALID:
111
all_verifiable = False
112
if process_events_callback is not None:
113
process_events_callback()
114
return (count, result, all_verifiable)
36
117
class DisabledGPGStrategy(object):
37
118
"""A GPG Strategy that makes everything fail."""
121
def verify_signatures_available():
39
124
def __init__(self, ignored):
40
125
"""Real strategies take a configuration."""
42
def sign(self, content):
43
raise errors.SigningFailed('Signing is disabled.')
127
def sign(self, content, mode):
128
raise SigningFailed('Signing is disabled.')
130
def verify(self, signed_data, signature=None):
131
raise SignatureVerificationFailed('Signature verification is \
134
def set_acceptable_keys(self, command_line_input):
46
138
class LoopbackGPGStrategy(object):
47
"""A GPG Strategy that acts like 'cat' - data is just passed through."""
139
"""A GPG Strategy that acts like 'cat' - data is just passed through.
144
def verify_signatures_available():
49
147
def __init__(self, ignored):
50
148
"""Real strategies take a configuration."""
52
def sign(self, content):
53
return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
54
"-----END PSEUDO-SIGNED CONTENT-----\n")
150
def sign(self, content, mode):
151
return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content
152
+ b"-----END PSEUDO-SIGNED CONTENT-----\n")
154
def verify(self, signed_data, signature=None):
155
plain_text = signed_data.replace(
156
b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"")
157
plain_text = plain_text.replace(
158
b"-----END PSEUDO-SIGNED CONTENT-----\n", b"")
159
return SIGNATURE_VALID, None, plain_text
161
def set_acceptable_keys(self, command_line_input):
162
if command_line_input is not None:
163
patterns = command_line_input.split(",")
164
self.acceptable_keys = []
165
for pattern in patterns:
166
if pattern == "unknown":
169
self.acceptable_keys.append(pattern)
57
172
def _set_gpg_tty():
70
185
class GPGStrategy(object):
71
186
"""GPG Signing and checking facilities."""
73
def _command_line(self):
74
return [self._config.gpg_signing_command(), '--clearsign']
76
def __init__(self, config):
79
def sign(self, content):
80
if isinstance(content, unicode):
188
acceptable_keys = None
190
def __init__(self, config_stack):
191
self._config_stack = config_stack
194
self.context = gpg.Context()
195
self.context.armor = True
196
self.context.signers = self._get_signing_keys()
198
pass # can't use verify()
200
def _get_signing_keys(self):
202
keyname = self._config_stack.get('gpg_signing_key')
205
return [self.context.get_key(keyname)]
206
except gpg.errors.KeyNotFound:
209
if keyname is None or keyname == 'default':
210
# 'default' or not setting gpg_signing_key at all means we should
211
# use the user email address
212
keyname = config.extract_email_address(
213
self._config_stack.get('email'))
214
possible_keys = self.context.keylist(keyname, secret=True)
216
return [next(possible_keys)]
217
except StopIteration:
221
def verify_signatures_available():
223
check if this strategy can verify signatures
225
:return: boolean if this strategy can verify signatures
228
import gpg # noqa: F401
233
def sign(self, content, mode):
236
except ImportError as error:
237
raise GpgNotInstalled(
238
'Set create_signatures=no to disable creating signatures.')
240
if isinstance(content, text_type):
81
241
raise errors.BzrBadParameterUnicode('content')
82
ui.ui_factory.clear_term()
84
preexec_fn = _set_gpg_tty
85
if sys.platform == 'win32':
86
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
89
process = subprocess.Popen(self._command_line(),
90
stdin=subprocess.PIPE,
91
stdout=subprocess.PIPE,
92
preexec_fn=preexec_fn)
94
result = process.communicate(content)[0]
95
if process.returncode is None:
97
if process.returncode != 0:
98
raise errors.SigningFailed(self._command_line())
101
if e.errno == errno.EPIPE:
102
raise errors.SigningFailed(self._command_line())
243
plain_text = gpg.Data(content)
245
output, result = self.context.sign(
247
MODE_DETACH: gpg.constants.sig.mode.DETACH,
248
MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
249
MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
251
except gpg.errors.GPGMEError as error:
252
raise SigningFailed(str(error))
256
def verify(self, signed_data, signature=None):
257
"""Check content has a valid signature.
259
:param signed_data; Signed data
260
:param signature: optional signature (if detached)
262
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
266
except ImportError as error:
267
raise GpgNotInstalled(
268
'Set check_signatures=ignore to disable verifying signatures.')
270
signed_data = gpg.Data(signed_data)
272
signature = gpg.Data(signature)
274
plain_output, result = self.context.verify(signed_data, signature)
275
except gpg.errors.BadSignatures as error:
276
fingerprint = error.result.signatures[0].fpr
277
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
278
expires = self.context.get_key(
279
error.result.signatures[0].fpr).subkeys[0].expires
280
if expires > error.result.signatures[0].timestamp:
281
# The expired key was not expired at time of signing.
282
# test_verify_expired_but_valid()
283
return SIGNATURE_EXPIRED, fingerprint[-8:], None
106
# bad subprocess parameters, should never happen.
109
if e.errno == errno.ENOENT:
110
# gpg is not installed
111
raise errors.SigningFailed(self._command_line())
285
# I can't work out how to create a test where the signature
286
# was expired at the time of signing.
287
return SIGNATURE_NOT_VALID, None, None
289
# GPG does not know this key.
290
# test_verify_unknown_key()
291
if (error.result.signatures[0].summary &
292
gpg.constants.SIGSUM_KEY_MISSING):
293
return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
295
return SIGNATURE_NOT_VALID, None, None
296
except gpg.errors.GPGMEError as error:
297
raise SignatureVerificationFailed(error)
299
# No result if input is invalid.
300
# test_verify_invalid()
301
if len(result.signatures) == 0:
302
return SIGNATURE_NOT_VALID, None, plain_output
304
# User has specified a list of acceptable keys, check our result is in
305
# it. test_verify_unacceptable_key()
306
fingerprint = result.signatures[0].fpr
307
if self.acceptable_keys is not None:
308
if fingerprint not in self.acceptable_keys:
309
return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
310
# Yay gpg set the valid bit.
311
# Can't write a test for this one as you can't set a key to be
313
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
314
key = self.context.get_key(fingerprint)
315
name = key.uids[0].name
316
email = key.uids[0].email
319
name.decode('utf-8') + u" <" + email.decode('utf-8') + u">",
321
# Sigsum_red indicates a problem, unfortunatly I have not been able
322
# to write any tests which actually set this.
323
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
324
return SIGNATURE_NOT_VALID, None, plain_output
325
# Summary isn't set if sig is valid but key is untrusted but if user
326
# has explicity set the key as acceptable we can validate it.
327
if (result.signatures[0].summary == 0 and
328
self.acceptable_keys is not None):
329
if fingerprint in self.acceptable_keys:
330
# test_verify_untrusted_but_accepted()
331
return SIGNATURE_VALID, None, plain_output
332
# test_verify_valid_but_untrusted()
333
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
334
return SIGNATURE_NOT_VALID, None, plain_output
335
# Other error types such as revoked keys should (I think) be caught by
336
# SIGSUM_RED so anything else means something is buggy.
337
raise SignatureVerificationFailed(
338
"Unknown GnuPG key verification result")
340
def set_acceptable_keys(self, command_line_input):
341
"""Set the acceptable keys for verifying with this GPGStrategy.
343
:param command_line_input: comma separated list of patterns from
348
acceptable_keys_config = self._config_stack.get('acceptable_keys')
349
if acceptable_keys_config is not None:
350
patterns = acceptable_keys_config
351
if command_line_input is not None: # command line overrides config
352
patterns = command_line_input.split(',')
355
self.acceptable_keys = []
356
for pattern in patterns:
357
result = self.context.keylist(pattern)
361
self.acceptable_keys.append(key.subkeys[0].fpr)
362
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
365
"No GnuPG key results for pattern: {0}"
369
def valid_commits_message(count):
370
"""returns message for number of commits"""
371
return gettext(u"{0} commits with valid signatures").format(
372
count[SIGNATURE_VALID])
375
def unknown_key_message(count):
376
"""returns message for number of commits"""
377
return ngettext(u"{0} commit with unknown key",
378
u"{0} commits with unknown keys",
379
count[SIGNATURE_KEY_MISSING]).format(
380
count[SIGNATURE_KEY_MISSING])
383
def commit_not_valid_message(count):
384
"""returns message for number of commits"""
385
return ngettext(u"{0} commit not valid",
386
u"{0} commits not valid",
387
count[SIGNATURE_NOT_VALID]).format(
388
count[SIGNATURE_NOT_VALID])
391
def commit_not_signed_message(count):
392
"""returns message for number of commits"""
393
return ngettext(u"{0} commit not signed",
394
u"{0} commits not signed",
395
count[SIGNATURE_NOT_SIGNED]).format(
396
count[SIGNATURE_NOT_SIGNED])
399
def expired_commit_message(count):
400
"""returns message for number of commits"""
401
return ngettext(u"{0} commit with key now expired",
402
u"{0} commits with key now expired",
403
count[SIGNATURE_EXPIRED]).format(
404
count[SIGNATURE_EXPIRED])
407
def verbose_expired_key_message(result, repo):
408
"""takes a verify result and returns list of expired key info"""
410
fingerprint_to_authors = {}
411
for rev_id, validity, fingerprint in result:
412
if validity == SIGNATURE_EXPIRED:
413
revision = repo.get_revision(rev_id)
414
authors = ', '.join(revision.get_apparent_authors())
415
signers.setdefault(fingerprint, 0)
416
signers[fingerprint] += 1
417
fingerprint_to_authors[fingerprint] = authors
419
for fingerprint, number in signers.items():
421
ngettext(u"{0} commit by author {1} with key {2} now expired",
422
u"{0} commits by author {1} with key {2} now expired",
424
number, fingerprint_to_authors[fingerprint], fingerprint))
428
def verbose_valid_message(result):
429
"""takes a verify result and returns list of signed commits strings"""
431
for rev_id, validity, uid in result:
432
if validity == SIGNATURE_VALID:
433
signers.setdefault(uid, 0)
436
for uid, number in signers.items():
437
result.append(ngettext(u"{0} signed {1} commit",
438
u"{0} signed {1} commits",
439
number).format(uid, number))
443
def verbose_not_valid_message(result, repo):
444
"""takes a verify result and returns list of not valid commit info"""
446
for rev_id, validity, empty in result:
447
if validity == SIGNATURE_NOT_VALID:
448
revision = repo.get_revision(rev_id)
449
authors = ', '.join(revision.get_apparent_authors())
450
signers.setdefault(authors, 0)
451
signers[authors] += 1
453
for authors, number in signers.items():
454
result.append(ngettext(u"{0} commit by author {1}",
455
u"{0} commits by author {1}",
456
number).format(number, authors))
460
def verbose_not_signed_message(result, repo):
461
"""takes a verify result and returns list of not signed commit info"""
463
for rev_id, validity, empty in result:
464
if validity == SIGNATURE_NOT_SIGNED:
465
revision = repo.get_revision(rev_id)
466
authors = ', '.join(revision.get_apparent_authors())
467
signers.setdefault(authors, 0)
468
signers[authors] += 1
470
for authors, number in signers.items():
471
result.append(ngettext(u"{0} commit by author {1}",
472
u"{0} commits by author {1}",
473
number).format(number, authors))
477
def verbose_missing_key_message(result):
478
"""takes a verify result and returns list of missing key info"""
480
for rev_id, validity, fingerprint in result:
481
if validity == SIGNATURE_KEY_MISSING:
482
signers.setdefault(fingerprint, 0)
483
signers[fingerprint] += 1
485
for fingerprint, number in list(signers.items()):
486
result.append(ngettext(u"Unknown key {0} signed {1} commit",
487
u"Unknown key {0} signed {1} commits",
488
number).format(fingerprint, number))