1
# Copyright (C) 2005, 2006, 2007, 2009, 2011, 2012, 2013, 2016 Canonical Ltd
2
# Authors: Robert Collins <robert.collins@canonical.com>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""GPG signing and checking logic."""
20
from __future__ import absolute_import
25
from breezy.lazy_import import lazy_import
26
lazy_import(globals(), """
36
from breezy.i18n import (
48
SIGNATURE_KEY_MISSING = 1
49
SIGNATURE_NOT_VALID = 2
50
SIGNATURE_NOT_SIGNED = 3
54
def bulk_verify_signatures(repository, revids, strategy,
55
process_events_callback=None):
56
"""Do verifications on a set of revisions
58
:param repository: repository object
59
:param revids: list of revision ids to verify
60
:param strategy: GPG strategy to use
61
:param process_events_callback: method to call for GUI frontends that
62
want to keep their UI refreshed
64
:return: count dictionary of results of each type,
65
result list for each revision,
66
boolean True if all results are verified successfully
68
count = {SIGNATURE_VALID: 0,
69
SIGNATURE_KEY_MISSING: 0,
70
SIGNATURE_NOT_VALID: 0,
71
SIGNATURE_NOT_SIGNED: 0,
76
pb = ui.ui_factory.nested_progress_bar()
78
for i, (rev_id, verification_result, uid) in enumerate(
79
repository.verify_revision_signatures(
81
pb.update("verifying signatures", i, total)
82
result.append([rev_id, verification_result, uid])
83
count[verification_result] += 1
84
if verification_result != SIGNATURE_VALID:
85
all_verifiable = False
86
if process_events_callback is not None:
87
process_events_callback()
90
return (count, result, all_verifiable)
93
class DisabledGPGStrategy(object):
94
"""A GPG Strategy that makes everything fail."""
97
def verify_signatures_available():
100
def __init__(self, ignored):
101
"""Real strategies take a configuration."""
103
def sign(self, content):
104
raise errors.SigningFailed('Signing is disabled.')
106
def verify(self, content, testament):
107
raise errors.SignatureVerificationFailed('Signature verification is \
110
def set_acceptable_keys(self, command_line_input):
114
class LoopbackGPGStrategy(object):
115
"""A GPG Strategy that acts like 'cat' - data is just passed through.
120
def verify_signatures_available():
123
def __init__(self, ignored):
124
"""Real strategies take a configuration."""
126
def sign(self, content):
127
return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
128
"-----END PSEUDO-SIGNED CONTENT-----\n")
130
def verify(self, content, testament):
131
return SIGNATURE_VALID, None
133
def set_acceptable_keys(self, command_line_input):
134
if command_line_input is not None:
135
patterns = command_line_input.split(",")
136
self.acceptable_keys = []
137
for pattern in patterns:
138
if pattern == "unknown":
141
self.acceptable_keys.append(pattern)
145
tty = os.environ.get('TTY')
147
os.environ['GPG_TTY'] = tty
148
trace.mutter('setting GPG_TTY=%s', tty)
150
# This is not quite worthy of a warning, because some people
151
# don't need GPG_TTY to be set. But it is worthy of a big mark
152
# in ~/.brz.log, so that people can debug it if it happens to them
153
trace.mutter('** Env var TTY empty, cannot set GPG_TTY.'
157
class GPGStrategy(object):
158
"""GPG Signing and checking facilities."""
160
acceptable_keys = None
162
def __init__(self, config_stack):
163
self._config_stack = config_stack
166
self.context = gpgme.Context()
167
except ImportError as error:
168
pass # can't use verify()
171
def verify_signatures_available():
173
check if this strategy can verify signatures
175
:return: boolean if this strategy can verify signatures
180
except ImportError as error:
183
def _command_line(self):
184
key = self._config_stack.get('gpg_signing_key')
185
if key is None or key == 'default':
186
# 'default' or not setting gpg_signing_key at all means we should
187
# use the user email address
188
key = config.extract_email_address(self._config_stack.get('email'))
189
return [self._config_stack.get('gpg_signing_command'), '--clearsign',
192
def sign(self, content):
193
if isinstance(content, unicode):
194
raise errors.BzrBadParameterUnicode('content')
195
ui.ui_factory.clear_term()
197
preexec_fn = _set_gpg_tty
198
if sys.platform == 'win32':
199
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
202
process = subprocess.Popen(self._command_line(),
203
stdin=subprocess.PIPE,
204
stdout=subprocess.PIPE,
205
preexec_fn=preexec_fn)
207
result = process.communicate(content)[0]
208
if process.returncode is None:
210
if process.returncode != 0:
211
raise errors.SigningFailed(self._command_line())
214
if e.errno == errno.EPIPE:
215
raise errors.SigningFailed(self._command_line())
219
# bad subprocess parameters, should never happen.
222
if e.errno == errno.ENOENT:
223
# gpg is not installed
224
raise errors.SigningFailed(self._command_line())
228
def verify(self, content, testament):
229
"""Check content has a valid signature.
231
:param content: the commit signature
232
:param testament: the valid testament string for the commit
234
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
238
except ImportError as error:
239
raise errors.GpgmeNotInstalled(error)
241
signature = BytesIO(content)
242
plain_output = BytesIO()
244
result = self.context.verify(signature, None, plain_output)
245
except gpgme.GpgmeError as error:
246
raise errors.SignatureVerificationFailed(error[2])
248
# No result if input is invalid.
249
# test_verify_invalid()
251
return SIGNATURE_NOT_VALID, None
252
# User has specified a list of acceptable keys, check our result is in
253
# it. test_verify_unacceptable_key()
254
fingerprint = result[0].fpr
255
if self.acceptable_keys is not None:
256
if not fingerprint in self.acceptable_keys:
257
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
258
# Check the signature actually matches the testament.
259
# test_verify_bad_testament()
260
if testament != plain_output.getvalue():
261
return SIGNATURE_NOT_VALID, None
262
# Yay gpgme set the valid bit.
263
# Can't write a test for this one as you can't set a key to be
264
# trusted using gpgme.
265
if result[0].summary & gpgme.SIGSUM_VALID:
266
key = self.context.get_key(fingerprint)
267
name = key.uids[0].name
268
email = key.uids[0].email
269
return SIGNATURE_VALID, name + " <" + email + ">"
270
# Sigsum_red indicates a problem, unfortunatly I have not been able
271
# to write any tests which actually set this.
272
if result[0].summary & gpgme.SIGSUM_RED:
273
return SIGNATURE_NOT_VALID, None
274
# GPG does not know this key.
275
# test_verify_unknown_key()
276
if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
277
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
278
# Summary isn't set if sig is valid but key is untrusted but if user
279
# has explicity set the key as acceptable we can validate it.
280
if result[0].summary == 0 and self.acceptable_keys is not None:
281
if fingerprint in self.acceptable_keys:
282
# test_verify_untrusted_but_accepted()
283
return SIGNATURE_VALID, None
284
# test_verify_valid_but_untrusted()
285
if result[0].summary == 0 and self.acceptable_keys is None:
286
return SIGNATURE_NOT_VALID, None
287
if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
288
expires = self.context.get_key(result[0].fpr).subkeys[0].expires
289
if expires > result[0].timestamp:
290
# The expired key was not expired at time of signing.
291
# test_verify_expired_but_valid()
292
return SIGNATURE_EXPIRED, fingerprint[-8:]
294
# I can't work out how to create a test where the signature
295
# was expired at the time of signing.
296
return SIGNATURE_NOT_VALID, None
297
# A signature from a revoked key gets this.
298
# test_verify_revoked_signature()
299
if ((result[0].summary & gpgme.SIGSUM_SYS_ERROR
300
or result[0].status.strerror == 'Certificate revoked')):
301
return SIGNATURE_NOT_VALID, None
302
# Other error types such as revoked keys should (I think) be caught by
303
# SIGSUM_RED so anything else means something is buggy.
304
raise errors.SignatureVerificationFailed(
305
"Unknown GnuPG key verification result")
307
def set_acceptable_keys(self, command_line_input):
308
"""Set the acceptable keys for verifying with this GPGStrategy.
310
:param command_line_input: comma separated list of patterns from
315
acceptable_keys_config = self._config_stack.get('acceptable_keys')
316
if acceptable_keys_config is not None:
317
patterns = acceptable_keys_config
318
if command_line_input is not None: # command line overrides config
319
patterns = command_line_input.split(',')
322
self.acceptable_keys = []
323
for pattern in patterns:
324
result = self.context.keylist(pattern)
328
self.acceptable_keys.append(key.subkeys[0].fpr)
329
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
332
"No GnuPG key results for pattern: {0}"
336
def valid_commits_message(count):
337
"""returns message for number of commits"""
338
return gettext(u"{0} commits with valid signatures").format(
339
count[SIGNATURE_VALID])
342
def unknown_key_message(count):
343
"""returns message for number of commits"""
344
return ngettext(u"{0} commit with unknown key",
345
u"{0} commits with unknown keys",
346
count[SIGNATURE_KEY_MISSING]).format(
347
count[SIGNATURE_KEY_MISSING])
350
def commit_not_valid_message(count):
351
"""returns message for number of commits"""
352
return ngettext(u"{0} commit not valid",
353
u"{0} commits not valid",
354
count[SIGNATURE_NOT_VALID]).format(
355
count[SIGNATURE_NOT_VALID])
358
def commit_not_signed_message(count):
359
"""returns message for number of commits"""
360
return ngettext(u"{0} commit not signed",
361
u"{0} commits not signed",
362
count[SIGNATURE_NOT_SIGNED]).format(
363
count[SIGNATURE_NOT_SIGNED])
366
def expired_commit_message(count):
367
"""returns message for number of commits"""
368
return ngettext(u"{0} commit with key now expired",
369
u"{0} commits with key now expired",
370
count[SIGNATURE_EXPIRED]).format(
371
count[SIGNATURE_EXPIRED])
374
def verbose_expired_key_message(result, repo):
375
"""takes a verify result and returns list of expired key info"""
377
fingerprint_to_authors = {}
378
for rev_id, validity, fingerprint in result:
379
if validity == SIGNATURE_EXPIRED:
380
revision = repo.get_revision(rev_id)
381
authors = ', '.join(revision.get_apparent_authors())
382
signers.setdefault(fingerprint, 0)
383
signers[fingerprint] += 1
384
fingerprint_to_authors[fingerprint] = authors
386
for fingerprint, number in signers.items():
388
ngettext(u"{0} commit by author {1} with key {2} now expired",
389
u"{0} commits by author {1} with key {2} now expired",
391
number, fingerprint_to_authors[fingerprint], fingerprint))
395
def verbose_valid_message(result):
396
"""takes a verify result and returns list of signed commits strings"""
398
for rev_id, validity, uid in result:
399
if validity == SIGNATURE_VALID:
400
signers.setdefault(uid, 0)
403
for uid, number in signers.items():
404
result.append(ngettext(u"{0} signed {1} commit",
405
u"{0} signed {1} commits",
406
number).format(uid, number))
410
def verbose_not_valid_message(result, repo):
411
"""takes a verify result and returns list of not valid commit info"""
413
for rev_id, validity, empty in result:
414
if validity == SIGNATURE_NOT_VALID:
415
revision = repo.get_revision(rev_id)
416
authors = ', '.join(revision.get_apparent_authors())
417
signers.setdefault(authors, 0)
418
signers[authors] += 1
420
for authors, number in signers.items():
421
result.append(ngettext(u"{0} commit by author {1}",
422
u"{0} commits by author {1}",
423
number).format(number, authors))
427
def verbose_not_signed_message(result, repo):
428
"""takes a verify result and returns list of not signed commit info"""
430
for rev_id, validity, empty in result:
431
if validity == SIGNATURE_NOT_SIGNED:
432
revision = repo.get_revision(rev_id)
433
authors = ', '.join(revision.get_apparent_authors())
434
signers.setdefault(authors, 0)
435
signers[authors] += 1
437
for authors, number in signers.items():
438
result.append(ngettext(u"{0} commit by author {1}",
439
u"{0} commits by author {1}",
440
number).format(number, authors))
444
def verbose_missing_key_message(result):
445
"""takes a verify result and returns list of missing key info"""
447
for rev_id, validity, fingerprint in result:
448
if validity == SIGNATURE_KEY_MISSING:
449
signers.setdefault(fingerprint, 0)
450
signers[fingerprint] += 1
452
for fingerprint, number in list(signers.items()):
453
result.append(ngettext(u"Unknown key {0} signed {1} commit",
454
u"Unknown key {0} signed {1} commits",
455
number).format(fingerprint, number))