18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
23
from bzrlib.lazy_import import lazy_import
25
from breezy.lazy_import import lazy_import
24
26
lazy_import(globals(), """
35
from breezy.i18n import (
50
SIGNATURE_KEY_MISSING = 1
51
SIGNATURE_NOT_VALID = 2
52
SIGNATURE_NOT_SIGNED = 3
60
class GpgNotInstalled(errors.DependencyNotPresent):
62
_fmt = 'python-gpg is not installed, it is needed to verify signatures'
64
def __init__(self, error):
65
errors.DependencyNotPresent.__init__(self, 'gpg', error)
68
class SigningFailed(errors.BzrError):
70
_fmt = 'Failed to GPG sign data: "%(error)s"'
72
def __init__(self, error):
73
errors.BzrError.__init__(self, error=error)
76
class SignatureVerificationFailed(errors.BzrError):
78
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
80
def __init__(self, error):
81
errors.BzrError.__init__(self, error=error)
84
def bulk_verify_signatures(repository, revids, strategy,
85
process_events_callback=None):
86
"""Do verifications on a set of revisions
88
:param repository: repository object
89
:param revids: list of revision ids to verify
90
:param strategy: GPG strategy to use
91
:param process_events_callback: method to call for GUI frontends that
92
want to keep their UI refreshed
94
:return: count dictionary of results of each type,
95
result list for each revision,
96
boolean True if all results are verified successfully
98
count = {SIGNATURE_VALID: 0,
99
SIGNATURE_KEY_MISSING: 0,
100
SIGNATURE_NOT_VALID: 0,
101
SIGNATURE_NOT_SIGNED: 0,
102
SIGNATURE_EXPIRED: 0}
104
all_verifiable = True
106
with ui.ui_factory.nested_progress_bar() as pb:
107
for i, (rev_id, verification_result, uid) in enumerate(
108
repository.verify_revision_signatures(
110
pb.update("verifying signatures", i, total)
111
result.append([rev_id, verification_result, uid])
112
count[verification_result] += 1
113
if verification_result != SIGNATURE_VALID:
114
all_verifiable = False
115
if process_events_callback is not None:
116
process_events_callback()
117
return (count, result, all_verifiable)
36
120
class DisabledGPGStrategy(object):
37
121
"""A GPG Strategy that makes everything fail."""
124
def verify_signatures_available():
39
127
def __init__(self, ignored):
40
128
"""Real strategies take a configuration."""
42
def sign(self, content):
43
raise errors.SigningFailed('Signing is disabled.')
130
def sign(self, content, mode):
131
raise SigningFailed('Signing is disabled.')
133
def verify(self, signed_data, signature=None):
134
raise SignatureVerificationFailed('Signature verification is \
137
def set_acceptable_keys(self, command_line_input):
46
141
class LoopbackGPGStrategy(object):
47
"""A GPG Strategy that acts like 'cat' - data is just passed through."""
142
"""A GPG Strategy that acts like 'cat' - data is just passed through.
147
def verify_signatures_available():
49
150
def __init__(self, ignored):
50
151
"""Real strategies take a configuration."""
52
def sign(self, content):
53
return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
54
"-----END PSEUDO-SIGNED CONTENT-----\n")
153
def sign(self, content, mode):
154
return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
155
b"-----END PSEUDO-SIGNED CONTENT-----\n")
157
def verify(self, signed_data, signature=None):
158
plain_text = signed_data.replace(b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"")
159
plain_text = plain_text.replace(b"-----END PSEUDO-SIGNED CONTENT-----\n", b"")
160
return SIGNATURE_VALID, None, plain_text
162
def set_acceptable_keys(self, command_line_input):
163
if command_line_input is not None:
164
patterns = command_line_input.split(",")
165
self.acceptable_keys = []
166
for pattern in patterns:
167
if pattern == "unknown":
170
self.acceptable_keys.append(pattern)
57
173
def _set_gpg_tty():
70
186
class GPGStrategy(object):
71
187
"""GPG Signing and checking facilities."""
73
def _command_line(self):
74
return [self._config.gpg_signing_command(), '--clearsign']
76
def __init__(self, config):
79
def sign(self, content):
80
if isinstance(content, unicode):
189
acceptable_keys = None
191
def __init__(self, config_stack):
192
self._config_stack = config_stack
195
self.context = gpg.Context()
196
self.context.armor = True
197
self.context.signers = self._get_signing_keys()
198
except ImportError as error:
199
pass # can't use verify()
201
def _get_signing_keys(self):
203
keyname = self._config_stack.get('gpg_signing_key')
206
return [self.context.get_key(keyname)]
207
except gpg.errors.KeyNotFound:
210
if keyname is None or keyname == 'default':
211
# 'default' or not setting gpg_signing_key at all means we should
212
# use the user email address
213
keyname = config.extract_email_address(self._config_stack.get('email'))
214
possible_keys = self.context.keylist(keyname, secret=True)
216
return [next(possible_keys)]
217
except StopIteration:
221
def verify_signatures_available():
223
check if this strategy can verify signatures
225
:return: boolean if this strategy can verify signatures
230
except ImportError as error:
233
def sign(self, content, mode):
235
if isinstance(content, text_type):
81
236
raise errors.BzrBadParameterUnicode('content')
82
ui.ui_factory.clear_term()
84
preexec_fn = _set_gpg_tty
85
if sys.platform == 'win32':
86
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
89
process = subprocess.Popen(self._command_line(),
90
stdin=subprocess.PIPE,
91
stdout=subprocess.PIPE,
92
preexec_fn=preexec_fn)
94
result = process.communicate(content)[0]
95
if process.returncode is None:
97
if process.returncode != 0:
98
raise errors.SigningFailed(self._command_line())
101
if e.errno == errno.EPIPE:
102
raise errors.SigningFailed(self._command_line())
238
plain_text = gpg.Data(content)
240
output, result = self.context.sign(
242
MODE_DETACH: gpg.constants.sig.mode.DETACH,
243
MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
244
MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
246
except gpg.errors.GPGMEError as error:
247
raise SigningFailed(str(error))
251
def verify(self, signed_data, signature=None):
252
"""Check content has a valid signature.
254
:param signed_data; Signed data
255
:param signature: optional signature (if detached)
257
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
261
except ImportError as error:
262
raise errors.GpgNotInstalled(error)
264
signed_data = gpg.Data(signed_data)
266
signature = gpg.Data(signature)
268
plain_output, result = self.context.verify(signed_data, signature)
269
except gpg.errors.BadSignatures as error:
270
fingerprint = error.result.signatures[0].fpr
271
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
272
expires = self.context.get_key(error.result.signatures[0].fpr).subkeys[0].expires
273
if expires > error.result.signatures[0].timestamp:
274
# The expired key was not expired at time of signing.
275
# test_verify_expired_but_valid()
276
return SIGNATURE_EXPIRED, fingerprint[-8:], None
106
# bad subprocess parameters, should never happen.
109
if e.errno == errno.ENOENT:
110
# gpg is not installed
111
raise errors.SigningFailed(self._command_line())
278
# I can't work out how to create a test where the signature
279
# was expired at the time of signing.
280
return SIGNATURE_NOT_VALID, None, None
282
# GPG does not know this key.
283
# test_verify_unknown_key()
284
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_MISSING:
285
return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
287
return SIGNATURE_NOT_VALID, None, None
288
except gpg.errors.GPGMEError as error:
289
raise SignatureVerificationFailed(error)
291
# No result if input is invalid.
292
# test_verify_invalid()
293
if len(result.signatures) == 0:
294
return SIGNATURE_NOT_VALID, None, plain_output
296
# User has specified a list of acceptable keys, check our result is in
297
# it. test_verify_unacceptable_key()
298
fingerprint = result.signatures[0].fpr
299
if self.acceptable_keys is not None:
300
if not fingerprint in self.acceptable_keys:
301
return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
302
# Yay gpg set the valid bit.
303
# Can't write a test for this one as you can't set a key to be
305
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
306
key = self.context.get_key(fingerprint)
307
name = key.uids[0].name
308
email = key.uids[0].email
309
return SIGNATURE_VALID, name.decode('utf-8') + u" <" + email.decode('utf-8') + u">", plain_output
310
# Sigsum_red indicates a problem, unfortunatly I have not been able
311
# to write any tests which actually set this.
312
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
313
return SIGNATURE_NOT_VALID, None, plain_output
314
# Summary isn't set if sig is valid but key is untrusted but if user
315
# has explicity set the key as acceptable we can validate it.
316
if result.signatures[0].summary == 0 and self.acceptable_keys is not None:
317
if fingerprint in self.acceptable_keys:
318
# test_verify_untrusted_but_accepted()
319
return SIGNATURE_VALID, None, plain_output
320
# test_verify_valid_but_untrusted()
321
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
322
return SIGNATURE_NOT_VALID, None, plain_output
323
# Other error types such as revoked keys should (I think) be caught by
324
# SIGSUM_RED so anything else means something is buggy.
325
raise SignatureVerificationFailed(
326
"Unknown GnuPG key verification result")
328
def set_acceptable_keys(self, command_line_input):
329
"""Set the acceptable keys for verifying with this GPGStrategy.
331
:param command_line_input: comma separated list of patterns from
336
acceptable_keys_config = self._config_stack.get('acceptable_keys')
337
if acceptable_keys_config is not None:
338
patterns = acceptable_keys_config
339
if command_line_input is not None: # command line overrides config
340
patterns = command_line_input.split(',')
343
self.acceptable_keys = []
344
for pattern in patterns:
345
result = self.context.keylist(pattern)
349
self.acceptable_keys.append(key.subkeys[0].fpr)
350
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
353
"No GnuPG key results for pattern: {0}"
357
def valid_commits_message(count):
358
"""returns message for number of commits"""
359
return gettext(u"{0} commits with valid signatures").format(
360
count[SIGNATURE_VALID])
363
def unknown_key_message(count):
364
"""returns message for number of commits"""
365
return ngettext(u"{0} commit with unknown key",
366
u"{0} commits with unknown keys",
367
count[SIGNATURE_KEY_MISSING]).format(
368
count[SIGNATURE_KEY_MISSING])
371
def commit_not_valid_message(count):
372
"""returns message for number of commits"""
373
return ngettext(u"{0} commit not valid",
374
u"{0} commits not valid",
375
count[SIGNATURE_NOT_VALID]).format(
376
count[SIGNATURE_NOT_VALID])
379
def commit_not_signed_message(count):
380
"""returns message for number of commits"""
381
return ngettext(u"{0} commit not signed",
382
u"{0} commits not signed",
383
count[SIGNATURE_NOT_SIGNED]).format(
384
count[SIGNATURE_NOT_SIGNED])
387
def expired_commit_message(count):
388
"""returns message for number of commits"""
389
return ngettext(u"{0} commit with key now expired",
390
u"{0} commits with key now expired",
391
count[SIGNATURE_EXPIRED]).format(
392
count[SIGNATURE_EXPIRED])
395
def verbose_expired_key_message(result, repo):
396
"""takes a verify result and returns list of expired key info"""
398
fingerprint_to_authors = {}
399
for rev_id, validity, fingerprint in result:
400
if validity == SIGNATURE_EXPIRED:
401
revision = repo.get_revision(rev_id)
402
authors = ', '.join(revision.get_apparent_authors())
403
signers.setdefault(fingerprint, 0)
404
signers[fingerprint] += 1
405
fingerprint_to_authors[fingerprint] = authors
407
for fingerprint, number in signers.items():
409
ngettext(u"{0} commit by author {1} with key {2} now expired",
410
u"{0} commits by author {1} with key {2} now expired",
412
number, fingerprint_to_authors[fingerprint], fingerprint))
416
def verbose_valid_message(result):
417
"""takes a verify result and returns list of signed commits strings"""
419
for rev_id, validity, uid in result:
420
if validity == SIGNATURE_VALID:
421
signers.setdefault(uid, 0)
424
for uid, number in signers.items():
425
result.append(ngettext(u"{0} signed {1} commit",
426
u"{0} signed {1} commits",
427
number).format(uid, number))
431
def verbose_not_valid_message(result, repo):
432
"""takes a verify result and returns list of not valid commit info"""
434
for rev_id, validity, empty in result:
435
if validity == SIGNATURE_NOT_VALID:
436
revision = repo.get_revision(rev_id)
437
authors = ', '.join(revision.get_apparent_authors())
438
signers.setdefault(authors, 0)
439
signers[authors] += 1
441
for authors, number in signers.items():
442
result.append(ngettext(u"{0} commit by author {1}",
443
u"{0} commits by author {1}",
444
number).format(number, authors))
448
def verbose_not_signed_message(result, repo):
449
"""takes a verify result and returns list of not signed commit info"""
451
for rev_id, validity, empty in result:
452
if validity == SIGNATURE_NOT_SIGNED:
453
revision = repo.get_revision(rev_id)
454
authors = ', '.join(revision.get_apparent_authors())
455
signers.setdefault(authors, 0)
456
signers[authors] += 1
458
for authors, number in signers.items():
459
result.append(ngettext(u"{0} commit by author {1}",
460
u"{0} commits by author {1}",
461
number).format(number, authors))
465
def verbose_missing_key_message(result):
466
"""takes a verify result and returns list of missing key info"""
468
for rev_id, validity, fingerprint in result:
469
if validity == SIGNATURE_KEY_MISSING:
470
signers.setdefault(fingerprint, 0)
471
signers[fingerprint] += 1
473
for fingerprint, number in list(signers.items()):
474
result.append(ngettext(u"Unknown key {0} signed {1} commit",
475
u"Unknown key {0} signed {1} commits",
476
number).format(fingerprint, number))