18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
24
from breezy.lazy_import import lazy_import
23
from bzrlib.lazy_import import lazy_import
25
24
lazy_import(globals(), """
31
from breezy.i18n import (
44
# verification results
46
SIGNATURE_KEY_MISSING = 1
47
SIGNATURE_NOT_VALID = 2
48
SIGNATURE_NOT_SIGNED = 3
56
class GpgNotInstalled(errors.DependencyNotPresent):
58
_fmt = ('python-gpg is not installed, it is needed to create or '
59
'verify signatures. %(error)s')
61
def __init__(self, error):
62
errors.DependencyNotPresent.__init__(self, 'gpg', error)
65
class SigningFailed(errors.BzrError):
67
_fmt = 'Failed to GPG sign data: "%(error)s"'
69
def __init__(self, error):
70
errors.BzrError.__init__(self, error=error)
73
class SignatureVerificationFailed(errors.BzrError):
75
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
77
def __init__(self, error):
78
errors.BzrError.__init__(self, error=error)
81
def bulk_verify_signatures(repository, revids, strategy,
82
process_events_callback=None):
83
"""Do verifications on a set of revisions
85
:param repository: repository object
86
:param revids: list of revision ids to verify
87
:param strategy: GPG strategy to use
88
:param process_events_callback: method to call for GUI frontends that
89
want to keep their UI refreshed
91
:return: count dictionary of results of each type,
92
result list for each revision,
93
boolean True if all results are verified successfully
95
count = {SIGNATURE_VALID: 0,
96
SIGNATURE_KEY_MISSING: 0,
97
SIGNATURE_NOT_VALID: 0,
98
SIGNATURE_NOT_SIGNED: 0,
101
all_verifiable = True
103
with ui.ui_factory.nested_progress_bar() as pb:
104
for i, (rev_id, verification_result, uid) in enumerate(
105
repository.verify_revision_signatures(
107
pb.update("verifying signatures", i, total)
108
result.append([rev_id, verification_result, uid])
109
count[verification_result] += 1
110
if verification_result != SIGNATURE_VALID:
111
all_verifiable = False
112
if process_events_callback is not None:
113
process_events_callback()
114
return (count, result, all_verifiable)
117
36
class DisabledGPGStrategy(object):
118
37
"""A GPG Strategy that makes everything fail."""
121
def verify_signatures_available():
124
39
def __init__(self, ignored):
125
40
"""Real strategies take a configuration."""
127
def sign(self, content, mode):
128
raise SigningFailed('Signing is disabled.')
130
def verify(self, signed_data, signature=None):
131
raise SignatureVerificationFailed('Signature verification is \
134
def set_acceptable_keys(self, command_line_input):
42
def sign(self, content):
43
raise errors.SigningFailed('Signing is disabled.')
138
46
class LoopbackGPGStrategy(object):
139
"""A GPG Strategy that acts like 'cat' - data is just passed through.
144
def verify_signatures_available():
47
"""A GPG Strategy that acts like 'cat' - data is just passed through."""
147
49
def __init__(self, ignored):
148
50
"""Real strategies take a configuration."""
150
def sign(self, content, mode):
151
return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content
152
+ b"-----END PSEUDO-SIGNED CONTENT-----\n")
154
def verify(self, signed_data, signature=None):
155
plain_text = signed_data.replace(
156
b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"")
157
plain_text = plain_text.replace(
158
b"-----END PSEUDO-SIGNED CONTENT-----\n", b"")
159
return SIGNATURE_VALID, None, plain_text
161
def set_acceptable_keys(self, command_line_input):
162
if command_line_input is not None:
163
patterns = command_line_input.split(",")
164
self.acceptable_keys = []
165
for pattern in patterns:
166
if pattern == "unknown":
169
self.acceptable_keys.append(pattern)
52
def sign(self, content):
53
return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
54
"-----END PSEUDO-SIGNED CONTENT-----\n")
172
57
def _set_gpg_tty():
185
70
class GPGStrategy(object):
186
71
"""GPG Signing and checking facilities."""
188
acceptable_keys = None
190
def __init__(self, config_stack):
191
self._config_stack = config_stack
73
def _command_line(self):
74
return [self._config.gpg_signing_command(), '--clearsign']
76
def __init__(self, config):
79
def sign(self, content):
80
if isinstance(content, unicode):
81
raise errors.BzrBadParameterUnicode('content')
82
ui.ui_factory.clear_term()
84
preexec_fn = _set_gpg_tty
85
if sys.platform == 'win32':
86
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
194
self.context = gpg.Context()
195
self.context.armor = True
196
self.context.signers = self._get_signing_keys()
198
pass # can't use verify()
200
def _get_signing_keys(self):
202
keyname = self._config_stack.get('gpg_signing_key')
203
if keyname == 'default':
204
# Leave things to gpg
89
process = subprocess.Popen(self._command_line(),
90
stdin=subprocess.PIPE,
91
stdout=subprocess.PIPE,
92
preexec_fn=preexec_fn)
209
return [self.context.get_key(keyname)]
210
except gpg.errors.KeyNotFound:
214
# not setting gpg_signing_key at all means we should
215
# use the user email address
216
keyname = config.extract_email_address(
217
self._config_stack.get('email'))
218
if keyname == 'default':
220
possible_keys = self.context.keylist(keyname, secret=True)
222
return [next(possible_keys)]
223
except StopIteration:
227
def verify_signatures_available():
229
check if this strategy can verify signatures
231
:return: boolean if this strategy can verify signatures
234
import gpg # noqa: F401
239
def sign(self, content, mode):
242
except ImportError as error:
243
raise GpgNotInstalled(
244
'Set create_signatures=no to disable creating signatures.')
246
if isinstance(content, text_type):
247
raise errors.BzrBadParameterUnicode('content')
249
plain_text = gpg.Data(content)
251
output, result = self.context.sign(
253
MODE_DETACH: gpg.constants.sig.mode.DETACH,
254
MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
255
MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
257
except gpg.errors.GPGMEError as error:
258
raise SigningFailed(str(error))
262
def verify(self, signed_data, signature=None):
263
"""Check content has a valid signature.
265
:param signed_data; Signed data
266
:param signature: optional signature (if detached)
268
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
272
except ImportError as error:
273
raise GpgNotInstalled(
274
'Set check_signatures=ignore to disable verifying signatures.')
276
signed_data = gpg.Data(signed_data)
278
signature = gpg.Data(signature)
280
plain_output, result = self.context.verify(signed_data, signature)
281
except gpg.errors.BadSignatures as error:
282
fingerprint = error.result.signatures[0].fpr
283
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
284
expires = self.context.get_key(
285
error.result.signatures[0].fpr).subkeys[0].expires
286
if expires > error.result.signatures[0].timestamp:
287
# The expired key was not expired at time of signing.
288
# test_verify_expired_but_valid()
289
return SIGNATURE_EXPIRED, fingerprint[-8:], None
94
result = process.communicate(content)[0]
95
if process.returncode is None:
97
if process.returncode != 0:
98
raise errors.SigningFailed(self._command_line())
101
if e.errno == errno.EPIPE:
102
raise errors.SigningFailed(self._command_line())
291
# I can't work out how to create a test where the signature
292
# was expired at the time of signing.
293
return SIGNATURE_NOT_VALID, None, None
295
# GPG does not know this key.
296
# test_verify_unknown_key()
297
if (error.result.signatures[0].summary &
298
gpg.constants.SIGSUM_KEY_MISSING):
299
return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
301
return SIGNATURE_NOT_VALID, None, None
302
except gpg.errors.GPGMEError as error:
303
raise SignatureVerificationFailed(error)
305
# No result if input is invalid.
306
# test_verify_invalid()
307
if len(result.signatures) == 0:
308
return SIGNATURE_NOT_VALID, None, plain_output
310
# User has specified a list of acceptable keys, check our result is in
311
# it. test_verify_unacceptable_key()
312
fingerprint = result.signatures[0].fpr
313
if self.acceptable_keys is not None:
314
if fingerprint not in self.acceptable_keys:
315
return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
316
# Yay gpg set the valid bit.
317
# Can't write a test for this one as you can't set a key to be
319
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
320
key = self.context.get_key(fingerprint)
321
name = key.uids[0].name
322
if isinstance(name, bytes):
323
name = name.decode('utf-8')
324
email = key.uids[0].email
325
if isinstance(email, bytes):
326
email = email.decode('utf-8')
327
return (SIGNATURE_VALID, name + u" <" + email + u">", plain_output)
328
# Sigsum_red indicates a problem, unfortunatly I have not been able
329
# to write any tests which actually set this.
330
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
331
return SIGNATURE_NOT_VALID, None, plain_output
332
# Summary isn't set if sig is valid but key is untrusted but if user
333
# has explicity set the key as acceptable we can validate it.
334
if (result.signatures[0].summary == 0 and
335
self.acceptable_keys is not None):
336
if fingerprint in self.acceptable_keys:
337
# test_verify_untrusted_but_accepted()
338
return SIGNATURE_VALID, None, plain_output
339
# test_verify_valid_but_untrusted()
340
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
341
return SIGNATURE_NOT_VALID, None, plain_output
342
# Other error types such as revoked keys should (I think) be caught by
343
# SIGSUM_RED so anything else means something is buggy.
344
raise SignatureVerificationFailed(
345
"Unknown GnuPG key verification result")
347
def set_acceptable_keys(self, command_line_input):
348
"""Set the acceptable keys for verifying with this GPGStrategy.
350
:param command_line_input: comma separated list of patterns from
355
acceptable_keys_config = self._config_stack.get('acceptable_keys')
356
if acceptable_keys_config is not None:
357
patterns = acceptable_keys_config
358
if command_line_input is not None: # command line overrides config
359
patterns = command_line_input.split(',')
362
self.acceptable_keys = []
363
for pattern in patterns:
364
result = self.context.keylist(pattern)
368
self.acceptable_keys.append(key.subkeys[0].fpr)
369
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
372
"No GnuPG key results for pattern: {0}"
376
def valid_commits_message(count):
377
"""returns message for number of commits"""
378
return gettext(u"{0} commits with valid signatures").format(
379
count[SIGNATURE_VALID])
382
def unknown_key_message(count):
383
"""returns message for number of commits"""
384
return ngettext(u"{0} commit with unknown key",
385
u"{0} commits with unknown keys",
386
count[SIGNATURE_KEY_MISSING]).format(
387
count[SIGNATURE_KEY_MISSING])
390
def commit_not_valid_message(count):
391
"""returns message for number of commits"""
392
return ngettext(u"{0} commit not valid",
393
u"{0} commits not valid",
394
count[SIGNATURE_NOT_VALID]).format(
395
count[SIGNATURE_NOT_VALID])
398
def commit_not_signed_message(count):
399
"""returns message for number of commits"""
400
return ngettext(u"{0} commit not signed",
401
u"{0} commits not signed",
402
count[SIGNATURE_NOT_SIGNED]).format(
403
count[SIGNATURE_NOT_SIGNED])
406
def expired_commit_message(count):
407
"""returns message for number of commits"""
408
return ngettext(u"{0} commit with key now expired",
409
u"{0} commits with key now expired",
410
count[SIGNATURE_EXPIRED]).format(
411
count[SIGNATURE_EXPIRED])
414
def verbose_expired_key_message(result, repo):
415
"""takes a verify result and returns list of expired key info"""
417
fingerprint_to_authors = {}
418
for rev_id, validity, fingerprint in result:
419
if validity == SIGNATURE_EXPIRED:
420
revision = repo.get_revision(rev_id)
421
authors = ', '.join(revision.get_apparent_authors())
422
signers.setdefault(fingerprint, 0)
423
signers[fingerprint] += 1
424
fingerprint_to_authors[fingerprint] = authors
426
for fingerprint, number in signers.items():
428
ngettext(u"{0} commit by author {1} with key {2} now expired",
429
u"{0} commits by author {1} with key {2} now expired",
431
number, fingerprint_to_authors[fingerprint], fingerprint))
435
def verbose_valid_message(result):
436
"""takes a verify result and returns list of signed commits strings"""
438
for rev_id, validity, uid in result:
439
if validity == SIGNATURE_VALID:
440
signers.setdefault(uid, 0)
443
for uid, number in signers.items():
444
result.append(ngettext(u"{0} signed {1} commit",
445
u"{0} signed {1} commits",
446
number).format(uid, number))
450
def verbose_not_valid_message(result, repo):
451
"""takes a verify result and returns list of not valid commit info"""
453
for rev_id, validity, empty in result:
454
if validity == SIGNATURE_NOT_VALID:
455
revision = repo.get_revision(rev_id)
456
authors = ', '.join(revision.get_apparent_authors())
457
signers.setdefault(authors, 0)
458
signers[authors] += 1
460
for authors, number in signers.items():
461
result.append(ngettext(u"{0} commit by author {1}",
462
u"{0} commits by author {1}",
463
number).format(number, authors))
467
def verbose_not_signed_message(result, repo):
468
"""takes a verify result and returns list of not signed commit info"""
470
for rev_id, validity, empty in result:
471
if validity == SIGNATURE_NOT_SIGNED:
472
revision = repo.get_revision(rev_id)
473
authors = ', '.join(revision.get_apparent_authors())
474
signers.setdefault(authors, 0)
475
signers[authors] += 1
477
for authors, number in signers.items():
478
result.append(ngettext(u"{0} commit by author {1}",
479
u"{0} commits by author {1}",
480
number).format(number, authors))
484
def verbose_missing_key_message(result):
485
"""takes a verify result and returns list of missing key info"""
487
for rev_id, validity, fingerprint in result:
488
if validity == SIGNATURE_KEY_MISSING:
489
signers.setdefault(fingerprint, 0)
490
signers[fingerprint] += 1
492
for fingerprint, number in list(signers.items()):
493
result.append(ngettext(u"Unknown key {0} signed {1} commit",
494
u"Unknown key {0} signed {1} commits",
495
number).format(fingerprint, number))
106
# bad subprocess parameters, should never happen.
109
if e.errno == errno.ENOENT:
110
# gpg is not installed
111
raise errors.SigningFailed(self._command_line())