1
# Copyright (C) 2005, 2006, 2007, 2009, 2011, 2012, 2013, 2016 Canonical Ltd
2
# Authors: Robert Collins <robert.collins@canonical.com>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""GPG signing and checking logic."""
22
from breezy.lazy_import import lazy_import
23
lazy_import(globals(), """
29
from breezy.i18n import (
39
# verification results
41
SIGNATURE_KEY_MISSING = 1
42
SIGNATURE_NOT_VALID = 2
43
SIGNATURE_NOT_SIGNED = 3
51
class GpgNotInstalled(errors.DependencyNotPresent):
53
_fmt = ('python-gpg is not installed, it is needed to create or '
54
'verify signatures. %(error)s')
56
def __init__(self, error):
57
errors.DependencyNotPresent.__init__(self, 'gpg', error)
60
class SigningFailed(errors.BzrError):
62
_fmt = 'Failed to GPG sign data: "%(error)s"'
64
def __init__(self, error):
65
errors.BzrError.__init__(self, error=error)
68
class SignatureVerificationFailed(errors.BzrError):
70
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
72
def __init__(self, error):
73
errors.BzrError.__init__(self, error=error)
76
def bulk_verify_signatures(repository, revids, strategy,
77
process_events_callback=None):
78
"""Do verifications on a set of revisions
80
:param repository: repository object
81
:param revids: list of revision ids to verify
82
:param strategy: GPG strategy to use
83
:param process_events_callback: method to call for GUI frontends that
84
want to keep their UI refreshed
86
:return: count dictionary of results of each type,
87
result list for each revision,
88
boolean True if all results are verified successfully
90
count = {SIGNATURE_VALID: 0,
91
SIGNATURE_KEY_MISSING: 0,
92
SIGNATURE_NOT_VALID: 0,
93
SIGNATURE_NOT_SIGNED: 0,
98
with ui.ui_factory.nested_progress_bar() as pb:
99
for i, (rev_id, verification_result, uid) in enumerate(
100
repository.verify_revision_signatures(
102
pb.update("verifying signatures", i, total)
103
result.append([rev_id, verification_result, uid])
104
count[verification_result] += 1
105
if verification_result != SIGNATURE_VALID:
106
all_verifiable = False
107
if process_events_callback is not None:
108
process_events_callback()
109
return (count, result, all_verifiable)
112
class DisabledGPGStrategy(object):
113
"""A GPG Strategy that makes everything fail."""
116
def verify_signatures_available():
119
def __init__(self, ignored):
120
"""Real strategies take a configuration."""
122
def sign(self, content, mode):
123
raise SigningFailed('Signing is disabled.')
125
def verify(self, signed_data, signature=None):
126
raise SignatureVerificationFailed('Signature verification is \
129
def set_acceptable_keys(self, command_line_input):
133
class LoopbackGPGStrategy(object):
134
"""A GPG Strategy that acts like 'cat' - data is just passed through.
139
def verify_signatures_available():
142
def __init__(self, ignored):
143
"""Real strategies take a configuration."""
145
def sign(self, content, mode):
146
return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content
147
+ b"-----END PSEUDO-SIGNED CONTENT-----\n")
149
def verify(self, signed_data, signature=None):
150
plain_text = signed_data.replace(
151
b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"")
152
plain_text = plain_text.replace(
153
b"-----END PSEUDO-SIGNED CONTENT-----\n", b"")
154
return SIGNATURE_VALID, None, plain_text
156
def set_acceptable_keys(self, command_line_input):
157
if command_line_input is not None:
158
patterns = command_line_input.split(",")
159
self.acceptable_keys = []
160
for pattern in patterns:
161
if pattern == "unknown":
164
self.acceptable_keys.append(pattern)
168
tty = os.environ.get('TTY')
170
os.environ['GPG_TTY'] = tty
171
trace.mutter('setting GPG_TTY=%s', tty)
173
# This is not quite worthy of a warning, because some people
174
# don't need GPG_TTY to be set. But it is worthy of a big mark
175
# in brz.log, so that people can debug it if it happens to them
176
trace.mutter('** Env var TTY empty, cannot set GPG_TTY.'
180
class GPGStrategy(object):
181
"""GPG Signing and checking facilities."""
183
acceptable_keys = None
185
def __init__(self, config_stack):
186
self._config_stack = config_stack
189
self.context = gpg.Context()
190
self.context.armor = True
191
self.context.signers = self._get_signing_keys()
193
pass # can't use verify()
195
def _get_signing_keys(self):
197
keyname = self._config_stack.get('gpg_signing_key')
198
if keyname == 'default':
199
# Leave things to gpg
204
return [self.context.get_key(keyname)]
205
except gpg.errors.KeyNotFound:
209
# not setting gpg_signing_key at all means we should
210
# use the user email address
211
keyname = config.extract_email_address(
212
self._config_stack.get('email'))
213
if keyname == 'default':
215
possible_keys = self.context.keylist(keyname, secret=True)
217
return [next(possible_keys)]
218
except StopIteration:
222
def verify_signatures_available():
224
check if this strategy can verify signatures
226
:return: boolean if this strategy can verify signatures
229
import gpg # noqa: F401
234
def sign(self, content, mode):
237
except ImportError as error:
238
raise GpgNotInstalled(
239
'Set create_signatures=no to disable creating signatures.')
241
if isinstance(content, str):
242
raise errors.BzrBadParameterUnicode('content')
244
plain_text = gpg.Data(content)
246
output, result = self.context.sign(
248
MODE_DETACH: gpg.constants.sig.mode.DETACH,
249
MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
250
MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
252
except gpg.errors.GPGMEError as error:
253
raise SigningFailed(str(error))
257
def verify(self, signed_data, signature=None):
258
"""Check content has a valid signature.
260
:param signed_data; Signed data
261
:param signature: optional signature (if detached)
263
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
267
except ImportError as error:
268
raise GpgNotInstalled(
269
'Set check_signatures=ignore to disable verifying signatures.')
271
signed_data = gpg.Data(signed_data)
273
signature = gpg.Data(signature)
275
plain_output, result = self.context.verify(signed_data, signature)
276
except gpg.errors.BadSignatures as error:
277
fingerprint = error.result.signatures[0].fpr
278
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
279
expires = self.context.get_key(
280
error.result.signatures[0].fpr).subkeys[0].expires
281
if expires > error.result.signatures[0].timestamp:
282
# The expired key was not expired at time of signing.
283
# test_verify_expired_but_valid()
284
return SIGNATURE_EXPIRED, fingerprint[-8:], None
286
# I can't work out how to create a test where the signature
287
# was expired at the time of signing.
288
return SIGNATURE_NOT_VALID, None, None
290
# GPG does not know this key.
291
# test_verify_unknown_key()
292
if (error.result.signatures[0].summary &
293
gpg.constants.SIGSUM_KEY_MISSING):
294
return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
296
return SIGNATURE_NOT_VALID, None, None
297
except gpg.errors.GPGMEError as error:
298
raise SignatureVerificationFailed(error)
300
# No result if input is invalid.
301
# test_verify_invalid()
302
if len(result.signatures) == 0:
303
return SIGNATURE_NOT_VALID, None, plain_output
305
# User has specified a list of acceptable keys, check our result is in
306
# it. test_verify_unacceptable_key()
307
fingerprint = result.signatures[0].fpr
308
if self.acceptable_keys is not None:
309
if fingerprint not in self.acceptable_keys:
310
return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
311
# Yay gpg set the valid bit.
312
# Can't write a test for this one as you can't set a key to be
314
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
315
key = self.context.get_key(fingerprint)
316
name = key.uids[0].name
317
if isinstance(name, bytes):
318
name = name.decode('utf-8')
319
email = key.uids[0].email
320
if isinstance(email, bytes):
321
email = email.decode('utf-8')
322
return (SIGNATURE_VALID, name + u" <" + email + u">", plain_output)
323
# Sigsum_red indicates a problem, unfortunatly I have not been able
324
# to write any tests which actually set this.
325
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
326
return SIGNATURE_NOT_VALID, None, plain_output
327
# Summary isn't set if sig is valid but key is untrusted but if user
328
# has explicity set the key as acceptable we can validate it.
329
if (result.signatures[0].summary == 0 and
330
self.acceptable_keys is not None):
331
if fingerprint in self.acceptable_keys:
332
# test_verify_untrusted_but_accepted()
333
return SIGNATURE_VALID, None, plain_output
334
# test_verify_valid_but_untrusted()
335
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
336
return SIGNATURE_NOT_VALID, None, plain_output
337
# Other error types such as revoked keys should (I think) be caught by
338
# SIGSUM_RED so anything else means something is buggy.
339
raise SignatureVerificationFailed(
340
"Unknown GnuPG key verification result")
342
def set_acceptable_keys(self, command_line_input):
343
"""Set the acceptable keys for verifying with this GPGStrategy.
345
:param command_line_input: comma separated list of patterns from
350
acceptable_keys_config = self._config_stack.get('acceptable_keys')
351
if acceptable_keys_config is not None:
352
patterns = acceptable_keys_config
353
if command_line_input is not None: # command line overrides config
354
patterns = command_line_input.split(',')
357
self.acceptable_keys = []
358
for pattern in patterns:
359
result = self.context.keylist(pattern)
363
self.acceptable_keys.append(key.subkeys[0].fpr)
364
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
367
"No GnuPG key results for pattern: {0}"
371
def valid_commits_message(count):
372
"""returns message for number of commits"""
373
return gettext(u"{0} commits with valid signatures").format(
374
count[SIGNATURE_VALID])
377
def unknown_key_message(count):
378
"""returns message for number of commits"""
379
return ngettext(u"{0} commit with unknown key",
380
u"{0} commits with unknown keys",
381
count[SIGNATURE_KEY_MISSING]).format(
382
count[SIGNATURE_KEY_MISSING])
385
def commit_not_valid_message(count):
386
"""returns message for number of commits"""
387
return ngettext(u"{0} commit not valid",
388
u"{0} commits not valid",
389
count[SIGNATURE_NOT_VALID]).format(
390
count[SIGNATURE_NOT_VALID])
393
def commit_not_signed_message(count):
394
"""returns message for number of commits"""
395
return ngettext(u"{0} commit not signed",
396
u"{0} commits not signed",
397
count[SIGNATURE_NOT_SIGNED]).format(
398
count[SIGNATURE_NOT_SIGNED])
401
def expired_commit_message(count):
402
"""returns message for number of commits"""
403
return ngettext(u"{0} commit with key now expired",
404
u"{0} commits with key now expired",
405
count[SIGNATURE_EXPIRED]).format(
406
count[SIGNATURE_EXPIRED])
409
def verbose_expired_key_message(result, repo):
410
"""takes a verify result and returns list of expired key info"""
412
fingerprint_to_authors = {}
413
for rev_id, validity, fingerprint in result:
414
if validity == SIGNATURE_EXPIRED:
415
revision = repo.get_revision(rev_id)
416
authors = ', '.join(revision.get_apparent_authors())
417
signers.setdefault(fingerprint, 0)
418
signers[fingerprint] += 1
419
fingerprint_to_authors[fingerprint] = authors
421
for fingerprint, number in signers.items():
423
ngettext(u"{0} commit by author {1} with key {2} now expired",
424
u"{0} commits by author {1} with key {2} now expired",
426
number, fingerprint_to_authors[fingerprint], fingerprint))
430
def verbose_valid_message(result):
431
"""takes a verify result and returns list of signed commits strings"""
433
for rev_id, validity, uid in result:
434
if validity == SIGNATURE_VALID:
435
signers.setdefault(uid, 0)
438
for uid, number in signers.items():
439
result.append(ngettext(u"{0} signed {1} commit",
440
u"{0} signed {1} commits",
441
number).format(uid, number))
445
def verbose_not_valid_message(result, repo):
446
"""takes a verify result and returns list of not valid commit info"""
448
for rev_id, validity, empty in result:
449
if validity == SIGNATURE_NOT_VALID:
450
revision = repo.get_revision(rev_id)
451
authors = ', '.join(revision.get_apparent_authors())
452
signers.setdefault(authors, 0)
453
signers[authors] += 1
455
for authors, number in signers.items():
456
result.append(ngettext(u"{0} commit by author {1}",
457
u"{0} commits by author {1}",
458
number).format(number, authors))
462
def verbose_not_signed_message(result, repo):
463
"""takes a verify result and returns list of not signed commit info"""
465
for rev_id, validity, empty in result:
466
if validity == SIGNATURE_NOT_SIGNED:
467
revision = repo.get_revision(rev_id)
468
authors = ', '.join(revision.get_apparent_authors())
469
signers.setdefault(authors, 0)
470
signers[authors] += 1
472
for authors, number in signers.items():
473
result.append(ngettext(u"{0} commit by author {1}",
474
u"{0} commits by author {1}",
475
number).format(number, authors))
479
def verbose_missing_key_message(result):
480
"""takes a verify result and returns list of missing key info"""
482
for rev_id, validity, fingerprint in result:
483
if validity == SIGNATURE_KEY_MISSING:
484
signers.setdefault(fingerprint, 0)
485
signers[fingerprint] += 1
487
for fingerprint, number in list(signers.items()):
488
result.append(ngettext(u"Unknown key {0} signed {1} commit",
489
u"Unknown key {0} signed {1} commits",
490
number).format(fingerprint, number))