/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Vincent Ladeuil
  • Date: 2010-01-25 15:55:48 UTC
  • mto: (4985.1.4 add-attr-cleanup)
  • mto: This revision was merged to the branch mainline in revision 4988.
  • Revision ID: v.ladeuil+lp@free.fr-20100125155548-0l352pujvt5bzl5e
Deploy addAttrCleanup on the whole test suite.

Several use case worth mentioning:

- setting a module or any other object attribute is the majority
by far. In some cases the setting itself is deferred but most of
the time we want to set at the same time we add the cleanup.

- there multiple occurrences of protecting hooks or ui factory
which are now useless (the test framework takes care of that now),

- there was some lambda uses that can now be avoided.

That first cleanup already simplifies things a lot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
 
    transport,
 
28
    transport as _mod_transport,
29
29
    urlutils,
30
30
    )
31
31
from bzrlib.transport import (
32
 
    chroot,
33
32
    fakenfs,
34
 
    local,
35
33
    memory,
36
 
    pathfilter,
37
34
    readonly,
38
35
    )
39
 
from bzrlib.tests import (
40
 
    features,
41
 
    test_server,
42
 
    )
 
36
from bzrlib.errors import (DependencyNotPresent,
 
37
                           FileExists,
 
38
                           InvalidURLJoin,
 
39
                           NoSuchFile,
 
40
                           PathNotChild,
 
41
                           ReadError,
 
42
                           UnsupportedProtocol,
 
43
                           )
 
44
from bzrlib.tests import features, TestCase, TestCaseInTempDir
 
45
from bzrlib.transport import (_clear_protocol_handlers,
 
46
                              _CoalescedOffset,
 
47
                              ConnectedTransport,
 
48
                              _get_protocol_handlers,
 
49
                              _set_protocol_handlers,
 
50
                              _get_transport_modules,
 
51
                              get_transport,
 
52
                              LateReadError,
 
53
                              register_lazy_transport,
 
54
                              register_transport_proto,
 
55
                              Transport,
 
56
                              )
 
57
from bzrlib.transport.chroot import ChrootServer
 
58
from bzrlib.transport.local import (LocalTransport,
 
59
                                    EmulatedWin32LocalTransport)
 
60
from bzrlib.transport.pathfilter import PathFilteringServer
43
61
 
44
62
 
45
63
# TODO: Should possibly split transport-specific tests into their own files.
46
64
 
47
65
 
48
 
class TestTransport(tests.TestCase):
 
66
class TestTransport(TestCase):
49
67
    """Test the non transport-concrete class functionality."""
50
68
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
69
    def test__get_set_protocol_handlers(self):
55
 
        handlers = transport._get_protocol_handlers()
 
70
        handlers = _get_protocol_handlers()
56
71
        self.assertNotEqual([], handlers.keys( ))
57
72
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
 
73
            _clear_protocol_handlers()
 
74
            self.assertEqual([], _get_protocol_handlers().keys())
60
75
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
76
            _set_protocol_handlers(handlers)
62
77
 
63
78
    def test_get_transport_modules(self):
64
 
        handlers = transport._get_protocol_handlers()
 
79
        handlers = _get_protocol_handlers()
65
80
        # don't pollute the current handlers
66
 
        transport._clear_protocol_handlers()
 
81
        _clear_protocol_handlers()
67
82
        class SampleHandler(object):
68
83
            """I exist, isnt that enough?"""
69
84
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
 
85
            _clear_protocol_handlers()
 
86
            register_transport_proto('foo')
 
87
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
88
                                    'TestTransport.SampleHandler')
 
89
            register_transport_proto('bar')
 
90
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
91
                                    'TestTransport.SampleHandler')
79
92
            self.assertEqual([SampleHandler.__module__,
80
93
                              'bzrlib.transport.chroot',
81
94
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
 
95
                             _get_transport_modules())
83
96
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
97
            _set_protocol_handlers(handlers)
85
98
 
86
99
    def test_transport_dependency(self):
87
100
        """Transport with missing dependency causes no error"""
88
 
        saved_handlers = transport._get_protocol_handlers()
 
101
        saved_handlers = _get_protocol_handlers()
89
102
        # don't pollute the current handlers
90
 
        transport._clear_protocol_handlers()
 
103
        _clear_protocol_handlers()
91
104
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
 
105
            register_transport_proto('foo')
 
106
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
107
                    'BadTransportHandler')
95
108
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
 
109
                get_transport('foo://fooserver/foo')
 
110
            except UnsupportedProtocol, e:
98
111
                e_str = str(e)
99
112
                self.assertEquals('Unsupported protocol'
100
113
                                  ' for url "foo://fooserver/foo":'
104
117
                self.fail('Did not raise UnsupportedProtocol')
105
118
        finally:
106
119
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
120
            _set_protocol_handlers(saved_handlers)
108
121
 
109
122
    def test_transport_fallback(self):
110
123
        """Transport with missing dependency causes no error"""
111
 
        saved_handlers = transport._get_protocol_handlers()
 
124
        saved_handlers = _get_protocol_handlers()
112
125
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
 
126
            _clear_protocol_handlers()
 
127
            register_transport_proto('foo')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BackupTransportHandler')
 
130
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
131
                    'BadTransportHandler')
 
132
            t = get_transport('foo://fooserver/foo')
120
133
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
134
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
135
            _set_protocol_handlers(saved_handlers)
123
136
 
124
137
    def test_ssh_hints(self):
125
138
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
139
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
 
140
            get_transport('ssh://fooserver/foo')
 
141
        except UnsupportedProtocol, e:
129
142
            e_str = str(e)
130
143
            self.assertEquals('Unsupported protocol'
131
144
                              ' for url "ssh://fooserver/foo":'
132
 
                              ' bzr supports bzr+ssh to operate over ssh,'
133
 
                              ' use "bzr+ssh://fooserver/foo".',
 
145
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
134
146
                              str(e))
135
147
        else:
136
148
            self.fail('Did not raise UnsupportedProtocol')
137
149
 
138
150
    def test_LateReadError(self):
139
151
        """The LateReadError helper should raise on read()."""
140
 
        a_file = transport.LateReadError('a path')
 
152
        a_file = LateReadError('a path')
141
153
        try:
142
154
            a_file.read()
143
 
        except errors.ReadError, error:
 
155
        except ReadError, error:
144
156
            self.assertEqual('a path', error.path)
145
 
        self.assertRaises(errors.ReadError, a_file.read, 40)
 
157
        self.assertRaises(ReadError, a_file.read, 40)
146
158
        a_file.close()
147
159
 
148
160
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
 
161
        t = Transport('/')
150
162
        self.assertEqual('/home/sarah/project/foo',
151
163
                         t._combine_paths('/home/sarah', 'project/foo'))
152
164
        self.assertEqual('/etc',
163
175
        self.assertEqual('memory:///t is not a local path.', str(e))
164
176
 
165
177
 
166
 
class TestCoalesceOffsets(tests.TestCase):
 
178
class TestCoalesceOffsets(TestCase):
167
179
 
168
180
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
169
 
        coalesce = transport.Transport._coalesce_offsets
170
 
        exp = [transport._CoalescedOffset(*x) for x in expected]
 
181
        coalesce = Transport._coalesce_offsets
 
182
        exp = [_CoalescedOffset(*x) for x in expected]
171
183
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
172
184
                            max_size=max_size))
173
185
        self.assertEqual(exp, out)
248
260
                   max_size=1*1024*1024*1024)
249
261
 
250
262
 
251
 
class TestMemoryServer(tests.TestCase):
 
263
class TestMemoryServer(TestCase):
252
264
 
253
265
    def test_create_server(self):
254
266
        server = memory.MemoryServer()
255
267
        server.start_server()
256
268
        url = server.get_url()
257
 
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
269
        self.assertTrue(url in _mod_transport.transport_list_registry)
 
270
        t = _mod_transport.get_transport(url)
259
271
        del t
260
272
        server.stop_server()
261
 
        self.assertFalse(url in transport.transport_list_registry)
 
273
        self.assertFalse(url in _mod_transport.transport_list_registry)
262
274
        self.assertRaises(errors.UnsupportedProtocol,
263
 
                          transport.get_transport, url)
264
 
 
265
 
 
266
 
class TestMemoryTransport(tests.TestCase):
 
275
                          _mod_transport.get_transport, url)
 
276
 
 
277
 
 
278
class TestMemoryTransport(TestCase):
267
279
 
268
280
    def test_get_transport(self):
269
281
        memory.MemoryTransport()
270
282
 
271
283
    def test_clone(self):
272
 
        t = memory.MemoryTransport()
273
 
        self.assertTrue(isinstance(t, memory.MemoryTransport))
274
 
        self.assertEqual("memory:///", t.clone("/").base)
 
284
        transport = memory.MemoryTransport()
 
285
        self.assertTrue(isinstance(transport, memory.MemoryTransport))
 
286
        self.assertEqual("memory:///", transport.clone("/").base)
275
287
 
276
288
    def test_abspath(self):
277
 
        t = memory.MemoryTransport()
278
 
        self.assertEqual("memory:///relpath", t.abspath('relpath'))
 
289
        transport = memory.MemoryTransport()
 
290
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
279
291
 
280
292
    def test_abspath_of_root(self):
281
 
        t = memory.MemoryTransport()
282
 
        self.assertEqual("memory:///", t.base)
283
 
        self.assertEqual("memory:///", t.abspath('/'))
 
293
        transport = memory.MemoryTransport()
 
294
        self.assertEqual("memory:///", transport.base)
 
295
        self.assertEqual("memory:///", transport.abspath('/'))
284
296
 
285
297
    def test_abspath_of_relpath_starting_at_root(self):
286
 
        t = memory.MemoryTransport()
287
 
        self.assertEqual("memory:///foo", t.abspath('/foo'))
 
298
        transport = memory.MemoryTransport()
 
299
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
288
300
 
289
301
    def test_append_and_get(self):
290
 
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
302
        transport = memory.MemoryTransport()
 
303
        transport.append_bytes('path', 'content')
 
304
        self.assertEqual(transport.get('path').read(), 'content')
 
305
        transport.append_file('path', StringIO('content'))
 
306
        self.assertEqual(transport.get('path').read(), 'contentcontent')
295
307
 
296
308
    def test_put_and_get(self):
297
 
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
309
        transport = memory.MemoryTransport()
 
310
        transport.put_file('path', StringIO('content'))
 
311
        self.assertEqual(transport.get('path').read(), 'content')
 
312
        transport.put_bytes('path', 'content')
 
313
        self.assertEqual(transport.get('path').read(), 'content')
302
314
 
303
315
    def test_append_without_dir_fails(self):
304
 
        t = memory.MemoryTransport()
305
 
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
316
        transport = memory.MemoryTransport()
 
317
        self.assertRaises(NoSuchFile,
 
318
                          transport.append_bytes, 'dir/path', 'content')
307
319
 
308
320
    def test_put_without_dir_fails(self):
309
 
        t = memory.MemoryTransport()
310
 
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
321
        transport = memory.MemoryTransport()
 
322
        self.assertRaises(NoSuchFile,
 
323
                          transport.put_file, 'dir/path', StringIO('content'))
312
324
 
313
325
    def test_get_missing(self):
314
326
        transport = memory.MemoryTransport()
315
 
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
 
327
        self.assertRaises(NoSuchFile, transport.get, 'foo')
316
328
 
317
329
    def test_has_missing(self):
318
 
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
330
        transport = memory.MemoryTransport()
 
331
        self.assertEquals(False, transport.has('foo'))
320
332
 
321
333
    def test_has_present(self):
322
 
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
334
        transport = memory.MemoryTransport()
 
335
        transport.append_bytes('foo', 'content')
 
336
        self.assertEquals(True, transport.has('foo'))
325
337
 
326
338
    def test_list_dir(self):
327
 
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
329
 
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
339
        transport = memory.MemoryTransport()
 
340
        transport.put_bytes('foo', 'content')
 
341
        transport.mkdir('dir')
 
342
        transport.put_bytes('dir/subfoo', 'content')
 
343
        transport.put_bytes('dirlike', 'content')
332
344
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
345
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
346
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
335
347
 
336
348
    def test_mkdir(self):
337
 
        t = memory.MemoryTransport()
338
 
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
349
        transport = memory.MemoryTransport()
 
350
        transport.mkdir('dir')
 
351
        transport.append_bytes('dir/path', 'content')
 
352
        self.assertEqual(transport.get('dir/path').read(), 'content')
341
353
 
342
354
    def test_mkdir_missing_parent(self):
343
 
        t = memory.MemoryTransport()
344
 
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
 
355
        transport = memory.MemoryTransport()
 
356
        self.assertRaises(NoSuchFile,
 
357
                          transport.mkdir, 'dir/dir')
345
358
 
346
359
    def test_mkdir_twice(self):
347
 
        t = memory.MemoryTransport()
348
 
        t.mkdir('dir')
349
 
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')
 
360
        transport = memory.MemoryTransport()
 
361
        transport.mkdir('dir')
 
362
        self.assertRaises(FileExists, transport.mkdir, 'dir')
350
363
 
351
364
    def test_parameters(self):
352
 
        t = memory.MemoryTransport()
353
 
        self.assertEqual(True, t.listable())
354
 
        self.assertEqual(False, t.is_readonly())
 
365
        transport = memory.MemoryTransport()
 
366
        self.assertEqual(True, transport.listable())
 
367
        self.assertEqual(False, transport.is_readonly())
355
368
 
356
369
    def test_iter_files_recursive(self):
357
 
        t = memory.MemoryTransport()
358
 
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
362
 
        paths = set(t.iter_files_recursive())
 
370
        transport = memory.MemoryTransport()
 
371
        transport.mkdir('dir')
 
372
        transport.put_bytes('dir/foo', 'content')
 
373
        transport.put_bytes('dir/bar', 'content')
 
374
        transport.put_bytes('bar', 'content')
 
375
        paths = set(transport.iter_files_recursive())
363
376
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
364
377
 
365
378
    def test_stat(self):
366
 
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
369
 
        self.assertEqual(7, t.stat('foo').st_size)
370
 
        self.assertEqual(6, t.stat('bar').st_size)
371
 
 
372
 
 
373
 
class ChrootDecoratorTransportTest(tests.TestCase):
 
379
        transport = memory.MemoryTransport()
 
380
        transport.put_bytes('foo', 'content')
 
381
        transport.put_bytes('bar', 'phowar')
 
382
        self.assertEqual(7, transport.stat('foo').st_size)
 
383
        self.assertEqual(6, transport.stat('bar').st_size)
 
384
 
 
385
 
 
386
class ChrootDecoratorTransportTest(TestCase):
374
387
    """Chroot decoration specific tests."""
375
388
 
376
389
    def test_abspath(self):
377
390
        # The abspath is always relative to the chroot_url.
378
 
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
391
        server = ChrootServer(get_transport('memory:///foo/bar/'))
380
392
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
382
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
393
        transport = get_transport(server.get_url())
 
394
        self.assertEqual(server.get_url(), transport.abspath('/'))
383
395
 
384
 
        subdir_t = t.clone('subdir')
385
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
396
        subdir_transport = transport.clone('subdir')
 
397
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
386
398
 
387
399
    def test_clone(self):
388
 
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
400
        server = ChrootServer(get_transport('memory:///foo/bar/'))
390
401
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
402
        transport = get_transport(server.get_url())
392
403
        # relpath from root and root path are the same
393
 
        relpath_cloned = t.clone('foo')
394
 
        abspath_cloned = t.clone('/foo')
 
404
        relpath_cloned = transport.clone('foo')
 
405
        abspath_cloned = transport.clone('/foo')
395
406
        self.assertEqual(server, relpath_cloned.server)
396
407
        self.assertEqual(server, abspath_cloned.server)
397
408
 
403
414
        This is so that it is not possible to escape a chroot by doing::
404
415
            url = chroot_transport.base
405
416
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
417
            new_transport = get_transport(parent_url)
407
418
        """
408
 
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
419
        server = ChrootServer(get_transport('memory:///path/subpath'))
410
420
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
413
 
        self.assertEqual(t.server, new_t.server)
414
 
        self.assertEqual(t.base, new_t.base)
 
421
        transport = get_transport(server.get_url())
 
422
        new_transport = get_transport(transport.base)
 
423
        self.assertEqual(transport.server, new_transport.server)
 
424
        self.assertEqual(transport.base, new_transport.base)
415
425
 
416
426
    def test_urljoin_preserves_chroot(self):
417
427
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
420
430
        This is so that it is not possible to escape a chroot by doing::
421
431
            url = chroot_transport.base
422
432
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
433
            new_transport = get_transport(parent_url)
424
434
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
435
        server = ChrootServer(get_transport('memory:///path/'))
426
436
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
437
        transport = get_transport(server.get_url())
428
438
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
 
 
431
 
 
432
 
class TestChrootServer(tests.TestCase):
 
439
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
440
 
 
441
 
 
442
class ChrootServerTest(TestCase):
433
443
 
434
444
    def test_construct(self):
435
445
        backing_transport = memory.MemoryTransport()
436
 
        server = chroot.ChrootServer(backing_transport)
 
446
        server = ChrootServer(backing_transport)
437
447
        self.assertEqual(backing_transport, server.backing_transport)
438
448
 
439
449
    def test_setUp(self):
440
450
        backing_transport = memory.MemoryTransport()
441
 
        server = chroot.ChrootServer(backing_transport)
 
451
        server = ChrootServer(backing_transport)
442
452
        server.start_server()
443
453
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
 
454
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
446
455
        finally:
447
456
            server.stop_server()
448
457
 
449
458
    def test_stop_server(self):
450
459
        backing_transport = memory.MemoryTransport()
451
 
        server = chroot.ChrootServer(backing_transport)
 
460
        server = ChrootServer(backing_transport)
452
461
        server.start_server()
453
462
        server.stop_server()
454
 
        self.assertFalse(server.scheme
455
 
                         in transport._get_protocol_handlers().keys())
 
463
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
456
464
 
457
465
    def test_get_url(self):
458
466
        backing_transport = memory.MemoryTransport()
459
 
        server = chroot.ChrootServer(backing_transport)
 
467
        server = ChrootServer(backing_transport)
460
468
        server.start_server()
461
469
        try:
462
470
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
464
472
            server.stop_server()
465
473
 
466
474
 
467
 
class PathFilteringDecoratorTransportTest(tests.TestCase):
 
475
class PathFilteringDecoratorTransportTest(TestCase):
468
476
    """Pathfilter decoration specific tests."""
469
477
 
470
478
    def test_abspath(self):
471
479
        # The abspath is always relative to the base of the backing transport.
472
 
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
480
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
474
481
            lambda x: x)
475
482
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
477
 
        self.assertEqual(server.get_url(), t.abspath('/'))
 
483
        transport = get_transport(server.get_url())
 
484
        self.assertEqual(server.get_url(), transport.abspath('/'))
478
485
 
479
 
        subdir_t = t.clone('subdir')
480
 
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
 
486
        subdir_transport = transport.clone('subdir')
 
487
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
481
488
        server.stop_server()
482
489
 
483
490
    def make_pf_transport(self, filter_func=None):
487
494
            parameter to override it."""
488
495
        if filter_func is None:
489
496
            filter_func = lambda x: x
490
 
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
497
        server = PathFilteringServer(
 
498
            get_transport('memory:///foo/bar/'), filter_func)
492
499
        server.start_server()
493
500
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
501
        return get_transport(server.get_url())
495
502
 
496
503
    def test__filter(self):
497
504
        # _filter (with an identity func as filter_func) always returns
498
505
        # paths relative to the base of the backing transport.
499
 
        t = self.make_pf_transport()
500
 
        self.assertEqual('foo', t._filter('foo'))
501
 
        self.assertEqual('foo/bar', t._filter('foo/bar'))
502
 
        self.assertEqual('', t._filter('..'))
503
 
        self.assertEqual('', t._filter('/'))
 
506
        transport = self.make_pf_transport()
 
507
        self.assertEqual('foo', transport._filter('foo'))
 
508
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
509
        self.assertEqual('', transport._filter('..'))
 
510
        self.assertEqual('', transport._filter('/'))
504
511
        # The base of the pathfiltering transport is taken into account too.
505
 
        t = t.clone('subdir1/subdir2')
506
 
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
507
 
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
508
 
        self.assertEqual('subdir1', t._filter('..'))
509
 
        self.assertEqual('', t._filter('/'))
 
512
        transport = transport.clone('subdir1/subdir2')
 
513
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
514
        self.assertEqual(
 
515
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
516
        self.assertEqual('subdir1', transport._filter('..'))
 
517
        self.assertEqual('', transport._filter('/'))
510
518
 
511
519
    def test_filter_invocation(self):
512
520
        filter_log = []
513
521
        def filter(path):
514
522
            filter_log.append(path)
515
523
            return path
516
 
        t = self.make_pf_transport(filter)
517
 
        t.has('abc')
 
524
        transport = self.make_pf_transport(filter)
 
525
        transport.has('abc')
518
526
        self.assertEqual(['abc'], filter_log)
519
527
        del filter_log[:]
520
 
        t.clone('abc').has('xyz')
 
528
        transport.clone('abc').has('xyz')
521
529
        self.assertEqual(['abc/xyz'], filter_log)
522
530
        del filter_log[:]
523
 
        t.has('/abc')
 
531
        transport.has('/abc')
524
532
        self.assertEqual(['abc'], filter_log)
525
533
 
526
534
    def test_clone(self):
527
 
        t = self.make_pf_transport()
 
535
        transport = self.make_pf_transport()
528
536
        # relpath from root and root path are the same
529
 
        relpath_cloned = t.clone('foo')
530
 
        abspath_cloned = t.clone('/foo')
531
 
        self.assertEqual(t.server, relpath_cloned.server)
532
 
        self.assertEqual(t.server, abspath_cloned.server)
 
537
        relpath_cloned = transport.clone('foo')
 
538
        abspath_cloned = transport.clone('/foo')
 
539
        self.assertEqual(transport.server, relpath_cloned.server)
 
540
        self.assertEqual(transport.server, abspath_cloned.server)
533
541
 
534
542
    def test_url_preserves_pathfiltering(self):
535
543
        """Calling get_transport on a pathfiltered transport's base should
540
548
        otherwise) the filtering by doing::
541
549
            url = filtered_transport.base
542
550
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
551
            new_transport = get_transport(parent_url)
544
552
        """
545
 
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
547
 
        self.assertEqual(t.server, new_t.server)
548
 
        self.assertEqual(t.base, new_t.base)
549
 
 
550
 
 
551
 
class ReadonlyDecoratorTransportTest(tests.TestCase):
 
553
        transport = self.make_pf_transport()
 
554
        new_transport = get_transport(transport.base)
 
555
        self.assertEqual(transport.server, new_transport.server)
 
556
        self.assertEqual(transport.base, new_transport.base)
 
557
 
 
558
 
 
559
class ReadonlyDecoratorTransportTest(TestCase):
552
560
    """Readonly decoration specific tests."""
553
561
 
554
562
    def test_local_parameters(self):
555
563
        # connect to . in readonly mode
556
 
        t = readonly.ReadonlyTransportDecorator('readonly+.')
557
 
        self.assertEqual(True, t.listable())
558
 
        self.assertEqual(True, t.is_readonly())
 
564
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
565
        self.assertEqual(True, transport.listable())
 
566
        self.assertEqual(True, transport.is_readonly())
559
567
 
560
568
    def test_http_parameters(self):
561
569
        from bzrlib.tests.http_server import HttpServer
562
570
        # connect to '.' via http which is not listable
563
571
        server = HttpServer()
564
572
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
567
 
        self.assertEqual(False, t.listable())
568
 
        self.assertEqual(True, t.is_readonly())
569
 
 
570
 
 
571
 
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
 
573
        transport = get_transport('readonly+' + server.get_url())
 
574
        self.failUnless(isinstance(transport,
 
575
                                   readonly.ReadonlyTransportDecorator))
 
576
        self.assertEqual(False, transport.listable())
 
577
        self.assertEqual(True, transport.is_readonly())
 
578
 
 
579
 
 
580
class FakeNFSDecoratorTests(TestCaseInTempDir):
572
581
    """NFS decorator specific tests."""
573
582
 
574
583
    def get_nfs_transport(self, url):
578
587
    def test_local_parameters(self):
579
588
        # the listable and is_readonly parameters
580
589
        # are not changed by the fakenfs decorator
581
 
        t = self.get_nfs_transport('.')
582
 
        self.assertEqual(True, t.listable())
583
 
        self.assertEqual(False, t.is_readonly())
 
590
        transport = self.get_nfs_transport('.')
 
591
        self.assertEqual(True, transport.listable())
 
592
        self.assertEqual(False, transport.is_readonly())
584
593
 
585
594
    def test_http_parameters(self):
586
595
        # the listable and is_readonly parameters
589
598
        # connect to '.' via http which is not listable
590
599
        server = HttpServer()
591
600
        self.start_server(server)
592
 
        t = self.get_nfs_transport(server.get_url())
593
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
594
 
        self.assertEqual(False, t.listable())
595
 
        self.assertEqual(True, t.is_readonly())
 
601
        transport = self.get_nfs_transport(server.get_url())
 
602
        self.assertIsInstance(
 
603
            transport, fakenfs.FakeNFSTransportDecorator)
 
604
        self.assertEqual(False, transport.listable())
 
605
        self.assertEqual(True, transport.is_readonly())
596
606
 
597
607
    def test_fakenfs_server_default(self):
598
608
        # a FakeNFSServer() should bring up a local relpath server for itself
599
 
        server = test_server.FakeNFSServer()
 
609
        server = fakenfs.FakeNFSServer()
600
610
        self.start_server(server)
601
611
        # the url should be decorated appropriately
602
612
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
613
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
614
        transport = get_transport(server.get_url())
605
615
        # which must be a FakeNFSTransportDecorator instance.
606
 
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
 
616
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
607
617
 
608
618
    def test_fakenfs_rename_semantics(self):
609
619
        # a FakeNFS transport must mangle the way rename errors occur to
610
620
        # look like NFS problems.
611
 
        t = self.get_nfs_transport('.')
 
621
        transport = self.get_nfs_transport('.')
612
622
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
613
 
                        transport=t)
614
 
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
615
 
 
616
 
 
617
 
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
 
623
                        transport=transport)
 
624
        self.assertRaises(errors.ResourceBusy,
 
625
                          transport.rename, 'from', 'to')
 
626
 
 
627
 
 
628
class FakeVFATDecoratorTests(TestCaseInTempDir):
618
629
    """Tests for simulation of VFAT restrictions"""
619
630
 
620
631
    def get_vfat_transport(self, url):
624
635
 
625
636
    def test_transport_creation(self):
626
637
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
627
 
        t = self.get_vfat_transport('.')
628
 
        self.assertIsInstance(t, FakeVFATTransportDecorator)
 
638
        transport = self.get_vfat_transport('.')
 
639
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
629
640
 
630
641
    def test_transport_mkdir(self):
631
 
        t = self.get_vfat_transport('.')
632
 
        t.mkdir('HELLO')
633
 
        self.assertTrue(t.has('hello'))
634
 
        self.assertTrue(t.has('Hello'))
 
642
        transport = self.get_vfat_transport('.')
 
643
        transport.mkdir('HELLO')
 
644
        self.assertTrue(transport.has('hello'))
 
645
        self.assertTrue(transport.has('Hello'))
635
646
 
636
647
    def test_forbidden_chars(self):
637
 
        t = self.get_vfat_transport('.')
638
 
        self.assertRaises(ValueError, t.has, "<NU>")
639
 
 
640
 
 
641
 
class BadTransportHandler(transport.Transport):
 
648
        transport = self.get_vfat_transport('.')
 
649
        self.assertRaises(ValueError, transport.has, "<NU>")
 
650
 
 
651
 
 
652
class BadTransportHandler(Transport):
642
653
    def __init__(self, base_url):
643
 
        raise errors.DependencyNotPresent('some_lib',
644
 
                                          'testing missing dependency')
645
 
 
646
 
 
647
 
class BackupTransportHandler(transport.Transport):
 
654
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
655
 
 
656
 
 
657
class BackupTransportHandler(Transport):
648
658
    """Test transport that works as a backup for the BadTransportHandler"""
649
659
    pass
650
660
 
651
661
 
652
 
class TestTransportImplementation(tests.TestCaseInTempDir):
 
662
class TestTransportImplementation(TestCaseInTempDir):
653
663
    """Implementation verification for transports.
654
664
 
655
665
    To verify a transport we need a server factory, which is a callable
684
694
        base_url = self._server.get_url()
685
695
        url = self._adjust_url(base_url, relpath)
686
696
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
697
        t = get_transport(url)
688
698
        # vila--20070607 if the following are commented out the test suite
689
699
        # still pass. Is this really still needed or was it a forgotten
690
700
        # temporary fix ?
695
705
        return t
696
706
 
697
707
 
698
 
class TestLocalTransports(tests.TestCase):
 
708
class TestLocalTransports(TestCase):
699
709
 
700
710
    def test_get_transport_from_abspath(self):
701
711
        here = osutils.abspath('.')
702
 
        t = transport.get_transport(here)
703
 
        self.assertIsInstance(t, local.LocalTransport)
 
712
        t = get_transport(here)
 
713
        self.assertIsInstance(t, LocalTransport)
704
714
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
705
715
 
706
716
    def test_get_transport_from_relpath(self):
707
717
        here = osutils.abspath('.')
708
 
        t = transport.get_transport('.')
709
 
        self.assertIsInstance(t, local.LocalTransport)
 
718
        t = get_transport('.')
 
719
        self.assertIsInstance(t, LocalTransport)
710
720
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
711
721
 
712
722
    def test_get_transport_from_local_url(self):
713
723
        here = osutils.abspath('.')
714
724
        here_url = urlutils.local_path_to_url(here) + '/'
715
 
        t = transport.get_transport(here_url)
716
 
        self.assertIsInstance(t, local.LocalTransport)
 
725
        t = get_transport(here_url)
 
726
        self.assertIsInstance(t, LocalTransport)
717
727
        self.assertEquals(t.base, here_url)
718
728
 
719
729
    def test_local_abspath(self):
720
730
        here = osutils.abspath('.')
721
 
        t = transport.get_transport(here)
 
731
        t = get_transport(here)
722
732
        self.assertEquals(t.local_abspath(''), here)
723
733
 
724
734
 
725
 
class TestWin32LocalTransport(tests.TestCase):
 
735
class TestWin32LocalTransport(TestCase):
726
736
 
727
737
    def test_unc_clone_to_root(self):
728
738
        # Win32 UNC path like \\HOST\path
729
739
        # clone to root should stop at least at \\HOST part
730
740
        # not on \\
731
 
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
741
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
742
        for i in xrange(4):
733
743
            t = t.clone('..')
734
744
        self.assertEquals(t.base, 'file://HOST/')
737
747
        self.assertEquals(t.base, 'file://HOST/')
738
748
 
739
749
 
740
 
class TestConnectedTransport(tests.TestCase):
 
750
class TestConnectedTransport(TestCase):
741
751
    """Tests for connected to remote server transports"""
742
752
 
743
753
    def test_parse_url(self):
744
 
        t = transport.ConnectedTransport(
745
 
            'http://simple.example.com/home/source')
 
754
        t = ConnectedTransport('http://simple.example.com/home/source')
746
755
        self.assertEquals(t._host, 'simple.example.com')
747
756
        self.assertEquals(t._port, None)
748
757
        self.assertEquals(t._path, '/home/source/')
753
762
 
754
763
    def test_parse_url_with_at_in_user(self):
755
764
        # Bug 228058
756
 
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
 
765
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
757
766
        self.assertEquals(t._user, 'user@host.com')
758
767
 
759
768
    def test_parse_quoted_url(self):
760
 
        t = transport.ConnectedTransport(
761
 
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
769
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
770
        self.assertEquals(t._host, 'exAmple.com')
763
771
        self.assertEquals(t._port, 2222)
764
772
        self.assertEquals(t._user, 'robey')
770
778
 
771
779
    def test_parse_invalid_url(self):
772
780
        self.assertRaises(errors.InvalidURL,
773
 
                          transport.ConnectedTransport,
 
781
                          ConnectedTransport,
774
782
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
783
 
776
784
    def test_relpath(self):
777
 
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
 
785
        t = ConnectedTransport('sftp://user@host.com/abs/path')
778
786
 
779
787
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
780
788
        self.assertRaises(errors.PathNotChild, t.relpath,
786
794
        self.assertRaises(errors.PathNotChild, t.relpath,
787
795
                          'sftp://user@host.com:33/abs/path/sub')
788
796
        # Make sure it works when we don't supply a username
789
 
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
 
797
        t = ConnectedTransport('sftp://host.com/abs/path')
790
798
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
799
 
792
800
        # Make sure it works when parts of the path will be url encoded
793
 
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
 
801
        t = ConnectedTransport('sftp://host.com/dev/%path')
794
802
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
803
 
796
804
    def test_connection_sharing_propagate_credentials(self):
797
 
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
 
805
        t = ConnectedTransport('ftp://user@host.com/abs/path')
798
806
        self.assertEquals('user', t._user)
799
807
        self.assertEquals('host.com', t._host)
800
808
        self.assertIs(None, t._get_connection())
821
829
        self.assertIs(new_password, c._get_credentials())
822
830
 
823
831
 
824
 
class TestReusedTransports(tests.TestCase):
 
832
class TestReusedTransports(TestCase):
825
833
    """Tests for transport reuse"""
826
834
 
827
835
    def test_reuse_same_transport(self):
828
836
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
830
 
                                     possible_transports=possible_transports)
 
837
        t1 = get_transport('http://foo/',
 
838
                           possible_transports=possible_transports)
831
839
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
833
 
                                     possible_transports=[t1])
 
840
        t2 = get_transport('http://foo/', possible_transports=[t1])
834
841
        self.assertIs(t1, t2)
835
842
 
836
843
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
839
 
                                     possible_transports=[t3])
 
844
        t3 = get_transport('http://foo/path/')
 
845
        t4 = get_transport('http://foo/path', possible_transports=[t3])
840
846
        self.assertIs(t3, t4)
841
847
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
844
 
                                     possible_transports=[t5])
 
848
        t5 = get_transport('http://foo/path')
 
849
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
845
850
        self.assertIs(t5, t6)
846
851
 
847
852
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
850
 
                                     possible_transports=[t1])
 
853
        t1 = get_transport('http://foo/path')
 
854
        t2 = get_transport('http://bar/path', possible_transports=[t1])
851
855
        self.assertIsNot(t1, t2)
852
856
 
853
857
 
854
 
class TestTransportTrace(tests.TestCase):
 
858
class TestTransportTrace(TestCase):
855
859
 
856
860
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
861
        transport = get_transport('trace+memory://')
 
862
        self.assertIsInstance(
 
863
            transport, bzrlib.transport.trace.TransportTraceDecorator)
859
864
 
860
865
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
862
 
        t2 = t.clone('.')
863
 
        self.assertTrue(t is not t2)
864
 
        self.assertTrue(t._activity is t2._activity)
 
866
        transport = get_transport('trace+memory://')
 
867
        transport2 = transport.clone('.')
 
868
        self.assertTrue(transport is not transport2)
 
869
        self.assertTrue(transport._activity is transport2._activity)
865
870
 
866
871
    # the following specific tests are for the operations that have made use of
867
872
    # logging in tests; we could test every single operation but doing that
868
873
    # still won't cause a test failure when the top level Transport API
869
874
    # changes; so there is little return doing that.
870
875
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
873
 
        t.get('foo')
 
876
        transport = get_transport('trace+memory:///')
 
877
        transport.put_bytes('foo', 'barish')
 
878
        transport.get('foo')
874
879
        expected_result = []
875
880
        # put_bytes records the bytes, not the content to avoid memory
876
881
        # pressure.
877
882
        expected_result.append(('put_bytes', 'foo', 6, None))
878
883
        # get records the file name only.
879
884
        expected_result.append(('get', 'foo'))
880
 
        self.assertEqual(expected_result, t._activity)
 
885
        self.assertEqual(expected_result, transport._activity)
881
886
 
882
887
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
885
 
        list(t.readv('foo', [(0, 1), (3, 2)],
886
 
                     adjust_for_latency=True, upper_limit=6))
 
888
        transport = get_transport('trace+memory:///')
 
889
        transport.put_bytes('foo', 'barish')
 
890
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
891
            upper_limit=6))
887
892
        expected_result = []
888
893
        # put_bytes records the bytes, not the content to avoid memory
889
894
        # pressure.
890
895
        expected_result.append(('put_bytes', 'foo', 6, None))
891
896
        # readv records the supplied offset request
892
897
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
893
 
        self.assertEqual(expected_result, t._activity)
 
898
        self.assertEqual(expected_result, transport._activity)
894
899
 
895
900
 
896
901
class TestSSHConnections(tests.TestCaseWithTransport):
909
914
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
915
        # override the interface (doesn't change self._vendor).
911
916
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
917
        from bzrlib.transport.sftp import SFTPFullAbsoluteServer
 
918
        from bzrlib.tests.stub_sftp import StubServer
913
919
 
914
920
        # Start an SSH server
915
921
        self.command_executed = []
918
924
        # SSH channel ourselves.  Surely this has already been implemented
919
925
        # elsewhere?
920
926
        started = []
921
 
        class StubSSHServer(stub_sftp.StubServer):
 
927
        class StubSSHServer(StubServer):
922
928
 
923
929
            test = self
924
930
 
952
958
 
953
959
                return True
954
960
 
955
 
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
 
961
        ssh_server = SFTPFullAbsoluteServer(StubSSHServer)
956
962
        # We *don't* want to override the default SSH vendor: the detected one
957
963
        # is the one to use.
958
964
        self.start_server(ssh_server)
972
978
            # prefix a '/' to get the right path.
973
979
            path_to_branch = '/' + path_to_branch
974
980
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
975
 
        t = transport.get_transport(url)
 
981
        t = get_transport(url)
976
982
        self.permit_url(t.base)
977
983
        t.mkdir('foo')
978
984