/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smtp_connection.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-22 16:28:47 UTC
  • mto: This revision was merged to the branch mainline in revision 4922.
  • Revision ID: john@arbash-meinel.com-20091222162847-tvnsc69to4l4uf5r
Implement a permute_for_extension helper.

Use it for all of the 'simple' extension permutations.
It basically permutes all tests in the current module, by setting TestCase.module.
Which works well for most of our extension tests. Some had more advanced
handling of permutations (extra permutations, custom vars, etc.)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2007, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from cStringIO import StringIO
 
18
from email.Message import Message
 
19
import errno
 
20
import smtplib
 
21
import socket
 
22
 
 
23
from bzrlib import (
 
24
    config,
 
25
    email_message,
 
26
    errors,
 
27
    smtp_connection,
 
28
    tests,
 
29
    ui,
 
30
    )
 
31
 
 
32
 
 
33
def connection_refuser():
 
34
    def connect(server):
 
35
        raise socket.error(errno.ECONNREFUSED, 'Connection Refused')
 
36
    smtp = smtplib.SMTP()
 
37
    smtp.connect = connect
 
38
    return smtp
 
39
 
 
40
 
 
41
class StubSMTPFactory(object):
 
42
    """A fake SMTP connection to test the connection setup."""
 
43
    def __init__(self, fail_on=None, smtp_features=None):
 
44
        self._fail_on = fail_on or []
 
45
        self._calls = []
 
46
        self._smtp_features = smtp_features or []
 
47
        self._ehlo_called = False
 
48
 
 
49
    def __call__(self):
 
50
        # The factory pretends to be a connection
 
51
        return self
 
52
 
 
53
    def connect(self, server):
 
54
        self._calls.append(('connect', server))
 
55
 
 
56
    def helo(self):
 
57
        self._calls.append(('helo',))
 
58
        if 'helo' in self._fail_on:
 
59
            return 500, 'helo failure'
 
60
        else:
 
61
            return 200, 'helo success'
 
62
 
 
63
    def ehlo(self):
 
64
        self._calls.append(('ehlo',))
 
65
        if 'ehlo' in self._fail_on:
 
66
            return 500, 'ehlo failure'
 
67
        else:
 
68
            self._ehlo_called = True
 
69
            return 200, 'ehlo success'
 
70
 
 
71
    def has_extn(self, extension):
 
72
        self._calls.append(('has_extn', extension))
 
73
        return self._ehlo_called and extension in self._smtp_features
 
74
 
 
75
    def starttls(self):
 
76
        self._calls.append(('starttls',))
 
77
        if 'starttls' in self._fail_on:
 
78
            return 500, 'starttls failure'
 
79
        else:
 
80
            self._ehlo_called = True
 
81
            return 200, 'starttls success'
 
82
 
 
83
 
 
84
class WideOpenSMTPFactory(StubSMTPFactory):
 
85
    """A fake smtp server that implements login by accepting anybody."""
 
86
 
 
87
    def login(self, user, password):
 
88
        self._calls.append(('login', user, password))
 
89
 
 
90
 
 
91
class TestSMTPConnection(tests.TestCaseInTempDir):
 
92
 
 
93
    def get_connection(self, text, smtp_factory=None):
 
94
        my_config = config.GlobalConfig()
 
95
        config_file = StringIO(text)
 
96
        my_config._get_parser(config_file)
 
97
        return smtp_connection.SMTPConnection(my_config,
 
98
                                              _smtp_factory=smtp_factory)
 
99
 
 
100
    def test_defaults(self):
 
101
        conn = self.get_connection('')
 
102
        self.assertEqual('localhost', conn._smtp_server)
 
103
        self.assertEqual(None, conn._smtp_username)
 
104
        self.assertEqual(None, conn._smtp_password)
 
105
 
 
106
    def test_smtp_server(self):
 
107
        conn = self.get_connection('[DEFAULT]\nsmtp_server=host:10\n')
 
108
        self.assertEqual('host:10', conn._smtp_server)
 
109
 
 
110
    def test_missing_server(self):
 
111
        conn = self.get_connection('', smtp_factory=connection_refuser)
 
112
        self.assertRaises(errors.DefaultSMTPConnectionRefused, conn._connect)
 
113
        conn = self.get_connection('[DEFAULT]\nsmtp_server=smtp.example.com\n',
 
114
                                   smtp_factory=connection_refuser)
 
115
        self.assertRaises(errors.SMTPConnectionRefused, conn._connect)
 
116
 
 
117
    def test_smtp_username(self):
 
118
        conn = self.get_connection('')
 
119
        self.assertIs(None, conn._smtp_username)
 
120
 
 
121
        conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
 
122
        self.assertEqual(u'joebody', conn._smtp_username)
 
123
 
 
124
    def test_smtp_password_from_config(self):
 
125
        conn = self.get_connection('')
 
126
        self.assertIs(None, conn._smtp_password)
 
127
 
 
128
        conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
 
129
        self.assertEqual(u'mypass', conn._smtp_password)
 
130
 
 
131
    def test_smtp_password_from_user(self):
 
132
        user = 'joe'
 
133
        password = 'hispass'
 
134
        factory = WideOpenSMTPFactory()
 
135
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
136
                                   smtp_factory=factory)
 
137
        self.assertIs(None, conn._smtp_password)
 
138
 
 
139
        ui.ui_factory = ui.CannedInputUIFactory([password])
 
140
        conn._connect()
 
141
        self.assertEqual(password, conn._smtp_password)
 
142
 
 
143
    def test_smtp_password_from_auth_config(self):
 
144
        user = 'joe'
 
145
        password = 'hispass'
 
146
        factory = WideOpenSMTPFactory()
 
147
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
148
                                   smtp_factory=factory)
 
149
        self.assertEqual(user, conn._smtp_username)
 
150
        self.assertIs(None, conn._smtp_password)
 
151
        # Create a config file with the right password
 
152
        conf = config.AuthenticationConfig()
 
153
        conf._get_config().update({'smtptest':
 
154
                                       {'scheme': 'smtp', 'user':user,
 
155
                                        'password': password}})
 
156
        conf._save()
 
157
 
 
158
        conn._connect()
 
159
        self.assertEqual(password, conn._smtp_password)
 
160
 
 
161
    def test_authenticate_with_byte_strings(self):
 
162
        user = 'joe'
 
163
        password = 'h\xC3\xACspass'
 
164
        factory = WideOpenSMTPFactory()
 
165
        conn = self.get_connection(
 
166
            '[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
 
167
            % (user, password), smtp_factory=factory)
 
168
        self.assertEqual(u'h\xECspass', conn._smtp_password)
 
169
        conn._connect()
 
170
        self.assertEqual([('connect', 'localhost'),
 
171
                          ('ehlo',),
 
172
                          ('has_extn', 'starttls'),
 
173
                          ('login', user, password)], factory._calls)
 
174
        smtp_username, smtp_password = factory._calls[-1][1:]
 
175
        self.assertIsInstance(smtp_username, str)
 
176
        self.assertIsInstance(smtp_password, str)
 
177
 
 
178
    def test_create_connection(self):
 
179
        factory = StubSMTPFactory()
 
180
        conn = self.get_connection('', smtp_factory=factory)
 
181
        conn._create_connection()
 
182
        self.assertEqual([('connect', 'localhost'),
 
183
                          ('ehlo',),
 
184
                          ('has_extn', 'starttls')], factory._calls)
 
185
 
 
186
    def test_create_connection_ehlo_fails(self):
 
187
        # Check that we call HELO if EHLO failed.
 
188
        factory = StubSMTPFactory(fail_on=['ehlo'])
 
189
        conn = self.get_connection('', smtp_factory=factory)
 
190
        conn._create_connection()
 
191
        self.assertEqual([('connect', 'localhost'),
 
192
                          ('ehlo',),
 
193
                          ('helo',),
 
194
                          ('has_extn', 'starttls')], factory._calls)
 
195
 
 
196
    def test_create_connection_ehlo_helo_fails(self):
 
197
        # Check that we raise an exception if both EHLO and HELO fail.
 
198
        factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
 
199
        conn = self.get_connection('', smtp_factory=factory)
 
200
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
201
        self.assertEqual([('connect', 'localhost'),
 
202
                          ('ehlo',),
 
203
                          ('helo',)], factory._calls)
 
204
 
 
205
    def test_create_connection_starttls(self):
 
206
        # Check that STARTTLS plus a second EHLO are called if the
 
207
        # server says it supports the feature.
 
208
        factory = StubSMTPFactory(smtp_features=['starttls'])
 
209
        conn = self.get_connection('', smtp_factory=factory)
 
210
        conn._create_connection()
 
211
        self.assertEqual([('connect', 'localhost'),
 
212
                          ('ehlo',),
 
213
                          ('has_extn', 'starttls'),
 
214
                          ('starttls',),
 
215
                          ('ehlo',)], factory._calls)
 
216
 
 
217
    def test_create_connection_starttls_fails(self):
 
218
        # Check that we raise an exception if the server claims to
 
219
        # support STARTTLS, but then fails when we try to activate it.
 
220
        factory = StubSMTPFactory(fail_on=['starttls'],
 
221
                                  smtp_features=['starttls'])
 
222
        conn = self.get_connection('', smtp_factory=factory)
 
223
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
224
        self.assertEqual([('connect', 'localhost'),
 
225
                          ('ehlo',),
 
226
                          ('has_extn', 'starttls'),
 
227
                          ('starttls',)], factory._calls)
 
228
 
 
229
    def test_get_message_addresses(self):
 
230
        msg = Message()
 
231
 
 
232
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
233
        self.assertEqual('', from_)
 
234
        self.assertEqual([], to)
 
235
 
 
236
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
 
237
        msg['To'] = 'John Doe <john@doe.com>, Jane Doe <jane@doe.com>'
 
238
        msg['CC'] = u'Pepe P\xe9rez <pperez@ejemplo.com>'
 
239
        msg['Bcc'] = 'user@localhost'
 
240
 
 
241
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
242
        self.assertEqual('jrandom@example.com', from_)
 
243
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
244
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
245
 
 
246
        # now with bzrlib's EmailMessage
 
247
        msg = email_message.EmailMessage(
 
248
            '"J. Random Developer" <jrandom@example.com>',
 
249
            ['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
 
250
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
 
251
            'subject')
 
252
 
 
253
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
254
        self.assertEqual('jrandom@example.com', from_)
 
255
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
256
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
257
 
 
258
    def test_destination_address_required(self):
 
259
        class FakeConfig:
 
260
            def get_user_option(self, option):
 
261
                return None
 
262
 
 
263
        msg = Message()
 
264
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
 
265
        self.assertRaises(
 
266
            errors.NoDestinationAddress,
 
267
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
268
 
 
269
        msg = email_message.EmailMessage('from@from.com', '', 'subject')
 
270
        self.assertRaises(
 
271
            errors.NoDestinationAddress,
 
272
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
273
 
 
274
        msg = email_message.EmailMessage('from@from.com', [], 'subject')
 
275
        self.assertRaises(
 
276
            errors.NoDestinationAddress,
 
277
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)