/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2010-03-21 21:39:33 UTC
  • mfrom: (5102 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5143.
  • Revision ID: jelmer@samba.org-20100321213933-fexeh9zcoz8oaju2
merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
21
21
import sys
22
22
import threading
23
23
 
24
 
import bzrlib
25
24
from bzrlib import (
26
25
    errors,
27
26
    osutils,
28
27
    tests,
 
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.errors import (DependencyNotPresent,
32
 
                           FileExists,
33
 
                           InvalidURLJoin,
34
 
                           NoSuchFile,
35
 
                           PathNotChild,
36
 
                           ReadError,
37
 
                           UnsupportedProtocol,
38
 
                           )
39
 
from bzrlib.tests import ParamikoFeature, TestCase, TestCaseInTempDir
40
 
from bzrlib.transport import (_clear_protocol_handlers,
41
 
                              _CoalescedOffset,
42
 
                              ConnectedTransport,
43
 
                              _get_protocol_handlers,
44
 
                              _set_protocol_handlers,
45
 
                              _get_transport_modules,
46
 
                              get_transport,
47
 
                              LateReadError,
48
 
                              register_lazy_transport,
49
 
                              register_transport_proto,
50
 
                              Transport,
51
 
                              )
52
 
from bzrlib.transport.chroot import ChrootServer
53
 
from bzrlib.transport.memory import MemoryTransport
54
 
from bzrlib.transport.local import (LocalTransport,
55
 
                                    EmulatedWin32LocalTransport)
56
 
from bzrlib.transport.pathfilter import PathFilteringServer
 
31
from bzrlib.transport import (
 
32
    chroot,
 
33
    fakenfs,
 
34
    local,
 
35
    memory,
 
36
    pathfilter,
 
37
    readonly,
 
38
    )
 
39
from bzrlib.tests import (
 
40
    features,
 
41
    test_server,
 
42
    )
57
43
 
58
44
 
59
45
# TODO: Should possibly split transport-specific tests into their own files.
60
46
 
61
47
 
62
 
class TestTransport(TestCase):
 
48
class TestTransport(tests.TestCase):
63
49
    """Test the non transport-concrete class functionality."""
64
50
 
 
51
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
 
52
    # of try/finally -- vila 20100205
 
53
 
65
54
    def test__get_set_protocol_handlers(self):
66
 
        handlers = _get_protocol_handlers()
 
55
        handlers = transport._get_protocol_handlers()
67
56
        self.assertNotEqual([], handlers.keys( ))
68
57
        try:
69
 
            _clear_protocol_handlers()
70
 
            self.assertEqual([], _get_protocol_handlers().keys())
 
58
            transport._clear_protocol_handlers()
 
59
            self.assertEqual([], transport._get_protocol_handlers().keys())
71
60
        finally:
72
 
            _set_protocol_handlers(handlers)
 
61
            transport._set_protocol_handlers(handlers)
73
62
 
74
63
    def test_get_transport_modules(self):
75
 
        handlers = _get_protocol_handlers()
 
64
        handlers = transport._get_protocol_handlers()
76
65
        # don't pollute the current handlers
77
 
        _clear_protocol_handlers()
 
66
        transport._clear_protocol_handlers()
78
67
        class SampleHandler(object):
79
68
            """I exist, isnt that enough?"""
80
69
        try:
81
 
            _clear_protocol_handlers()
82
 
            register_transport_proto('foo')
83
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
84
 
                                    'TestTransport.SampleHandler')
85
 
            register_transport_proto('bar')
86
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
87
 
                                    'TestTransport.SampleHandler')
 
70
            transport._clear_protocol_handlers()
 
71
            transport.register_transport_proto('foo')
 
72
            transport.register_lazy_transport('foo',
 
73
                                              'bzrlib.tests.test_transport',
 
74
                                              'TestTransport.SampleHandler')
 
75
            transport.register_transport_proto('bar')
 
76
            transport.register_lazy_transport('bar',
 
77
                                              'bzrlib.tests.test_transport',
 
78
                                              'TestTransport.SampleHandler')
88
79
            self.assertEqual([SampleHandler.__module__,
89
80
                              'bzrlib.transport.chroot',
90
81
                              'bzrlib.transport.pathfilter'],
91
 
                             _get_transport_modules())
 
82
                             transport._get_transport_modules())
92
83
        finally:
93
 
            _set_protocol_handlers(handlers)
 
84
            transport._set_protocol_handlers(handlers)
94
85
 
95
86
    def test_transport_dependency(self):
96
87
        """Transport with missing dependency causes no error"""
97
 
        saved_handlers = _get_protocol_handlers()
 
88
        saved_handlers = transport._get_protocol_handlers()
98
89
        # don't pollute the current handlers
99
 
        _clear_protocol_handlers()
 
90
        transport._clear_protocol_handlers()
100
91
        try:
101
 
            register_transport_proto('foo')
102
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
103
 
                    'BadTransportHandler')
 
92
            transport.register_transport_proto('foo')
 
93
            transport.register_lazy_transport(
 
94
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
104
95
            try:
105
 
                get_transport('foo://fooserver/foo')
106
 
            except UnsupportedProtocol, e:
 
96
                transport.get_transport('foo://fooserver/foo')
 
97
            except errors.UnsupportedProtocol, e:
107
98
                e_str = str(e)
108
99
                self.assertEquals('Unsupported protocol'
109
100
                                  ' for url "foo://fooserver/foo":'
113
104
                self.fail('Did not raise UnsupportedProtocol')
114
105
        finally:
115
106
            # restore original values
116
 
            _set_protocol_handlers(saved_handlers)
 
107
            transport._set_protocol_handlers(saved_handlers)
117
108
 
118
109
    def test_transport_fallback(self):
119
110
        """Transport with missing dependency causes no error"""
120
 
        saved_handlers = _get_protocol_handlers()
 
111
        saved_handlers = transport._get_protocol_handlers()
121
112
        try:
122
 
            _clear_protocol_handlers()
123
 
            register_transport_proto('foo')
124
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
125
 
                    'BackupTransportHandler')
126
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
127
 
                    'BadTransportHandler')
128
 
            t = get_transport('foo://fooserver/foo')
 
113
            transport._clear_protocol_handlers()
 
114
            transport.register_transport_proto('foo')
 
115
            transport.register_lazy_transport(
 
116
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
 
117
            transport.register_lazy_transport(
 
118
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
119
            t = transport.get_transport('foo://fooserver/foo')
129
120
            self.assertTrue(isinstance(t, BackupTransportHandler))
130
121
        finally:
131
 
            _set_protocol_handlers(saved_handlers)
 
122
            transport._set_protocol_handlers(saved_handlers)
132
123
 
133
124
    def test_ssh_hints(self):
134
125
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
135
126
        try:
136
 
            get_transport('ssh://fooserver/foo')
137
 
        except UnsupportedProtocol, e:
 
127
            transport.get_transport('ssh://fooserver/foo')
 
128
        except errors.UnsupportedProtocol, e:
138
129
            e_str = str(e)
139
130
            self.assertEquals('Unsupported protocol'
140
131
                              ' for url "ssh://fooserver/foo":'
141
 
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
132
                              ' bzr supports bzr+ssh to operate over ssh,'
 
133
                              ' use "bzr+ssh://fooserver/foo".',
142
134
                              str(e))
143
135
        else:
144
136
            self.fail('Did not raise UnsupportedProtocol')
145
137
 
146
138
    def test_LateReadError(self):
147
139
        """The LateReadError helper should raise on read()."""
148
 
        a_file = LateReadError('a path')
 
140
        a_file = transport.LateReadError('a path')
149
141
        try:
150
142
            a_file.read()
151
 
        except ReadError, error:
 
143
        except errors.ReadError, error:
152
144
            self.assertEqual('a path', error.path)
153
 
        self.assertRaises(ReadError, a_file.read, 40)
 
145
        self.assertRaises(errors.ReadError, a_file.read, 40)
154
146
        a_file.close()
155
147
 
156
148
    def test__combine_paths(self):
157
 
        t = Transport('/')
 
149
        t = transport.Transport('/')
158
150
        self.assertEqual('/home/sarah/project/foo',
159
151
                         t._combine_paths('/home/sarah', 'project/foo'))
160
152
        self.assertEqual('/etc',
166
158
 
167
159
    def test_local_abspath_non_local_transport(self):
168
160
        # the base implementation should throw
169
 
        t = MemoryTransport()
 
161
        t = memory.MemoryTransport()
170
162
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
171
163
        self.assertEqual('memory:///t is not a local path.', str(e))
172
164
 
173
165
 
174
 
class TestCoalesceOffsets(TestCase):
 
166
class TestCoalesceOffsets(tests.TestCase):
175
167
 
176
168
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
177
 
        coalesce = Transport._coalesce_offsets
178
 
        exp = [_CoalescedOffset(*x) for x in expected]
 
169
        coalesce = transport.Transport._coalesce_offsets
 
170
        exp = [transport._CoalescedOffset(*x) for x in expected]
179
171
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
180
172
                            max_size=max_size))
181
173
        self.assertEqual(exp, out)
256
248
                   max_size=1*1024*1024*1024)
257
249
 
258
250
 
259
 
class TestMemoryTransport(TestCase):
 
251
class TestMemoryServer(tests.TestCase):
 
252
 
 
253
    def test_create_server(self):
 
254
        server = memory.MemoryServer()
 
255
        server.start_server()
 
256
        url = server.get_url()
 
257
        self.assertTrue(url in transport.transport_list_registry)
 
258
        t = transport.get_transport(url)
 
259
        del t
 
260
        server.stop_server()
 
261
        self.assertFalse(url in transport.transport_list_registry)
 
262
        self.assertRaises(errors.UnsupportedProtocol,
 
263
                          transport.get_transport, url)
 
264
 
 
265
 
 
266
class TestMemoryTransport(tests.TestCase):
260
267
 
261
268
    def test_get_transport(self):
262
 
        MemoryTransport()
 
269
        memory.MemoryTransport()
263
270
 
264
271
    def test_clone(self):
265
 
        transport = MemoryTransport()
266
 
        self.assertTrue(isinstance(transport, MemoryTransport))
267
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
272
        t = memory.MemoryTransport()
 
273
        self.assertTrue(isinstance(t, memory.MemoryTransport))
 
274
        self.assertEqual("memory:///", t.clone("/").base)
268
275
 
269
276
    def test_abspath(self):
270
 
        transport = MemoryTransport()
271
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
277
        t = memory.MemoryTransport()
 
278
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
272
279
 
273
280
    def test_abspath_of_root(self):
274
 
        transport = MemoryTransport()
275
 
        self.assertEqual("memory:///", transport.base)
276
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
281
        t = memory.MemoryTransport()
 
282
        self.assertEqual("memory:///", t.base)
 
283
        self.assertEqual("memory:///", t.abspath('/'))
277
284
 
278
285
    def test_abspath_of_relpath_starting_at_root(self):
279
 
        transport = MemoryTransport()
280
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
286
        t = memory.MemoryTransport()
 
287
        self.assertEqual("memory:///foo", t.abspath('/foo'))
281
288
 
282
289
    def test_append_and_get(self):
283
 
        transport = MemoryTransport()
284
 
        transport.append_bytes('path', 'content')
285
 
        self.assertEqual(transport.get('path').read(), 'content')
286
 
        transport.append_file('path', StringIO('content'))
287
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
290
        t = memory.MemoryTransport()
 
291
        t.append_bytes('path', 'content')
 
292
        self.assertEqual(t.get('path').read(), 'content')
 
293
        t.append_file('path', StringIO('content'))
 
294
        self.assertEqual(t.get('path').read(), 'contentcontent')
288
295
 
289
296
    def test_put_and_get(self):
290
 
        transport = MemoryTransport()
291
 
        transport.put_file('path', StringIO('content'))
292
 
        self.assertEqual(transport.get('path').read(), 'content')
293
 
        transport.put_bytes('path', 'content')
294
 
        self.assertEqual(transport.get('path').read(), 'content')
 
297
        t = memory.MemoryTransport()
 
298
        t.put_file('path', StringIO('content'))
 
299
        self.assertEqual(t.get('path').read(), 'content')
 
300
        t.put_bytes('path', 'content')
 
301
        self.assertEqual(t.get('path').read(), 'content')
295
302
 
296
303
    def test_append_without_dir_fails(self):
297
 
        transport = MemoryTransport()
298
 
        self.assertRaises(NoSuchFile,
299
 
                          transport.append_bytes, 'dir/path', 'content')
 
304
        t = memory.MemoryTransport()
 
305
        self.assertRaises(errors.NoSuchFile,
 
306
                          t.append_bytes, 'dir/path', 'content')
300
307
 
301
308
    def test_put_without_dir_fails(self):
302
 
        transport = MemoryTransport()
303
 
        self.assertRaises(NoSuchFile,
304
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
309
        t = memory.MemoryTransport()
 
310
        self.assertRaises(errors.NoSuchFile,
 
311
                          t.put_file, 'dir/path', StringIO('content'))
305
312
 
306
313
    def test_get_missing(self):
307
 
        transport = MemoryTransport()
308
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
314
        transport = memory.MemoryTransport()
 
315
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
309
316
 
310
317
    def test_has_missing(self):
311
 
        transport = MemoryTransport()
312
 
        self.assertEquals(False, transport.has('foo'))
 
318
        t = memory.MemoryTransport()
 
319
        self.assertEquals(False, t.has('foo'))
313
320
 
314
321
    def test_has_present(self):
315
 
        transport = MemoryTransport()
316
 
        transport.append_bytes('foo', 'content')
317
 
        self.assertEquals(True, transport.has('foo'))
 
322
        t = memory.MemoryTransport()
 
323
        t.append_bytes('foo', 'content')
 
324
        self.assertEquals(True, t.has('foo'))
318
325
 
319
326
    def test_list_dir(self):
320
 
        transport = MemoryTransport()
321
 
        transport.put_bytes('foo', 'content')
322
 
        transport.mkdir('dir')
323
 
        transport.put_bytes('dir/subfoo', 'content')
324
 
        transport.put_bytes('dirlike', 'content')
 
327
        t = memory.MemoryTransport()
 
328
        t.put_bytes('foo', 'content')
 
329
        t.mkdir('dir')
 
330
        t.put_bytes('dir/subfoo', 'content')
 
331
        t.put_bytes('dirlike', 'content')
325
332
 
326
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
327
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
333
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
334
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
328
335
 
329
336
    def test_mkdir(self):
330
 
        transport = MemoryTransport()
331
 
        transport.mkdir('dir')
332
 
        transport.append_bytes('dir/path', 'content')
333
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
337
        t = memory.MemoryTransport()
 
338
        t.mkdir('dir')
 
339
        t.append_bytes('dir/path', 'content')
 
340
        self.assertEqual(t.get('dir/path').read(), 'content')
334
341
 
335
342
    def test_mkdir_missing_parent(self):
336
 
        transport = MemoryTransport()
337
 
        self.assertRaises(NoSuchFile,
338
 
                          transport.mkdir, 'dir/dir')
 
343
        t = memory.MemoryTransport()
 
344
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
339
345
 
340
346
    def test_mkdir_twice(self):
341
 
        transport = MemoryTransport()
342
 
        transport.mkdir('dir')
343
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
347
        t = memory.MemoryTransport()
 
348
        t.mkdir('dir')
 
349
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
344
350
 
345
351
    def test_parameters(self):
346
 
        transport = MemoryTransport()
347
 
        self.assertEqual(True, transport.listable())
348
 
        self.assertEqual(False, transport.is_readonly())
 
352
        t = memory.MemoryTransport()
 
353
        self.assertEqual(True, t.listable())
 
354
        self.assertEqual(False, t.is_readonly())
349
355
 
350
356
    def test_iter_files_recursive(self):
351
 
        transport = MemoryTransport()
352
 
        transport.mkdir('dir')
353
 
        transport.put_bytes('dir/foo', 'content')
354
 
        transport.put_bytes('dir/bar', 'content')
355
 
        transport.put_bytes('bar', 'content')
356
 
        paths = set(transport.iter_files_recursive())
 
357
        t = memory.MemoryTransport()
 
358
        t.mkdir('dir')
 
359
        t.put_bytes('dir/foo', 'content')
 
360
        t.put_bytes('dir/bar', 'content')
 
361
        t.put_bytes('bar', 'content')
 
362
        paths = set(t.iter_files_recursive())
357
363
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
358
364
 
359
365
    def test_stat(self):
360
 
        transport = MemoryTransport()
361
 
        transport.put_bytes('foo', 'content')
362
 
        transport.put_bytes('bar', 'phowar')
363
 
        self.assertEqual(7, transport.stat('foo').st_size)
364
 
        self.assertEqual(6, transport.stat('bar').st_size)
365
 
 
366
 
 
367
 
class ChrootDecoratorTransportTest(TestCase):
 
366
        t = memory.MemoryTransport()
 
367
        t.put_bytes('foo', 'content')
 
368
        t.put_bytes('bar', 'phowar')
 
369
        self.assertEqual(7, t.stat('foo').st_size)
 
370
        self.assertEqual(6, t.stat('bar').st_size)
 
371
 
 
372
 
 
373
class ChrootDecoratorTransportTest(tests.TestCase):
368
374
    """Chroot decoration specific tests."""
369
375
 
370
376
    def test_abspath(self):
371
377
        # The abspath is always relative to the chroot_url.
372
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
378
        server = chroot.ChrootServer(
 
379
            transport.get_transport('memory:///foo/bar/'))
373
380
        self.start_server(server)
374
 
        transport = get_transport(server.get_url())
375
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
381
        t = transport.get_transport(server.get_url())
 
382
        self.assertEqual(server.get_url(), t.abspath('/'))
376
383
 
377
 
        subdir_transport = transport.clone('subdir')
378
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
384
        subdir_t = t.clone('subdir')
 
385
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
379
386
 
380
387
    def test_clone(self):
381
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
388
        server = chroot.ChrootServer(
 
389
            transport.get_transport('memory:///foo/bar/'))
382
390
        self.start_server(server)
383
 
        transport = get_transport(server.get_url())
 
391
        t = transport.get_transport(server.get_url())
384
392
        # relpath from root and root path are the same
385
 
        relpath_cloned = transport.clone('foo')
386
 
        abspath_cloned = transport.clone('/foo')
 
393
        relpath_cloned = t.clone('foo')
 
394
        abspath_cloned = t.clone('/foo')
387
395
        self.assertEqual(server, relpath_cloned.server)
388
396
        self.assertEqual(server, abspath_cloned.server)
389
397
 
395
403
        This is so that it is not possible to escape a chroot by doing::
396
404
            url = chroot_transport.base
397
405
            parent_url = urlutils.join(url, '..')
398
 
            new_transport = get_transport(parent_url)
 
406
            new_t = transport.get_transport(parent_url)
399
407
        """
400
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
408
        server = chroot.ChrootServer(
 
409
            transport.get_transport('memory:///path/subpath'))
401
410
        self.start_server(server)
402
 
        transport = get_transport(server.get_url())
403
 
        new_transport = get_transport(transport.base)
404
 
        self.assertEqual(transport.server, new_transport.server)
405
 
        self.assertEqual(transport.base, new_transport.base)
 
411
        t = transport.get_transport(server.get_url())
 
412
        new_t = transport.get_transport(t.base)
 
413
        self.assertEqual(t.server, new_t.server)
 
414
        self.assertEqual(t.base, new_t.base)
406
415
 
407
416
    def test_urljoin_preserves_chroot(self):
408
417
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
411
420
        This is so that it is not possible to escape a chroot by doing::
412
421
            url = chroot_transport.base
413
422
            parent_url = urlutils.join(url, '..')
414
 
            new_transport = get_transport(parent_url)
 
423
            new_t = transport.get_transport(parent_url)
415
424
        """
416
 
        server = ChrootServer(get_transport('memory:///path/'))
 
425
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
417
426
        self.start_server(server)
418
 
        transport = get_transport(server.get_url())
 
427
        t = transport.get_transport(server.get_url())
419
428
        self.assertRaises(
420
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
421
 
 
422
 
 
423
 
class ChrootServerTest(TestCase):
 
429
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
430
 
 
431
 
 
432
class TestChrootServer(tests.TestCase):
424
433
 
425
434
    def test_construct(self):
426
 
        backing_transport = MemoryTransport()
427
 
        server = ChrootServer(backing_transport)
 
435
        backing_transport = memory.MemoryTransport()
 
436
        server = chroot.ChrootServer(backing_transport)
428
437
        self.assertEqual(backing_transport, server.backing_transport)
429
438
 
430
439
    def test_setUp(self):
431
 
        backing_transport = MemoryTransport()
432
 
        server = ChrootServer(backing_transport)
433
 
        server.setUp()
 
440
        backing_transport = memory.MemoryTransport()
 
441
        server = chroot.ChrootServer(backing_transport)
 
442
        server.start_server()
434
443
        try:
435
 
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
444
            self.assertTrue(server.scheme
 
445
                            in transport._get_protocol_handlers().keys())
436
446
        finally:
437
 
            server.tearDown()
 
447
            server.stop_server()
438
448
 
439
 
    def test_tearDown(self):
440
 
        backing_transport = MemoryTransport()
441
 
        server = ChrootServer(backing_transport)
442
 
        server.setUp()
443
 
        server.tearDown()
444
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
449
    def test_stop_server(self):
 
450
        backing_transport = memory.MemoryTransport()
 
451
        server = chroot.ChrootServer(backing_transport)
 
452
        server.start_server()
 
453
        server.stop_server()
 
454
        self.assertFalse(server.scheme
 
455
                         in transport._get_protocol_handlers().keys())
445
456
 
446
457
    def test_get_url(self):
447
 
        backing_transport = MemoryTransport()
448
 
        server = ChrootServer(backing_transport)
449
 
        server.setUp()
 
458
        backing_transport = memory.MemoryTransport()
 
459
        server = chroot.ChrootServer(backing_transport)
 
460
        server.start_server()
450
461
        try:
451
462
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
452
463
        finally:
453
 
            server.tearDown()
454
 
 
455
 
 
456
 
class PathFilteringDecoratorTransportTest(TestCase):
 
464
            server.stop_server()
 
465
 
 
466
 
 
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
457
468
    """Pathfilter decoration specific tests."""
458
469
 
459
470
    def test_abspath(self):
460
471
        # The abspath is always relative to the base of the backing transport.
461
 
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
472
        server = pathfilter.PathFilteringServer(
 
473
            transport.get_transport('memory:///foo/bar/'),
462
474
            lambda x: x)
463
 
        server.setUp()
464
 
        transport = get_transport(server.get_url())
465
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
475
        server.start_server()
 
476
        t = transport.get_transport(server.get_url())
 
477
        self.assertEqual(server.get_url(), t.abspath('/'))
466
478
 
467
 
        subdir_transport = transport.clone('subdir')
468
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
469
 
        server.tearDown()
 
479
        subdir_t = t.clone('subdir')
 
480
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
481
        server.stop_server()
470
482
 
471
483
    def make_pf_transport(self, filter_func=None):
472
484
        """Make a PathFilteringTransport backed by a MemoryTransport.
475
487
            parameter to override it."""
476
488
        if filter_func is None:
477
489
            filter_func = lambda x: x
478
 
        server = PathFilteringServer(
479
 
            get_transport('memory:///foo/bar/'), filter_func)
480
 
        server.setUp()
481
 
        self.addCleanup(server.tearDown)
482
 
        return get_transport(server.get_url())
 
490
        server = pathfilter.PathFilteringServer(
 
491
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
492
        server.start_server()
 
493
        self.addCleanup(server.stop_server)
 
494
        return transport.get_transport(server.get_url())
483
495
 
484
496
    def test__filter(self):
485
497
        # _filter (with an identity func as filter_func) always returns
486
498
        # paths relative to the base of the backing transport.
487
 
        transport = self.make_pf_transport()
488
 
        self.assertEqual('foo', transport._filter('foo'))
489
 
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
490
 
        self.assertEqual('', transport._filter('..'))
491
 
        self.assertEqual('', transport._filter('/'))
 
499
        t = self.make_pf_transport()
 
500
        self.assertEqual('foo', t._filter('foo'))
 
501
        self.assertEqual('foo/bar', t._filter('foo/bar'))
 
502
        self.assertEqual('', t._filter('..'))
 
503
        self.assertEqual('', t._filter('/'))
492
504
        # The base of the pathfiltering transport is taken into account too.
493
 
        transport = transport.clone('subdir1/subdir2')
494
 
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
495
 
        self.assertEqual(
496
 
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
497
 
        self.assertEqual('subdir1', transport._filter('..'))
498
 
        self.assertEqual('', transport._filter('/'))
 
505
        t = t.clone('subdir1/subdir2')
 
506
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
 
507
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
 
508
        self.assertEqual('subdir1', t._filter('..'))
 
509
        self.assertEqual('', t._filter('/'))
499
510
 
500
511
    def test_filter_invocation(self):
501
512
        filter_log = []
502
513
        def filter(path):
503
514
            filter_log.append(path)
504
515
            return path
505
 
        transport = self.make_pf_transport(filter)
506
 
        transport.has('abc')
 
516
        t = self.make_pf_transport(filter)
 
517
        t.has('abc')
507
518
        self.assertEqual(['abc'], filter_log)
508
519
        del filter_log[:]
509
 
        transport.clone('abc').has('xyz')
 
520
        t.clone('abc').has('xyz')
510
521
        self.assertEqual(['abc/xyz'], filter_log)
511
522
        del filter_log[:]
512
 
        transport.has('/abc')
 
523
        t.has('/abc')
513
524
        self.assertEqual(['abc'], filter_log)
514
525
 
515
526
    def test_clone(self):
516
 
        transport = self.make_pf_transport()
 
527
        t = self.make_pf_transport()
517
528
        # relpath from root and root path are the same
518
 
        relpath_cloned = transport.clone('foo')
519
 
        abspath_cloned = transport.clone('/foo')
520
 
        self.assertEqual(transport.server, relpath_cloned.server)
521
 
        self.assertEqual(transport.server, abspath_cloned.server)
 
529
        relpath_cloned = t.clone('foo')
 
530
        abspath_cloned = t.clone('/foo')
 
531
        self.assertEqual(t.server, relpath_cloned.server)
 
532
        self.assertEqual(t.server, abspath_cloned.server)
522
533
 
523
534
    def test_url_preserves_pathfiltering(self):
524
535
        """Calling get_transport on a pathfiltered transport's base should
529
540
        otherwise) the filtering by doing::
530
541
            url = filtered_transport.base
531
542
            parent_url = urlutils.join(url, '..')
532
 
            new_transport = get_transport(parent_url)
 
543
            new_t = transport.get_transport(parent_url)
533
544
        """
534
 
        transport = self.make_pf_transport()
535
 
        new_transport = get_transport(transport.base)
536
 
        self.assertEqual(transport.server, new_transport.server)
537
 
        self.assertEqual(transport.base, new_transport.base)
538
 
 
539
 
 
540
 
class ReadonlyDecoratorTransportTest(TestCase):
 
545
        t = self.make_pf_transport()
 
546
        new_t = transport.get_transport(t.base)
 
547
        self.assertEqual(t.server, new_t.server)
 
548
        self.assertEqual(t.base, new_t.base)
 
549
 
 
550
 
 
551
class ReadonlyDecoratorTransportTest(tests.TestCase):
541
552
    """Readonly decoration specific tests."""
542
553
 
543
554
    def test_local_parameters(self):
544
 
        import bzrlib.transport.readonly as readonly
545
555
        # connect to . in readonly mode
546
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
547
 
        self.assertEqual(True, transport.listable())
548
 
        self.assertEqual(True, transport.is_readonly())
 
556
        t = readonly.ReadonlyTransportDecorator('readonly+.')
 
557
        self.assertEqual(True, t.listable())
 
558
        self.assertEqual(True, t.is_readonly())
549
559
 
550
560
    def test_http_parameters(self):
551
561
        from bzrlib.tests.http_server import HttpServer
552
 
        import bzrlib.transport.readonly as readonly
553
562
        # connect to '.' via http which is not listable
554
563
        server = HttpServer()
555
564
        self.start_server(server)
556
 
        transport = get_transport('readonly+' + server.get_url())
557
 
        self.failUnless(isinstance(transport,
558
 
                                   readonly.ReadonlyTransportDecorator))
559
 
        self.assertEqual(False, transport.listable())
560
 
        self.assertEqual(True, transport.is_readonly())
561
 
 
562
 
 
563
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
565
        t = transport.get_transport('readonly+' + server.get_url())
 
566
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
567
        self.assertEqual(False, t.listable())
 
568
        self.assertEqual(True, t.is_readonly())
 
569
 
 
570
 
 
571
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
564
572
    """NFS decorator specific tests."""
565
573
 
566
574
    def get_nfs_transport(self, url):
567
 
        import bzrlib.transport.fakenfs as fakenfs
568
575
        # connect to url with nfs decoration
569
576
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
570
577
 
571
578
    def test_local_parameters(self):
572
579
        # the listable and is_readonly parameters
573
580
        # are not changed by the fakenfs decorator
574
 
        transport = self.get_nfs_transport('.')
575
 
        self.assertEqual(True, transport.listable())
576
 
        self.assertEqual(False, transport.is_readonly())
 
581
        t = self.get_nfs_transport('.')
 
582
        self.assertEqual(True, t.listable())
 
583
        self.assertEqual(False, t.is_readonly())
577
584
 
578
585
    def test_http_parameters(self):
579
586
        # the listable and is_readonly parameters
582
589
        # connect to '.' via http which is not listable
583
590
        server = HttpServer()
584
591
        self.start_server(server)
585
 
        transport = self.get_nfs_transport(server.get_url())
586
 
        self.assertIsInstance(
587
 
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
588
 
        self.assertEqual(False, transport.listable())
589
 
        self.assertEqual(True, transport.is_readonly())
 
592
        t = self.get_nfs_transport(server.get_url())
 
593
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
594
        self.assertEqual(False, t.listable())
 
595
        self.assertEqual(True, t.is_readonly())
590
596
 
591
597
    def test_fakenfs_server_default(self):
592
598
        # a FakeNFSServer() should bring up a local relpath server for itself
593
 
        import bzrlib.transport.fakenfs as fakenfs
594
 
        server = fakenfs.FakeNFSServer()
 
599
        server = test_server.FakeNFSServer()
595
600
        self.start_server(server)
596
601
        # the url should be decorated appropriately
597
602
        self.assertStartsWith(server.get_url(), 'fakenfs+')
598
603
        # and we should be able to get a transport for it
599
 
        transport = get_transport(server.get_url())
 
604
        t = transport.get_transport(server.get_url())
600
605
        # which must be a FakeNFSTransportDecorator instance.
601
 
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
606
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
602
607
 
603
608
    def test_fakenfs_rename_semantics(self):
604
609
        # a FakeNFS transport must mangle the way rename errors occur to
605
610
        # look like NFS problems.
606
 
        transport = self.get_nfs_transport('.')
 
611
        t = self.get_nfs_transport('.')
607
612
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
608
 
                        transport=transport)
609
 
        self.assertRaises(errors.ResourceBusy,
610
 
                          transport.rename, 'from', 'to')
611
 
 
612
 
 
613
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
613
                        transport=t)
 
614
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
 
615
 
 
616
 
 
617
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
614
618
    """Tests for simulation of VFAT restrictions"""
615
619
 
616
620
    def get_vfat_transport(self, url):
620
624
 
621
625
    def test_transport_creation(self):
622
626
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
623
 
        transport = self.get_vfat_transport('.')
624
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
627
        t = self.get_vfat_transport('.')
 
628
        self.assertIsInstance(t, FakeVFATTransportDecorator)
625
629
 
626
630
    def test_transport_mkdir(self):
627
 
        transport = self.get_vfat_transport('.')
628
 
        transport.mkdir('HELLO')
629
 
        self.assertTrue(transport.has('hello'))
630
 
        self.assertTrue(transport.has('Hello'))
 
631
        t = self.get_vfat_transport('.')
 
632
        t.mkdir('HELLO')
 
633
        self.assertTrue(t.has('hello'))
 
634
        self.assertTrue(t.has('Hello'))
631
635
 
632
636
    def test_forbidden_chars(self):
633
 
        transport = self.get_vfat_transport('.')
634
 
        self.assertRaises(ValueError, transport.has, "<NU>")
635
 
 
636
 
 
637
 
class BadTransportHandler(Transport):
 
637
        t = self.get_vfat_transport('.')
 
638
        self.assertRaises(ValueError, t.has, "<NU>")
 
639
 
 
640
 
 
641
class BadTransportHandler(transport.Transport):
638
642
    def __init__(self, base_url):
639
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
640
 
 
641
 
 
642
 
class BackupTransportHandler(Transport):
 
643
        raise errors.DependencyNotPresent('some_lib',
 
644
                                          'testing missing dependency')
 
645
 
 
646
 
 
647
class BackupTransportHandler(transport.Transport):
643
648
    """Test transport that works as a backup for the BadTransportHandler"""
644
649
    pass
645
650
 
646
651
 
647
 
class TestTransportImplementation(TestCaseInTempDir):
 
652
class TestTransportImplementation(tests.TestCaseInTempDir):
648
653
    """Implementation verification for transports.
649
654
 
650
655
    To verify a transport we need a server factory, which is a callable
679
684
        base_url = self._server.get_url()
680
685
        url = self._adjust_url(base_url, relpath)
681
686
        # try getting the transport via the regular interface:
682
 
        t = get_transport(url)
 
687
        t = transport.get_transport(url)
683
688
        # vila--20070607 if the following are commented out the test suite
684
689
        # still pass. Is this really still needed or was it a forgotten
685
690
        # temporary fix ?
690
695
        return t
691
696
 
692
697
 
693
 
class TestLocalTransports(TestCase):
 
698
class TestLocalTransports(tests.TestCase):
694
699
 
695
700
    def test_get_transport_from_abspath(self):
696
701
        here = osutils.abspath('.')
697
 
        t = get_transport(here)
698
 
        self.assertIsInstance(t, LocalTransport)
 
702
        t = transport.get_transport(here)
 
703
        self.assertIsInstance(t, local.LocalTransport)
699
704
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
700
705
 
701
706
    def test_get_transport_from_relpath(self):
702
707
        here = osutils.abspath('.')
703
 
        t = get_transport('.')
704
 
        self.assertIsInstance(t, LocalTransport)
 
708
        t = transport.get_transport('.')
 
709
        self.assertIsInstance(t, local.LocalTransport)
705
710
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
706
711
 
707
712
    def test_get_transport_from_local_url(self):
708
713
        here = osutils.abspath('.')
709
714
        here_url = urlutils.local_path_to_url(here) + '/'
710
 
        t = get_transport(here_url)
711
 
        self.assertIsInstance(t, LocalTransport)
 
715
        t = transport.get_transport(here_url)
 
716
        self.assertIsInstance(t, local.LocalTransport)
712
717
        self.assertEquals(t.base, here_url)
713
718
 
714
719
    def test_local_abspath(self):
715
720
        here = osutils.abspath('.')
716
 
        t = get_transport(here)
 
721
        t = transport.get_transport(here)
717
722
        self.assertEquals(t.local_abspath(''), here)
718
723
 
719
724
 
720
 
class TestWin32LocalTransport(TestCase):
 
725
class TestWin32LocalTransport(tests.TestCase):
721
726
 
722
727
    def test_unc_clone_to_root(self):
723
728
        # Win32 UNC path like \\HOST\path
724
729
        # clone to root should stop at least at \\HOST part
725
730
        # not on \\
726
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
731
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
727
732
        for i in xrange(4):
728
733
            t = t.clone('..')
729
734
        self.assertEquals(t.base, 'file://HOST/')
732
737
        self.assertEquals(t.base, 'file://HOST/')
733
738
 
734
739
 
735
 
class TestConnectedTransport(TestCase):
 
740
class TestConnectedTransport(tests.TestCase):
736
741
    """Tests for connected to remote server transports"""
737
742
 
738
743
    def test_parse_url(self):
739
 
        t = ConnectedTransport('http://simple.example.com/home/source')
 
744
        t = transport.ConnectedTransport(
 
745
            'http://simple.example.com/home/source')
740
746
        self.assertEquals(t._host, 'simple.example.com')
741
747
        self.assertEquals(t._port, None)
742
748
        self.assertEquals(t._path, '/home/source/')
747
753
 
748
754
    def test_parse_url_with_at_in_user(self):
749
755
        # Bug 228058
750
 
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
756
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
751
757
        self.assertEquals(t._user, 'user@host.com')
752
758
 
753
759
    def test_parse_quoted_url(self):
754
 
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
760
        t = transport.ConnectedTransport(
 
761
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
755
762
        self.assertEquals(t._host, 'exAmple.com')
756
763
        self.assertEquals(t._port, 2222)
757
764
        self.assertEquals(t._user, 'robey')
763
770
 
764
771
    def test_parse_invalid_url(self):
765
772
        self.assertRaises(errors.InvalidURL,
766
 
                          ConnectedTransport,
 
773
                          transport.ConnectedTransport,
767
774
                          'sftp://lily.org:~janneke/public/bzr/gub')
768
775
 
769
776
    def test_relpath(self):
770
 
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
777
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
771
778
 
772
779
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
773
780
        self.assertRaises(errors.PathNotChild, t.relpath,
779
786
        self.assertRaises(errors.PathNotChild, t.relpath,
780
787
                          'sftp://user@host.com:33/abs/path/sub')
781
788
        # Make sure it works when we don't supply a username
782
 
        t = ConnectedTransport('sftp://host.com/abs/path')
 
789
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
783
790
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
784
791
 
785
792
        # Make sure it works when parts of the path will be url encoded
786
 
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
793
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
787
794
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
788
795
 
789
796
    def test_connection_sharing_propagate_credentials(self):
790
 
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
797
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
791
798
        self.assertEquals('user', t._user)
792
799
        self.assertEquals('host.com', t._host)
793
800
        self.assertIs(None, t._get_connection())
814
821
        self.assertIs(new_password, c._get_credentials())
815
822
 
816
823
 
817
 
class TestReusedTransports(TestCase):
 
824
class TestReusedTransports(tests.TestCase):
818
825
    """Tests for transport reuse"""
819
826
 
820
827
    def test_reuse_same_transport(self):
821
828
        possible_transports = []
822
 
        t1 = get_transport('http://foo/',
823
 
                           possible_transports=possible_transports)
 
829
        t1 = transport.get_transport('http://foo/',
 
830
                                     possible_transports=possible_transports)
824
831
        self.assertEqual([t1], possible_transports)
825
 
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
832
        t2 = transport.get_transport('http://foo/',
 
833
                                     possible_transports=[t1])
826
834
        self.assertIs(t1, t2)
827
835
 
828
836
        # Also check that final '/' are handled correctly
829
 
        t3 = get_transport('http://foo/path/')
830
 
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
837
        t3 = transport.get_transport('http://foo/path/')
 
838
        t4 = transport.get_transport('http://foo/path',
 
839
                                     possible_transports=[t3])
831
840
        self.assertIs(t3, t4)
832
841
 
833
 
        t5 = get_transport('http://foo/path')
834
 
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
842
        t5 = transport.get_transport('http://foo/path')
 
843
        t6 = transport.get_transport('http://foo/path/',
 
844
                                     possible_transports=[t5])
835
845
        self.assertIs(t5, t6)
836
846
 
837
847
    def test_don_t_reuse_different_transport(self):
838
 
        t1 = get_transport('http://foo/path')
839
 
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
848
        t1 = transport.get_transport('http://foo/path')
 
849
        t2 = transport.get_transport('http://bar/path',
 
850
                                     possible_transports=[t1])
840
851
        self.assertIsNot(t1, t2)
841
852
 
842
853
 
843
 
class TestTransportTrace(TestCase):
 
854
class TestTransportTrace(tests.TestCase):
844
855
 
845
856
    def test_get(self):
846
 
        transport = get_transport('trace+memory://')
847
 
        self.assertIsInstance(
848
 
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
857
        t = transport.get_transport('trace+memory://')
 
858
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
849
859
 
850
860
    def test_clone_preserves_activity(self):
851
 
        transport = get_transport('trace+memory://')
852
 
        transport2 = transport.clone('.')
853
 
        self.assertTrue(transport is not transport2)
854
 
        self.assertTrue(transport._activity is transport2._activity)
 
861
        t = transport.get_transport('trace+memory://')
 
862
        t2 = t.clone('.')
 
863
        self.assertTrue(t is not t2)
 
864
        self.assertTrue(t._activity is t2._activity)
855
865
 
856
866
    # the following specific tests are for the operations that have made use of
857
867
    # logging in tests; we could test every single operation but doing that
858
868
    # still won't cause a test failure when the top level Transport API
859
869
    # changes; so there is little return doing that.
860
870
    def test_get(self):
861
 
        transport = get_transport('trace+memory:///')
862
 
        transport.put_bytes('foo', 'barish')
863
 
        transport.get('foo')
 
871
        t = transport.get_transport('trace+memory:///')
 
872
        t.put_bytes('foo', 'barish')
 
873
        t.get('foo')
864
874
        expected_result = []
865
875
        # put_bytes records the bytes, not the content to avoid memory
866
876
        # pressure.
867
877
        expected_result.append(('put_bytes', 'foo', 6, None))
868
878
        # get records the file name only.
869
879
        expected_result.append(('get', 'foo'))
870
 
        self.assertEqual(expected_result, transport._activity)
 
880
        self.assertEqual(expected_result, t._activity)
871
881
 
872
882
    def test_readv(self):
873
 
        transport = get_transport('trace+memory:///')
874
 
        transport.put_bytes('foo', 'barish')
875
 
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
876
 
            upper_limit=6))
 
883
        t = transport.get_transport('trace+memory:///')
 
884
        t.put_bytes('foo', 'barish')
 
885
        list(t.readv('foo', [(0, 1), (3, 2)],
 
886
                     adjust_for_latency=True, upper_limit=6))
877
887
        expected_result = []
878
888
        # put_bytes records the bytes, not the content to avoid memory
879
889
        # pressure.
880
890
        expected_result.append(('put_bytes', 'foo', 6, None))
881
891
        # readv records the supplied offset request
882
892
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
883
 
        self.assertEqual(expected_result, transport._activity)
 
893
        self.assertEqual(expected_result, t._activity)
884
894
 
885
895
 
886
896
class TestSSHConnections(tests.TestCaseWithTransport):
895
905
        # A reasonable evolution for this would be to simply check inside
896
906
        # check_channel_exec_request that the command is appropriate, and then
897
907
        # satisfy requests in-process.
898
 
        self.requireFeature(ParamikoFeature)
 
908
        self.requireFeature(features.paramiko)
899
909
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
900
910
        # override the interface (doesn't change self._vendor).
901
911
        # Note that this does encryption, so can be slow.
902
 
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
903
 
        from bzrlib.tests.stub_sftp import StubServer
 
912
        from bzrlib.tests import stub_sftp
904
913
 
905
914
        # Start an SSH server
906
915
        self.command_executed = []
909
918
        # SSH channel ourselves.  Surely this has already been implemented
910
919
        # elsewhere?
911
920
        started = []
912
 
        class StubSSHServer(StubServer):
 
921
        class StubSSHServer(stub_sftp.StubServer):
913
922
 
914
923
            test = self
915
924
 
943
952
 
944
953
                return True
945
954
 
946
 
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
 
955
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
947
956
        # We *don't* want to override the default SSH vendor: the detected one
948
957
        # is the one to use.
949
958
        self.start_server(ssh_server)
963
972
            # prefix a '/' to get the right path.
964
973
            path_to_branch = '/' + path_to_branch
965
974
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
966
 
        t = get_transport(url)
 
975
        t = transport.get_transport(url)
967
976
        self.permit_url(t.base)
968
977
        t.mkdir('foo')
969
978