/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_smtp_connection.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-18 01:57:45 UTC
  • mto: This revision was merged to the branch mainline in revision 7493.
  • Revision ID: jelmer@jelmer.uk-20200218015745-q2ss9tsk74h4nh61
drop use of future.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2007, 2009 Canonical Ltd
 
1
# Copyright (C) 2007, 2009, 2010, 2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
from cStringIO import StringIO
18
 
from email.Message import Message
 
17
from email.message import Message
19
18
import errno
20
19
import smtplib
21
20
import socket
22
21
 
23
 
from bzrlib import (
 
22
from breezy import (
24
23
    config,
25
24
    email_message,
26
 
    errors,
27
25
    smtp_connection,
28
26
    tests,
29
27
    ui,
40
38
 
41
39
class StubSMTPFactory(object):
42
40
    """A fake SMTP connection to test the connection setup."""
 
41
 
43
42
    def __init__(self, fail_on=None, smtp_features=None):
44
43
        self._fail_on = fail_on or []
45
44
        self._calls = []
91
90
class TestSMTPConnection(tests.TestCaseInTempDir):
92
91
 
93
92
    def get_connection(self, text, smtp_factory=None):
94
 
        my_config = config.GlobalConfig()
95
 
        config_file = StringIO(text)
96
 
        my_config._get_parser(config_file)
97
 
        return smtp_connection.SMTPConnection(my_config,
98
 
                                              _smtp_factory=smtp_factory)
 
93
        my_config = config.MemoryStack(text)
 
94
        return smtp_connection.SMTPConnection(
 
95
            my_config, _smtp_factory=smtp_factory)
99
96
 
100
97
    def test_defaults(self):
101
 
        conn = self.get_connection('')
 
98
        conn = self.get_connection(b'')
102
99
        self.assertEqual('localhost', conn._smtp_server)
103
100
        self.assertEqual(None, conn._smtp_username)
104
101
        self.assertEqual(None, conn._smtp_password)
105
102
 
106
103
    def test_smtp_server(self):
107
 
        conn = self.get_connection('[DEFAULT]\nsmtp_server=host:10\n')
 
104
        conn = self.get_connection(b'smtp_server=host:10')
108
105
        self.assertEqual('host:10', conn._smtp_server)
109
106
 
110
107
    def test_missing_server(self):
111
 
        conn = self.get_connection('', smtp_factory=connection_refuser)
112
 
        self.assertRaises(errors.DefaultSMTPConnectionRefused, conn._connect)
113
 
        conn = self.get_connection('[DEFAULT]\nsmtp_server=smtp.example.com\n',
 
108
        conn = self.get_connection(b'', smtp_factory=connection_refuser)
 
109
        self.assertRaises(smtp_connection.DefaultSMTPConnectionRefused,
 
110
                          conn._connect)
 
111
        conn = self.get_connection(b'smtp_server=smtp.example.com',
114
112
                                   smtp_factory=connection_refuser)
115
 
        self.assertRaises(errors.SMTPConnectionRefused, conn._connect)
 
113
        self.assertRaises(smtp_connection.SMTPConnectionRefused, conn._connect)
116
114
 
117
115
    def test_smtp_username(self):
118
 
        conn = self.get_connection('')
 
116
        conn = self.get_connection(b'')
119
117
        self.assertIs(None, conn._smtp_username)
120
118
 
121
 
        conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
 
119
        conn = self.get_connection(b'smtp_username=joebody')
122
120
        self.assertEqual(u'joebody', conn._smtp_username)
123
121
 
124
122
    def test_smtp_password_from_config(self):
125
 
        conn = self.get_connection('')
 
123
        conn = self.get_connection(b'')
126
124
        self.assertIs(None, conn._smtp_password)
127
125
 
128
 
        conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
 
126
        conn = self.get_connection(b'smtp_password=mypass')
129
127
        self.assertEqual(u'mypass', conn._smtp_password)
130
128
 
131
129
    def test_smtp_password_from_user(self):
132
130
        user = 'joe'
133
131
        password = 'hispass'
134
132
        factory = WideOpenSMTPFactory()
135
 
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
136
 
                                   smtp_factory=factory)
 
133
        conn = self.get_connection(
 
134
            b'[DEFAULT]\nsmtp_username=%s\n' % user.encode('ascii'),
 
135
            smtp_factory=factory)
137
136
        self.assertIs(None, conn._smtp_password)
138
137
 
139
138
        ui.ui_factory = ui.CannedInputUIFactory([password])
144
143
        user = 'joe'
145
144
        password = 'hispass'
146
145
        factory = WideOpenSMTPFactory()
147
 
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
148
 
                                   smtp_factory=factory)
 
146
        conn = self.get_connection(
 
147
            b'[DEFAULT]\nsmtp_username=%s\n' % user.encode('ascii'),
 
148
            smtp_factory=factory)
149
149
        self.assertEqual(user, conn._smtp_username)
150
150
        self.assertIs(None, conn._smtp_password)
151
151
        # Create a config file with the right password
152
152
        conf = config.AuthenticationConfig()
153
153
        conf._get_config().update({'smtptest':
154
 
                                       {'scheme': 'smtp', 'user':user,
155
 
                                        'password': password}})
 
154
                                   {'scheme': 'smtp', 'user': user,
 
155
                                    'password': password}})
156
156
        conf._save()
157
157
 
158
158
        conn._connect()
159
159
        self.assertEqual(password, conn._smtp_password)
160
160
 
161
161
    def test_authenticate_with_byte_strings(self):
162
 
        user = 'joe'
163
 
        password = 'h\xC3\xACspass'
 
162
        user = b'joe'
 
163
        unicode_pass = u'h\xECspass'
 
164
        utf8_pass = unicode_pass.encode('utf-8')
164
165
        factory = WideOpenSMTPFactory()
165
166
        conn = self.get_connection(
166
 
            '[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
167
 
            % (user, password), smtp_factory=factory)
168
 
        self.assertEqual(u'h\xECspass', conn._smtp_password)
 
167
            b'[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
 
168
            % (user, utf8_pass), smtp_factory=factory)
 
169
        self.assertEqual(unicode_pass, conn._smtp_password)
169
170
        conn._connect()
170
171
        self.assertEqual([('connect', 'localhost'),
171
172
                          ('ehlo',),
172
173
                          ('has_extn', 'starttls'),
173
 
                          ('login', user, password)], factory._calls)
 
174
                          ('login', user, utf8_pass)], factory._calls)
174
175
        smtp_username, smtp_password = factory._calls[-1][1:]
175
 
        self.assertIsInstance(smtp_username, str)
176
 
        self.assertIsInstance(smtp_password, str)
 
176
        self.assertIsInstance(smtp_username, bytes)
 
177
        self.assertIsInstance(smtp_password, bytes)
177
178
 
178
179
    def test_create_connection(self):
179
180
        factory = StubSMTPFactory()
180
 
        conn = self.get_connection('', smtp_factory=factory)
 
181
        conn = self.get_connection(b'', smtp_factory=factory)
181
182
        conn._create_connection()
182
183
        self.assertEqual([('connect', 'localhost'),
183
184
                          ('ehlo',),
186
187
    def test_create_connection_ehlo_fails(self):
187
188
        # Check that we call HELO if EHLO failed.
188
189
        factory = StubSMTPFactory(fail_on=['ehlo'])
189
 
        conn = self.get_connection('', smtp_factory=factory)
 
190
        conn = self.get_connection(b'', smtp_factory=factory)
190
191
        conn._create_connection()
191
192
        self.assertEqual([('connect', 'localhost'),
192
193
                          ('ehlo',),
196
197
    def test_create_connection_ehlo_helo_fails(self):
197
198
        # Check that we raise an exception if both EHLO and HELO fail.
198
199
        factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
199
 
        conn = self.get_connection('', smtp_factory=factory)
200
 
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
200
        conn = self.get_connection(b'', smtp_factory=factory)
 
201
        self.assertRaises(smtp_connection.SMTPError, conn._create_connection)
201
202
        self.assertEqual([('connect', 'localhost'),
202
203
                          ('ehlo',),
203
204
                          ('helo',)], factory._calls)
206
207
        # Check that STARTTLS plus a second EHLO are called if the
207
208
        # server says it supports the feature.
208
209
        factory = StubSMTPFactory(smtp_features=['starttls'])
209
 
        conn = self.get_connection('', smtp_factory=factory)
 
210
        conn = self.get_connection(b'', smtp_factory=factory)
210
211
        conn._create_connection()
211
212
        self.assertEqual([('connect', 'localhost'),
212
213
                          ('ehlo',),
219
220
        # support STARTTLS, but then fails when we try to activate it.
220
221
        factory = StubSMTPFactory(fail_on=['starttls'],
221
222
                                  smtp_features=['starttls'])
222
 
        conn = self.get_connection('', smtp_factory=factory)
223
 
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
223
        conn = self.get_connection(b'', smtp_factory=factory)
 
224
        self.assertRaises(smtp_connection.SMTPError, conn._create_connection)
224
225
        self.assertEqual([('connect', 'localhost'),
225
226
                          ('ehlo',),
226
227
                          ('has_extn', 'starttls'),
241
242
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
242
243
        self.assertEqual('jrandom@example.com', from_)
243
244
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
244
 
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
245
                                 'pperez@ejemplo.com', 'user@localhost']), sorted(to))
245
246
 
246
 
        # now with bzrlib's EmailMessage
 
247
        # now with breezy's EmailMessage
247
248
        msg = email_message.EmailMessage(
248
249
            '"J. Random Developer" <jrandom@example.com>',
249
250
            ['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
250
 
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
 
251
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost'],
251
252
            'subject')
252
253
 
253
254
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
254
255
        self.assertEqual('jrandom@example.com', from_)
255
256
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
256
 
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
257
                                 'pperez@ejemplo.com', 'user@localhost']), sorted(to))
257
258
 
258
259
    def test_destination_address_required(self):
259
 
        class FakeConfig:
260
 
            def get_user_option(self, option):
261
 
                return None
262
 
 
263
260
        msg = Message()
264
261
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
265
262
        self.assertRaises(
266
 
            errors.NoDestinationAddress,
267
 
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
263
            smtp_connection.NoDestinationAddress,
 
264
            smtp_connection.SMTPConnection(config.MemoryStack(b"")
 
265
                                           ).send_email, msg)
268
266
 
269
267
        msg = email_message.EmailMessage('from@from.com', '', 'subject')
270
268
        self.assertRaises(
271
 
            errors.NoDestinationAddress,
272
 
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
269
            smtp_connection.NoDestinationAddress,
 
270
            smtp_connection.SMTPConnection(config.MemoryStack(b"")
 
271
                                           ).send_email, msg)
273
272
 
274
273
        msg = email_message.EmailMessage('from@from.com', [], 'subject')
275
274
        self.assertRaises(
276
 
            errors.NoDestinationAddress,
277
 
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
275
            smtp_connection.NoDestinationAddress,
 
276
            smtp_connection.SMTPConnection(config.MemoryStack(b"")
 
277
                                           ).send_email, msg)