/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smtp_connection.py

  • Committer: John Arbash Meinel
  • Date: 2009-06-18 18:18:36 UTC
  • mto: This revision was merged to the branch mainline in revision 4461.
  • Revision ID: john@arbash-meinel.com-20090618181836-biodfkat9a8eyzjz
The new add_inventory_by_delta is returning a CHKInventory when mapping from NULL
Which is completely valid, but 'broke' one of the tests.
So to fix it, changed the test to use CHKInventories on both sides, and add an __eq__
member. The nice thing is that CHKInventory.__eq__ is fairly cheap, since it only
has to check the root keys.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from cStringIO import StringIO
 
18
from email.Message import Message
 
19
import errno
 
20
import smtplib
 
21
import socket
 
22
 
 
23
from bzrlib import (
 
24
    config,
 
25
    email_message,
 
26
    errors,
 
27
    smtp_connection,
 
28
    tests,
 
29
    ui,
 
30
    )
 
31
 
 
32
 
 
33
def connection_refuser():
 
34
    def connect(server):
 
35
        raise socket.error(errno.ECONNREFUSED, 'Connection Refused')
 
36
    smtp = smtplib.SMTP()
 
37
    smtp.connect = connect
 
38
    return smtp
 
39
 
 
40
 
 
41
class StubSMTPFactory(object):
 
42
    """A fake SMTP connection to test the connection setup."""
 
43
    def __init__(self, fail_on=None, smtp_features=None):
 
44
        self._fail_on = fail_on or []
 
45
        self._calls = []
 
46
        self._smtp_features = smtp_features or []
 
47
        self._ehlo_called = False
 
48
 
 
49
    def __call__(self):
 
50
        # The factory pretends to be a connection
 
51
        return self
 
52
 
 
53
    def connect(self, server):
 
54
        self._calls.append(('connect', server))
 
55
 
 
56
    def helo(self):
 
57
        self._calls.append(('helo',))
 
58
        if 'helo' in self._fail_on:
 
59
            return 500, 'helo failure'
 
60
        else:
 
61
            return 200, 'helo success'
 
62
 
 
63
    def ehlo(self):
 
64
        self._calls.append(('ehlo',))
 
65
        if 'ehlo' in self._fail_on:
 
66
            return 500, 'ehlo failure'
 
67
        else:
 
68
            self._ehlo_called = True
 
69
            return 200, 'ehlo success'
 
70
 
 
71
    def has_extn(self, extension):
 
72
        self._calls.append(('has_extn', extension))
 
73
        return self._ehlo_called and extension in self._smtp_features
 
74
 
 
75
    def starttls(self):
 
76
        self._calls.append(('starttls',))
 
77
        if 'starttls' in self._fail_on:
 
78
            return 500, 'starttls failure'
 
79
        else:
 
80
            self._ehlo_called = True
 
81
            return 200, 'starttls success'
 
82
 
 
83
 
 
84
class WideOpenSMTPFactory(StubSMTPFactory):
 
85
    """A fake smtp server that implements login by accepting anybody."""
 
86
 
 
87
    def login(self, user, password):
 
88
        self._calls.append(('login', user, password))
 
89
 
 
90
 
 
91
class TestSMTPConnection(tests.TestCaseInTempDir):
 
92
 
 
93
    def get_connection(self, text, smtp_factory=None):
 
94
        my_config = config.GlobalConfig()
 
95
        config_file = StringIO(text)
 
96
        my_config._get_parser(config_file)
 
97
        return smtp_connection.SMTPConnection(my_config,
 
98
                                              _smtp_factory=smtp_factory)
 
99
 
 
100
    def test_defaults(self):
 
101
        conn = self.get_connection('')
 
102
        self.assertEqual('localhost', conn._smtp_server)
 
103
        self.assertEqual(None, conn._smtp_username)
 
104
        self.assertEqual(None, conn._smtp_password)
 
105
 
 
106
    def test_smtp_server(self):
 
107
        conn = self.get_connection('[DEFAULT]\nsmtp_server=host:10\n')
 
108
        self.assertEqual('host:10', conn._smtp_server)
 
109
 
 
110
    def test_missing_server(self):
 
111
        conn = self.get_connection('', smtp_factory=connection_refuser)
 
112
        self.assertRaises(errors.DefaultSMTPConnectionRefused, conn._connect)
 
113
        conn = self.get_connection('[DEFAULT]\nsmtp_server=smtp.example.com\n',
 
114
                                   smtp_factory=connection_refuser)
 
115
        self.assertRaises(errors.SMTPConnectionRefused, conn._connect)
 
116
 
 
117
    def test_smtp_username(self):
 
118
        conn = self.get_connection('')
 
119
        self.assertIs(None, conn._smtp_username)
 
120
 
 
121
        conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
 
122
        self.assertEqual(u'joebody', conn._smtp_username)
 
123
 
 
124
    def test_smtp_password_from_config(self):
 
125
        conn = self.get_connection('')
 
126
        self.assertIs(None, conn._smtp_password)
 
127
 
 
128
        conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
 
129
        self.assertEqual(u'mypass', conn._smtp_password)
 
130
 
 
131
    def test_smtp_password_from_user(self):
 
132
        user = 'joe'
 
133
        password = 'hispass'
 
134
        factory = WideOpenSMTPFactory()
 
135
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
136
                                   smtp_factory=factory)
 
137
        self.assertIs(None, conn._smtp_password)
 
138
 
 
139
        ui.ui_factory = tests.TestUIFactory(stdin=password + '\n',
 
140
                                            stdout=tests.StringIOWrapper())
 
141
        conn._connect()
 
142
        self.assertEqual(password, conn._smtp_password)
 
143
        # stdin should be empty (the provided password have been consumed)
 
144
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
145
 
 
146
    def test_smtp_password_from_auth_config(self):
 
147
        user = 'joe'
 
148
        password = 'hispass'
 
149
        factory = WideOpenSMTPFactory()
 
150
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
151
                                   smtp_factory=factory)
 
152
        self.assertEqual(user, conn._smtp_username)
 
153
        self.assertIs(None, conn._smtp_password)
 
154
        # Create a config file with the right password
 
155
        conf = config.AuthenticationConfig()
 
156
        conf._get_config().update({'smtptest':
 
157
                                       {'scheme': 'smtp', 'user':user,
 
158
                                        'password': password}})
 
159
        conf._save()
 
160
 
 
161
        conn._connect()
 
162
        self.assertEqual(password, conn._smtp_password)
 
163
 
 
164
    def test_authenticate_with_byte_strings(self):
 
165
        user = 'joe'
 
166
        password = 'h\xC3\xACspass'
 
167
        factory = WideOpenSMTPFactory()
 
168
        conn = self.get_connection(
 
169
            '[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
 
170
            % (user, password), smtp_factory=factory)
 
171
        self.assertEqual(u'h\xECspass', conn._smtp_password)
 
172
        conn._connect()
 
173
        self.assertEqual([('connect', 'localhost'),
 
174
                          ('ehlo',),
 
175
                          ('has_extn', 'starttls'),
 
176
                          ('login', user, password)], factory._calls)
 
177
        smtp_username, smtp_password = factory._calls[-1][1:]
 
178
        self.assertIsInstance(smtp_username, str)
 
179
        self.assertIsInstance(smtp_password, str)
 
180
 
 
181
    def test_create_connection(self):
 
182
        factory = StubSMTPFactory()
 
183
        conn = self.get_connection('', smtp_factory=factory)
 
184
        conn._create_connection()
 
185
        self.assertEqual([('connect', 'localhost'),
 
186
                          ('ehlo',),
 
187
                          ('has_extn', 'starttls')], factory._calls)
 
188
 
 
189
    def test_create_connection_ehlo_fails(self):
 
190
        # Check that we call HELO if EHLO failed.
 
191
        factory = StubSMTPFactory(fail_on=['ehlo'])
 
192
        conn = self.get_connection('', smtp_factory=factory)
 
193
        conn._create_connection()
 
194
        self.assertEqual([('connect', 'localhost'),
 
195
                          ('ehlo',),
 
196
                          ('helo',),
 
197
                          ('has_extn', 'starttls')], factory._calls)
 
198
 
 
199
    def test_create_connection_ehlo_helo_fails(self):
 
200
        # Check that we raise an exception if both EHLO and HELO fail.
 
201
        factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
 
202
        conn = self.get_connection('', smtp_factory=factory)
 
203
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
204
        self.assertEqual([('connect', 'localhost'),
 
205
                          ('ehlo',),
 
206
                          ('helo',)], factory._calls)
 
207
 
 
208
    def test_create_connection_starttls(self):
 
209
        # Check that STARTTLS plus a second EHLO are called if the
 
210
        # server says it supports the feature.
 
211
        factory = StubSMTPFactory(smtp_features=['starttls'])
 
212
        conn = self.get_connection('', smtp_factory=factory)
 
213
        conn._create_connection()
 
214
        self.assertEqual([('connect', 'localhost'),
 
215
                          ('ehlo',),
 
216
                          ('has_extn', 'starttls'),
 
217
                          ('starttls',),
 
218
                          ('ehlo',)], factory._calls)
 
219
 
 
220
    def test_create_connection_starttls_fails(self):
 
221
        # Check that we raise an exception if the server claims to
 
222
        # support STARTTLS, but then fails when we try to activate it.
 
223
        factory = StubSMTPFactory(fail_on=['starttls'],
 
224
                                  smtp_features=['starttls'])
 
225
        conn = self.get_connection('', smtp_factory=factory)
 
226
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
227
        self.assertEqual([('connect', 'localhost'),
 
228
                          ('ehlo',),
 
229
                          ('has_extn', 'starttls'),
 
230
                          ('starttls',)], factory._calls)
 
231
 
 
232
    def test_get_message_addresses(self):
 
233
        msg = Message()
 
234
 
 
235
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
236
        self.assertEqual('', from_)
 
237
        self.assertEqual([], to)
 
238
 
 
239
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
 
240
        msg['To'] = 'John Doe <john@doe.com>, Jane Doe <jane@doe.com>'
 
241
        msg['CC'] = u'Pepe P\xe9rez <pperez@ejemplo.com>'
 
242
        msg['Bcc'] = 'user@localhost'
 
243
 
 
244
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
245
        self.assertEqual('jrandom@example.com', from_)
 
246
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
247
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
248
 
 
249
        # now with bzrlib's EmailMessage
 
250
        msg = email_message.EmailMessage(
 
251
            '"J. Random Developer" <jrandom@example.com>',
 
252
            ['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
 
253
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
 
254
            'subject')
 
255
 
 
256
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
257
        self.assertEqual('jrandom@example.com', from_)
 
258
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
259
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
260
 
 
261
    def test_destination_address_required(self):
 
262
        class FakeConfig:
 
263
            def get_user_option(self, option):
 
264
                return None
 
265
 
 
266
        msg = Message()
 
267
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
 
268
        self.assertRaises(
 
269
            errors.NoDestinationAddress,
 
270
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
271
 
 
272
        msg = email_message.EmailMessage('from@from.com', '', 'subject')
 
273
        self.assertRaises(
 
274
            errors.NoDestinationAddress,
 
275
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
276
 
 
277
        msg = email_message.EmailMessage('from@from.com', [], 'subject')
 
278
        self.assertRaises(
 
279
            errors.NoDestinationAddress,
 
280
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)