/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testtransport.py

  • Committer: Robert Collins
  • Date: 2005-10-19 10:11:57 UTC
  • mfrom: (1185.16.78)
  • mto: This revision was merged to the branch mainline in revision 1470.
  • Revision ID: robertc@robertcollins.net-20051019101157-17438d311e746b4f
mergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
import errno
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
19
18
import os
20
 
import subprocess
21
 
import sys
22
 
import threading
23
 
 
24
 
from .. import (
25
 
    errors,
26
 
    osutils,
27
 
    tests,
28
 
    transport,
29
 
    urlutils,
30
 
    )
31
 
from ..sixish import (
32
 
    BytesIO,
33
 
    )
34
 
from ..transport import (
35
 
    chroot,
36
 
    fakenfs,
37
 
    http,
38
 
    local,
39
 
    memory,
40
 
    pathfilter,
41
 
    readonly,
42
 
    )
43
 
import breezy.transport.trace
44
 
from . import (
45
 
    features,
46
 
    test_server,
47
 
    )
48
 
 
49
 
 
50
 
# TODO: Should possibly split transport-specific tests into their own files.
51
 
 
52
 
 
53
 
class TestTransport(tests.TestCase):
54
 
    """Test the non transport-concrete class functionality."""
55
 
 
56
 
    def test__get_set_protocol_handlers(self):
57
 
        handlers = transport._get_protocol_handlers()
58
 
        self.assertNotEqual([], handlers.keys())
59
 
        transport._clear_protocol_handlers()
60
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
61
 
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
 
 
63
 
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
65
 
        self.addCleanup(transport._set_protocol_handlers, handlers)
66
 
        # don't pollute the current handlers
67
 
        transport._clear_protocol_handlers()
68
 
 
69
 
        class SampleHandler(object):
70
 
            """I exist, isnt that enough?"""
71
 
        transport._clear_protocol_handlers()
72
 
        transport.register_transport_proto('foo')
73
 
        transport.register_lazy_transport('foo',
74
 
                                          'breezy.tests.test_transport',
75
 
                                          'TestTransport.SampleHandler')
76
 
        transport.register_transport_proto('bar')
77
 
        transport.register_lazy_transport('bar',
78
 
                                          'breezy.tests.test_transport',
79
 
                                          'TestTransport.SampleHandler')
80
 
        self.assertEqual([SampleHandler.__module__,
81
 
                          'breezy.transport.chroot',
82
 
                          'breezy.transport.pathfilter'],
83
 
                         transport._get_transport_modules())
84
 
 
85
 
    def test_transport_dependency(self):
86
 
        """Transport with missing dependency causes no error"""
87
 
        saved_handlers = transport._get_protocol_handlers()
88
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
 
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
91
 
        transport.register_transport_proto('foo')
92
 
        transport.register_lazy_transport(
93
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
94
 
        try:
95
 
            transport.get_transport_from_url('foo://fooserver/foo')
96
 
        except errors.UnsupportedProtocol as e:
97
 
            self.assertEqual('Unsupported protocol'
98
 
                             ' for url "foo://fooserver/foo":'
99
 
                             ' Unable to import library "some_lib":'
100
 
                             ' testing missing dependency', str(e))
101
 
        else:
102
 
            self.fail('Did not raise UnsupportedProtocol')
103
 
 
104
 
    def test_transport_fallback(self):
105
 
        """Transport with missing dependency causes no error"""
106
 
        saved_handlers = transport._get_protocol_handlers()
107
 
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
108
 
        transport._clear_protocol_handlers()
109
 
        transport.register_transport_proto('foo')
110
 
        transport.register_lazy_transport(
111
 
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
112
 
        transport.register_lazy_transport(
113
 
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
114
 
        t = transport.get_transport_from_url('foo://fooserver/foo')
115
 
        self.assertTrue(isinstance(t, BackupTransportHandler))
116
 
 
117
 
    def test_ssh_hints(self):
118
 
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
119
 
        try:
120
 
            transport.get_transport_from_url('ssh://fooserver/foo')
121
 
        except errors.UnsupportedProtocol as e:
122
 
            self.assertEqual(
123
 
                'Unsupported protocol'
124
 
                ' for url "ssh://fooserver/foo":'
125
 
                ' Use bzr+ssh for Bazaar operations over SSH, '
126
 
                'e.g. "bzr+ssh://fooserver/foo". Use git+ssh '
127
 
                'for Git operations over SSH, e.g. "git+ssh://fooserver/foo".',
128
 
                str(e))
129
 
        else:
130
 
            self.fail('Did not raise UnsupportedProtocol')
131
 
 
132
 
    def test_LateReadError(self):
133
 
        """The LateReadError helper should raise on read()."""
134
 
        a_file = transport.LateReadError('a path')
135
 
        try:
136
 
            a_file.read()
137
 
        except errors.ReadError as error:
138
 
            self.assertEqual('a path', error.path)
139
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
140
 
        a_file.close()
141
 
 
142
 
    def test_local_abspath_non_local_transport(self):
143
 
        # the base implementation should throw
144
 
        t = memory.MemoryTransport()
145
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
146
 
        self.assertEqual('memory:///t is not a local path.', str(e))
147
 
 
148
 
 
149
 
class TestCoalesceOffsets(tests.TestCase):
150
 
 
151
 
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
152
 
        coalesce = transport.Transport._coalesce_offsets
153
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
154
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
155
 
                            max_size=max_size))
156
 
        self.assertEqual(exp, out)
157
 
 
158
 
    def test_coalesce_empty(self):
159
 
        self.check([], [])
160
 
 
161
 
    def test_coalesce_simple(self):
162
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
163
 
 
164
 
    def test_coalesce_unrelated(self):
165
 
        self.check([(0, 10, [(0, 10)]),
166
 
                    (20, 10, [(0, 10)]),
167
 
                    ], [(0, 10), (20, 10)])
168
 
 
169
 
    def test_coalesce_unsorted(self):
170
 
        self.check([(20, 10, [(0, 10)]),
171
 
                    (0, 10, [(0, 10)]),
172
 
                    ], [(20, 10), (0, 10)])
173
 
 
174
 
    def test_coalesce_nearby(self):
175
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
176
 
                   [(0, 10), (10, 10)])
177
 
 
178
 
    def test_coalesce_overlapped(self):
179
 
        self.assertRaises(ValueError,
180
 
                          self.check, [(0, 15, [(0, 10), (5, 10)])],
181
 
                          [(0, 10), (5, 10)])
182
 
 
183
 
    def test_coalesce_limit(self):
184
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
185
 
                              (30, 10), (40, 10)]),
186
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
187
 
                              (30, 10), (40, 10)]),
188
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
189
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
190
 
                        (90, 10), (100, 10)],
191
 
                   limit=5)
192
 
 
193
 
    def test_coalesce_no_limit(self):
194
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
195
 
                               (30, 10), (40, 10), (50, 10),
196
 
                               (60, 10), (70, 10), (80, 10),
197
 
                               (90, 10)]),
198
 
                    ], [(10, 10), (20, 10), (30, 10), (40, 10),
199
 
                        (50, 10), (60, 10), (70, 10), (80, 10),
200
 
                        (90, 10), (100, 10)])
201
 
 
202
 
    def test_coalesce_fudge(self):
203
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
204
 
                    (100, 10, [(0, 10)]),
205
 
                    ], [(10, 10), (30, 10), (100, 10)],
206
 
                   fudge=10)
207
 
 
208
 
    def test_coalesce_max_size(self):
209
 
        self.check([(10, 20, [(0, 10), (10, 10)]),
210
 
                    (30, 50, [(0, 50)]),
211
 
                    # If one range is above max_size, it gets its own coalesced
212
 
                    # offset
213
 
                    (100, 80, [(0, 80)]), ],
214
 
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
215
 
                   max_size=50)
216
 
 
217
 
    def test_coalesce_no_max_size(self):
218
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
219
 
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
220
 
                   )
221
 
 
222
 
    def test_coalesce_default_limit(self):
223
 
        # By default we use a 100MB max size.
224
 
        ten_mb = 10 * 1024 * 1024
225
 
        self.check(
226
 
            [(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
227
 
             (10 * ten_mb, ten_mb, [(0, ten_mb)])],
228
 
            [(i * ten_mb, ten_mb) for i in range(11)])
229
 
        self.check(
230
 
            [(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
 
            [(i * ten_mb, ten_mb) for i in range(11)],
232
 
            max_size=1 * 1024 * 1024 * 1024)
233
 
 
234
 
 
235
 
class TestMemoryServer(tests.TestCase):
236
 
 
237
 
    def test_create_server(self):
238
 
        server = memory.MemoryServer()
239
 
        server.start_server()
240
 
        url = server.get_url()
241
 
        self.assertTrue(url in transport.transport_list_registry)
242
 
        t = transport.get_transport_from_url(url)
243
 
        del t
244
 
        server.stop_server()
245
 
        self.assertFalse(url in transport.transport_list_registry)
246
 
        self.assertRaises(errors.UnsupportedProtocol,
247
 
                          transport.get_transport, url)
248
 
 
249
 
 
250
 
class TestMemoryTransport(tests.TestCase):
 
19
from cStringIO import StringIO
 
20
 
 
21
from bzrlib.errors import NoSuchFile, FileExists, TransportNotPossible
 
22
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
23
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
24
from bzrlib.transport import memory
 
25
 
 
26
 
 
27
def _append(fn, txt):
 
28
    """Append the given text (file-like object) to the supplied filename."""
 
29
    f = open(fn, 'ab')
 
30
    f.write(txt)
 
31
    f.flush()
 
32
    f.close()
 
33
    del f
 
34
 
 
35
class TestTransportMixIn(object):
 
36
    """Subclass this, and it will provide a series of tests for a Transport.
 
37
    It assumes that the Transport object is connected to the 
 
38
    current working directory.  So that whatever is done 
 
39
    through the transport, should show up in the working 
 
40
    directory, and vice-versa.
 
41
 
 
42
    This also tests to make sure that the functions work with both
 
43
    generators and lists (assuming iter(list) is effectively a generator)
 
44
    """
 
45
    readonly = False
 
46
    def get_transport(self):
 
47
        """Children should override this to return the Transport object.
 
48
        """
 
49
        raise NotImplementedError
 
50
 
 
51
    def test_has(self):
 
52
        t = self.get_transport()
 
53
 
 
54
        files = ['a', 'b', 'e', 'g']
 
55
        self.build_tree(files)
 
56
        self.assertEqual(t.has('a'), True)
 
57
        self.assertEqual(t.has('c'), False)
 
58
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
59
                [True, True, False, False, True, False, True, False])
 
60
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
61
                [True, True, False, False, True, False, True, False])
 
62
 
 
63
    def test_get(self):
 
64
        t = self.get_transport()
 
65
 
 
66
        files = ['a', 'b', 'e', 'g']
 
67
        self.build_tree(files)
 
68
        self.assertEqual(t.get('a').read(), open('a').read())
 
69
        content_f = t.get_multi(files)
 
70
        for path,f in zip(files, content_f):
 
71
            self.assertEqual(open(path).read(), f.read())
 
72
 
 
73
        content_f = t.get_multi(iter(files))
 
74
        for path,f in zip(files, content_f):
 
75
            self.assertEqual(open(path).read(), f.read())
 
76
 
 
77
        self.assertRaises(NoSuchFile, t.get, 'c')
 
78
        try:
 
79
            files = list(t.get_multi(['a', 'b', 'c']))
 
80
        except NoSuchFile:
 
81
            pass
 
82
        else:
 
83
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
84
        try:
 
85
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
 
86
        except NoSuchFile:
 
87
            pass
 
88
        else:
 
89
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
90
 
 
91
    def test_put(self):
 
92
        t = self.get_transport()
 
93
 
 
94
        if self.readonly:
 
95
            self.assertRaises(TransportNotPossible,
 
96
                    t.put, 'a', 'some text for a\n')
 
97
            open('a', 'wb').write('some text for a\n')
 
98
        else:
 
99
            t.put('a', 'some text for a\n')
 
100
        self.assert_(os.path.exists('a'))
 
101
        self.check_file_contents('a', 'some text for a\n')
 
102
        self.assertEqual(t.get('a').read(), 'some text for a\n')
 
103
        # Make sure 'has' is updated
 
104
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
105
                [True, False, False, False, False])
 
106
        if self.readonly:
 
107
            self.assertRaises(TransportNotPossible,
 
108
                    t.put_multi,
 
109
                    [('a', 'new\ncontents for\na\n'),
 
110
                        ('d', 'contents\nfor d\n')])
 
111
            open('a', 'wb').write('new\ncontents for\na\n')
 
112
            open('d', 'wb').write('contents\nfor d\n')
 
113
        else:
 
114
            # Put also replaces contents
 
115
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
116
                                          ('d', 'contents\nfor d\n')]),
 
117
                             2)
 
118
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
119
                [True, False, False, True, False])
 
120
        self.check_file_contents('a', 'new\ncontents for\na\n')
 
121
        self.check_file_contents('d', 'contents\nfor d\n')
 
122
 
 
123
        if self.readonly:
 
124
            self.assertRaises(TransportNotPossible,
 
125
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
 
126
                                  ('d', 'another contents\nfor d\n')]))
 
127
            open('a', 'wb').write('diff\ncontents for\na\n')
 
128
            open('d', 'wb').write('another contents\nfor d\n')
 
129
        else:
 
130
            self.assertEqual(
 
131
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
132
                                  ('d', 'another contents\nfor d\n')]))
 
133
                             , 2)
 
134
        self.check_file_contents('a', 'diff\ncontents for\na\n')
 
135
        self.check_file_contents('d', 'another contents\nfor d\n')
 
136
 
 
137
        if self.readonly:
 
138
            self.assertRaises(TransportNotPossible,
 
139
                    t.put, 'path/doesnt/exist/c', 'contents')
 
140
        else:
 
141
            self.assertRaises(NoSuchFile,
 
142
                    t.put, 'path/doesnt/exist/c', 'contents')
 
143
 
 
144
    def test_put_file(self):
 
145
        t = self.get_transport()
 
146
 
 
147
        # Test that StringIO can be used as a file-like object with put
 
148
        f1 = StringIO('this is a string\nand some more stuff\n')
 
149
        if self.readonly:
 
150
            open('f1', 'wb').write(f1.read())
 
151
        else:
 
152
            t.put('f1', f1)
 
153
 
 
154
        del f1
 
155
 
 
156
        self.check_file_contents('f1', 
 
157
                'this is a string\nand some more stuff\n')
 
158
 
 
159
        f2 = StringIO('here is some text\nand a bit more\n')
 
160
        f3 = StringIO('some text for the\nthird file created\n')
 
161
 
 
162
        if self.readonly:
 
163
            open('f2', 'wb').write(f2.read())
 
164
            open('f3', 'wb').write(f3.read())
 
165
        else:
 
166
            t.put_multi([('f2', f2), ('f3', f3)])
 
167
 
 
168
        del f2, f3
 
169
 
 
170
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
 
171
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
 
172
 
 
173
        # Test that an actual file object can be used with put
 
174
        f4 = open('f1', 'rb')
 
175
        if self.readonly:
 
176
            open('f4', 'wb').write(f4.read())
 
177
        else:
 
178
            t.put('f4', f4)
 
179
 
 
180
        del f4
 
181
 
 
182
        self.check_file_contents('f4', 
 
183
                'this is a string\nand some more stuff\n')
 
184
 
 
185
        f5 = open('f2', 'rb')
 
186
        f6 = open('f3', 'rb')
 
187
        if self.readonly:
 
188
            open('f5', 'wb').write(f5.read())
 
189
            open('f6', 'wb').write(f6.read())
 
190
        else:
 
191
            t.put_multi([('f5', f5), ('f6', f6)])
 
192
 
 
193
        del f5, f6
 
194
 
 
195
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
 
196
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
 
197
 
 
198
 
 
199
 
 
200
    def test_mkdir(self):
 
201
        t = self.get_transport()
 
202
 
 
203
        # Test mkdir
 
204
        os.mkdir('dir_a')
 
205
        self.assertEqual(t.has('dir_a'), True)
 
206
        self.assertEqual(t.has('dir_b'), False)
 
207
 
 
208
        if self.readonly:
 
209
            self.assertRaises(TransportNotPossible,
 
210
                    t.mkdir, 'dir_b')
 
211
            os.mkdir('dir_b')
 
212
        else:
 
213
            t.mkdir('dir_b')
 
214
        self.assertEqual(t.has('dir_b'), True)
 
215
        self.assert_(os.path.isdir('dir_b'))
 
216
 
 
217
        if self.readonly:
 
218
            self.assertRaises(TransportNotPossible,
 
219
                    t.mkdir_multi, ['dir_c', 'dir_d'])
 
220
            os.mkdir('dir_c')
 
221
            os.mkdir('dir_d')
 
222
        else:
 
223
            t.mkdir_multi(['dir_c', 'dir_d'])
 
224
 
 
225
        if self.readonly:
 
226
            self.assertRaises(TransportNotPossible,
 
227
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
 
228
            os.mkdir('dir_e')
 
229
            os.mkdir('dir_f')
 
230
        else:
 
231
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
232
        self.assertEqual(list(t.has_multi(
 
233
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
234
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
235
            [True, True, True, False,
 
236
             True, True, True, True])
 
237
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
 
238
            self.assert_(os.path.isdir(d))
 
239
 
 
240
        if not self.readonly:
 
241
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
242
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
243
 
 
244
        # Make sure the transport recognizes when a
 
245
        # directory is created by other means
 
246
        # Caching Transports will fail, because dir_e was already seen not
 
247
        # to exist. So instead, we will search for a new directory
 
248
        #os.mkdir('dir_e')
 
249
        #if not self.readonly:
 
250
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
 
251
 
 
252
        os.mkdir('dir_g')
 
253
        if not self.readonly:
 
254
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
255
 
 
256
        # Test get/put in sub-directories
 
257
        if self.readonly:
 
258
            open('dir_a/a', 'wb').write('contents of dir_a/a')
 
259
            open('dir_b/b', 'wb').write('contents of dir_b/b')
 
260
        else:
 
261
            self.assertEqual(
 
262
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
263
                             ('dir_b/b', 'contents of dir_b/b')])
 
264
                          , 2)
 
265
        for f in ('dir_a/a', 'dir_b/b'):
 
266
            self.assertEqual(t.get(f).read(), open(f).read())
 
267
 
 
268
    def test_copy_to(self):
 
269
        import tempfile
 
270
        from bzrlib.transport.local import LocalTransport
 
271
 
 
272
        t = self.get_transport()
 
273
 
 
274
        files = ['a', 'b', 'c', 'd']
 
275
        self.build_tree(files)
 
276
 
 
277
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
278
        dtmp_base = os.path.basename(dtmp)
 
279
        local_t = LocalTransport(dtmp)
 
280
 
 
281
        t.copy_to(files, local_t)
 
282
        for f in files:
 
283
            self.assertEquals(open(f).read(),
 
284
                    open(os.path.join(dtmp_base, f)).read())
 
285
 
 
286
        del dtmp, dtmp_base, local_t
 
287
 
 
288
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
289
        dtmp_base = os.path.basename(dtmp)
 
290
        local_t = LocalTransport(dtmp)
 
291
 
 
292
        files = ['a', 'b', 'c', 'd']
 
293
        t.copy_to(iter(files), local_t)
 
294
        for f in files:
 
295
            self.assertEquals(open(f).read(),
 
296
                    open(os.path.join(dtmp_base, f)).read())
 
297
 
 
298
        del dtmp, dtmp_base, local_t
 
299
 
 
300
    def test_append(self):
 
301
        t = self.get_transport()
 
302
 
 
303
        if self.readonly:
 
304
            open('a', 'wb').write('diff\ncontents for\na\n')
 
305
            open('b', 'wb').write('contents\nfor b\n')
 
306
        else:
 
307
            t.put_multi([
 
308
                    ('a', 'diff\ncontents for\na\n'),
 
309
                    ('b', 'contents\nfor b\n')
 
310
                    ])
 
311
 
 
312
        if self.readonly:
 
313
            self.assertRaises(TransportNotPossible,
 
314
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
315
            _append('a', 'add\nsome\nmore\ncontents\n')
 
316
        else:
 
317
            t.append('a', 'add\nsome\nmore\ncontents\n')
 
318
 
 
319
        self.check_file_contents('a', 
 
320
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
 
321
 
 
322
        if self.readonly:
 
323
            self.assertRaises(TransportNotPossible,
 
324
                    t.append_multi,
 
325
                        [('a', 'and\nthen\nsome\nmore\n'),
 
326
                         ('b', 'some\nmore\nfor\nb\n')])
 
327
            _append('a', 'and\nthen\nsome\nmore\n')
 
328
            _append('b', 'some\nmore\nfor\nb\n')
 
329
        else:
 
330
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
 
331
                    ('b', 'some\nmore\nfor\nb\n')])
 
332
        self.check_file_contents('a', 
 
333
            'diff\ncontents for\na\n'
 
334
            'add\nsome\nmore\ncontents\n'
 
335
            'and\nthen\nsome\nmore\n')
 
336
        self.check_file_contents('b', 
 
337
                'contents\nfor b\n'
 
338
                'some\nmore\nfor\nb\n')
 
339
 
 
340
        if self.readonly:
 
341
            _append('a', 'a little bit more\n')
 
342
            _append('b', 'from an iterator\n')
 
343
        else:
 
344
            t.append_multi(iter([('a', 'a little bit more\n'),
 
345
                    ('b', 'from an iterator\n')]))
 
346
        self.check_file_contents('a', 
 
347
            'diff\ncontents for\na\n'
 
348
            'add\nsome\nmore\ncontents\n'
 
349
            'and\nthen\nsome\nmore\n'
 
350
            'a little bit more\n')
 
351
        self.check_file_contents('b', 
 
352
                'contents\nfor b\n'
 
353
                'some\nmore\nfor\nb\n'
 
354
                'from an iterator\n')
 
355
 
 
356
    def test_append_file(self):
 
357
        t = self.get_transport()
 
358
 
 
359
        contents = [
 
360
            ('f1', 'this is a string\nand some more stuff\n'),
 
361
            ('f2', 'here is some text\nand a bit more\n'),
 
362
            ('f3', 'some text for the\nthird file created\n'),
 
363
            ('f4', 'this is a string\nand some more stuff\n'),
 
364
            ('f5', 'here is some text\nand a bit more\n'),
 
365
            ('f6', 'some text for the\nthird file created\n')
 
366
        ]
 
367
        
 
368
        if self.readonly:
 
369
            for f, val in contents:
 
370
                open(f, 'wb').write(val)
 
371
        else:
 
372
            t.put_multi(contents)
 
373
 
 
374
        a1 = StringIO('appending to\none\n')
 
375
        if self.readonly:
 
376
            _append('f1', a1.read())
 
377
        else:
 
378
            t.append('f1', a1)
 
379
 
 
380
        del a1
 
381
 
 
382
        self.check_file_contents('f1', 
 
383
                'this is a string\nand some more stuff\n'
 
384
                'appending to\none\n')
 
385
 
 
386
        a2 = StringIO('adding more\ntext to two\n')
 
387
        a3 = StringIO('some garbage\nto put in three\n')
 
388
 
 
389
        if self.readonly:
 
390
            _append('f2', a2.read())
 
391
            _append('f3', a3.read())
 
392
        else:
 
393
            t.append_multi([('f2', a2), ('f3', a3)])
 
394
 
 
395
        del a2, a3
 
396
 
 
397
        self.check_file_contents('f2',
 
398
                'here is some text\nand a bit more\n'
 
399
                'adding more\ntext to two\n')
 
400
        self.check_file_contents('f3', 
 
401
                'some text for the\nthird file created\n'
 
402
                'some garbage\nto put in three\n')
 
403
 
 
404
        # Test that an actual file object can be used with put
 
405
        a4 = open('f1', 'rb')
 
406
        if self.readonly:
 
407
            _append('f4', a4.read())
 
408
        else:
 
409
            t.append('f4', a4)
 
410
 
 
411
        del a4
 
412
 
 
413
        self.check_file_contents('f4', 
 
414
                'this is a string\nand some more stuff\n'
 
415
                'this is a string\nand some more stuff\n'
 
416
                'appending to\none\n')
 
417
 
 
418
        a5 = open('f2', 'rb')
 
419
        a6 = open('f3', 'rb')
 
420
        if self.readonly:
 
421
            _append('f5', a5.read())
 
422
            _append('f6', a6.read())
 
423
        else:
 
424
            t.append_multi([('f5', a5), ('f6', a6)])
 
425
 
 
426
        del a5, a6
 
427
 
 
428
        self.check_file_contents('f5',
 
429
                'here is some text\nand a bit more\n'
 
430
                'here is some text\nand a bit more\n'
 
431
                'adding more\ntext to two\n')
 
432
        self.check_file_contents('f6',
 
433
                'some text for the\nthird file created\n'
 
434
                'some text for the\nthird file created\n'
 
435
                'some garbage\nto put in three\n')
 
436
 
 
437
    def test_delete(self):
 
438
        # TODO: Test Transport.delete
 
439
        pass
 
440
 
 
441
    def test_move(self):
 
442
        # TODO: Test Transport.move
 
443
        pass
 
444
 
 
445
 
 
446
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
 
447
    def get_transport(self):
 
448
        from bzrlib.transport.local import LocalTransport
 
449
        return LocalTransport('.')
 
450
 
 
451
 
 
452
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
 
453
 
 
454
    readonly = True
 
455
 
 
456
    def get_transport(self):
 
457
        from bzrlib.transport.http import HttpTransport
 
458
        url = self.get_remote_url('.')
 
459
        return HttpTransport(url)
 
460
 
 
461
 
 
462
class TestMemoryTransport(TestCase):
251
463
 
252
464
    def test_get_transport(self):
253
465
        memory.MemoryTransport()
254
466
 
255
467
    def test_clone(self):
256
 
        t = memory.MemoryTransport()
257
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
258
 
        self.assertEqual("memory:///", t.clone("/").base)
 
468
        transport = memory.MemoryTransport()
 
469
        self.failUnless(transport.clone() is transport)
259
470
 
260
471
    def test_abspath(self):
261
 
        t = memory.MemoryTransport()
262
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
263
 
 
264
 
    def test_abspath_of_root(self):
265
 
        t = memory.MemoryTransport()
266
 
        self.assertEqual("memory:///", t.base)
267
 
        self.assertEqual("memory:///", t.abspath('/'))
268
 
 
269
 
    def test_abspath_of_relpath_starting_at_root(self):
270
 
        t = memory.MemoryTransport()
271
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
472
        transport = memory.MemoryTransport()
 
473
        self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
 
474
 
 
475
    def test_relpath(self):
 
476
        transport = memory.MemoryTransport()
272
477
 
273
478
    def test_append_and_get(self):
274
 
        t = memory.MemoryTransport()
275
 
        t.append_bytes('path', b'content')
276
 
        self.assertEqual(t.get('path').read(), b'content')
277
 
        t.append_file('path', BytesIO(b'content'))
278
 
        with t.get('path') as f:
279
 
            self.assertEqual(f.read(), b'contentcontent')
 
479
        transport = memory.MemoryTransport()
 
480
        transport.append('path', StringIO('content'))
 
481
        self.assertEqual(transport.get('path').read(), 'content')
 
482
        transport.append('path', StringIO('content'))
 
483
        self.assertEqual(transport.get('path').read(), 'contentcontent')
280
484
 
281
485
    def test_put_and_get(self):
282
 
        t = memory.MemoryTransport()
283
 
        t.put_file('path', BytesIO(b'content'))
284
 
        self.assertEqual(t.get('path').read(), b'content')
285
 
        t.put_bytes('path', b'content')
286
 
        self.assertEqual(t.get('path').read(), b'content')
 
486
        transport = memory.MemoryTransport()
 
487
        transport.put('path', StringIO('content'))
 
488
        self.assertEqual(transport.get('path').read(), 'content')
 
489
        transport.put('path', StringIO('content'))
 
490
        self.assertEqual(transport.get('path').read(), 'content')
287
491
 
288
492
    def test_append_without_dir_fails(self):
289
 
        t = memory.MemoryTransport()
290
 
        self.assertRaises(errors.NoSuchFile,
291
 
                          t.append_bytes, 'dir/path', b'content')
 
493
        transport = memory.MemoryTransport()
 
494
        self.assertRaises(NoSuchFile,
 
495
                          transport.append, 'dir/path', StringIO('content'))
292
496
 
293
497
    def test_put_without_dir_fails(self):
294
 
        t = memory.MemoryTransport()
295
 
        self.assertRaises(errors.NoSuchFile,
296
 
                          t.put_file, 'dir/path', BytesIO(b'content'))
 
498
        transport = memory.MemoryTransport()
 
499
        self.assertRaises(NoSuchFile,
 
500
                          transport.put, 'dir/path', StringIO('content'))
297
501
 
298
502
    def test_get_missing(self):
299
503
        transport = memory.MemoryTransport()
300
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
504
        self.assertRaises(NoSuchFile, transport.get, 'foo')
301
505
 
302
506
    def test_has_missing(self):
303
 
        t = memory.MemoryTransport()
304
 
        self.assertEqual(False, t.has('foo'))
 
507
        transport = memory.MemoryTransport()
 
508
        self.assertEquals(False, transport.has('foo'))
305
509
 
306
510
    def test_has_present(self):
307
 
        t = memory.MemoryTransport()
308
 
        t.append_bytes('foo', b'content')
309
 
        self.assertEqual(True, t.has('foo'))
310
 
 
311
 
    def test_list_dir(self):
312
 
        t = memory.MemoryTransport()
313
 
        t.put_bytes('foo', b'content')
314
 
        t.mkdir('dir')
315
 
        t.put_bytes('dir/subfoo', b'content')
316
 
        t.put_bytes('dirlike', b'content')
317
 
 
318
 
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
319
 
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
 
511
        transport = memory.MemoryTransport()
 
512
        transport.append('foo', StringIO('content'))
 
513
        self.assertEquals(True, transport.has('foo'))
320
514
 
321
515
    def test_mkdir(self):
322
 
        t = memory.MemoryTransport()
323
 
        t.mkdir('dir')
324
 
        t.append_bytes('dir/path', b'content')
325
 
        with t.get('dir/path') as f:
326
 
            self.assertEqual(f.read(), b'content')
 
516
        transport = memory.MemoryTransport()
 
517
        transport.mkdir('dir')
 
518
        transport.append('dir/path', StringIO('content'))
 
519
        self.assertEqual(transport.get('dir/path').read(), 'content')
327
520
 
328
521
    def test_mkdir_missing_parent(self):
329
 
        t = memory.MemoryTransport()
330
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
522
        transport = memory.MemoryTransport()
 
523
        self.assertRaises(NoSuchFile,
 
524
                          transport.mkdir, 'dir/dir')
331
525
 
332
526
    def test_mkdir_twice(self):
333
 
        t = memory.MemoryTransport()
334
 
        t.mkdir('dir')
335
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
527
        transport = memory.MemoryTransport()
 
528
        transport.mkdir('dir')
 
529
        self.assertRaises(FileExists, transport.mkdir, 'dir')
336
530
 
337
531
    def test_parameters(self):
338
 
        t = memory.MemoryTransport()
339
 
        self.assertEqual(True, t.listable())
340
 
        self.assertEqual(False, t.is_readonly())
 
532
        transport = memory.MemoryTransport()
 
533
        self.assertEqual(True, transport.listable())
 
534
        self.assertEqual(False, transport.should_cache())
341
535
 
342
536
    def test_iter_files_recursive(self):
343
 
        t = memory.MemoryTransport()
344
 
        t.mkdir('dir')
345
 
        t.put_bytes('dir/foo', b'content')
346
 
        t.put_bytes('dir/bar', b'content')
347
 
        t.put_bytes('bar', b'content')
348
 
        paths = set(t.iter_files_recursive())
349
 
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
 
537
        transport = memory.MemoryTransport()
 
538
        transport.mkdir('dir')
 
539
        transport.put('dir/foo', StringIO('content'))
 
540
        transport.put('dir/bar', StringIO('content'))
 
541
        transport.put('bar', StringIO('content'))
 
542
        paths = set(transport.iter_files_recursive())
 
543
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
350
544
 
351
545
    def test_stat(self):
352
 
        t = memory.MemoryTransport()
353
 
        t.put_bytes('foo', b'content')
354
 
        t.put_bytes('bar', b'phowar')
355
 
        self.assertEqual(7, t.stat('foo').st_size)
356
 
        self.assertEqual(6, t.stat('bar').st_size)
357
 
 
358
 
 
359
 
class ChrootDecoratorTransportTest(tests.TestCase):
360
 
    """Chroot decoration specific tests."""
361
 
 
362
 
    def test_abspath(self):
363
 
        # The abspath is always relative to the chroot_url.
364
 
        server = chroot.ChrootServer(
365
 
            transport.get_transport_from_url('memory:///foo/bar/'))
366
 
        self.start_server(server)
367
 
        t = transport.get_transport_from_url(server.get_url())
368
 
        self.assertEqual(server.get_url(), t.abspath('/'))
369
 
 
370
 
        subdir_t = t.clone('subdir')
371
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
372
 
 
373
 
    def test_clone(self):
374
 
        server = chroot.ChrootServer(
375
 
            transport.get_transport_from_url('memory:///foo/bar/'))
376
 
        self.start_server(server)
377
 
        t = transport.get_transport_from_url(server.get_url())
378
 
        # relpath from root and root path are the same
379
 
        relpath_cloned = t.clone('foo')
380
 
        abspath_cloned = t.clone('/foo')
381
 
        self.assertEqual(server, relpath_cloned.server)
382
 
        self.assertEqual(server, abspath_cloned.server)
383
 
 
384
 
    def test_chroot_url_preserves_chroot(self):
385
 
        """Calling get_transport on a chroot transport's base should produce a
386
 
        transport with exactly the same behaviour as the original chroot
387
 
        transport.
388
 
 
389
 
        This is so that it is not possible to escape a chroot by doing::
390
 
            url = chroot_transport.base
391
 
            parent_url = urlutils.join(url, '..')
392
 
            new_t = transport.get_transport_from_url(parent_url)
393
 
        """
394
 
        server = chroot.ChrootServer(
395
 
            transport.get_transport_from_url('memory:///path/subpath'))
396
 
        self.start_server(server)
397
 
        t = transport.get_transport_from_url(server.get_url())
398
 
        new_t = transport.get_transport_from_url(t.base)
399
 
        self.assertEqual(t.server, new_t.server)
400
 
        self.assertEqual(t.base, new_t.base)
401
 
 
402
 
    def test_urljoin_preserves_chroot(self):
403
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
404
 
        URL that escapes the intended chroot.
405
 
 
406
 
        This is so that it is not possible to escape a chroot by doing::
407
 
            url = chroot_transport.base
408
 
            parent_url = urlutils.join(url, '..')
409
 
            new_t = transport.get_transport_from_url(parent_url)
410
 
        """
411
 
        server = chroot.ChrootServer(
412
 
            transport.get_transport_from_url('memory:///path/'))
413
 
        self.start_server(server)
414
 
        t = transport.get_transport_from_url(server.get_url())
415
 
        self.assertRaises(
416
 
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
417
 
 
418
 
 
419
 
class TestChrootServer(tests.TestCase):
420
 
 
421
 
    def test_construct(self):
422
 
        backing_transport = memory.MemoryTransport()
423
 
        server = chroot.ChrootServer(backing_transport)
424
 
        self.assertEqual(backing_transport, server.backing_transport)
425
 
 
426
 
    def test_setUp(self):
427
 
        backing_transport = memory.MemoryTransport()
428
 
        server = chroot.ChrootServer(backing_transport)
429
 
        server.start_server()
430
 
        self.addCleanup(server.stop_server)
431
 
        self.assertTrue(server.scheme
432
 
                        in transport._get_protocol_handlers().keys())
433
 
 
434
 
    def test_stop_server(self):
435
 
        backing_transport = memory.MemoryTransport()
436
 
        server = chroot.ChrootServer(backing_transport)
437
 
        server.start_server()
438
 
        server.stop_server()
439
 
        self.assertFalse(server.scheme
440
 
                         in transport._get_protocol_handlers().keys())
441
 
 
442
 
    def test_get_url(self):
443
 
        backing_transport = memory.MemoryTransport()
444
 
        server = chroot.ChrootServer(backing_transport)
445
 
        server.start_server()
446
 
        self.addCleanup(server.stop_server)
447
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
448
 
 
449
 
 
450
 
class TestHooks(tests.TestCase):
451
 
    """Basic tests for transport hooks"""
452
 
 
453
 
    def _get_connected_transport(self):
454
 
        return transport.ConnectedTransport("bogus:nowhere")
455
 
 
456
 
    def test_transporthooks_initialisation(self):
457
 
        """Check all expected transport hook points are set up"""
458
 
        hookpoint = transport.TransportHooks()
459
 
        self.assertTrue("post_connect" in hookpoint,
460
 
                        "post_connect not in %s" % (hookpoint,))
461
 
 
462
 
    def test_post_connect(self):
463
 
        """Ensure the post_connect hook is called when _set_transport is"""
464
 
        calls = []
465
 
        transport.Transport.hooks.install_named_hook("post_connect",
466
 
                                                     calls.append, None)
467
 
        t = self._get_connected_transport()
468
 
        self.assertLength(0, calls)
469
 
        t._set_connection("connection", "auth")
470
 
        self.assertEqual(calls, [t])
471
 
 
472
 
 
473
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
474
 
    """Pathfilter decoration specific tests."""
475
 
 
476
 
    def test_abspath(self):
477
 
        # The abspath is always relative to the base of the backing transport.
478
 
        server = pathfilter.PathFilteringServer(
479
 
            transport.get_transport_from_url('memory:///foo/bar/'),
480
 
            lambda x: x)
481
 
        server.start_server()
482
 
        t = transport.get_transport_from_url(server.get_url())
483
 
        self.assertEqual(server.get_url(), t.abspath('/'))
484
 
 
485
 
        subdir_t = t.clone('subdir')
486
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
487
 
        server.stop_server()
488
 
 
489
 
    def make_pf_transport(self, filter_func=None):
490
 
        """Make a PathFilteringTransport backed by a MemoryTransport.
491
 
 
492
 
        :param filter_func: by default this will be a no-op function.  Use this
493
 
            parameter to override it."""
494
 
        if filter_func is None:
495
 
            def filter_func(x):
496
 
                return x
497
 
        server = pathfilter.PathFilteringServer(
498
 
            transport.get_transport_from_url('memory:///foo/bar/'),
499
 
            filter_func)
500
 
        server.start_server()
501
 
        self.addCleanup(server.stop_server)
502
 
        return transport.get_transport_from_url(server.get_url())
503
 
 
504
 
    def test__filter(self):
505
 
        # _filter (with an identity func as filter_func) always returns
506
 
        # paths relative to the base of the backing transport.
507
 
        t = self.make_pf_transport()
508
 
        self.assertEqual('foo', t._filter('foo'))
509
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
510
 
        self.assertEqual('', t._filter('..'))
511
 
        self.assertEqual('', t._filter('/'))
512
 
        # The base of the pathfiltering transport is taken into account too.
513
 
        t = t.clone('subdir1/subdir2')
514
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
515
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
516
 
        self.assertEqual('subdir1', t._filter('..'))
517
 
        self.assertEqual('', t._filter('/'))
518
 
 
519
 
    def test_filter_invocation(self):
520
 
        filter_log = []
521
 
 
522
 
        def filter(path):
523
 
            filter_log.append(path)
524
 
            return path
525
 
        t = self.make_pf_transport(filter)
526
 
        t.has('abc')
527
 
        self.assertEqual(['abc'], filter_log)
528
 
        del filter_log[:]
529
 
        t.clone('abc').has('xyz')
530
 
        self.assertEqual(['abc/xyz'], filter_log)
531
 
        del filter_log[:]
532
 
        t.has('/abc')
533
 
        self.assertEqual(['abc'], filter_log)
534
 
 
535
 
    def test_clone(self):
536
 
        t = self.make_pf_transport()
537
 
        # relpath from root and root path are the same
538
 
        relpath_cloned = t.clone('foo')
539
 
        abspath_cloned = t.clone('/foo')
540
 
        self.assertEqual(t.server, relpath_cloned.server)
541
 
        self.assertEqual(t.server, abspath_cloned.server)
542
 
 
543
 
    def test_url_preserves_pathfiltering(self):
544
 
        """Calling get_transport on a pathfiltered transport's base should
545
 
        produce a transport with exactly the same behaviour as the original
546
 
        pathfiltered transport.
547
 
 
548
 
        This is so that it is not possible to escape (accidentally or
549
 
        otherwise) the filtering by doing::
550
 
            url = filtered_transport.base
551
 
            parent_url = urlutils.join(url, '..')
552
 
            new_t = transport.get_transport_from_url(parent_url)
553
 
        """
554
 
        t = self.make_pf_transport()
555
 
        new_t = transport.get_transport_from_url(t.base)
556
 
        self.assertEqual(t.server, new_t.server)
557
 
        self.assertEqual(t.base, new_t.base)
558
 
 
559
 
 
560
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
561
 
    """Readonly decoration specific tests."""
562
 
 
563
 
    def test_local_parameters(self):
564
 
        # connect to . in readonly mode
565
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
566
 
        self.assertEqual(True, t.listable())
567
 
        self.assertEqual(True, t.is_readonly())
568
 
 
569
 
    def test_http_parameters(self):
570
 
        from breezy.tests.http_server import HttpServer
571
 
        # connect to '.' via http which is not listable
572
 
        server = HttpServer()
573
 
        self.start_server(server)
574
 
        t = transport.get_transport_from_url('readonly+' + server.get_url())
575
 
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
576
 
        self.assertEqual(False, t.listable())
577
 
        self.assertEqual(True, t.is_readonly())
578
 
 
579
 
 
580
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
581
 
    """NFS decorator specific tests."""
582
 
 
583
 
    def get_nfs_transport(self, url):
584
 
        # connect to url with nfs decoration
585
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
586
 
 
587
 
    def test_local_parameters(self):
588
 
        # the listable and is_readonly parameters
589
 
        # are not changed by the fakenfs decorator
590
 
        t = self.get_nfs_transport('.')
591
 
        self.assertEqual(True, t.listable())
592
 
        self.assertEqual(False, t.is_readonly())
593
 
 
594
 
    def test_http_parameters(self):
595
 
        # the listable and is_readonly parameters
596
 
        # are not changed by the fakenfs decorator
597
 
        from breezy.tests.http_server import HttpServer
598
 
        # connect to '.' via http which is not listable
599
 
        server = HttpServer()
600
 
        self.start_server(server)
601
 
        t = self.get_nfs_transport(server.get_url())
602
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
603
 
        self.assertEqual(False, t.listable())
604
 
        self.assertEqual(True, t.is_readonly())
605
 
 
606
 
    def test_fakenfs_server_default(self):
607
 
        # a FakeNFSServer() should bring up a local relpath server for itself
608
 
        server = test_server.FakeNFSServer()
609
 
        self.start_server(server)
610
 
        # the url should be decorated appropriately
611
 
        self.assertStartsWith(server.get_url(), 'fakenfs+')
612
 
        # and we should be able to get a transport for it
613
 
        t = transport.get_transport_from_url(server.get_url())
614
 
        # which must be a FakeNFSTransportDecorator instance.
615
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
616
 
 
617
 
    def test_fakenfs_rename_semantics(self):
618
 
        # a FakeNFS transport must mangle the way rename errors occur to
619
 
        # look like NFS problems.
620
 
        t = self.get_nfs_transport('.')
621
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
622
 
                        transport=t)
623
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
624
 
 
625
 
 
626
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
627
 
    """Tests for simulation of VFAT restrictions"""
628
 
 
629
 
    def get_vfat_transport(self, url):
630
 
        """Return vfat-backed transport for test directory"""
631
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
632
 
        return FakeVFATTransportDecorator('vfat+' + url)
633
 
 
634
 
    def test_transport_creation(self):
635
 
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
636
 
        t = self.get_vfat_transport('.')
637
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
638
 
 
639
 
    def test_transport_mkdir(self):
640
 
        t = self.get_vfat_transport('.')
641
 
        t.mkdir('HELLO')
642
 
        self.assertTrue(t.has('hello'))
643
 
        self.assertTrue(t.has('Hello'))
644
 
 
645
 
    def test_forbidden_chars(self):
646
 
        t = self.get_vfat_transport('.')
647
 
        self.assertRaises(ValueError, t.has, "<NU>")
648
 
 
649
 
 
650
 
class BadTransportHandler(transport.Transport):
651
 
    def __init__(self, base_url):
652
 
        raise errors.DependencyNotPresent('some_lib',
653
 
                                          'testing missing dependency')
654
 
 
655
 
 
656
 
class BackupTransportHandler(transport.Transport):
657
 
    """Test transport that works as a backup for the BadTransportHandler"""
658
 
    pass
659
 
 
660
 
 
661
 
class TestTransportImplementation(tests.TestCaseInTempDir):
662
 
    """Implementation verification for transports.
663
 
 
664
 
    To verify a transport we need a server factory, which is a callable
665
 
    that accepts no parameters and returns an implementation of
666
 
    breezy.transport.Server.
667
 
 
668
 
    That Server is then used to construct transport instances and test
669
 
    the transport via loopback activity.
670
 
 
671
 
    Currently this assumes that the Transport object is connected to the
672
 
    current working directory.  So that whatever is done
673
 
    through the transport, should show up in the working
674
 
    directory, and vice-versa. This is a bug, because its possible to have
675
 
    URL schemes which provide access to something that may not be
676
 
    result in storage on the local disk, i.e. due to file system limits, or
677
 
    due to it being a database or some other non-filesystem tool.
678
 
 
679
 
    This also tests to make sure that the functions work with both
680
 
    generators and lists (assuming iter(list) is effectively a generator)
681
 
    """
682
 
 
683
 
    def setUp(self):
684
 
        super(TestTransportImplementation, self).setUp()
685
 
        self._server = self.transport_server()
686
 
        self.start_server(self._server)
687
 
 
688
 
    def get_transport(self, relpath=None):
689
 
        """Return a connected transport to the local directory.
690
 
 
691
 
        :param relpath: a path relative to the base url.
692
 
        """
693
 
        base_url = self._server.get_url()
694
 
        url = self._adjust_url(base_url, relpath)
695
 
        # try getting the transport via the regular interface:
696
 
        t = transport.get_transport_from_url(url)
697
 
        # vila--20070607 if the following are commented out the test suite
698
 
        # still pass. Is this really still needed or was it a forgotten
699
 
        # temporary fix ?
700
 
        if not isinstance(t, self.transport_class):
701
 
            # we did not get the correct transport class type. Override the
702
 
            # regular connection behaviour by direct construction.
703
 
            t = self.transport_class(url)
704
 
        return t
705
 
 
706
 
 
707
 
class TestTransportFromPath(tests.TestCaseInTempDir):
708
 
 
709
 
    def test_with_path(self):
710
 
        t = transport.get_transport_from_path(self.test_dir)
711
 
        self.assertIsInstance(t, local.LocalTransport)
712
 
        self.assertEqual(t.base.rstrip("/"),
713
 
                         urlutils.local_path_to_url(self.test_dir))
714
 
 
715
 
    def test_with_url(self):
716
 
        t = transport.get_transport_from_path("file:")
717
 
        self.assertIsInstance(t, local.LocalTransport)
718
 
        self.assertEqual(
719
 
            t.base.rstrip("/"),
720
 
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
721
 
 
722
 
 
723
 
class TestTransportFromUrl(tests.TestCaseInTempDir):
724
 
 
725
 
    def test_with_path(self):
726
 
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
727
 
                          self.test_dir)
728
 
 
729
 
    def test_with_url(self):
730
 
        url = urlutils.local_path_to_url(self.test_dir)
731
 
        t = transport.get_transport_from_url(url)
732
 
        self.assertIsInstance(t, local.LocalTransport)
733
 
        self.assertEqual(t.base.rstrip("/"), url)
734
 
 
735
 
    def test_with_url_and_segment_parameters(self):
736
 
        url = urlutils.local_path_to_url(self.test_dir) + ",branch=foo"
737
 
        t = transport.get_transport_from_url(url)
738
 
        self.assertIsInstance(t, local.LocalTransport)
739
 
        self.assertEqual(t.base.rstrip("/"), url)
740
 
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
741
 
            f.write("data")
742
 
        self.assertTrue(t.has("afile"))
743
 
 
744
 
 
745
 
class TestLocalTransports(tests.TestCase):
746
 
 
747
 
    def test_get_transport_from_abspath(self):
748
 
        here = osutils.abspath('.')
749
 
        t = transport.get_transport(here)
750
 
        self.assertIsInstance(t, local.LocalTransport)
751
 
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
752
 
 
753
 
    def test_get_transport_from_relpath(self):
754
 
        t = transport.get_transport('.')
755
 
        self.assertIsInstance(t, local.LocalTransport)
756
 
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
757
 
 
758
 
    def test_get_transport_from_local_url(self):
759
 
        here = osutils.abspath('.')
760
 
        here_url = urlutils.local_path_to_url(here) + '/'
761
 
        t = transport.get_transport(here_url)
762
 
        self.assertIsInstance(t, local.LocalTransport)
763
 
        self.assertEqual(t.base, here_url)
764
 
 
765
 
    def test_local_abspath(self):
766
 
        here = osutils.abspath('.')
767
 
        t = transport.get_transport(here)
768
 
        self.assertEqual(t.local_abspath(''), here)
769
 
 
770
 
 
771
 
class TestLocalTransportMutation(tests.TestCaseInTempDir):
772
 
 
773
 
    def test_local_transport_mkdir(self):
774
 
        here = osutils.abspath('.')
775
 
        t = transport.get_transport(here)
776
 
        t.mkdir('test')
777
 
        self.assertTrue(os.path.exists('test'))
778
 
 
779
 
    def test_local_transport_mkdir_permission_denied(self):
780
 
        # See https://bugs.launchpad.net/bzr/+bug/606537
781
 
        here = osutils.abspath('.')
782
 
        t = transport.get_transport(here)
783
 
 
784
 
        def fake_chmod(path, mode):
785
 
            e = OSError('permission denied')
786
 
            e.errno = errno.EPERM
787
 
            raise e
788
 
        self.overrideAttr(os, 'chmod', fake_chmod)
789
 
        t.mkdir('test')
790
 
        t.mkdir('test2', mode=0o707)
791
 
        self.assertTrue(os.path.exists('test'))
792
 
        self.assertTrue(os.path.exists('test2'))
793
 
 
794
 
 
795
 
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
796
 
 
797
 
    def test_local_fdatasync_calls_fdatasync(self):
798
 
        """Check fdatasync on a stream tries to flush the data to the OS.
799
 
 
800
 
        We can't easily observe the external effect but we can at least see
801
 
        it's called.
802
 
        """
803
 
        sentinel = object()
804
 
        fdatasync = getattr(os, 'fdatasync', sentinel)
805
 
        if fdatasync is sentinel:
806
 
            raise tests.TestNotApplicable('fdatasync not supported')
807
 
        t = self.get_transport('.')
808
 
        calls = self.recordCalls(os, 'fdatasync')
809
 
        w = t.open_write_stream('out')
810
 
        w.write(b'foo')
811
 
        w.fdatasync()
812
 
        with open('out', 'rb') as f:
813
 
            # Should have been flushed.
814
 
            self.assertEqual(f.read(), b'foo')
815
 
        self.assertEqual(len(calls), 1, calls)
816
 
 
817
 
    def test_missing_directory(self):
818
 
        t = self.get_transport('.')
819
 
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
820
 
 
821
 
 
822
 
class TestWin32LocalTransport(tests.TestCase):
823
 
 
824
 
    def test_unc_clone_to_root(self):
825
 
        self.requireFeature(features.win32_feature)
826
 
        # Win32 UNC path like \\HOST\path
827
 
        # clone to root should stop at least at \\HOST part
828
 
        # not on \\
829
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
830
 
        for i in range(4):
831
 
            t = t.clone('..')
832
 
        self.assertEqual(t.base, 'file://HOST/')
833
 
        # make sure we reach the root
834
 
        t = t.clone('..')
835
 
        self.assertEqual(t.base, 'file://HOST/')
836
 
 
837
 
 
838
 
class TestConnectedTransport(tests.TestCase):
839
 
    """Tests for connected to remote server transports"""
840
 
 
841
 
    def test_parse_url(self):
842
 
        t = transport.ConnectedTransport(
843
 
            'http://simple.example.com/home/source')
844
 
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
845
 
        self.assertEqual(t._parsed_url.port, None)
846
 
        self.assertEqual(t._parsed_url.path, '/home/source/')
847
 
        self.assertTrue(t._parsed_url.user is None)
848
 
        self.assertTrue(t._parsed_url.password is None)
849
 
 
850
 
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
851
 
 
852
 
    def test_parse_url_with_at_in_user(self):
853
 
        # Bug 228058
854
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
855
 
        self.assertEqual(t._parsed_url.user, 'user@host.com')
856
 
 
857
 
    def test_parse_quoted_url(self):
858
 
        t = transport.ConnectedTransport(
859
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
860
 
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
861
 
        self.assertEqual(t._parsed_url.port, 2222)
862
 
        self.assertEqual(t._parsed_url.user, 'robey')
863
 
        self.assertEqual(t._parsed_url.password, 'h@t')
864
 
        self.assertEqual(t._parsed_url.path, '/path/')
865
 
 
866
 
        # Base should not keep track of the password
867
 
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
868
 
 
869
 
    def test_parse_invalid_url(self):
870
 
        self.assertRaises(urlutils.InvalidURL,
871
 
                          transport.ConnectedTransport,
872
 
                          'sftp://lily.org:~janneke/public/bzr/gub')
873
 
 
874
 
    def test_relpath(self):
875
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
876
 
 
877
 
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
878
 
                         'sub')
879
 
        self.assertRaises(errors.PathNotChild, t.relpath,
880
 
                          'http://user@host.com/abs/path/sub')
881
 
        self.assertRaises(errors.PathNotChild, t.relpath,
882
 
                          'sftp://user2@host.com/abs/path/sub')
883
 
        self.assertRaises(errors.PathNotChild, t.relpath,
884
 
                          'sftp://user@otherhost.com/abs/path/sub')
885
 
        self.assertRaises(errors.PathNotChild, t.relpath,
886
 
                          'sftp://user@host.com:33/abs/path/sub')
887
 
        # Make sure it works when we don't supply a username
888
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
889
 
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
890
 
 
891
 
        # Make sure it works when parts of the path will be url encoded
892
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
893
 
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
894
 
 
895
 
    def test_connection_sharing_propagate_credentials(self):
896
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
897
 
        self.assertEqual('user', t._parsed_url.user)
898
 
        self.assertEqual('host.com', t._parsed_url.host)
899
 
        self.assertIs(None, t._get_connection())
900
 
        self.assertIs(None, t._parsed_url.password)
901
 
        c = t.clone('subdir')
902
 
        self.assertIs(None, c._get_connection())
903
 
        self.assertIs(None, t._parsed_url.password)
904
 
 
905
 
        # Simulate the user entering a password
906
 
        password = 'secret'
907
 
        connection = object()
908
 
        t._set_connection(connection, password)
909
 
        self.assertIs(connection, t._get_connection())
910
 
        self.assertIs(password, t._get_credentials())
911
 
        self.assertIs(connection, c._get_connection())
912
 
        self.assertIs(password, c._get_credentials())
913
 
 
914
 
        # credentials can be updated
915
 
        new_password = 'even more secret'
916
 
        c._update_credentials(new_password)
917
 
        self.assertIs(connection, t._get_connection())
918
 
        self.assertIs(new_password, t._get_credentials())
919
 
        self.assertIs(connection, c._get_connection())
920
 
        self.assertIs(new_password, c._get_credentials())
921
 
 
922
 
 
923
 
class TestReusedTransports(tests.TestCase):
924
 
    """Tests for transport reuse"""
925
 
 
926
 
    def test_reuse_same_transport(self):
927
 
        possible_transports = []
928
 
        t1 = transport.get_transport_from_url(
929
 
            'http://foo/', possible_transports=possible_transports)
930
 
        self.assertEqual([t1], possible_transports)
931
 
        t2 = transport.get_transport_from_url('http://foo/',
932
 
                                              possible_transports=[t1])
933
 
        self.assertIs(t1, t2)
934
 
 
935
 
        # Also check that final '/' are handled correctly
936
 
        t3 = transport.get_transport_from_url('http://foo/path/')
937
 
        t4 = transport.get_transport_from_url('http://foo/path',
938
 
                                              possible_transports=[t3])
939
 
        self.assertIs(t3, t4)
940
 
 
941
 
        t5 = transport.get_transport_from_url('http://foo/path')
942
 
        t6 = transport.get_transport_from_url('http://foo/path/',
943
 
                                              possible_transports=[t5])
944
 
        self.assertIs(t5, t6)
945
 
 
946
 
    def test_don_t_reuse_different_transport(self):
947
 
        t1 = transport.get_transport_from_url('http://foo/path')
948
 
        t2 = transport.get_transport_from_url('http://bar/path',
949
 
                                              possible_transports=[t1])
950
 
        self.assertIsNot(t1, t2)
951
 
 
952
 
 
953
 
class TestTransportTrace(tests.TestCase):
954
 
 
955
 
    def test_decorator(self):
956
 
        t = transport.get_transport_from_url('trace+memory://')
957
 
        self.assertIsInstance(
958
 
            t, breezy.transport.trace.TransportTraceDecorator)
959
 
 
960
 
    def test_clone_preserves_activity(self):
961
 
        t = transport.get_transport_from_url('trace+memory://')
962
 
        t2 = t.clone('.')
963
 
        self.assertTrue(t is not t2)
964
 
        self.assertTrue(t._activity is t2._activity)
965
 
 
966
 
    # the following specific tests are for the operations that have made use of
967
 
    # logging in tests; we could test every single operation but doing that
968
 
    # still won't cause a test failure when the top level Transport API
969
 
    # changes; so there is little return doing that.
970
 
    def test_get(self):
971
 
        t = transport.get_transport_from_url('trace+memory:///')
972
 
        t.put_bytes('foo', b'barish')
973
 
        t.get('foo')
974
 
        expected_result = []
975
 
        # put_bytes records the bytes, not the content to avoid memory
976
 
        # pressure.
977
 
        expected_result.append(('put_bytes', 'foo', 6, None))
978
 
        # get records the file name only.
979
 
        expected_result.append(('get', 'foo'))
980
 
        self.assertEqual(expected_result, t._activity)
981
 
 
982
 
    def test_readv(self):
983
 
        t = transport.get_transport_from_url('trace+memory:///')
984
 
        t.put_bytes('foo', b'barish')
985
 
        list(t.readv('foo', [(0, 1), (3, 2)],
986
 
                     adjust_for_latency=True, upper_limit=6))
987
 
        expected_result = []
988
 
        # put_bytes records the bytes, not the content to avoid memory
989
 
        # pressure.
990
 
        expected_result.append(('put_bytes', 'foo', 6, None))
991
 
        # readv records the supplied offset request
992
 
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
993
 
        self.assertEqual(expected_result, t._activity)
994
 
 
995
 
 
996
 
class TestSSHConnections(tests.TestCaseWithTransport):
997
 
 
998
 
    def test_bzr_connect_to_bzr_ssh(self):
999
 
        """get_transport of a bzr+ssh:// behaves correctly.
1000
 
 
1001
 
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
1002
 
        """
1003
 
        # This test actually causes a bzr instance to be invoked, which is very
1004
 
        # expensive: it should be the only such test in the test suite.
1005
 
        # A reasonable evolution for this would be to simply check inside
1006
 
        # check_channel_exec_request that the command is appropriate, and then
1007
 
        # satisfy requests in-process.
1008
 
        self.requireFeature(features.paramiko)
1009
 
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
1010
 
        # override the interface (doesn't change self._vendor).
1011
 
        # Note that this does encryption, so can be slow.
1012
 
        from breezy.tests import stub_sftp
1013
 
 
1014
 
        # Start an SSH server
1015
 
        self.command_executed = []
1016
 
        # XXX: This is horrible -- we define a really dumb SSH server that
1017
 
        # executes commands, and manage the hooking up of stdin/out/err to the
1018
 
        # SSH channel ourselves.  Surely this has already been implemented
1019
 
        # elsewhere?
1020
 
        started = []
1021
 
 
1022
 
        class StubSSHServer(stub_sftp.StubServer):
1023
 
 
1024
 
            test = self
1025
 
 
1026
 
            def check_channel_exec_request(self, channel, command):
1027
 
                self.test.command_executed.append(command)
1028
 
                proc = subprocess.Popen(
1029
 
                    command, shell=True, stdin=subprocess.PIPE,
1030
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1031
 
                    bufsize=0)
1032
 
 
1033
 
                # XXX: horribly inefficient, not to mention ugly.
1034
 
                # Start a thread for each of stdin/out/err, and relay bytes
1035
 
                # from the subprocess to channel and vice versa.
1036
 
                def ferry_bytes(read, write, close):
1037
 
                    while True:
1038
 
                        bytes = read(1)
1039
 
                        if bytes == b'':
1040
 
                            close()
1041
 
                            break
1042
 
                        write(bytes)
1043
 
 
1044
 
                file_functions = [
1045
 
                    (channel.recv, proc.stdin.write, proc.stdin.close),
1046
 
                    (proc.stdout.read, channel.sendall, channel.close),
1047
 
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
1048
 
                started.append(proc)
1049
 
                for read, write, close in file_functions:
1050
 
                    t = threading.Thread(
1051
 
                        target=ferry_bytes, args=(read, write, close))
1052
 
                    t.start()
1053
 
                    started.append(t)
1054
 
 
1055
 
                return True
1056
 
 
1057
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1058
 
        # We *don't* want to override the default SSH vendor: the detected one
1059
 
        # is the one to use.
1060
 
 
1061
 
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1062
 
        # inherits from SFTPServer which forces the SSH vendor to
1063
 
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1064
 
        self.start_server(ssh_server)
1065
 
        port = ssh_server.port
1066
 
 
1067
 
        if sys.platform == 'win32':
1068
 
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
1069
 
        else:
1070
 
            bzr_remote_path = self.get_brz_path()
1071
 
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1072
 
 
1073
 
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
1074
 
        # variable is used to tell bzr what command to run on the remote end.
1075
 
        path_to_branch = osutils.abspath('.')
1076
 
        if sys.platform == 'win32':
1077
 
            # On Windows, we export all drives as '/C:/, etc. So we need to
1078
 
            # prefix a '/' to get the right path.
1079
 
            path_to_branch = '/' + path_to_branch
1080
 
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1081
 
        t = transport.get_transport(url)
1082
 
        self.permit_url(t.base)
1083
 
        t.mkdir('foo')
1084
 
 
1085
 
        self.assertEqual(
1086
 
            [b'%s serve --inet --directory=/ --allow-writes' %
1087
 
                bzr_remote_path.encode()],
1088
 
            self.command_executed)
1089
 
        # Make sure to disconnect, so that the remote process can stop, and we
1090
 
        # can cleanup. Then pause the test until everything is shutdown
1091
 
        t._client._medium.disconnect()
1092
 
        if not started:
1093
 
            return
1094
 
        # First wait for the subprocess
1095
 
        started[0].wait()
1096
 
        # And the rest are threads
1097
 
        for t in started[1:]:
1098
 
            t.join()
1099
 
 
1100
 
 
1101
 
class TestUnhtml(tests.TestCase):
1102
 
 
1103
 
    """Tests for unhtml_roughly"""
1104
 
 
1105
 
    def test_truncation(self):
1106
 
        fake_html = "<p>something!\n" * 1000
1107
 
        result = http.unhtml_roughly(fake_html)
1108
 
        self.assertEqual(len(result), 1000)
1109
 
        self.assertStartsWith(result, " something!")
 
546
        transport = memory.MemoryTransport()
 
547
        transport.put('foo', StringIO('content'))
 
548
        transport.put('bar', StringIO('phowar'))
 
549
        self.assertEqual(7, transport.stat('foo').st_size)
 
550
        self.assertEqual(6, transport.stat('bar').st_size)
 
551