/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Upgraded to the latest bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
 
21
from cStringIO import StringIO
 
22
 
 
23
import bzrlib
 
24
from bzrlib import urlutils
 
25
from bzrlib.errors import (ConnectionError,
 
26
                           DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           TransportNotPossible,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_CoalescedOffset,
 
36
                              _get_protocol_handlers,
 
37
                              _set_protocol_handlers,
 
38
                              _get_transport_modules,
 
39
                              get_transport,
 
40
                              register_lazy_transport,
 
41
                              register_transport_proto,
 
42
                              _clear_protocol_handlers,
 
43
                              Transport,
 
44
                              )
 
45
from bzrlib.transport.chroot import ChrootServer
 
46
from bzrlib.transport.memory import MemoryTransport
 
47
from bzrlib.transport.local import (LocalTransport,
 
48
                                    EmulatedWin32LocalTransport)
 
49
 
 
50
 
 
51
# TODO: Should possibly split transport-specific tests into their own files.
 
52
 
 
53
 
 
54
class TestTransport(TestCase):
 
55
    """Test the non transport-concrete class functionality."""
 
56
 
 
57
    def test__get_set_protocol_handlers(self):
 
58
        handlers = _get_protocol_handlers()
 
59
        self.assertNotEqual([], handlers.keys( ))
 
60
        try:
 
61
            _clear_protocol_handlers()
 
62
            self.assertEqual([], _get_protocol_handlers().keys())
 
63
        finally:
 
64
            _set_protocol_handlers(handlers)
 
65
 
 
66
    def test_get_transport_modules(self):
 
67
        handlers = _get_protocol_handlers()
 
68
        class SampleHandler(object):
 
69
            """I exist, isnt that enough?"""
 
70
        try:
 
71
            _clear_protocol_handlers()
 
72
            register_transport_proto('foo')
 
73
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
74
            register_transport_proto('bar')
 
75
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
76
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
77
                             _get_transport_modules())
 
78
        finally:
 
79
            _set_protocol_handlers(handlers)
 
80
 
 
81
    def test_transport_dependency(self):
 
82
        """Transport with missing dependency causes no error"""
 
83
        saved_handlers = _get_protocol_handlers()
 
84
        try:
 
85
            register_transport_proto('foo')
 
86
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
87
                    'BadTransportHandler')
 
88
            try:
 
89
                get_transport('foo://fooserver/foo')
 
90
            except UnsupportedProtocol, e:
 
91
                e_str = str(e)
 
92
                self.assertEquals('Unsupported protocol'
 
93
                                  ' for url "foo://fooserver/foo":'
 
94
                                  ' Unable to import library "some_lib":'
 
95
                                  ' testing missing dependency', str(e))
 
96
            else:
 
97
                self.fail('Did not raise UnsupportedProtocol')
 
98
        finally:
 
99
            # restore original values
 
100
            _set_protocol_handlers(saved_handlers)
 
101
            
 
102
    def test_transport_fallback(self):
 
103
        """Transport with missing dependency causes no error"""
 
104
        saved_handlers = _get_protocol_handlers()
 
105
        try:
 
106
            _clear_protocol_handlers()
 
107
            register_transport_proto('foo')
 
108
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
109
                    'BackupTransportHandler')
 
110
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
111
                    'BadTransportHandler')
 
112
            t = get_transport('foo://fooserver/foo')
 
113
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
114
        finally:
 
115
            _set_protocol_handlers(saved_handlers)
 
116
 
 
117
    def test__combine_paths(self):
 
118
        t = Transport('/')
 
119
        self.assertEqual('/home/sarah/project/foo',
 
120
                         t._combine_paths('/home/sarah', 'project/foo'))
 
121
        self.assertEqual('/etc',
 
122
                         t._combine_paths('/home/sarah', '../../etc'))
 
123
        self.assertEqual('/etc',
 
124
                         t._combine_paths('/home/sarah', '../../../etc'))
 
125
        self.assertEqual('/etc',
 
126
                         t._combine_paths('/home/sarah', '/etc'))
 
127
 
 
128
 
 
129
class TestCoalesceOffsets(TestCase):
 
130
    
 
131
    def check(self, expected, offsets, limit=0, fudge=0):
 
132
        coalesce = Transport._coalesce_offsets
 
133
        exp = [_CoalescedOffset(*x) for x in expected]
 
134
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
135
        self.assertEqual(exp, out)
 
136
 
 
137
    def test_coalesce_empty(self):
 
138
        self.check([], [])
 
139
 
 
140
    def test_coalesce_simple(self):
 
141
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
142
 
 
143
    def test_coalesce_unrelated(self):
 
144
        self.check([(0, 10, [(0, 10)]),
 
145
                    (20, 10, [(0, 10)]),
 
146
                   ], [(0, 10), (20, 10)])
 
147
            
 
148
    def test_coalesce_unsorted(self):
 
149
        self.check([(20, 10, [(0, 10)]),
 
150
                    (0, 10, [(0, 10)]),
 
151
                   ], [(20, 10), (0, 10)])
 
152
 
 
153
    def test_coalesce_nearby(self):
 
154
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
155
                   [(0, 10), (10, 10)])
 
156
 
 
157
    def test_coalesce_overlapped(self):
 
158
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
159
                   [(0, 10), (5, 10)])
 
160
 
 
161
    def test_coalesce_limit(self):
 
162
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
163
                              (30, 10), (40, 10)]),
 
164
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
165
                              (30, 10), (40, 10)]),
 
166
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
167
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
168
                       (90, 10), (100, 10)],
 
169
                    limit=5)
 
170
 
 
171
    def test_coalesce_no_limit(self):
 
172
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
173
                               (30, 10), (40, 10), (50, 10),
 
174
                               (60, 10), (70, 10), (80, 10),
 
175
                               (90, 10)]),
 
176
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
177
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
178
                       (90, 10), (100, 10)])
 
179
 
 
180
    def test_coalesce_fudge(self):
 
181
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
182
                    (100, 10, [(0, 10),]),
 
183
                   ], [(10, 10), (30, 10), (100, 10)],
 
184
                   fudge=10
 
185
                  )
 
186
 
 
187
 
 
188
class TestMemoryTransport(TestCase):
 
189
 
 
190
    def test_get_transport(self):
 
191
        MemoryTransport()
 
192
 
 
193
    def test_clone(self):
 
194
        transport = MemoryTransport()
 
195
        self.assertTrue(isinstance(transport, MemoryTransport))
 
196
        self.assertEqual("memory:///", transport.clone("/").base)
 
197
 
 
198
    def test_abspath(self):
 
199
        transport = MemoryTransport()
 
200
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
201
 
 
202
    def test_abspath_of_root(self):
 
203
        transport = MemoryTransport()
 
204
        self.assertEqual("memory:///", transport.base)
 
205
        self.assertEqual("memory:///", transport.abspath('/'))
 
206
 
 
207
    def test_abspath_of_relpath_starting_at_root(self):
 
208
        transport = MemoryTransport()
 
209
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
210
 
 
211
    def test_append_and_get(self):
 
212
        transport = MemoryTransport()
 
213
        transport.append_bytes('path', 'content')
 
214
        self.assertEqual(transport.get('path').read(), 'content')
 
215
        transport.append_file('path', StringIO('content'))
 
216
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
217
 
 
218
    def test_put_and_get(self):
 
219
        transport = MemoryTransport()
 
220
        transport.put_file('path', StringIO('content'))
 
221
        self.assertEqual(transport.get('path').read(), 'content')
 
222
        transport.put_bytes('path', 'content')
 
223
        self.assertEqual(transport.get('path').read(), 'content')
 
224
 
 
225
    def test_append_without_dir_fails(self):
 
226
        transport = MemoryTransport()
 
227
        self.assertRaises(NoSuchFile,
 
228
                          transport.append_bytes, 'dir/path', 'content')
 
229
 
 
230
    def test_put_without_dir_fails(self):
 
231
        transport = MemoryTransport()
 
232
        self.assertRaises(NoSuchFile,
 
233
                          transport.put_file, 'dir/path', StringIO('content'))
 
234
 
 
235
    def test_get_missing(self):
 
236
        transport = MemoryTransport()
 
237
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
238
 
 
239
    def test_has_missing(self):
 
240
        transport = MemoryTransport()
 
241
        self.assertEquals(False, transport.has('foo'))
 
242
 
 
243
    def test_has_present(self):
 
244
        transport = MemoryTransport()
 
245
        transport.append_bytes('foo', 'content')
 
246
        self.assertEquals(True, transport.has('foo'))
 
247
 
 
248
    def test_list_dir(self):
 
249
        transport = MemoryTransport()
 
250
        transport.put_bytes('foo', 'content')
 
251
        transport.mkdir('dir')
 
252
        transport.put_bytes('dir/subfoo', 'content')
 
253
        transport.put_bytes('dirlike', 'content')
 
254
 
 
255
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
256
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
257
 
 
258
    def test_mkdir(self):
 
259
        transport = MemoryTransport()
 
260
        transport.mkdir('dir')
 
261
        transport.append_bytes('dir/path', 'content')
 
262
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
263
 
 
264
    def test_mkdir_missing_parent(self):
 
265
        transport = MemoryTransport()
 
266
        self.assertRaises(NoSuchFile,
 
267
                          transport.mkdir, 'dir/dir')
 
268
 
 
269
    def test_mkdir_twice(self):
 
270
        transport = MemoryTransport()
 
271
        transport.mkdir('dir')
 
272
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
273
 
 
274
    def test_parameters(self):
 
275
        transport = MemoryTransport()
 
276
        self.assertEqual(True, transport.listable())
 
277
        self.assertEqual(False, transport.should_cache())
 
278
        self.assertEqual(False, transport.is_readonly())
 
279
 
 
280
    def test_iter_files_recursive(self):
 
281
        transport = MemoryTransport()
 
282
        transport.mkdir('dir')
 
283
        transport.put_bytes('dir/foo', 'content')
 
284
        transport.put_bytes('dir/bar', 'content')
 
285
        transport.put_bytes('bar', 'content')
 
286
        paths = set(transport.iter_files_recursive())
 
287
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
288
 
 
289
    def test_stat(self):
 
290
        transport = MemoryTransport()
 
291
        transport.put_bytes('foo', 'content')
 
292
        transport.put_bytes('bar', 'phowar')
 
293
        self.assertEqual(7, transport.stat('foo').st_size)
 
294
        self.assertEqual(6, transport.stat('bar').st_size)
 
295
 
 
296
 
 
297
class ChrootDecoratorTransportTest(TestCase):
 
298
    """Chroot decoration specific tests."""
 
299
 
 
300
    def test_abspath(self):
 
301
        # The abspath is always relative to the chroot_url.
 
302
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
303
        server.setUp()
 
304
        transport = get_transport(server.get_url())
 
305
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
306
 
 
307
        subdir_transport = transport.clone('subdir')
 
308
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
309
        server.tearDown()
 
310
 
 
311
    def test_clone(self):
 
312
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
313
        server.setUp()
 
314
        transport = get_transport(server.get_url())
 
315
        # relpath from root and root path are the same
 
316
        relpath_cloned = transport.clone('foo')
 
317
        abspath_cloned = transport.clone('/foo')
 
318
        self.assertEqual(server, relpath_cloned.server)
 
319
        self.assertEqual(server, abspath_cloned.server)
 
320
        server.tearDown()
 
321
    
 
322
    def test_chroot_url_preserves_chroot(self):
 
323
        """Calling get_transport on a chroot transport's base should produce a
 
324
        transport with exactly the same behaviour as the original chroot
 
325
        transport.
 
326
 
 
327
        This is so that it is not possible to escape a chroot by doing::
 
328
            url = chroot_transport.base
 
329
            parent_url = urlutils.join(url, '..')
 
330
            new_transport = get_transport(parent_url)
 
331
        """
 
332
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
333
        server.setUp()
 
334
        transport = get_transport(server.get_url())
 
335
        new_transport = get_transport(transport.base)
 
336
        self.assertEqual(transport.server, new_transport.server)
 
337
        self.assertEqual(transport.base, new_transport.base)
 
338
        server.tearDown()
 
339
        
 
340
    def test_urljoin_preserves_chroot(self):
 
341
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
342
        URL that escapes the intended chroot.
 
343
 
 
344
        This is so that it is not possible to escape a chroot by doing::
 
345
            url = chroot_transport.base
 
346
            parent_url = urlutils.join(url, '..')
 
347
            new_transport = get_transport(parent_url)
 
348
        """
 
349
        server = ChrootServer(get_transport('memory:///path/'))
 
350
        server.setUp()
 
351
        transport = get_transport(server.get_url())
 
352
        self.assertRaises(
 
353
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
354
        server.tearDown()
 
355
 
 
356
 
 
357
class ChrootServerTest(TestCase):
 
358
 
 
359
    def test_construct(self):
 
360
        backing_transport = MemoryTransport()
 
361
        server = ChrootServer(backing_transport)
 
362
        self.assertEqual(backing_transport, server.backing_transport)
 
363
 
 
364
    def test_setUp(self):
 
365
        backing_transport = MemoryTransport()
 
366
        server = ChrootServer(backing_transport)
 
367
        server.setUp()
 
368
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
369
 
 
370
    def test_tearDown(self):
 
371
        backing_transport = MemoryTransport()
 
372
        server = ChrootServer(backing_transport)
 
373
        server.setUp()
 
374
        server.tearDown()
 
375
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
376
 
 
377
    def test_get_url(self):
 
378
        backing_transport = MemoryTransport()
 
379
        server = ChrootServer(backing_transport)
 
380
        server.setUp()
 
381
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
382
        server.tearDown()
 
383
 
 
384
 
 
385
class ReadonlyDecoratorTransportTest(TestCase):
 
386
    """Readonly decoration specific tests."""
 
387
 
 
388
    def test_local_parameters(self):
 
389
        import bzrlib.transport.readonly as readonly
 
390
        # connect to . in readonly mode
 
391
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
392
        self.assertEqual(True, transport.listable())
 
393
        self.assertEqual(False, transport.should_cache())
 
394
        self.assertEqual(True, transport.is_readonly())
 
395
 
 
396
    def test_http_parameters(self):
 
397
        from bzrlib.tests.HttpServer import HttpServer
 
398
        import bzrlib.transport.readonly as readonly
 
399
        # connect to . via http which is not listable
 
400
        server = HttpServer()
 
401
        server.setUp()
 
402
        try:
 
403
            transport = get_transport('readonly+' + server.get_url())
 
404
            self.failUnless(isinstance(transport,
 
405
                                       readonly.ReadonlyTransportDecorator))
 
406
            self.assertEqual(False, transport.listable())
 
407
            self.assertEqual(True, transport.should_cache())
 
408
            self.assertEqual(True, transport.is_readonly())
 
409
        finally:
 
410
            server.tearDown()
 
411
 
 
412
 
 
413
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
414
    """NFS decorator specific tests."""
 
415
 
 
416
    def get_nfs_transport(self, url):
 
417
        import bzrlib.transport.fakenfs as fakenfs
 
418
        # connect to url with nfs decoration
 
419
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
420
 
 
421
    def test_local_parameters(self):
 
422
        # the listable, should_cache and is_readonly parameters
 
423
        # are not changed by the fakenfs decorator
 
424
        transport = self.get_nfs_transport('.')
 
425
        self.assertEqual(True, transport.listable())
 
426
        self.assertEqual(False, transport.should_cache())
 
427
        self.assertEqual(False, transport.is_readonly())
 
428
 
 
429
    def test_http_parameters(self):
 
430
        # the listable, should_cache and is_readonly parameters
 
431
        # are not changed by the fakenfs decorator
 
432
        from bzrlib.tests.HttpServer import HttpServer
 
433
        # connect to . via http which is not listable
 
434
        server = HttpServer()
 
435
        server.setUp()
 
436
        try:
 
437
            transport = self.get_nfs_transport(server.get_url())
 
438
            self.assertIsInstance(
 
439
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
440
            self.assertEqual(False, transport.listable())
 
441
            self.assertEqual(True, transport.should_cache())
 
442
            self.assertEqual(True, transport.is_readonly())
 
443
        finally:
 
444
            server.tearDown()
 
445
 
 
446
    def test_fakenfs_server_default(self):
 
447
        # a FakeNFSServer() should bring up a local relpath server for itself
 
448
        import bzrlib.transport.fakenfs as fakenfs
 
449
        server = fakenfs.FakeNFSServer()
 
450
        server.setUp()
 
451
        try:
 
452
            # the url should be decorated appropriately
 
453
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
454
            # and we should be able to get a transport for it
 
455
            transport = get_transport(server.get_url())
 
456
            # which must be a FakeNFSTransportDecorator instance.
 
457
            self.assertIsInstance(
 
458
                transport, fakenfs.FakeNFSTransportDecorator)
 
459
        finally:
 
460
            server.tearDown()
 
461
 
 
462
    def test_fakenfs_rename_semantics(self):
 
463
        # a FakeNFS transport must mangle the way rename errors occur to
 
464
        # look like NFS problems.
 
465
        transport = self.get_nfs_transport('.')
 
466
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
467
                        transport=transport)
 
468
        self.assertRaises(bzrlib.errors.ResourceBusy,
 
469
                          transport.rename, 'from', 'to')
 
470
 
 
471
 
 
472
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
473
    """Tests for simulation of VFAT restrictions"""
 
474
 
 
475
    def get_vfat_transport(self, url):
 
476
        """Return vfat-backed transport for test directory"""
 
477
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
478
        return FakeVFATTransportDecorator('vfat+' + url)
 
479
 
 
480
    def test_transport_creation(self):
 
481
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
482
        transport = self.get_vfat_transport('.')
 
483
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
484
 
 
485
    def test_transport_mkdir(self):
 
486
        transport = self.get_vfat_transport('.')
 
487
        transport.mkdir('HELLO')
 
488
        self.assertTrue(transport.has('hello'))
 
489
        self.assertTrue(transport.has('Hello'))
 
490
 
 
491
    def test_forbidden_chars(self):
 
492
        transport = self.get_vfat_transport('.')
 
493
        self.assertRaises(ValueError, transport.has, "<NU>")
 
494
 
 
495
 
 
496
class BadTransportHandler(Transport):
 
497
    def __init__(self, base_url):
 
498
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
499
 
 
500
 
 
501
class BackupTransportHandler(Transport):
 
502
    """Test transport that works as a backup for the BadTransportHandler"""
 
503
    pass
 
504
 
 
505
 
 
506
class TestTransportImplementation(TestCaseInTempDir):
 
507
    """Implementation verification for transports.
 
508
    
 
509
    To verify a transport we need a server factory, which is a callable
 
510
    that accepts no parameters and returns an implementation of
 
511
    bzrlib.transport.Server.
 
512
    
 
513
    That Server is then used to construct transport instances and test
 
514
    the transport via loopback activity.
 
515
 
 
516
    Currently this assumes that the Transport object is connected to the 
 
517
    current working directory.  So that whatever is done 
 
518
    through the transport, should show up in the working 
 
519
    directory, and vice-versa. This is a bug, because its possible to have
 
520
    URL schemes which provide access to something that may not be 
 
521
    result in storage on the local disk, i.e. due to file system limits, or 
 
522
    due to it being a database or some other non-filesystem tool.
 
523
 
 
524
    This also tests to make sure that the functions work with both
 
525
    generators and lists (assuming iter(list) is effectively a generator)
 
526
    """
 
527
    
 
528
    def setUp(self):
 
529
        super(TestTransportImplementation, self).setUp()
 
530
        self._server = self.transport_server()
 
531
        self._server.setUp()
 
532
        self.addCleanup(self._server.tearDown)
 
533
 
 
534
    def get_transport(self):
 
535
        """Return a connected transport to the local directory."""
 
536
        base_url = self._server.get_url()
 
537
        # try getting the transport via the regular interface:
 
538
        t = get_transport(base_url)
 
539
        if not isinstance(t, self.transport_class):
 
540
            # we did not get the correct transport class type. Override the
 
541
            # regular connection behaviour by direct construction.
 
542
            t = self.transport_class(base_url)
 
543
        return t
 
544
 
 
545
 
 
546
class TestLocalTransports(TestCase):
 
547
 
 
548
    def test_get_transport_from_abspath(self):
 
549
        here = os.path.abspath('.')
 
550
        t = get_transport(here)
 
551
        self.assertIsInstance(t, LocalTransport)
 
552
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
553
 
 
554
    def test_get_transport_from_relpath(self):
 
555
        here = os.path.abspath('.')
 
556
        t = get_transport('.')
 
557
        self.assertIsInstance(t, LocalTransport)
 
558
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
559
 
 
560
    def test_get_transport_from_local_url(self):
 
561
        here = os.path.abspath('.')
 
562
        here_url = urlutils.local_path_to_url(here) + '/'
 
563
        t = get_transport(here_url)
 
564
        self.assertIsInstance(t, LocalTransport)
 
565
        self.assertEquals(t.base, here_url)
 
566
 
 
567
 
 
568
class TestWin32LocalTransport(TestCase):
 
569
 
 
570
    def test_unc_clone_to_root(self):
 
571
        # Win32 UNC path like \\HOST\path
 
572
        # clone to root should stop at least at \\HOST part
 
573
        # not on \\
 
574
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
575
        for i in xrange(4):
 
576
            t = t.clone('..')
 
577
        self.assertEquals(t.base, 'file://HOST/')
 
578
        # make sure we reach the root
 
579
        t = t.clone('..')
 
580
        self.assertEquals(t.base, 'file://HOST/')