/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/gpg.py

  • Committer: Jelmer Vernooij
  • Date: 2020-03-22 01:35:14 UTC
  • mfrom: (7490.7.6 work)
  • mto: This revision was merged to the branch mainline in revision 7499.
  • Revision ID: jelmer@jelmer.uk-20200322013514-7vw1ntwho04rcuj3
merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 
18
18
"""GPG signing and checking logic."""
19
19
 
20
 
from __future__ import absolute_import
21
 
 
22
20
import os
23
 
import sys
24
21
 
25
22
from breezy.lazy_import import lazy_import
26
23
lazy_import(globals(), """
27
 
import errno
28
 
import subprocess
29
 
 
30
24
from breezy import (
31
25
    config,
32
26
    trace,
41
35
from . import (
42
36
    errors,
43
37
    )
44
 
from .sixish import (
45
 
    BytesIO,
46
 
    )
47
38
 
48
 
#verification results
 
39
# verification results
49
40
SIGNATURE_VALID = 0
50
41
SIGNATURE_KEY_MISSING = 1
51
42
SIGNATURE_NOT_VALID = 2
52
43
SIGNATURE_NOT_SIGNED = 3
53
44
SIGNATURE_EXPIRED = 4
54
45
 
 
46
MODE_NORMAL = 0
 
47
MODE_DETACH = 1
 
48
MODE_CLEAR = 2
 
49
 
55
50
 
56
51
class GpgNotInstalled(errors.DependencyNotPresent):
57
52
 
58
 
    _fmt = 'python-gpg is not installed, it is needed to verify signatures'
 
53
    _fmt = ('python-gpg is not installed, it is needed to create or '
 
54
            'verify signatures. %(error)s')
59
55
 
60
56
    def __init__(self, error):
61
57
        errors.DependencyNotPresent.__init__(self, 'gpg', error)
78
74
 
79
75
 
80
76
def bulk_verify_signatures(repository, revids, strategy,
81
 
        process_events_callback=None):
 
77
                           process_events_callback=None):
82
78
    """Do verifications on a set of revisions
83
79
 
84
80
    :param repository: repository object
99
95
    result = []
100
96
    all_verifiable = True
101
97
    total = len(revids)
102
 
    pb = ui.ui_factory.nested_progress_bar()
103
 
    try:
 
98
    with ui.ui_factory.nested_progress_bar() as pb:
104
99
        for i, (rev_id, verification_result, uid) in enumerate(
105
100
                repository.verify_revision_signatures(
106
101
                    revids, strategy)):
111
106
                all_verifiable = False
112
107
            if process_events_callback is not None:
113
108
                process_events_callback()
114
 
    finally:
115
 
        pb.finished()
116
109
    return (count, result, all_verifiable)
117
110
 
118
111
 
126
119
    def __init__(self, ignored):
127
120
        """Real strategies take a configuration."""
128
121
 
129
 
    def sign(self, content):
 
122
    def sign(self, content, mode):
130
123
        raise SigningFailed('Signing is disabled.')
131
124
 
132
 
    def verify(self, content, testament):
 
125
    def verify(self, signed_data, signature=None):
133
126
        raise SignatureVerificationFailed('Signature verification is \
134
127
disabled.')
135
128
 
149
142
    def __init__(self, ignored):
150
143
        """Real strategies take a configuration."""
151
144
 
152
 
    def sign(self, content):
153
 
        return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
154
 
                "-----END PSEUDO-SIGNED CONTENT-----\n")
 
145
    def sign(self, content, mode):
 
146
        return (b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content
 
147
                + b"-----END PSEUDO-SIGNED CONTENT-----\n")
155
148
 
156
 
    def verify(self, content, testament):
157
 
        return SIGNATURE_VALID, None
 
149
    def verify(self, signed_data, signature=None):
 
150
        plain_text = signed_data.replace(
 
151
            b"-----BEGIN PSEUDO-SIGNED CONTENT-----\n", b"")
 
152
        plain_text = plain_text.replace(
 
153
            b"-----END PSEUDO-SIGNED CONTENT-----\n", b"")
 
154
        return SIGNATURE_VALID, None, plain_text
158
155
 
159
156
    def set_acceptable_keys(self, command_line_input):
160
157
        if command_line_input is not None:
175
172
    else:
176
173
        # This is not quite worthy of a warning, because some people
177
174
        # don't need GPG_TTY to be set. But it is worthy of a big mark
178
 
        # in ~/.brz.log, so that people can debug it if it happens to them
 
175
        # in brz.log, so that people can debug it if it happens to them
179
176
        trace.mutter('** Env var TTY empty, cannot set GPG_TTY.'
180
177
                     '  Is TTY exported?')
181
178
 
190
187
        try:
191
188
            import gpg
192
189
            self.context = gpg.Context()
193
 
        except ImportError as error:
194
 
            pass # can't use verify()
195
 
 
196
 
        self.context.signers = self._get_signing_keys()
 
190
            self.context.armor = True
 
191
            self.context.signers = self._get_signing_keys()
 
192
        except ImportError:
 
193
            pass  # can't use verify()
197
194
 
198
195
    def _get_signing_keys(self):
199
196
        import gpg
200
197
        keyname = self._config_stack.get('gpg_signing_key')
 
198
        if keyname == 'default':
 
199
            # Leave things to gpg
 
200
            return []
 
201
 
201
202
        if keyname:
202
203
            try:
203
204
                return [self.context.get_key(keyname)]
204
205
            except gpg.errors.KeyNotFound:
205
206
                pass
206
207
 
207
 
        if keyname is None or keyname == 'default':
208
 
            # 'default' or not setting gpg_signing_key at all means we should
 
208
        if keyname is None:
 
209
            # not setting gpg_signing_key at all means we should
209
210
            # use the user email address
210
 
            keyname = config.extract_email_address(self._config_stack.get('email'))
 
211
            keyname = config.extract_email_address(
 
212
                self._config_stack.get('email'))
 
213
        if keyname == 'default':
 
214
            return []
211
215
        possible_keys = self.context.keylist(keyname, secret=True)
212
216
        try:
213
 
            return [possible_keys.next()]
 
217
            return [next(possible_keys)]
214
218
        except StopIteration:
215
219
            return []
216
220
 
222
226
        :return: boolean if this strategy can verify signatures
223
227
        """
224
228
        try:
225
 
            import gpg
 
229
            import gpg  # noqa: F401
226
230
            return True
227
 
        except ImportError as error:
 
231
        except ImportError:
228
232
            return False
229
233
 
230
 
    def sign(self, content):
231
 
        import gpg
232
 
        if isinstance(content, unicode):
 
234
    def sign(self, content, mode):
 
235
        try:
 
236
            import gpg
 
237
        except ImportError as error:
 
238
            raise GpgNotInstalled(
 
239
                'Set create_signatures=no to disable creating signatures.')
 
240
 
 
241
        if isinstance(content, str):
233
242
            raise errors.BzrBadParameterUnicode('content')
234
243
 
235
244
        plain_text = gpg.Data(content)
236
245
        try:
237
246
            output, result = self.context.sign(
238
 
                plain_text, mode=gpg.constants.sig.mode.CLEAR)
 
247
                plain_text, mode={
 
248
                    MODE_DETACH: gpg.constants.sig.mode.DETACH,
 
249
                    MODE_CLEAR: gpg.constants.sig.mode.CLEAR,
 
250
                    MODE_NORMAL: gpg.constants.sig.mode.NORMAL,
 
251
                    }[mode])
239
252
        except gpg.errors.GPGMEError as error:
240
253
            raise SigningFailed(str(error))
241
254
 
242
255
        return output
243
256
 
244
 
    def verify(self, content, testament):
 
257
    def verify(self, signed_data, signature=None):
245
258
        """Check content has a valid signature.
246
259
 
247
 
        :param content: the commit signature
248
 
        :param testament: the valid testament string for the commit
 
260
        :param signed_data; Signed data
 
261
        :param signature: optional signature (if detached)
249
262
 
250
 
        :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
 
263
        :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid, plain text
251
264
        """
252
265
        try:
253
266
            import gpg
254
267
        except ImportError as error:
255
 
            raise errors.GpgNotInstalled(error)
 
268
            raise GpgNotInstalled(
 
269
                'Set check_signatures=ignore to disable verifying signatures.')
256
270
 
257
 
        signature = gpg.Data(content)
258
 
        sink = gpg.Data()
 
271
        signed_data = gpg.Data(signed_data)
 
272
        if signature:
 
273
            signature = gpg.Data(signature)
259
274
        try:
260
 
            plain_output, result = self.context.verify(signature)
 
275
            plain_output, result = self.context.verify(signed_data, signature)
261
276
        except gpg.errors.BadSignatures as error:
262
277
            fingerprint = error.result.signatures[0].fpr
263
278
            if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_EXPIRED:
264
 
                expires = self.context.get_key(error.result.signatures[0].fpr).subkeys[0].expires
 
279
                expires = self.context.get_key(
 
280
                    error.result.signatures[0].fpr).subkeys[0].expires
265
281
                if expires > error.result.signatures[0].timestamp:
266
282
                    # The expired key was not expired at time of signing.
267
283
                    # test_verify_expired_but_valid()
268
 
                    return SIGNATURE_EXPIRED, fingerprint[-8:]
 
284
                    return SIGNATURE_EXPIRED, fingerprint[-8:], None
269
285
                else:
270
286
                    # I can't work out how to create a test where the signature
271
287
                    # was expired at the time of signing.
272
 
                    return SIGNATURE_NOT_VALID, None
 
288
                    return SIGNATURE_NOT_VALID, None, None
273
289
 
274
290
            # GPG does not know this key.
275
291
            # test_verify_unknown_key()
276
 
            if error.result.signatures[0].summary & gpg.constants.SIGSUM_KEY_MISSING:
277
 
                return SIGNATURE_KEY_MISSING, fingerprint[-8:]
 
292
            if (error.result.signatures[0].summary &
 
293
                    gpg.constants.SIGSUM_KEY_MISSING):
 
294
                return SIGNATURE_KEY_MISSING, fingerprint[-8:], None
278
295
 
279
 
            return SIGNATURE_NOT_VALID, None
 
296
            return SIGNATURE_NOT_VALID, None, None
280
297
        except gpg.errors.GPGMEError as error:
281
 
            raise SignatureVerificationFailed(error[2])
 
298
            raise SignatureVerificationFailed(error)
282
299
 
283
300
        # No result if input is invalid.
284
301
        # test_verify_invalid()
285
302
        if len(result.signatures) == 0:
286
 
            return SIGNATURE_NOT_VALID, None
 
303
            return SIGNATURE_NOT_VALID, None, plain_output
 
304
 
287
305
        # User has specified a list of acceptable keys, check our result is in
288
306
        # it.  test_verify_unacceptable_key()
289
307
        fingerprint = result.signatures[0].fpr
290
308
        if self.acceptable_keys is not None:
291
 
            if not fingerprint in self.acceptable_keys:
292
 
                return SIGNATURE_KEY_MISSING, fingerprint[-8:]
293
 
        # Check the signature actually matches the testament.
294
 
        # test_verify_bad_testament()
295
 
        if testament != plain_output:
296
 
            return SIGNATURE_NOT_VALID, None
 
309
            if fingerprint not in self.acceptable_keys:
 
310
                return SIGNATURE_KEY_MISSING, fingerprint[-8:], plain_output
297
311
        # Yay gpg set the valid bit.
298
312
        # Can't write a test for this one as you can't set a key to be
299
313
        # trusted using gpg.
300
314
        if result.signatures[0].summary & gpg.constants.SIGSUM_VALID:
301
315
            key = self.context.get_key(fingerprint)
302
316
            name = key.uids[0].name
 
317
            if isinstance(name, bytes):
 
318
                name = name.decode('utf-8')
303
319
            email = key.uids[0].email
304
 
            return SIGNATURE_VALID, name + " <" + email + ">"
 
320
            if isinstance(email, bytes):
 
321
                email = email.decode('utf-8')
 
322
            return (SIGNATURE_VALID, name + u" <" + email + u">", plain_output)
305
323
        # Sigsum_red indicates a problem, unfortunatly I have not been able
306
324
        # to write any tests which actually set this.
307
325
        if result.signatures[0].summary & gpg.constants.SIGSUM_RED:
308
 
            return SIGNATURE_NOT_VALID, None
 
326
            return SIGNATURE_NOT_VALID, None, plain_output
309
327
        # Summary isn't set if sig is valid but key is untrusted but if user
310
328
        # has explicity set the key as acceptable we can validate it.
311
 
        if result.signatures[0].summary == 0 and self.acceptable_keys is not None:
 
329
        if (result.signatures[0].summary == 0 and
 
330
                self.acceptable_keys is not None):
312
331
            if fingerprint in self.acceptable_keys:
313
332
                # test_verify_untrusted_but_accepted()
314
 
                return SIGNATURE_VALID, None
 
333
                return SIGNATURE_VALID, None, plain_output
315
334
        # test_verify_valid_but_untrusted()
316
335
        if result.signatures[0].summary == 0 and self.acceptable_keys is None:
317
 
            return SIGNATURE_NOT_VALID, None
 
336
            return SIGNATURE_NOT_VALID, None, plain_output
318
337
        # Other error types such as revoked keys should (I think) be caught by
319
338
        # SIGSUM_RED so anything else means something is buggy.
320
339
        raise SignatureVerificationFailed(
331
350
        acceptable_keys_config = self._config_stack.get('acceptable_keys')
332
351
        if acceptable_keys_config is not None:
333
352
            patterns = acceptable_keys_config
334
 
        if command_line_input is not None: # command line overrides config
 
353
        if command_line_input is not None:  # command line overrides config
335
354
            patterns = command_line_input.split(',')
336
355
 
337
356
        if patterns:
345
364
                    trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
346
365
                if not found_key:
347
366
                    trace.note(gettext(
348
 
                            "No GnuPG key results for pattern: {0}"
349
 
                                ).format(pattern))
 
367
                        "No GnuPG key results for pattern: {0}"
 
368
                        ).format(pattern))
350
369
 
351
370
 
352
371
def valid_commits_message(count):
353
372
    """returns message for number of commits"""
354
373
    return gettext(u"{0} commits with valid signatures").format(
355
 
                                    count[SIGNATURE_VALID])
 
374
        count[SIGNATURE_VALID])
356
375
 
357
376
 
358
377
def unknown_key_message(count):
360
379
    return ngettext(u"{0} commit with unknown key",
361
380
                    u"{0} commits with unknown keys",
362
381
                    count[SIGNATURE_KEY_MISSING]).format(
363
 
                                    count[SIGNATURE_KEY_MISSING])
 
382
        count[SIGNATURE_KEY_MISSING])
364
383
 
365
384
 
366
385
def commit_not_valid_message(count):
368
387
    return ngettext(u"{0} commit not valid",
369
388
                    u"{0} commits not valid",
370
389
                    count[SIGNATURE_NOT_VALID]).format(
371
 
                                        count[SIGNATURE_NOT_VALID])
 
390
        count[SIGNATURE_NOT_VALID])
372
391
 
373
392
 
374
393
def commit_not_signed_message(count):
376
395
    return ngettext(u"{0} commit not signed",
377
396
                    u"{0} commits not signed",
378
397
                    count[SIGNATURE_NOT_SIGNED]).format(
379
 
                                    count[SIGNATURE_NOT_SIGNED])
 
398
        count[SIGNATURE_NOT_SIGNED])
380
399
 
381
400
 
382
401
def expired_commit_message(count):
384
403
    return ngettext(u"{0} commit with key now expired",
385
404
                    u"{0} commits with key now expired",
386
405
                    count[SIGNATURE_EXPIRED]).format(
387
 
                                count[SIGNATURE_EXPIRED])
 
406
        count[SIGNATURE_EXPIRED])
388
407
 
389
408
 
390
409
def verbose_expired_key_message(result, repo):
417
436
            signers[uid] += 1
418
437
    result = []
419
438
    for uid, number in signers.items():
420
 
         result.append(ngettext(u"{0} signed {1} commit",
421
 
                                u"{0} signed {1} commits",
422
 
                                number).format(uid, number))
 
439
        result.append(ngettext(u"{0} signed {1} commit",
 
440
                               u"{0} signed {1} commits",
 
441
                               number).format(uid, number))
423
442
    return result
424
443
 
425
444