18
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
24
from StringIO import StringIO
22
from breezy.lazy_import import lazy_import
26
from brzlib.lazy_import import lazy_import
23
27
lazy_import(globals(), """
29
from breezy.i18n import (
37
from brzlib.i18n import (
43
from brzlib.symbol_versioning import (
39
# verification results
40
49
SIGNATURE_VALID = 0
41
50
SIGNATURE_KEY_MISSING = 1
42
51
SIGNATURE_NOT_VALID = 2
43
52
SIGNATURE_NOT_SIGNED = 3
44
53
SIGNATURE_EXPIRED = 4
51
class GpgNotInstalled(errors.DependencyNotPresent):
53
_fmt = ('python-gpg is not installed, it is needed to create or '
54
'verify signatures. %(error)s')
56
def __init__(self, error):
57
errors.DependencyNotPresent.__init__(self, 'gpg', error)
60
class SigningFailed(errors.BzrError):
62
_fmt = 'Failed to GPG sign data: "%(error)s"'
64
def __init__(self, error):
65
errors.BzrError.__init__(self, error=error)
68
class SignatureVerificationFailed(errors.BzrError):
70
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
72
def __init__(self, error):
73
errors.BzrError.__init__(self, error=error)
76
56
def bulk_verify_signatures(repository, revids, strategy,
77
process_events_callback=None):
57
process_events_callback=None):
78
58
"""Do verifications on a set of revisions
80
60
:param repository: repository object
226
201
:return: boolean if this strategy can verify signatures
229
import gpg # noqa: F401
206
except ImportError, error:
234
def sign(self, content, mode):
237
except ImportError as error:
238
raise GpgNotInstalled(
239
'Set create_signatures=no to disable creating signatures.')
209
def _command_line(self):
210
key = self._config_stack.get('gpg_signing_key')
211
if key is None or key == 'default':
212
# 'default' or not setting gpg_signing_key at all means we should
213
# use the user email address
214
key = config.extract_email_address(self._config_stack.get('email'))
215
return [self._config_stack.get('gpg_signing_command'), '--clearsign',
241
if isinstance(content, str):
218
def sign(self, content):
219
if isinstance(content, unicode):
242
220
raise errors.BzrBadParameterUnicode('content')
221
ui.ui_factory.clear_term()
244
plain_text = gpg.Data(content)
223
preexec_fn = _set_gpg_tty
224
if sys.platform == 'win32':
225
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
246
output, result = self.context.sign(
248
MODE_DETACH: gpg.constants.sig.mode.DETACH,
249
MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
250
MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
252
except gpg.errors.GPGMEError as error:
253
raise SigningFailed(str(error))
257
def verify(self, signed_data, signature=None):
228
process = subprocess.Popen(self._command_line(),
229
stdin=subprocess.PIPE,
230
stdout=subprocess.PIPE,
231
preexec_fn=preexec_fn)
233
result = process.communicate(content)[0]
234
if process.returncode is None:
236
if process.returncode != 0:
237
raise errors.SigningFailed(self._command_line())
240
if e.errno == errno.EPIPE:
241
raise errors.SigningFailed(self._command_line())
245
# bad subprocess parameters, should never happen.
248
if e.errno == errno.ENOENT:
249
# gpg is not installed
250
raise errors.SigningFailed(self._command_line())
254
def verify(self, content, testament):
258
255
"""Check content has a valid signature.
260
:param signed_data; Signed data
261
:param signature: optional signature (if detached)
257
:param content: the commit signature
258
:param testament: the valid testament string for the commit
263
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
260
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
267
except ImportError as error:
268
raise GpgNotInstalled(
269
'Set check_signatures=ignore to disable verifying signatures.')
264
except ImportError, error:
265
raise errors.GpgmeNotInstalled(error)
271
signed_data = gpg.Data(signed_data)
273
signature = gpg.Data(signature)
267
signature = StringIO(content)
268
plain_output = StringIO()
275
plain_output, result = self.context.verify(signed_data, signature)
276
except gpg.errors.BadSignatures as error:
277
fingerprint = error.result.signatures[0].fpr
278
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
279
expires = self.context.get_key(
280
error.result.signatures[0].fpr).subkeys[0].expires
281
if expires > error.result.signatures[0].timestamp:
282
# The expired key was not expired at time of signing.
283
# test_verify_expired_but_valid()
284
return SIGNATURE_EXPIRED, fingerprint[-8:], None
286
# I can't work out how to create a test where the signature
287
# was expired at the time of signing.
288
return SIGNATURE_NOT_VALID, None, None
290
# GPG does not know this key.
291
# test_verify_unknown_key()
292
if (error.result.signatures[0].summary &
293
gpg.constants.SIGSUM_KEY_MISSING):
294
return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
296
return SIGNATURE_NOT_VALID, None, None
297
except gpg.errors.GPGMEError as error:
298
raise SignatureVerificationFailed(error)
270
result = self.context.verify(signature, None, plain_output)
271
except gpgme.GpgmeError,error:
272
raise errors.SignatureVerificationFailed(error[2])
300
274
# No result if input is invalid.
301
275
# test_verify_invalid()
302
if len(result.signatures) == 0:
303
return SIGNATURE_NOT_VALID, None, plain_output
277
return SIGNATURE_NOT_VALID, None
305
278
# User has specified a list of acceptable keys, check our result is in
306
279
# it. test_verify_unacceptable_key()
307
fingerprint = result.signatures[0].fpr
280
fingerprint = result[0].fpr
308
281
if self.acceptable_keys is not None:
309
if fingerprint not in self.acceptable_keys:
310
return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
311
# Yay gpg set the valid bit.
282
if not fingerprint in self.acceptable_keys:
283
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
284
# Check the signature actually matches the testament.
285
# test_verify_bad_testament()
286
if testament != plain_output.getvalue():
287
return SIGNATURE_NOT_VALID, None
288
# Yay gpgme set the valid bit.
312
289
# Can't write a test for this one as you can't set a key to be
314
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
290
# trusted using gpgme.
291
if result[0].summary & gpgme.SIGSUM_VALID:
315
292
key = self.context.get_key(fingerprint)
316
293
name = key.uids[0].name
317
if isinstance(name, bytes):
318
name = name.decode('utf-8')
319
294
email = key.uids[0].email
320
if isinstance(email, bytes):
321
email = email.decode('utf-8')
322
return (SIGNATURE_VALID, name + u" <" + email + u">", plain_output)
295
return SIGNATURE_VALID, name + " <" + email + ">"
323
296
# Sigsum_red indicates a problem, unfortunatly I have not been able
324
297
# to write any tests which actually set this.
325
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
326
return SIGNATURE_NOT_VALID, None, plain_output
298
if result[0].summary & gpgme.SIGSUM_RED:
299
return SIGNATURE_NOT_VALID, None
300
# GPG does not know this key.
301
# test_verify_unknown_key()
302
if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
303
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
327
304
# Summary isn't set if sig is valid but key is untrusted but if user
328
305
# has explicity set the key as acceptable we can validate it.
329
if (result.signatures[0].summary == 0 and
330
self.acceptable_keys is not None):
306
if result[0].summary == 0 and self.acceptable_keys is not None:
331
307
if fingerprint in self.acceptable_keys:
332
308
# test_verify_untrusted_but_accepted()
333
return SIGNATURE_VALID, None, plain_output
309
return SIGNATURE_VALID, None
334
310
# test_verify_valid_but_untrusted()
335
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
336
return SIGNATURE_NOT_VALID, None, plain_output
311
if result[0].summary == 0 and self.acceptable_keys is None:
312
return SIGNATURE_NOT_VALID, None
313
if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
314
expires = self.context.get_key(result[0].fpr).subkeys[0].expires
315
if expires > result[0].timestamp:
316
# The expired key was not expired at time of signing.
317
# test_verify_expired_but_valid()
318
return SIGNATURE_EXPIRED, fingerprint[-8:]
320
# I can't work out how to create a test where the signature
321
# was expired at the time of signing.
322
return SIGNATURE_NOT_VALID, None
323
# A signature from a revoked key gets this.
324
# test_verify_revoked_signature()
325
if ((result[0].summary & gpgme.SIGSUM_SYS_ERROR
326
or result[0].status.strerror == 'Certificate revoked')):
327
return SIGNATURE_NOT_VALID, None
337
328
# Other error types such as revoked keys should (I think) be caught by
338
329
# SIGSUM_RED so anything else means something is buggy.
339
raise SignatureVerificationFailed(
330
raise errors.SignatureVerificationFailed(
340
331
"Unknown GnuPG key verification result")
342
333
def set_acceptable_keys(self, command_line_input):
364
355
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
365
356
if not found_key:
366
357
trace.note(gettext(
367
"No GnuPG key results for pattern: {0}"
358
"No GnuPG key results for pattern: {0}"
361
@deprecated_method(deprecated_in((2, 6, 0)))
362
def do_verifications(self, revisions, repository,
363
process_events_callback=None):
364
"""do verifications on a set of revisions
366
:param revisions: list of revision ids to verify
367
:param repository: repository object
368
:param process_events_callback: method to call for GUI frontends that
369
want to keep their UI refreshed
371
:return: count dictionary of results of each type,
372
result list for each revision,
373
boolean True if all results are verified successfully
375
return bulk_verify_signatures(repository, revisions, self,
376
process_events_callback)
378
@deprecated_method(deprecated_in((2, 6, 0)))
379
def verbose_valid_message(self, result):
380
"""takes a verify result and returns list of signed commits strings"""
381
return verbose_valid_message(result)
383
@deprecated_method(deprecated_in((2, 6, 0)))
384
def verbose_not_valid_message(self, result, repo):
385
"""takes a verify result and returns list of not valid commit info"""
386
return verbose_not_valid_message(result, repo)
388
@deprecated_method(deprecated_in((2, 6, 0)))
389
def verbose_not_signed_message(self, result, repo):
390
"""takes a verify result and returns list of not signed commit info"""
391
return verbose_not_valid_message(result, repo)
393
@deprecated_method(deprecated_in((2, 6, 0)))
394
def verbose_missing_key_message(self, result):
395
"""takes a verify result and returns list of missing key info"""
396
return verbose_missing_key_message(result)
398
@deprecated_method(deprecated_in((2, 6, 0)))
399
def verbose_expired_key_message(self, result, repo):
400
"""takes a verify result and returns list of expired key info"""
401
return verbose_expired_key_message(result, repo)
403
@deprecated_method(deprecated_in((2, 6, 0)))
404
def valid_commits_message(self, count):
405
"""returns message for number of commits"""
406
return valid_commits_message(count)
408
@deprecated_method(deprecated_in((2, 6, 0)))
409
def unknown_key_message(self, count):
410
"""returns message for number of commits"""
411
return unknown_key_message(count)
413
@deprecated_method(deprecated_in((2, 6, 0)))
414
def commit_not_valid_message(self, count):
415
"""returns message for number of commits"""
416
return commit_not_valid_message(count)
418
@deprecated_method(deprecated_in((2, 6, 0)))
419
def commit_not_signed_message(self, count):
420
"""returns message for number of commits"""
421
return commit_not_signed_message(count)
423
@deprecated_method(deprecated_in((2, 6, 0)))
424
def expired_commit_message(self, count):
425
"""returns message for number of commits"""
426
return expired_commit_message(count)
371
429
def valid_commits_message(count):
372
430
"""returns message for number of commits"""
373
431
return gettext(u"{0} commits with valid signatures").format(
374
count[SIGNATURE_VALID])
432
count[SIGNATURE_VALID])
377
435
def unknown_key_message(count):