1
# Copyright (C) 2005, 2006, 2007, 2009, 2011, 2012, 2013, 2016 Canonical Ltd
2
# Authors: Robert Collins <robert.collins@canonical.com>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
24
from breezy.lazy_import import lazy_import
25
lazy_import(globals(), """
31
from breezy.i18n import (
41
# verification results
43
SIGNATURE_KEY_MISSING = 1
44
SIGNATURE_NOT_VALID = 2
45
SIGNATURE_NOT_SIGNED = 3
53
class GpgNotInstalled(errors.DependencyNotPresent):
55
_fmt = ('python-gpg is not installed, it is needed to create or '
56
'verify signatures. %(error)s')
58
def __init__(self, error):
59
errors.DependencyNotPresent.__init__(self, 'gpg', error)
62
class SigningFailed(errors.BzrError):
64
_fmt = 'Failed to GPG sign data: "%(error)s"'
66
def __init__(self, error):
67
errors.BzrError.__init__(self, error=error)
70
class SignatureVerificationFailed(errors.BzrError):
72
_fmt = 'Failed to verify GPG signature data with error "%(error)s"'
74
def __init__(self, error):
75
errors.BzrError.__init__(self, error=error)
78
def bulk_verify_signatures(repository, revids, strategy,
79
process_events_callback=None):
80
"""Do verifications on a set of revisions
82
:param repository: repository object
83
:param revids: list of revision ids to verify
84
:param strategy: GPG strategy to use
85
:param process_events_callback: method to call for GUI frontends that
86
want to keep their UI refreshed
88
:return: count dictionary of results of each type,
89
result list for each revision,
90
boolean True if all results are verified successfully
92
count = {SIGNATURE_VALID: 0,
93
SIGNATURE_KEY_MISSING: 0,
94
SIGNATURE_NOT_VALID: 0,
95
SIGNATURE_NOT_SIGNED: 0,
100
with ui.ui_factory.nested_progress_bar() as pb:
101
for i, (rev_id, verification_result, uid) in enumerate(
102
repository.verify_revision_signatures(
104
pb.update("verifying signatures", i, total)
105
result.append([rev_id, verification_result, uid])
106
count[verification_result] += 1
107
if verification_result != SIGNATURE_VALID:
108
all_verifiable = False
109
if process_events_callback is not None:
110
process_events_callback()
111
return (count, result, all_verifiable)
114
class DisabledGPGStrategy(object):
115
"""A GPG Strategy that makes everything fail."""
118
def verify_signatures_available():
121
def __init__(self, ignored):
122
"""Real strategies take a configuration."""
124
def sign(self, content, mode):
125
raise SigningFailed('Signing is disabled.')
127
def verify(self, signed_data, signature=None):
128
raise SignatureVerificationFailed('Signature verification is \
131
def set_acceptable_keys(self, command_line_input):
135
class LoopbackGPGStrategy(object):
136
"""A GPG Strategy that acts like 'cat' - data is just passed through.
141
def verify_signatures_available():
144
def __init__(self, ignored):
145
"""Real strategies take a configuration."""
147
def sign(self, content, mode):
148
return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content
149
+ b"-----END PSEUDO-SIGNED CONTENT-----\n")
151
def verify(self, signed_data, signature=None):
152
plain_text = signed_data.replace(
153
b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"")
154
plain_text = plain_text.replace(
155
b"-----END PSEUDO-SIGNED CONTENT-----\n", b"")
156
return SIGNATURE_VALID, None, plain_text
158
def set_acceptable_keys(self, command_line_input):
159
if command_line_input is not None:
160
patterns = command_line_input.split(",")
161
self.acceptable_keys = []
162
for pattern in patterns:
163
if pattern == "unknown":
166
self.acceptable_keys.append(pattern)
170
tty = os.environ.get('TTY')
172
os.environ['GPG_TTY'] = tty
173
trace.mutter('setting GPG_TTY=%s', tty)
175
# This is not quite worthy of a warning, because some people
176
# don't need GPG_TTY to be set. But it is worthy of a big mark
177
# in brz.log, so that people can debug it if it happens to them
178
trace.mutter('** Env var TTY empty, cannot set GPG_TTY.'
182
class GPGStrategy(object):
183
"""GPG Signing and checking facilities."""
185
acceptable_keys = None
187
def __init__(self, config_stack):
188
self._config_stack = config_stack
191
self.context = gpg.Context()
192
self.context.armor = True
193
self.context.signers = self._get_signing_keys()
195
pass # can't use verify()
197
def _get_signing_keys(self):
199
keyname = self._config_stack.get('gpg_signing_key')
200
if keyname == 'default':
201
# Leave things to gpg
206
return [self.context.get_key(keyname)]
207
except gpg.errors.KeyNotFound:
211
# not setting gpg_signing_key at all means we should
212
# use the user email address
213
keyname = config.extract_email_address(
214
self._config_stack.get('email'))
215
if keyname == 'default':
217
possible_keys = self.context.keylist(keyname, secret=True)
219
return [next(possible_keys)]
220
except StopIteration:
224
def verify_signatures_available():
226
check if this strategy can verify signatures
228
:return: boolean if this strategy can verify signatures
231
import gpg # noqa: F401
236
def sign(self, content, mode):
239
except ImportError as error:
240
raise GpgNotInstalled(
241
'Set create_signatures=no to disable creating signatures.')
243
if isinstance(content, str):
244
raise errors.BzrBadParameterUnicode('content')
246
plain_text = gpg.Data(content)
248
output, result = self.context.sign(
250
MODE_DETACH: gpg.constants.sig.mode.DETACH,
251
MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
252
MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
254
except gpg.errors.GPGMEError as error:
255
raise SigningFailed(str(error))
259
def verify(self, signed_data, signature=None):
260
"""Check content has a valid signature.
262
:param signed_data; Signed data
263
:param signature: optional signature (if detached)
265
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
269
except ImportError as error:
270
raise GpgNotInstalled(
271
'Set check_signatures=ignore to disable verifying signatures.')
273
signed_data = gpg.Data(signed_data)
275
signature = gpg.Data(signature)
277
plain_output, result = self.context.verify(signed_data, signature)
278
except gpg.errors.BadSignatures as error:
279
fingerprint = error.result.signatures[0].fpr
280
if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
281
expires = self.context.get_key(
282
error.result.signatures[0].fpr).subkeys[0].expires
283
if expires > error.result.signatures[0].timestamp:
284
# The expired key was not expired at time of signing.
285
# test_verify_expired_but_valid()
286
return SIGNATURE_EXPIRED, fingerprint[-8:], None
288
# I can't work out how to create a test where the signature
289
# was expired at the time of signing.
290
return SIGNATURE_NOT_VALID, None, None
292
# GPG does not know this key.
293
# test_verify_unknown_key()
294
if (error.result.signatures[0].summary &
295
gpg.constants.SIGSUM_KEY_MISSING):
296
return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
298
return SIGNATURE_NOT_VALID, None, None
299
except gpg.errors.GPGMEError as error:
300
raise SignatureVerificationFailed(error)
302
# No result if input is invalid.
303
# test_verify_invalid()
304
if len(result.signatures) == 0:
305
return SIGNATURE_NOT_VALID, None, plain_output
307
# User has specified a list of acceptable keys, check our result is in
308
# it. test_verify_unacceptable_key()
309
fingerprint = result.signatures[0].fpr
310
if self.acceptable_keys is not None:
311
if fingerprint not in self.acceptable_keys:
312
return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
313
# Yay gpg set the valid bit.
314
# Can't write a test for this one as you can't set a key to be
316
if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
317
key = self.context.get_key(fingerprint)
318
name = key.uids[0].name
319
if isinstance(name, bytes):
320
name = name.decode('utf-8')
321
email = key.uids[0].email
322
if isinstance(email, bytes):
323
email = email.decode('utf-8')
324
return (SIGNATURE_VALID, name + u" <" + email + u">", plain_output)
325
# Sigsum_red indicates a problem, unfortunatly I have not been able
326
# to write any tests which actually set this.
327
if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
328
return SIGNATURE_NOT_VALID, None, plain_output
329
# Summary isn't set if sig is valid but key is untrusted but if user
330
# has explicity set the key as acceptable we can validate it.
331
if (result.signatures[0].summary == 0 and
332
self.acceptable_keys is not None):
333
if fingerprint in self.acceptable_keys:
334
# test_verify_untrusted_but_accepted()
335
return SIGNATURE_VALID, None, plain_output
336
# test_verify_valid_but_untrusted()
337
if result.signatures[0].summary == 0 and self.acceptable_keys is None:
338
return SIGNATURE_NOT_VALID, None, plain_output
339
# Other error types such as revoked keys should (I think) be caught by
340
# SIGSUM_RED so anything else means something is buggy.
341
raise SignatureVerificationFailed(
342
"Unknown GnuPG key verification result")
344
def set_acceptable_keys(self, command_line_input):
345
"""Set the acceptable keys for verifying with this GPGStrategy.
347
:param command_line_input: comma separated list of patterns from
352
acceptable_keys_config = self._config_stack.get('acceptable_keys')
353
if acceptable_keys_config is not None:
354
patterns = acceptable_keys_config
355
if command_line_input is not None: # command line overrides config
356
patterns = command_line_input.split(',')
359
self.acceptable_keys = []
360
for pattern in patterns:
361
result = self.context.keylist(pattern)
365
self.acceptable_keys.append(key.subkeys[0].fpr)
366
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
369
"No GnuPG key results for pattern: {0}"
373
def valid_commits_message(count):
374
"""returns message for number of commits"""
375
return gettext(u"{0} commits with valid signatures").format(
376
count[SIGNATURE_VALID])
379
def unknown_key_message(count):
380
"""returns message for number of commits"""
381
return ngettext(u"{0} commit with unknown key",
382
u"{0} commits with unknown keys",
383
count[SIGNATURE_KEY_MISSING]).format(
384
count[SIGNATURE_KEY_MISSING])
387
def commit_not_valid_message(count):
388
"""returns message for number of commits"""
389
return ngettext(u"{0} commit not valid",
390
u"{0} commits not valid",
391
count[SIGNATURE_NOT_VALID]).format(
392
count[SIGNATURE_NOT_VALID])
395
def commit_not_signed_message(count):
396
"""returns message for number of commits"""
397
return ngettext(u"{0} commit not signed",
398
u"{0} commits not signed",
399
count[SIGNATURE_NOT_SIGNED]).format(
400
count[SIGNATURE_NOT_SIGNED])
403
def expired_commit_message(count):
404
"""returns message for number of commits"""
405
return ngettext(u"{0} commit with key now expired",
406
u"{0} commits with key now expired",
407
count[SIGNATURE_EXPIRED]).format(
408
count[SIGNATURE_EXPIRED])
411
def verbose_expired_key_message(result, repo):
412
"""takes a verify result and returns list of expired key info"""
414
fingerprint_to_authors = {}
415
for rev_id, validity, fingerprint in result:
416
if validity == SIGNATURE_EXPIRED:
417
revision = repo.get_revision(rev_id)
418
authors = ', '.join(revision.get_apparent_authors())
419
signers.setdefault(fingerprint, 0)
420
signers[fingerprint] += 1
421
fingerprint_to_authors[fingerprint] = authors
423
for fingerprint, number in signers.items():
425
ngettext(u"{0} commit by author {1} with key {2} now expired",
426
u"{0} commits by author {1} with key {2} now expired",
428
number, fingerprint_to_authors[fingerprint], fingerprint))
432
def verbose_valid_message(result):
433
"""takes a verify result and returns list of signed commits strings"""
435
for rev_id, validity, uid in result:
436
if validity == SIGNATURE_VALID:
437
signers.setdefault(uid, 0)
440
for uid, number in signers.items():
441
result.append(ngettext(u"{0} signed {1} commit",
442
u"{0} signed {1} commits",
443
number).format(uid, number))
447
def verbose_not_valid_message(result, repo):
448
"""takes a verify result and returns list of not valid commit info"""
450
for rev_id, validity, empty in result:
451
if validity == SIGNATURE_NOT_VALID:
452
revision = repo.get_revision(rev_id)
453
authors = ', '.join(revision.get_apparent_authors())
454
signers.setdefault(authors, 0)
455
signers[authors] += 1
457
for authors, number in signers.items():
458
result.append(ngettext(u"{0} commit by author {1}",
459
u"{0} commits by author {1}",
460
number).format(number, authors))
464
def verbose_not_signed_message(result, repo):
465
"""takes a verify result and returns list of not signed commit info"""
467
for rev_id, validity, empty in result:
468
if validity == SIGNATURE_NOT_SIGNED:
469
revision = repo.get_revision(rev_id)
470
authors = ', '.join(revision.get_apparent_authors())
471
signers.setdefault(authors, 0)
472
signers[authors] += 1
474
for authors, number in signers.items():
475
result.append(ngettext(u"{0} commit by author {1}",
476
u"{0} commits by author {1}",
477
number).format(number, authors))
481
def verbose_missing_key_message(result):
482
"""takes a verify result and returns list of missing key info"""
484
for rev_id, validity, fingerprint in result:
485
if validity == SIGNATURE_KEY_MISSING:
486
signers.setdefault(fingerprint, 0)
487
signers[fingerprint] += 1
489
for fingerprint, number in list(signers.items()):
490
result.append(ngettext(u"Unknown key {0} signed {1} commit",
491
u"Unknown key {0} signed {1} commits",
492
number).format(fingerprint, number))