/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Merge bzr.dev resolve conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
 
    transport,
 
28
    transport as _mod_transport,
29
29
    urlutils,
30
30
    )
31
31
from bzrlib.transport import (
32
 
    chroot,
33
 
    fakenfs,
34
 
    local,
35
32
    memory,
36
 
    pathfilter,
37
 
    readonly,
38
 
    )
39
 
from bzrlib.tests import (
40
 
    features,
41
 
    test_server,
42
 
    )
 
33
    )
 
34
from bzrlib.errors import (DependencyNotPresent,
 
35
                           FileExists,
 
36
                           InvalidURLJoin,
 
37
                           NoSuchFile,
 
38
                           PathNotChild,
 
39
                           ReadError,
 
40
                           UnsupportedProtocol,
 
41
                           )
 
42
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
43
from bzrlib.transport import (_clear_protocol_handlers,
 
44
                              _CoalescedOffset,
 
45
                              ConnectedTransport,
 
46
                              _get_protocol_handlers,
 
47
                              _set_protocol_handlers,
 
48
                              _get_transport_modules,
 
49
                              get_transport,
 
50
                              LateReadError,
 
51
                              register_lazy_transport,
 
52
                              register_transport_proto,
 
53
                              Transport,
 
54
                              )
 
55
from bzrlib.transport.chroot import ChrootServer
 
56
from bzrlib.transport.local import (LocalTransport,
 
57
                                    EmulatedWin32LocalTransport)
 
58
from bzrlib.transport.pathfilter import PathFilteringServer
43
59
 
44
60
 
45
61
# TODO: Should possibly split transport-specific tests into their own files.
46
62
 
47
63
 
48
 
class TestTransport(tests.TestCase):
 
64
class TestTransport(TestCase):
49
65
    """Test the non transport-concrete class functionality."""
50
66
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
67
    def test__get_set_protocol_handlers(self):
55
 
        handlers = transport._get_protocol_handlers()
 
68
        handlers = _get_protocol_handlers()
56
69
        self.assertNotEqual([], handlers.keys( ))
57
70
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
71
            _clear_protocol_handlers()
 
72
            self.assertEqual([], _get_protocol_handlers().keys())
60
73
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
74
            _set_protocol_handlers(handlers)
62
75
 
63
76
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
 
77
        handlers = _get_protocol_handlers()
65
78
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
 
79
        _clear_protocol_handlers()
67
80
        class SampleHandler(object):
68
81
            """I exist, isnt that enough?"""
69
82
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
 
83
            _clear_protocol_handlers()
 
84
            register_transport_proto('foo')
 
85
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
86
                                    'TestTransport.SampleHandler')
 
87
            register_transport_proto('bar')
 
88
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
89
                                    'TestTransport.SampleHandler')
79
90
            self.assertEqual([SampleHandler.__module__,
80
91
                              'bzrlib.transport.chroot',
81
92
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
 
93
                             _get_transport_modules())
83
94
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
95
            _set_protocol_handlers(handlers)
85
96
 
86
97
    def test_transport_dependency(self):
87
98
        """Transport with missing dependency causes no error"""
88
 
        saved_handlers = transport._get_protocol_handlers()
 
99
        saved_handlers = _get_protocol_handlers()
89
100
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
 
101
        _clear_protocol_handlers()
91
102
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
103
            register_transport_proto('foo')
 
104
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
105
                    'BadTransportHandler')
95
106
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
 
107
                get_transport('foo://fooserver/foo')
 
108
            except UnsupportedProtocol, e:
98
109
                e_str = str(e)
99
110
                self.assertEquals('Unsupported protocol'
100
111
                                  ' for url "foo://fooserver/foo":'
104
115
                self.fail('Did not raise UnsupportedProtocol')
105
116
        finally:
106
117
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
118
            _set_protocol_handlers(saved_handlers)
108
119
 
109
120
    def test_transport_fallback(self):
110
121
        """Transport with missing dependency causes no error"""
111
 
        saved_handlers = transport._get_protocol_handlers()
 
122
        saved_handlers = _get_protocol_handlers()
112
123
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
 
124
            _clear_protocol_handlers()
 
125
            register_transport_proto('foo')
 
126
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
127
                    'BackupTransportHandler')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BadTransportHandler')
 
130
            t = get_transport('foo://fooserver/foo')
120
131
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
132
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
133
            _set_protocol_handlers(saved_handlers)
123
134
 
124
135
    def test_ssh_hints(self):
125
136
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
137
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
 
138
            get_transport('ssh://fooserver/foo')
 
139
        except UnsupportedProtocol, e:
129
140
            e_str = str(e)
130
141
            self.assertEquals('Unsupported protocol'
131
142
                              ' for url "ssh://fooserver/foo":'
132
 
                              ' bzr supports bzr+ssh to operate over ssh,'
133
 
                              ' use "bzr+ssh://fooserver/foo".',
 
143
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
134
144
                              str(e))
135
145
        else:
136
146
            self.fail('Did not raise UnsupportedProtocol')
137
147
 
138
148
    def test_LateReadError(self):
139
149
        """The LateReadError helper should raise on read()."""
140
 
        a_file = transport.LateReadError('a path')
 
150
        a_file = LateReadError('a path')
141
151
        try:
142
152
            a_file.read()
143
 
        except errors.ReadError, error:
 
153
        except ReadError, error:
144
154
            self.assertEqual('a path', error.path)
145
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
155
        self.assertRaises(ReadError, a_file.read, 40)
146
156
        a_file.close()
147
157
 
148
158
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
 
159
        t = Transport('/')
150
160
        self.assertEqual('/home/sarah/project/foo',
151
161
                         t._combine_paths('/home/sarah', 'project/foo'))
152
162
        self.assertEqual('/etc',
163
173
        self.assertEqual('memory:///t is not a local path.', str(e))
164
174
 
165
175
 
166
 
class TestCoalesceOffsets(tests.TestCase):
 
176
class TestCoalesceOffsets(TestCase):
167
177
 
168
178
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
169
 
        coalesce = transport.Transport._coalesce_offsets
170
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
179
        coalesce = Transport._coalesce_offsets
 
180
        exp = [_CoalescedOffset(*x) for x in expected]
171
181
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
172
182
                            max_size=max_size))
173
183
        self.assertEqual(exp, out)
248
258
                   max_size=1*1024*1024*1024)
249
259
 
250
260
 
251
 
class TestMemoryServer(tests.TestCase):
 
261
class TestMemoryServer(TestCase):
252
262
 
253
263
    def test_create_server(self):
254
264
        server = memory.MemoryServer()
255
 
        server.start_server()
 
265
        server.setUp()
256
266
        url = server.get_url()
257
 
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
267
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
268
        t = _mod_transport.get_transport(url)
259
269
        del t
260
 
        server.stop_server()
261
 
        self.assertFalse(url in transport.transport_list_registry)
 
270
        server.tearDown()
 
271
        self.assertFalse(url in _mod_transport.transport_list_registry)
262
272
        self.assertRaises(errors.UnsupportedProtocol,
263
 
                          transport.get_transport, url)
264
 
 
265
 
 
266
 
class TestMemoryTransport(tests.TestCase):
 
273
                          _mod_transport.get_transport, url)
 
274
 
 
275
 
 
276
class TestMemoryTransport(TestCase):
267
277
 
268
278
    def test_get_transport(self):
269
279
        memory.MemoryTransport()
270
280
 
271
281
    def test_clone(self):
272
 
        t = memory.MemoryTransport()
273
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
274
 
        self.assertEqual("memory:///", t.clone("/").base)
 
282
        transport = memory.MemoryTransport()
 
283
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
 
284
        self.assertEqual("memory:///", transport.clone("/").base)
275
285
 
276
286
    def test_abspath(self):
277
 
        t = memory.MemoryTransport()
278
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
287
        transport = memory.MemoryTransport()
 
288
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
279
289
 
280
290
    def test_abspath_of_root(self):
281
 
        t = memory.MemoryTransport()
282
 
        self.assertEqual("memory:///", t.base)
283
 
        self.assertEqual("memory:///", t.abspath('/'))
 
291
        transport = memory.MemoryTransport()
 
292
        self.assertEqual("memory:///", transport.base)
 
293
        self.assertEqual("memory:///", transport.abspath('/'))
284
294
 
285
295
    def test_abspath_of_relpath_starting_at_root(self):
286
 
        t = memory.MemoryTransport()
287
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
296
        transport = memory.MemoryTransport()
 
297
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
288
298
 
289
299
    def test_append_and_get(self):
290
 
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
300
        transport = memory.MemoryTransport()
 
301
        transport.append_bytes('path', 'content')
 
302
        self.assertEqual(transport.get('path').read(), 'content')
 
303
        transport.append_file('path', StringIO('content'))
 
304
        self.assertEqual(transport.get('path').read(), 'contentcontent')
295
305
 
296
306
    def test_put_and_get(self):
297
 
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
307
        transport = memory.MemoryTransport()
 
308
        transport.put_file('path', StringIO('content'))
 
309
        self.assertEqual(transport.get('path').read(), 'content')
 
310
        transport.put_bytes('path', 'content')
 
311
        self.assertEqual(transport.get('path').read(), 'content')
302
312
 
303
313
    def test_append_without_dir_fails(self):
304
 
        t = memory.MemoryTransport()
305
 
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
314
        transport = memory.MemoryTransport()
 
315
        self.assertRaises(NoSuchFile,
 
316
                          transport.append_bytes, 'dir/path', 'content')
307
317
 
308
318
    def test_put_without_dir_fails(self):
309
 
        t = memory.MemoryTransport()
310
 
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
319
        transport = memory.MemoryTransport()
 
320
        self.assertRaises(NoSuchFile,
 
321
                          transport.put_file, 'dir/path', StringIO('content'))
312
322
 
313
323
    def test_get_missing(self):
314
324
        transport = memory.MemoryTransport()
315
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
325
        self.assertRaises(NoSuchFile, transport.get, 'foo')
316
326
 
317
327
    def test_has_missing(self):
318
 
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
328
        transport = memory.MemoryTransport()
 
329
        self.assertEquals(False, transport.has('foo'))
320
330
 
321
331
    def test_has_present(self):
322
 
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
332
        transport = memory.MemoryTransport()
 
333
        transport.append_bytes('foo', 'content')
 
334
        self.assertEquals(True, transport.has('foo'))
325
335
 
326
336
    def test_list_dir(self):
327
 
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
329
 
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
337
        transport = memory.MemoryTransport()
 
338
        transport.put_bytes('foo', 'content')
 
339
        transport.mkdir('dir')
 
340
        transport.put_bytes('dir/subfoo', 'content')
 
341
        transport.put_bytes('dirlike', 'content')
332
342
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
343
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
344
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
335
345
 
336
346
    def test_mkdir(self):
337
 
        t = memory.MemoryTransport()
338
 
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
347
        transport = memory.MemoryTransport()
 
348
        transport.mkdir('dir')
 
349
        transport.append_bytes('dir/path', 'content')
 
350
        self.assertEqual(transport.get('dir/path').read(), 'content')
341
351
 
342
352
    def test_mkdir_missing_parent(self):
343
 
        t = memory.MemoryTransport()
344
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
353
        transport = memory.MemoryTransport()
 
354
        self.assertRaises(NoSuchFile,
 
355
                          transport.mkdir, 'dir/dir')
345
356
 
346
357
    def test_mkdir_twice(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.mkdir('dir')
349
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
358
        transport = memory.MemoryTransport()
 
359
        transport.mkdir('dir')
 
360
        self.assertRaises(FileExists, transport.mkdir, 'dir')
350
361
 
351
362
    def test_parameters(self):
352
 
        t = memory.MemoryTransport()
353
 
        self.assertEqual(True, t.listable())
354
 
        self.assertEqual(False, t.is_readonly())
 
363
        transport = memory.MemoryTransport()
 
364
        self.assertEqual(True, transport.listable())
 
365
        self.assertEqual(False, transport.is_readonly())
355
366
 
356
367
    def test_iter_files_recursive(self):
357
 
        t = memory.MemoryTransport()
358
 
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
362
 
        paths = set(t.iter_files_recursive())
 
368
        transport = memory.MemoryTransport()
 
369
        transport.mkdir('dir')
 
370
        transport.put_bytes('dir/foo', 'content')
 
371
        transport.put_bytes('dir/bar', 'content')
 
372
        transport.put_bytes('bar', 'content')
 
373
        paths = set(transport.iter_files_recursive())
363
374
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
364
375
 
365
376
    def test_stat(self):
366
 
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
369
 
        self.assertEqual(7, t.stat('foo').st_size)
370
 
        self.assertEqual(6, t.stat('bar').st_size)
371
 
 
372
 
 
373
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
377
        transport = memory.MemoryTransport()
 
378
        transport.put_bytes('foo', 'content')
 
379
        transport.put_bytes('bar', 'phowar')
 
380
        self.assertEqual(7, transport.stat('foo').st_size)
 
381
        self.assertEqual(6, transport.stat('bar').st_size)
 
382
 
 
383
 
 
384
class ChrootDecoratorTransportTest(TestCase):
374
385
    """Chroot decoration specific tests."""
375
386
 
376
387
    def test_abspath(self):
377
388
        # The abspath is always relative to the chroot_url.
378
 
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
389
        server = ChrootServer(get_transport('memory:///foo/bar/'))
380
390
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
382
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
391
        transport = get_transport(server.get_url())
 
392
        self.assertEqual(server.get_url(), transport.abspath('/'))
383
393
 
384
 
        subdir_t = t.clone('subdir')
385
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
394
        subdir_transport = transport.clone('subdir')
 
395
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
386
396
 
387
397
    def test_clone(self):
388
 
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
398
        server = ChrootServer(get_transport('memory:///foo/bar/'))
390
399
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
400
        transport = get_transport(server.get_url())
392
401
        # relpath from root and root path are the same
393
 
        relpath_cloned = t.clone('foo')
394
 
        abspath_cloned = t.clone('/foo')
 
402
        relpath_cloned = transport.clone('foo')
 
403
        abspath_cloned = transport.clone('/foo')
395
404
        self.assertEqual(server, relpath_cloned.server)
396
405
        self.assertEqual(server, abspath_cloned.server)
397
406
 
403
412
        This is so that it is not possible to escape a chroot by doing::
404
413
            url = chroot_transport.base
405
414
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
415
            new_transport = get_transport(parent_url)
407
416
        """
408
 
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
417
        server = ChrootServer(get_transport('memory:///path/subpath'))
410
418
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
413
 
        self.assertEqual(t.server, new_t.server)
414
 
        self.assertEqual(t.base, new_t.base)
 
419
        transport = get_transport(server.get_url())
 
420
        new_transport = get_transport(transport.base)
 
421
        self.assertEqual(transport.server, new_transport.server)
 
422
        self.assertEqual(transport.base, new_transport.base)
415
423
 
416
424
    def test_urljoin_preserves_chroot(self):
417
425
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
420
428
        This is so that it is not possible to escape a chroot by doing::
421
429
            url = chroot_transport.base
422
430
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
431
            new_transport = get_transport(parent_url)
424
432
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
433
        server = ChrootServer(get_transport('memory:///path/'))
426
434
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
435
        transport = get_transport(server.get_url())
428
436
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
 
 
431
 
 
432
 
class TestChrootServer(tests.TestCase):
 
437
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
438
 
 
439
 
 
440
class ChrootServerTest(TestCase):
433
441
 
434
442
    def test_construct(self):
435
443
        backing_transport = memory.MemoryTransport()
436
 
        server = chroot.ChrootServer(backing_transport)
 
444
        server = ChrootServer(backing_transport)
437
445
        self.assertEqual(backing_transport, server.backing_transport)
438
446
 
439
447
    def test_setUp(self):
440
448
        backing_transport = memory.MemoryTransport()
441
 
        server = chroot.ChrootServer(backing_transport)
 
449
        server = ChrootServer(backing_transport)
442
450
        server.start_server()
443
451
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
 
452
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
446
453
        finally:
447
454
            server.stop_server()
448
455
 
449
456
    def test_stop_server(self):
450
457
        backing_transport = memory.MemoryTransport()
451
 
        server = chroot.ChrootServer(backing_transport)
 
458
        server = ChrootServer(backing_transport)
452
459
        server.start_server()
453
460
        server.stop_server()
454
 
        self.assertFalse(server.scheme
455
 
                         in transport._get_protocol_handlers().keys())
 
461
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
456
462
 
457
463
    def test_get_url(self):
458
464
        backing_transport = memory.MemoryTransport()
459
 
        server = chroot.ChrootServer(backing_transport)
 
465
        server = ChrootServer(backing_transport)
460
466
        server.start_server()
461
467
        try:
462
468
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
470
            server.stop_server()
465
471
 
466
472
 
467
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
473
class PathFilteringDecoratorTransportTest(TestCase):
468
474
    """Pathfilter decoration specific tests."""
469
475
 
470
476
    def test_abspath(self):
471
477
        # The abspath is always relative to the base of the backing transport.
472
 
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
478
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
474
479
            lambda x: x)
475
480
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
477
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
481
        transport = get_transport(server.get_url())
 
482
        self.assertEqual(server.get_url(), transport.abspath('/'))
478
483
 
479
 
        subdir_t = t.clone('subdir')
480
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
484
        subdir_transport = transport.clone('subdir')
 
485
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
481
486
        server.stop_server()
482
487
 
483
488
    def make_pf_transport(self, filter_func=None):
487
492
            parameter to override it."""
488
493
        if filter_func is None:
489
494
            filter_func = lambda x: x
490
 
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
495
        server = PathFilteringServer(
 
496
            get_transport('memory:///foo/bar/'), filter_func)
492
497
        server.start_server()
493
498
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
499
        return get_transport(server.get_url())
495
500
 
496
501
    def test__filter(self):
497
502
        # _filter (with an identity func as filter_func) always returns
498
503
        # paths relative to the base of the backing transport.
499
 
        t = self.make_pf_transport()
500
 
        self.assertEqual('foo', t._filter('foo'))
501
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
502
 
        self.assertEqual('', t._filter('..'))
503
 
        self.assertEqual('', t._filter('/'))
 
504
        transport = self.make_pf_transport()
 
505
        self.assertEqual('foo', transport._filter('foo'))
 
506
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
507
        self.assertEqual('', transport._filter('..'))
 
508
        self.assertEqual('', transport._filter('/'))
504
509
        # The base of the pathfiltering transport is taken into account too.
505
 
        t = t.clone('subdir1/subdir2')
506
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
507
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
508
 
        self.assertEqual('subdir1', t._filter('..'))
509
 
        self.assertEqual('', t._filter('/'))
 
510
        transport = transport.clone('subdir1/subdir2')
 
511
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
512
        self.assertEqual(
 
513
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
514
        self.assertEqual('subdir1', transport._filter('..'))
 
515
        self.assertEqual('', transport._filter('/'))
510
516
 
511
517
    def test_filter_invocation(self):
512
518
        filter_log = []
513
519
        def filter(path):
514
520
            filter_log.append(path)
515
521
            return path
516
 
        t = self.make_pf_transport(filter)
517
 
        t.has('abc')
 
522
        transport = self.make_pf_transport(filter)
 
523
        transport.has('abc')
518
524
        self.assertEqual(['abc'], filter_log)
519
525
        del filter_log[:]
520
 
        t.clone('abc').has('xyz')
 
526
        transport.clone('abc').has('xyz')
521
527
        self.assertEqual(['abc/xyz'], filter_log)
522
528
        del filter_log[:]
523
 
        t.has('/abc')
 
529
        transport.has('/abc')
524
530
        self.assertEqual(['abc'], filter_log)
525
531
 
526
532
    def test_clone(self):
527
 
        t = self.make_pf_transport()
 
533
        transport = self.make_pf_transport()
528
534
        # relpath from root and root path are the same
529
 
        relpath_cloned = t.clone('foo')
530
 
        abspath_cloned = t.clone('/foo')
531
 
        self.assertEqual(t.server, relpath_cloned.server)
532
 
        self.assertEqual(t.server, abspath_cloned.server)
 
535
        relpath_cloned = transport.clone('foo')
 
536
        abspath_cloned = transport.clone('/foo')
 
537
        self.assertEqual(transport.server, relpath_cloned.server)
 
538
        self.assertEqual(transport.server, abspath_cloned.server)
533
539
 
534
540
    def test_url_preserves_pathfiltering(self):
535
541
        """Calling get_transport on a pathfiltered transport's base should
540
546
        otherwise) the filtering by doing::
541
547
            url = filtered_transport.base
542
548
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
549
            new_transport = get_transport(parent_url)
544
550
        """
545
 
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
547
 
        self.assertEqual(t.server, new_t.server)
548
 
        self.assertEqual(t.base, new_t.base)
549
 
 
550
 
 
551
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
551
        transport = self.make_pf_transport()
 
552
        new_transport = get_transport(transport.base)
 
553
        self.assertEqual(transport.server, new_transport.server)
 
554
        self.assertEqual(transport.base, new_transport.base)
 
555
 
 
556
 
 
557
class ReadonlyDecoratorTransportTest(TestCase):
552
558
    """Readonly decoration specific tests."""
553
559
 
554
560
    def test_local_parameters(self):
 
561
        import bzrlib.transport.readonly as readonly
555
562
        # connect to . in readonly mode
556
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
557
 
        self.assertEqual(True, t.listable())
558
 
        self.assertEqual(True, t.is_readonly())
 
563
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
564
        self.assertEqual(True, transport.listable())
 
565
        self.assertEqual(True, transport.is_readonly())
559
566
 
560
567
    def test_http_parameters(self):
561
568
        from bzrlib.tests.http_server import HttpServer
 
569
        import bzrlib.transport.readonly as readonly
562
570
        # connect to '.' via http which is not listable
563
571
        server = HttpServer()
564
572
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
567
 
        self.assertEqual(False, t.listable())
568
 
        self.assertEqual(True, t.is_readonly())
569
 
 
570
 
 
571
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
 
578
 
 
579
 
 
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
572
581
    """NFS decorator specific tests."""
573
582
 
574
583
    def get_nfs_transport(self, url):
 
584
        import bzrlib.transport.fakenfs as fakenfs
575
585
        # connect to url with nfs decoration
576
586
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
577
587
 
578
588
    def test_local_parameters(self):
579
589
        # the listable and is_readonly parameters
580
590
        # are not changed by the fakenfs decorator
581
 
        t = self.get_nfs_transport('.')
582
 
        self.assertEqual(True, t.listable())
583
 
        self.assertEqual(False, t.is_readonly())
 
591
        transport = self.get_nfs_transport('.')
 
592
        self.assertEqual(True, transport.listable())
 
593
        self.assertEqual(False, transport.is_readonly())
584
594
 
585
595
    def test_http_parameters(self):
586
596
        # the listable and is_readonly parameters
589
599
        # connect to '.' via http which is not listable
590
600
        server = HttpServer()
591
601
        self.start_server(server)
592
 
        t = self.get_nfs_transport(server.get_url())
593
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
594
 
        self.assertEqual(False, t.listable())
595
 
        self.assertEqual(True, t.is_readonly())
 
602
        transport = self.get_nfs_transport(server.get_url())
 
603
        self.assertIsInstance(
 
604
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
605
        self.assertEqual(False, transport.listable())
 
606
        self.assertEqual(True, transport.is_readonly())
596
607
 
597
608
    def test_fakenfs_server_default(self):
598
609
        # a FakeNFSServer() should bring up a local relpath server for itself
599
 
        server = test_server.FakeNFSServer()
 
610
        import bzrlib.transport.fakenfs as fakenfs
 
611
        server = fakenfs.FakeNFSServer()
600
612
        self.start_server(server)
601
613
        # the url should be decorated appropriately
602
614
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
615
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
616
        transport = get_transport(server.get_url())
605
617
        # which must be a FakeNFSTransportDecorator instance.
606
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
618
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
607
619
 
608
620
    def test_fakenfs_rename_semantics(self):
609
621
        # a FakeNFS transport must mangle the way rename errors occur to
610
622
        # look like NFS problems.
611
 
        t = self.get_nfs_transport('.')
 
623
        transport = self.get_nfs_transport('.')
612
624
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
613
 
                        transport=t)
614
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
615
 
 
616
 
 
617
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
625
                        transport=transport)
 
626
        self.assertRaises(errors.ResourceBusy,
 
627
                          transport.rename, 'from', 'to')
 
628
 
 
629
 
 
630
class FakeVFATDecoratorTests(TestCaseInTempDir):
618
631
    """Tests for simulation of VFAT restrictions"""
619
632
 
620
633
    def get_vfat_transport(self, url):
624
637
 
625
638
    def test_transport_creation(self):
626
639
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
627
 
        t = self.get_vfat_transport('.')
628
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
640
        transport = self.get_vfat_transport('.')
 
641
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
629
642
 
630
643
    def test_transport_mkdir(self):
631
 
        t = self.get_vfat_transport('.')
632
 
        t.mkdir('HELLO')
633
 
        self.assertTrue(t.has('hello'))
634
 
        self.assertTrue(t.has('Hello'))
 
644
        transport = self.get_vfat_transport('.')
 
645
        transport.mkdir('HELLO')
 
646
        self.assertTrue(transport.has('hello'))
 
647
        self.assertTrue(transport.has('Hello'))
635
648
 
636
649
    def test_forbidden_chars(self):
637
 
        t = self.get_vfat_transport('.')
638
 
        self.assertRaises(ValueError, t.has, "<NU>")
639
 
 
640
 
 
641
 
class BadTransportHandler(transport.Transport):
 
650
        transport = self.get_vfat_transport('.')
 
651
        self.assertRaises(ValueError, transport.has, "<NU>")
 
652
 
 
653
 
 
654
class BadTransportHandler(Transport):
642
655
    def __init__(self, base_url):
643
 
        raise errors.DependencyNotPresent('some_lib',
644
 
                                          'testing missing dependency')
645
 
 
646
 
 
647
 
class BackupTransportHandler(transport.Transport):
 
656
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
657
 
 
658
 
 
659
class BackupTransportHandler(Transport):
648
660
    """Test transport that works as a backup for the BadTransportHandler"""
649
661
    pass
650
662
 
651
663
 
652
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
664
class TestTransportImplementation(TestCaseInTempDir):
653
665
    """Implementation verification for transports.
654
666
 
655
667
    To verify a transport we need a server factory, which is a callable
684
696
        base_url = self._server.get_url()
685
697
        url = self._adjust_url(base_url, relpath)
686
698
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
699
        t = get_transport(url)
688
700
        # vila--20070607 if the following are commented out the test suite
689
701
        # still pass. Is this really still needed or was it a forgotten
690
702
        # temporary fix ?
695
707
        return t
696
708
 
697
709
 
698
 
class TestLocalTransports(tests.TestCase):
 
710
class TestLocalTransports(TestCase):
699
711
 
700
712
    def test_get_transport_from_abspath(self):
701
713
        here = osutils.abspath('.')
702
 
        t = transport.get_transport(here)
703
 
        self.assertIsInstance(t, local.LocalTransport)
 
714
        t = get_transport(here)
 
715
        self.assertIsInstance(t, LocalTransport)
704
716
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
705
717
 
706
718
    def test_get_transport_from_relpath(self):
707
719
        here = osutils.abspath('.')
708
 
        t = transport.get_transport('.')
709
 
        self.assertIsInstance(t, local.LocalTransport)
 
720
        t = get_transport('.')
 
721
        self.assertIsInstance(t, LocalTransport)
710
722
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
711
723
 
712
724
    def test_get_transport_from_local_url(self):
713
725
        here = osutils.abspath('.')
714
726
        here_url = urlutils.local_path_to_url(here) + '/'
715
 
        t = transport.get_transport(here_url)
716
 
        self.assertIsInstance(t, local.LocalTransport)
 
727
        t = get_transport(here_url)
 
728
        self.assertIsInstance(t, LocalTransport)
717
729
        self.assertEquals(t.base, here_url)
718
730
 
719
731
    def test_local_abspath(self):
720
732
        here = osutils.abspath('.')
721
 
        t = transport.get_transport(here)
 
733
        t = get_transport(here)
722
734
        self.assertEquals(t.local_abspath(''), here)
723
735
 
724
736
 
725
 
class TestWin32LocalTransport(tests.TestCase):
 
737
class TestWin32LocalTransport(TestCase):
726
738
 
727
739
    def test_unc_clone_to_root(self):
728
740
        # Win32 UNC path like \\HOST\path
729
741
        # clone to root should stop at least at \\HOST part
730
742
        # not on \\
731
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
743
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
744
        for i in xrange(4):
733
745
            t = t.clone('..')
734
746
        self.assertEquals(t.base, 'file://HOST/')
737
749
        self.assertEquals(t.base, 'file://HOST/')
738
750
 
739
751
 
740
 
class TestConnectedTransport(tests.TestCase):
 
752
class TestConnectedTransport(TestCase):
741
753
    """Tests for connected to remote server transports"""
742
754
 
743
755
    def test_parse_url(self):
744
 
        t = transport.ConnectedTransport(
745
 
            'http://simple.example.com/home/source')
 
756
        t = ConnectedTransport('http://simple.example.com/home/source')
746
757
        self.assertEquals(t._host, 'simple.example.com')
747
758
        self.assertEquals(t._port, None)
748
759
        self.assertEquals(t._path, '/home/source/')
753
764
 
754
765
    def test_parse_url_with_at_in_user(self):
755
766
        # Bug 228058
756
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
767
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
757
768
        self.assertEquals(t._user, 'user@host.com')
758
769
 
759
770
    def test_parse_quoted_url(self):
760
 
        t = transport.ConnectedTransport(
761
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
771
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
772
        self.assertEquals(t._host, 'exAmple.com')
763
773
        self.assertEquals(t._port, 2222)
764
774
        self.assertEquals(t._user, 'robey')
770
780
 
771
781
    def test_parse_invalid_url(self):
772
782
        self.assertRaises(errors.InvalidURL,
773
 
                          transport.ConnectedTransport,
 
783
                          ConnectedTransport,
774
784
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
785
 
776
786
    def test_relpath(self):
777
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
787
        t = ConnectedTransport('sftp://user@host.com/abs/path')
778
788
 
779
789
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
780
790
        self.assertRaises(errors.PathNotChild, t.relpath,
786
796
        self.assertRaises(errors.PathNotChild, t.relpath,
787
797
                          'sftp://user@host.com:33/abs/path/sub')
788
798
        # Make sure it works when we don't supply a username
789
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
799
        t = ConnectedTransport('sftp://host.com/abs/path')
790
800
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
801
 
792
802
        # Make sure it works when parts of the path will be url encoded
793
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
803
        t = ConnectedTransport('sftp://host.com/dev/%path')
794
804
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
805
 
796
806
    def test_connection_sharing_propagate_credentials(self):
797
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
807
        t = ConnectedTransport('ftp://user@host.com/abs/path')
798
808
        self.assertEquals('user', t._user)
799
809
        self.assertEquals('host.com', t._host)
800
810
        self.assertIs(None, t._get_connection())
821
831
        self.assertIs(new_password, c._get_credentials())
822
832
 
823
833
 
824
 
class TestReusedTransports(tests.TestCase):
 
834
class TestReusedTransports(TestCase):
825
835
    """Tests for transport reuse"""
826
836
 
827
837
    def test_reuse_same_transport(self):
828
838
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=possible_transports)
 
839
        t1 = get_transport('http://foo/',
 
840
                           possible_transports=possible_transports)
831
841
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
833
 
                                     possible_transports=[t1])
 
842
        t2 = get_transport('http://foo/', possible_transports=[t1])
834
843
        self.assertIs(t1, t2)
835
844
 
836
845
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
839
 
                                     possible_transports=[t3])
 
846
        t3 = get_transport('http://foo/path/')
 
847
        t4 = get_transport('http://foo/path', possible_transports=[t3])
840
848
        self.assertIs(t3, t4)
841
849
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
844
 
                                     possible_transports=[t5])
 
850
        t5 = get_transport('http://foo/path')
 
851
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
845
852
        self.assertIs(t5, t6)
846
853
 
847
854
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
850
 
                                     possible_transports=[t1])
 
855
        t1 = get_transport('http://foo/path')
 
856
        t2 = get_transport('http://bar/path', possible_transports=[t1])
851
857
        self.assertIsNot(t1, t2)
852
858
 
853
859
 
854
 
class TestTransportTrace(tests.TestCase):
 
860
class TestTransportTrace(TestCase):
855
861
 
856
862
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
863
        transport = get_transport('trace+memory://')
 
864
        self.assertIsInstance(
 
865
            transport, bzrlib.transport.trace.TransportTraceDecorator)
859
866
 
860
867
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
862
 
        t2 = t.clone('.')
863
 
        self.assertTrue(t is not t2)
864
 
        self.assertTrue(t._activity is t2._activity)
 
868
        transport = get_transport('trace+memory://')
 
869
        transport2 = transport.clone('.')
 
870
        self.assertTrue(transport is not transport2)
 
871
        self.assertTrue(transport._activity is transport2._activity)
865
872
 
866
873
    # the following specific tests are for the operations that have made use of
867
874
    # logging in tests; we could test every single operation but doing that
868
875
    # still won't cause a test failure when the top level Transport API
869
876
    # changes; so there is little return doing that.
870
877
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
873
 
        t.get('foo')
 
878
        transport = get_transport('trace+memory:///')
 
879
        transport.put_bytes('foo', 'barish')
 
880
        transport.get('foo')
874
881
        expected_result = []
875
882
        # put_bytes records the bytes, not the content to avoid memory
876
883
        # pressure.
877
884
        expected_result.append(('put_bytes', 'foo', 6, None))
878
885
        # get records the file name only.
879
886
        expected_result.append(('get', 'foo'))
880
 
        self.assertEqual(expected_result, t._activity)
 
887
        self.assertEqual(expected_result, transport._activity)
881
888
 
882
889
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
885
 
        list(t.readv('foo', [(0, 1), (3, 2)],
886
 
                     adjust_for_latency=True, upper_limit=6))
 
890
        transport = get_transport('trace+memory:///')
 
891
        transport.put_bytes('foo', 'barish')
 
892
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
893
            upper_limit=6))
887
894
        expected_result = []
888
895
        # put_bytes records the bytes, not the content to avoid memory
889
896
        # pressure.
890
897
        expected_result.append(('put_bytes', 'foo', 6, None))
891
898
        # readv records the supplied offset request
892
899
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
893
 
        self.assertEqual(expected_result, t._activity)
 
900
        self.assertEqual(expected_result, transport._activity)
894
901
 
895
902
 
896
903
class TestSSHConnections(tests.TestCaseWithTransport):
909
916
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
917
        # override the interface (doesn't change self._vendor).
911
918
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
919
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
920
        from bzrlib.tests.stub_sftp import StubServer
913
921
 
914
922
        # Start an SSH server
915
923
        self.command_executed = []
918
926
        # SSH channel ourselves.  Surely this has already been implemented
919
927
        # elsewhere?
920
928
        started = []
921
 
        class StubSSHServer(stub_sftp.StubServer):
 
929
        class StubSSHServer(StubServer):
922
930
 
923
931
            test = self
924
932
 
952
960
 
953
961
                return True
954
962
 
955
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
963
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
956
964
        # We *don't* want to override the default SSH vendor: the detected one
957
965
        # is the one to use.
958
966
        self.start_server(ssh_server)
972
980
            # prefix a '/' to get the right path.
973
981
            path_to_branch = '/' + path_to_branch
974
982
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
975
 
        t = transport.get_transport(url)
 
983
        t = get_transport(url)
976
984
        self.permit_url(t.base)
977
985
        t.mkdir('foo')
978
986