/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smtp_connection.py

  • Committer: Martin Pool
  • Date: 2005-08-24 08:59:32 UTC
  • Revision ID: mbp@sourcefrog.net-20050824085932-c61f1f1f1c930e13
- Add a simple UIFactory 

  The idea of this is to let a client of bzrlib set some 
  policy about how output is displayed.

  In this revision all that's done is that progress bars
  are constructed by a policy established by the application
  rather than being randomly constructed in the library 
  or passed down the calls.  This avoids progress bars
  popping up while running the test suite and cleans up
  some code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2007, 2009 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
from cStringIO import StringIO
18
 
from email.Message import Message
19
 
import errno
20
 
import smtplib
21
 
import socket
22
 
 
23
 
from bzrlib import (
24
 
    config,
25
 
    email_message,
26
 
    errors,
27
 
    smtp_connection,
28
 
    tests,
29
 
    ui,
30
 
    )
31
 
 
32
 
 
33
 
def connection_refuser():
34
 
    def connect(server):
35
 
        raise socket.error(errno.ECONNREFUSED, 'Connection Refused')
36
 
    smtp = smtplib.SMTP()
37
 
    smtp.connect = connect
38
 
    return smtp
39
 
 
40
 
 
41
 
class StubSMTPFactory(object):
42
 
    """A fake SMTP connection to test the connection setup."""
43
 
    def __init__(self, fail_on=None, smtp_features=None):
44
 
        self._fail_on = fail_on or []
45
 
        self._calls = []
46
 
        self._smtp_features = smtp_features or []
47
 
        self._ehlo_called = False
48
 
 
49
 
    def __call__(self):
50
 
        # The factory pretends to be a connection
51
 
        return self
52
 
 
53
 
    def connect(self, server):
54
 
        self._calls.append(('connect', server))
55
 
 
56
 
    def helo(self):
57
 
        self._calls.append(('helo',))
58
 
        if 'helo' in self._fail_on:
59
 
            return 500, 'helo failure'
60
 
        else:
61
 
            return 200, 'helo success'
62
 
 
63
 
    def ehlo(self):
64
 
        self._calls.append(('ehlo',))
65
 
        if 'ehlo' in self._fail_on:
66
 
            return 500, 'ehlo failure'
67
 
        else:
68
 
            self._ehlo_called = True
69
 
            return 200, 'ehlo success'
70
 
 
71
 
    def has_extn(self, extension):
72
 
        self._calls.append(('has_extn', extension))
73
 
        return self._ehlo_called and extension in self._smtp_features
74
 
 
75
 
    def starttls(self):
76
 
        self._calls.append(('starttls',))
77
 
        if 'starttls' in self._fail_on:
78
 
            return 500, 'starttls failure'
79
 
        else:
80
 
            self._ehlo_called = True
81
 
            return 200, 'starttls success'
82
 
 
83
 
 
84
 
class WideOpenSMTPFactory(StubSMTPFactory):
85
 
    """A fake smtp server that implements login by accepting anybody."""
86
 
 
87
 
    def login(self, user, password):
88
 
        self._calls.append(('login', user, password))
89
 
 
90
 
 
91
 
class TestSMTPConnection(tests.TestCaseInTempDir):
92
 
 
93
 
    def get_connection(self, text, smtp_factory=None):
94
 
        my_config = config.GlobalConfig()
95
 
        config_file = StringIO(text)
96
 
        my_config._get_parser(config_file)
97
 
        return smtp_connection.SMTPConnection(my_config,
98
 
                                              _smtp_factory=smtp_factory)
99
 
 
100
 
    def test_defaults(self):
101
 
        conn = self.get_connection('')
102
 
        self.assertEqual('localhost', conn._smtp_server)
103
 
        self.assertEqual(None, conn._smtp_username)
104
 
        self.assertEqual(None, conn._smtp_password)
105
 
 
106
 
    def test_smtp_server(self):
107
 
        conn = self.get_connection('[DEFAULT]\nsmtp_server=host:10\n')
108
 
        self.assertEqual('host:10', conn._smtp_server)
109
 
 
110
 
    def test_missing_server(self):
111
 
        conn = self.get_connection('', smtp_factory=connection_refuser)
112
 
        self.assertRaises(errors.DefaultSMTPConnectionRefused, conn._connect)
113
 
        conn = self.get_connection('[DEFAULT]\nsmtp_server=smtp.example.com\n',
114
 
                                   smtp_factory=connection_refuser)
115
 
        self.assertRaises(errors.SMTPConnectionRefused, conn._connect)
116
 
 
117
 
    def test_smtp_username(self):
118
 
        conn = self.get_connection('')
119
 
        self.assertIs(None, conn._smtp_username)
120
 
 
121
 
        conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
122
 
        self.assertEqual(u'joebody', conn._smtp_username)
123
 
 
124
 
    def test_smtp_password_from_config(self):
125
 
        conn = self.get_connection('')
126
 
        self.assertIs(None, conn._smtp_password)
127
 
 
128
 
        conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
129
 
        self.assertEqual(u'mypass', conn._smtp_password)
130
 
 
131
 
    def test_smtp_password_from_user(self):
132
 
        user = 'joe'
133
 
        password = 'hispass'
134
 
        factory = WideOpenSMTPFactory()
135
 
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
136
 
                                   smtp_factory=factory)
137
 
        self.assertIs(None, conn._smtp_password)
138
 
 
139
 
        ui.ui_factory = ui.CannedInputUIFactory([password])
140
 
        conn._connect()
141
 
        self.assertEqual(password, conn._smtp_password)
142
 
 
143
 
    def test_smtp_password_from_auth_config(self):
144
 
        user = 'joe'
145
 
        password = 'hispass'
146
 
        factory = WideOpenSMTPFactory()
147
 
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
148
 
                                   smtp_factory=factory)
149
 
        self.assertEqual(user, conn._smtp_username)
150
 
        self.assertIs(None, conn._smtp_password)
151
 
        # Create a config file with the right password
152
 
        conf = config.AuthenticationConfig()
153
 
        conf._get_config().update({'smtptest':
154
 
                                       {'scheme': 'smtp', 'user':user,
155
 
                                        'password': password}})
156
 
        conf._save()
157
 
 
158
 
        conn._connect()
159
 
        self.assertEqual(password, conn._smtp_password)
160
 
 
161
 
    def test_authenticate_with_byte_strings(self):
162
 
        user = 'joe'
163
 
        password = 'h\xC3\xACspass'
164
 
        factory = WideOpenSMTPFactory()
165
 
        conn = self.get_connection(
166
 
            '[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
167
 
            % (user, password), smtp_factory=factory)
168
 
        self.assertEqual(u'h\xECspass', conn._smtp_password)
169
 
        conn._connect()
170
 
        self.assertEqual([('connect', 'localhost'),
171
 
                          ('ehlo',),
172
 
                          ('has_extn', 'starttls'),
173
 
                          ('login', user, password)], factory._calls)
174
 
        smtp_username, smtp_password = factory._calls[-1][1:]
175
 
        self.assertIsInstance(smtp_username, str)
176
 
        self.assertIsInstance(smtp_password, str)
177
 
 
178
 
    def test_create_connection(self):
179
 
        factory = StubSMTPFactory()
180
 
        conn = self.get_connection('', smtp_factory=factory)
181
 
        conn._create_connection()
182
 
        self.assertEqual([('connect', 'localhost'),
183
 
                          ('ehlo',),
184
 
                          ('has_extn', 'starttls')], factory._calls)
185
 
 
186
 
    def test_create_connection_ehlo_fails(self):
187
 
        # Check that we call HELO if EHLO failed.
188
 
        factory = StubSMTPFactory(fail_on=['ehlo'])
189
 
        conn = self.get_connection('', smtp_factory=factory)
190
 
        conn._create_connection()
191
 
        self.assertEqual([('connect', 'localhost'),
192
 
                          ('ehlo',),
193
 
                          ('helo',),
194
 
                          ('has_extn', 'starttls')], factory._calls)
195
 
 
196
 
    def test_create_connection_ehlo_helo_fails(self):
197
 
        # Check that we raise an exception if both EHLO and HELO fail.
198
 
        factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
199
 
        conn = self.get_connection('', smtp_factory=factory)
200
 
        self.assertRaises(errors.SMTPError, conn._create_connection)
201
 
        self.assertEqual([('connect', 'localhost'),
202
 
                          ('ehlo',),
203
 
                          ('helo',)], factory._calls)
204
 
 
205
 
    def test_create_connection_starttls(self):
206
 
        # Check that STARTTLS plus a second EHLO are called if the
207
 
        # server says it supports the feature.
208
 
        factory = StubSMTPFactory(smtp_features=['starttls'])
209
 
        conn = self.get_connection('', smtp_factory=factory)
210
 
        conn._create_connection()
211
 
        self.assertEqual([('connect', 'localhost'),
212
 
                          ('ehlo',),
213
 
                          ('has_extn', 'starttls'),
214
 
                          ('starttls',),
215
 
                          ('ehlo',)], factory._calls)
216
 
 
217
 
    def test_create_connection_starttls_fails(self):
218
 
        # Check that we raise an exception if the server claims to
219
 
        # support STARTTLS, but then fails when we try to activate it.
220
 
        factory = StubSMTPFactory(fail_on=['starttls'],
221
 
                                  smtp_features=['starttls'])
222
 
        conn = self.get_connection('', smtp_factory=factory)
223
 
        self.assertRaises(errors.SMTPError, conn._create_connection)
224
 
        self.assertEqual([('connect', 'localhost'),
225
 
                          ('ehlo',),
226
 
                          ('has_extn', 'starttls'),
227
 
                          ('starttls',)], factory._calls)
228
 
 
229
 
    def test_get_message_addresses(self):
230
 
        msg = Message()
231
 
 
232
 
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
233
 
        self.assertEqual('', from_)
234
 
        self.assertEqual([], to)
235
 
 
236
 
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
237
 
        msg['To'] = 'John Doe <john@doe.com>, Jane Doe <jane@doe.com>'
238
 
        msg['CC'] = u'Pepe P\xe9rez <pperez@ejemplo.com>'
239
 
        msg['Bcc'] = 'user@localhost'
240
 
 
241
 
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
242
 
        self.assertEqual('jrandom@example.com', from_)
243
 
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
244
 
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
245
 
 
246
 
        # now with bzrlib's EmailMessage
247
 
        msg = email_message.EmailMessage(
248
 
            '"J. Random Developer" <jrandom@example.com>',
249
 
            ['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
250
 
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
251
 
            'subject')
252
 
 
253
 
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
254
 
        self.assertEqual('jrandom@example.com', from_)
255
 
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
256
 
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
257
 
 
258
 
    def test_destination_address_required(self):
259
 
        class FakeConfig:
260
 
            def get_user_option(self, option):
261
 
                return None
262
 
 
263
 
        msg = Message()
264
 
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
265
 
        self.assertRaises(
266
 
            errors.NoDestinationAddress,
267
 
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
268
 
 
269
 
        msg = email_message.EmailMessage('from@from.com', '', 'subject')
270
 
        self.assertRaises(
271
 
            errors.NoDestinationAddress,
272
 
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
273
 
 
274
 
        msg = email_message.EmailMessage('from@from.com', [], 'subject')
275
 
        self.assertRaises(
276
 
            errors.NoDestinationAddress,
277
 
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)