1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
51
from bzrlib.transport.pathfilter import PathFilteringServer
54
# TODO: Should possibly split transport-specific tests into their own files.
57
class TestTransport(TestCase):
58
"""Test the non transport-concrete class functionality."""
60
def test__get_set_protocol_handlers(self):
61
handlers = _get_protocol_handlers()
62
self.assertNotEqual([], handlers.keys( ))
64
_clear_protocol_handlers()
65
self.assertEqual([], _get_protocol_handlers().keys())
67
_set_protocol_handlers(handlers)
69
def test_get_transport_modules(self):
70
handlers = _get_protocol_handlers()
71
# don't pollute the current handlers
72
_clear_protocol_handlers()
73
class SampleHandler(object):
74
"""I exist, isnt that enough?"""
76
_clear_protocol_handlers()
77
register_transport_proto('foo')
78
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
80
register_transport_proto('bar')
81
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
82
'TestTransport.SampleHandler')
83
self.assertEqual([SampleHandler.__module__,
84
'bzrlib.transport.chroot',
85
'bzrlib.transport.pathfilter'],
86
_get_transport_modules())
88
_set_protocol_handlers(handlers)
90
def test_transport_dependency(self):
91
"""Transport with missing dependency causes no error"""
92
saved_handlers = _get_protocol_handlers()
93
# don't pollute the current handlers
94
_clear_protocol_handlers()
96
register_transport_proto('foo')
97
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
98
'BadTransportHandler')
100
get_transport('foo://fooserver/foo')
101
except UnsupportedProtocol, e:
103
self.assertEquals('Unsupported protocol'
104
' for url "foo://fooserver/foo":'
105
' Unable to import library "some_lib":'
106
' testing missing dependency', str(e))
108
self.fail('Did not raise UnsupportedProtocol')
110
# restore original values
111
_set_protocol_handlers(saved_handlers)
113
def test_transport_fallback(self):
114
"""Transport with missing dependency causes no error"""
115
saved_handlers = _get_protocol_handlers()
117
_clear_protocol_handlers()
118
register_transport_proto('foo')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BackupTransportHandler')
121
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
122
'BadTransportHandler')
123
t = get_transport('foo://fooserver/foo')
124
self.assertTrue(isinstance(t, BackupTransportHandler))
126
_set_protocol_handlers(saved_handlers)
128
def test_ssh_hints(self):
129
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
131
get_transport('ssh://fooserver/foo')
132
except UnsupportedProtocol, e:
134
self.assertEquals('Unsupported protocol'
135
' for url "ssh://fooserver/foo":'
136
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
139
self.fail('Did not raise UnsupportedProtocol')
141
def test_LateReadError(self):
142
"""The LateReadError helper should raise on read()."""
143
a_file = LateReadError('a path')
146
except ReadError, error:
147
self.assertEqual('a path', error.path)
148
self.assertRaises(ReadError, a_file.read, 40)
151
def test__combine_paths(self):
153
self.assertEqual('/home/sarah/project/foo',
154
t._combine_paths('/home/sarah', 'project/foo'))
155
self.assertEqual('/etc',
156
t._combine_paths('/home/sarah', '../../etc'))
157
self.assertEqual('/etc',
158
t._combine_paths('/home/sarah', '../../../etc'))
159
self.assertEqual('/etc',
160
t._combine_paths('/home/sarah', '/etc'))
162
def test_local_abspath_non_local_transport(self):
163
# the base implementation should throw
164
t = MemoryTransport()
165
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
166
self.assertEqual('memory:///t is not a local path.', str(e))
169
class TestCoalesceOffsets(TestCase):
171
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
172
coalesce = Transport._coalesce_offsets
173
exp = [_CoalescedOffset(*x) for x in expected]
174
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
176
self.assertEqual(exp, out)
178
def test_coalesce_empty(self):
181
def test_coalesce_simple(self):
182
self.check([(0, 10, [(0, 10)])], [(0, 10)])
184
def test_coalesce_unrelated(self):
185
self.check([(0, 10, [(0, 10)]),
187
], [(0, 10), (20, 10)])
189
def test_coalesce_unsorted(self):
190
self.check([(20, 10, [(0, 10)]),
192
], [(20, 10), (0, 10)])
194
def test_coalesce_nearby(self):
195
self.check([(0, 20, [(0, 10), (10, 10)])],
198
def test_coalesce_overlapped(self):
199
self.assertRaises(ValueError,
200
self.check, [(0, 15, [(0, 10), (5, 10)])],
203
def test_coalesce_limit(self):
204
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
205
(30, 10), (40, 10)]),
206
(60, 50, [(0, 10), (10, 10), (20, 10),
207
(30, 10), (40, 10)]),
208
], [(10, 10), (20, 10), (30, 10), (40, 10),
209
(50, 10), (60, 10), (70, 10), (80, 10),
210
(90, 10), (100, 10)],
213
def test_coalesce_no_limit(self):
214
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
215
(30, 10), (40, 10), (50, 10),
216
(60, 10), (70, 10), (80, 10),
218
], [(10, 10), (20, 10), (30, 10), (40, 10),
219
(50, 10), (60, 10), (70, 10), (80, 10),
220
(90, 10), (100, 10)])
222
def test_coalesce_fudge(self):
223
self.check([(10, 30, [(0, 10), (20, 10)]),
224
(100, 10, [(0, 10),]),
225
], [(10, 10), (30, 10), (100, 10)],
228
def test_coalesce_max_size(self):
229
self.check([(10, 20, [(0, 10), (10, 10)]),
231
# If one range is above max_size, it gets its own coalesced
233
(100, 80, [(0, 80),]),],
234
[(10, 10), (20, 10), (30, 50), (100, 80)],
238
def test_coalesce_no_max_size(self):
239
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
240
[(10, 10), (20, 10), (30, 50), (80, 100)],
243
def test_coalesce_default_limit(self):
244
# By default we use a 100MB max size.
245
ten_mb = 10*1024*1024
246
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
247
(10*ten_mb, ten_mb, [(0, ten_mb)])],
248
[(i*ten_mb, ten_mb) for i in range(11)])
249
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
250
[(i*ten_mb, ten_mb) for i in range(11)],
251
max_size=1*1024*1024*1024)
254
class TestMemoryTransport(TestCase):
256
def test_get_transport(self):
259
def test_clone(self):
260
transport = MemoryTransport()
261
self.assertTrue(isinstance(transport, MemoryTransport))
262
self.assertEqual("memory:///", transport.clone("/").base)
264
def test_abspath(self):
265
transport = MemoryTransport()
266
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
268
def test_abspath_of_root(self):
269
transport = MemoryTransport()
270
self.assertEqual("memory:///", transport.base)
271
self.assertEqual("memory:///", transport.abspath('/'))
273
def test_abspath_of_relpath_starting_at_root(self):
274
transport = MemoryTransport()
275
self.assertEqual("memory:///foo", transport.abspath('/foo'))
277
def test_append_and_get(self):
278
transport = MemoryTransport()
279
transport.append_bytes('path', 'content')
280
self.assertEqual(transport.get('path').read(), 'content')
281
transport.append_file('path', StringIO('content'))
282
self.assertEqual(transport.get('path').read(), 'contentcontent')
284
def test_put_and_get(self):
285
transport = MemoryTransport()
286
transport.put_file('path', StringIO('content'))
287
self.assertEqual(transport.get('path').read(), 'content')
288
transport.put_bytes('path', 'content')
289
self.assertEqual(transport.get('path').read(), 'content')
291
def test_append_without_dir_fails(self):
292
transport = MemoryTransport()
293
self.assertRaises(NoSuchFile,
294
transport.append_bytes, 'dir/path', 'content')
296
def test_put_without_dir_fails(self):
297
transport = MemoryTransport()
298
self.assertRaises(NoSuchFile,
299
transport.put_file, 'dir/path', StringIO('content'))
301
def test_get_missing(self):
302
transport = MemoryTransport()
303
self.assertRaises(NoSuchFile, transport.get, 'foo')
305
def test_has_missing(self):
306
transport = MemoryTransport()
307
self.assertEquals(False, transport.has('foo'))
309
def test_has_present(self):
310
transport = MemoryTransport()
311
transport.append_bytes('foo', 'content')
312
self.assertEquals(True, transport.has('foo'))
314
def test_list_dir(self):
315
transport = MemoryTransport()
316
transport.put_bytes('foo', 'content')
317
transport.mkdir('dir')
318
transport.put_bytes('dir/subfoo', 'content')
319
transport.put_bytes('dirlike', 'content')
321
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
322
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
324
def test_mkdir(self):
325
transport = MemoryTransport()
326
transport.mkdir('dir')
327
transport.append_bytes('dir/path', 'content')
328
self.assertEqual(transport.get('dir/path').read(), 'content')
330
def test_mkdir_missing_parent(self):
331
transport = MemoryTransport()
332
self.assertRaises(NoSuchFile,
333
transport.mkdir, 'dir/dir')
335
def test_mkdir_twice(self):
336
transport = MemoryTransport()
337
transport.mkdir('dir')
338
self.assertRaises(FileExists, transport.mkdir, 'dir')
340
def test_parameters(self):
341
transport = MemoryTransport()
342
self.assertEqual(True, transport.listable())
343
self.assertEqual(False, transport.is_readonly())
345
def test_iter_files_recursive(self):
346
transport = MemoryTransport()
347
transport.mkdir('dir')
348
transport.put_bytes('dir/foo', 'content')
349
transport.put_bytes('dir/bar', 'content')
350
transport.put_bytes('bar', 'content')
351
paths = set(transport.iter_files_recursive())
352
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
355
transport = MemoryTransport()
356
transport.put_bytes('foo', 'content')
357
transport.put_bytes('bar', 'phowar')
358
self.assertEqual(7, transport.stat('foo').st_size)
359
self.assertEqual(6, transport.stat('bar').st_size)
362
class ChrootDecoratorTransportTest(TestCase):
363
"""Chroot decoration specific tests."""
365
def test_abspath(self):
366
# The abspath is always relative to the chroot_url.
367
server = ChrootServer(get_transport('memory:///foo/bar/'))
368
self.start_server(server)
369
transport = get_transport(server.get_url())
370
self.assertEqual(server.get_url(), transport.abspath('/'))
372
subdir_transport = transport.clone('subdir')
373
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
375
def test_clone(self):
376
server = ChrootServer(get_transport('memory:///foo/bar/'))
377
self.start_server(server)
378
transport = get_transport(server.get_url())
379
# relpath from root and root path are the same
380
relpath_cloned = transport.clone('foo')
381
abspath_cloned = transport.clone('/foo')
382
self.assertEqual(server, relpath_cloned.server)
383
self.assertEqual(server, abspath_cloned.server)
385
def test_chroot_url_preserves_chroot(self):
386
"""Calling get_transport on a chroot transport's base should produce a
387
transport with exactly the same behaviour as the original chroot
390
This is so that it is not possible to escape a chroot by doing::
391
url = chroot_transport.base
392
parent_url = urlutils.join(url, '..')
393
new_transport = get_transport(parent_url)
395
server = ChrootServer(get_transport('memory:///path/subpath'))
396
self.start_server(server)
397
transport = get_transport(server.get_url())
398
new_transport = get_transport(transport.base)
399
self.assertEqual(transport.server, new_transport.server)
400
self.assertEqual(transport.base, new_transport.base)
402
def test_urljoin_preserves_chroot(self):
403
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
404
URL that escapes the intended chroot.
406
This is so that it is not possible to escape a chroot by doing::
407
url = chroot_transport.base
408
parent_url = urlutils.join(url, '..')
409
new_transport = get_transport(parent_url)
411
server = ChrootServer(get_transport('memory:///path/'))
412
self.start_server(server)
413
transport = get_transport(server.get_url())
415
InvalidURLJoin, urlutils.join, transport.base, '..')
418
class ChrootServerTest(TestCase):
420
def test_construct(self):
421
backing_transport = MemoryTransport()
422
server = ChrootServer(backing_transport)
423
self.assertEqual(backing_transport, server.backing_transport)
425
def test_setUp(self):
426
backing_transport = MemoryTransport()
427
server = ChrootServer(backing_transport)
430
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
434
def test_tearDown(self):
435
backing_transport = MemoryTransport()
436
server = ChrootServer(backing_transport)
439
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
441
def test_get_url(self):
442
backing_transport = MemoryTransport()
443
server = ChrootServer(backing_transport)
446
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
451
class PathFilteringDecoratorTransportTest(TestCase):
452
"""Pathfilter decoration specific tests."""
454
def test_abspath(self):
455
# The abspath is always relative to the base of the backing transport.
456
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
459
transport = get_transport(server.get_url())
460
self.assertEqual(server.get_url(), transport.abspath('/'))
462
subdir_transport = transport.clone('subdir')
463
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
466
def make_pf_transport(self, filter_func=None):
467
"""Make a PathFilteringTransport backed by a MemoryTransport.
469
:param filter_func: by default this will be a no-op function. Use this
470
parameter to override it."""
471
if filter_func is None:
472
filter_func = lambda x: x
473
server = PathFilteringServer(
474
get_transport('memory:///foo/bar/'), filter_func)
476
self.addCleanup(server.tearDown)
477
return get_transport(server.get_url())
479
def test__filter(self):
480
# _filter (with an identity func as filter_func) always returns
481
# paths relative to the base of the backing transport.
482
transport = self.make_pf_transport()
483
self.assertEqual('foo', transport._filter('foo'))
484
self.assertEqual('foo/bar', transport._filter('foo/bar'))
485
self.assertEqual('', transport._filter('..'))
486
self.assertEqual('', transport._filter('/'))
487
# The base of the pathfiltering transport is taken into account too.
488
transport = transport.clone('subdir1/subdir2')
489
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
491
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
492
self.assertEqual('subdir1', transport._filter('..'))
493
self.assertEqual('', transport._filter('/'))
495
def test_filter_invocation(self):
498
filter_log.append(path)
500
transport = self.make_pf_transport(filter)
502
self.assertEqual(['abc'], filter_log)
504
transport.clone('abc').has('xyz')
505
self.assertEqual(['abc/xyz'], filter_log)
507
transport.has('/abc')
508
self.assertEqual(['abc'], filter_log)
510
def test_clone(self):
511
transport = self.make_pf_transport()
512
# relpath from root and root path are the same
513
relpath_cloned = transport.clone('foo')
514
abspath_cloned = transport.clone('/foo')
515
self.assertEqual(transport.server, relpath_cloned.server)
516
self.assertEqual(transport.server, abspath_cloned.server)
518
def test_url_preserves_pathfiltering(self):
519
"""Calling get_transport on a pathfiltered transport's base should
520
produce a transport with exactly the same behaviour as the original
521
pathfiltered transport.
523
This is so that it is not possible to escape (accidentally or
524
otherwise) the filtering by doing::
525
url = filtered_transport.base
526
parent_url = urlutils.join(url, '..')
527
new_transport = get_transport(parent_url)
529
transport = self.make_pf_transport()
530
new_transport = get_transport(transport.base)
531
self.assertEqual(transport.server, new_transport.server)
532
self.assertEqual(transport.base, new_transport.base)
535
class ReadonlyDecoratorTransportTest(TestCase):
536
"""Readonly decoration specific tests."""
538
def test_local_parameters(self):
539
import bzrlib.transport.readonly as readonly
540
# connect to . in readonly mode
541
transport = readonly.ReadonlyTransportDecorator('readonly+.')
542
self.assertEqual(True, transport.listable())
543
self.assertEqual(True, transport.is_readonly())
545
def test_http_parameters(self):
546
from bzrlib.tests.http_server import HttpServer
547
import bzrlib.transport.readonly as readonly
548
# connect to '.' via http which is not listable
549
server = HttpServer()
550
self.start_server(server)
551
transport = get_transport('readonly+' + server.get_url())
552
self.failUnless(isinstance(transport,
553
readonly.ReadonlyTransportDecorator))
554
self.assertEqual(False, transport.listable())
555
self.assertEqual(True, transport.is_readonly())
558
class FakeNFSDecoratorTests(TestCaseInTempDir):
559
"""NFS decorator specific tests."""
561
def get_nfs_transport(self, url):
562
import bzrlib.transport.fakenfs as fakenfs
563
# connect to url with nfs decoration
564
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
566
def test_local_parameters(self):
567
# the listable and is_readonly parameters
568
# are not changed by the fakenfs decorator
569
transport = self.get_nfs_transport('.')
570
self.assertEqual(True, transport.listable())
571
self.assertEqual(False, transport.is_readonly())
573
def test_http_parameters(self):
574
# the listable and is_readonly parameters
575
# are not changed by the fakenfs decorator
576
from bzrlib.tests.http_server import HttpServer
577
# connect to '.' via http which is not listable
578
server = HttpServer()
579
self.start_server(server)
580
transport = self.get_nfs_transport(server.get_url())
581
self.assertIsInstance(
582
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
583
self.assertEqual(False, transport.listable())
584
self.assertEqual(True, transport.is_readonly())
586
def test_fakenfs_server_default(self):
587
# a FakeNFSServer() should bring up a local relpath server for itself
588
import bzrlib.transport.fakenfs as fakenfs
589
server = fakenfs.FakeNFSServer()
590
self.start_server(server)
591
# the url should be decorated appropriately
592
self.assertStartsWith(server.get_url(), 'fakenfs+')
593
# and we should be able to get a transport for it
594
transport = get_transport(server.get_url())
595
# which must be a FakeNFSTransportDecorator instance.
596
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
598
def test_fakenfs_rename_semantics(self):
599
# a FakeNFS transport must mangle the way rename errors occur to
600
# look like NFS problems.
601
transport = self.get_nfs_transport('.')
602
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
604
self.assertRaises(errors.ResourceBusy,
605
transport.rename, 'from', 'to')
608
class FakeVFATDecoratorTests(TestCaseInTempDir):
609
"""Tests for simulation of VFAT restrictions"""
611
def get_vfat_transport(self, url):
612
"""Return vfat-backed transport for test directory"""
613
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
614
return FakeVFATTransportDecorator('vfat+' + url)
616
def test_transport_creation(self):
617
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
618
transport = self.get_vfat_transport('.')
619
self.assertIsInstance(transport, FakeVFATTransportDecorator)
621
def test_transport_mkdir(self):
622
transport = self.get_vfat_transport('.')
623
transport.mkdir('HELLO')
624
self.assertTrue(transport.has('hello'))
625
self.assertTrue(transport.has('Hello'))
627
def test_forbidden_chars(self):
628
transport = self.get_vfat_transport('.')
629
self.assertRaises(ValueError, transport.has, "<NU>")
632
class BadTransportHandler(Transport):
633
def __init__(self, base_url):
634
raise DependencyNotPresent('some_lib', 'testing missing dependency')
637
class BackupTransportHandler(Transport):
638
"""Test transport that works as a backup for the BadTransportHandler"""
642
class TestTransportImplementation(TestCaseInTempDir):
643
"""Implementation verification for transports.
645
To verify a transport we need a server factory, which is a callable
646
that accepts no parameters and returns an implementation of
647
bzrlib.transport.Server.
649
That Server is then used to construct transport instances and test
650
the transport via loopback activity.
652
Currently this assumes that the Transport object is connected to the
653
current working directory. So that whatever is done
654
through the transport, should show up in the working
655
directory, and vice-versa. This is a bug, because its possible to have
656
URL schemes which provide access to something that may not be
657
result in storage on the local disk, i.e. due to file system limits, or
658
due to it being a database or some other non-filesystem tool.
660
This also tests to make sure that the functions work with both
661
generators and lists (assuming iter(list) is effectively a generator)
665
super(TestTransportImplementation, self).setUp()
666
self._server = self.transport_server()
667
self.start_server(self._server)
669
def get_transport(self, relpath=None):
670
"""Return a connected transport to the local directory.
672
:param relpath: a path relative to the base url.
674
base_url = self._server.get_url()
675
url = self._adjust_url(base_url, relpath)
676
# try getting the transport via the regular interface:
677
t = get_transport(url)
678
# vila--20070607 if the following are commented out the test suite
679
# still pass. Is this really still needed or was it a forgotten
681
if not isinstance(t, self.transport_class):
682
# we did not get the correct transport class type. Override the
683
# regular connection behaviour by direct construction.
684
t = self.transport_class(url)
688
class TestLocalTransports(TestCase):
690
def test_get_transport_from_abspath(self):
691
here = osutils.abspath('.')
692
t = get_transport(here)
693
self.assertIsInstance(t, LocalTransport)
694
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
696
def test_get_transport_from_relpath(self):
697
here = osutils.abspath('.')
698
t = get_transport('.')
699
self.assertIsInstance(t, LocalTransport)
700
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
702
def test_get_transport_from_local_url(self):
703
here = osutils.abspath('.')
704
here_url = urlutils.local_path_to_url(here) + '/'
705
t = get_transport(here_url)
706
self.assertIsInstance(t, LocalTransport)
707
self.assertEquals(t.base, here_url)
709
def test_local_abspath(self):
710
here = osutils.abspath('.')
711
t = get_transport(here)
712
self.assertEquals(t.local_abspath(''), here)
715
class TestWin32LocalTransport(TestCase):
717
def test_unc_clone_to_root(self):
718
# Win32 UNC path like \\HOST\path
719
# clone to root should stop at least at \\HOST part
721
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
724
self.assertEquals(t.base, 'file://HOST/')
725
# make sure we reach the root
727
self.assertEquals(t.base, 'file://HOST/')
730
class TestConnectedTransport(TestCase):
731
"""Tests for connected to remote server transports"""
733
def test_parse_url(self):
734
t = ConnectedTransport('http://simple.example.com/home/source')
735
self.assertEquals(t._host, 'simple.example.com')
736
self.assertEquals(t._port, None)
737
self.assertEquals(t._path, '/home/source/')
738
self.failUnless(t._user is None)
739
self.failUnless(t._password is None)
741
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
743
def test_parse_url_with_at_in_user(self):
745
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
746
self.assertEquals(t._user, 'user@host.com')
748
def test_parse_quoted_url(self):
749
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
750
self.assertEquals(t._host, 'exAmple.com')
751
self.assertEquals(t._port, 2222)
752
self.assertEquals(t._user, 'robey')
753
self.assertEquals(t._password, 'h@t')
754
self.assertEquals(t._path, '/path/')
756
# Base should not keep track of the password
757
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
759
def test_parse_invalid_url(self):
760
self.assertRaises(errors.InvalidURL,
762
'sftp://lily.org:~janneke/public/bzr/gub')
764
def test_relpath(self):
765
t = ConnectedTransport('sftp://user@host.com/abs/path')
767
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
768
self.assertRaises(errors.PathNotChild, t.relpath,
769
'http://user@host.com/abs/path/sub')
770
self.assertRaises(errors.PathNotChild, t.relpath,
771
'sftp://user2@host.com/abs/path/sub')
772
self.assertRaises(errors.PathNotChild, t.relpath,
773
'sftp://user@otherhost.com/abs/path/sub')
774
self.assertRaises(errors.PathNotChild, t.relpath,
775
'sftp://user@host.com:33/abs/path/sub')
776
# Make sure it works when we don't supply a username
777
t = ConnectedTransport('sftp://host.com/abs/path')
778
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
780
# Make sure it works when parts of the path will be url encoded
781
t = ConnectedTransport('sftp://host.com/dev/%path')
782
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
784
def test_connection_sharing_propagate_credentials(self):
785
t = ConnectedTransport('ftp://user@host.com/abs/path')
786
self.assertEquals('user', t._user)
787
self.assertEquals('host.com', t._host)
788
self.assertIs(None, t._get_connection())
789
self.assertIs(None, t._password)
790
c = t.clone('subdir')
791
self.assertIs(None, c._get_connection())
792
self.assertIs(None, t._password)
794
# Simulate the user entering a password
796
connection = object()
797
t._set_connection(connection, password)
798
self.assertIs(connection, t._get_connection())
799
self.assertIs(password, t._get_credentials())
800
self.assertIs(connection, c._get_connection())
801
self.assertIs(password, c._get_credentials())
803
# credentials can be updated
804
new_password = 'even more secret'
805
c._update_credentials(new_password)
806
self.assertIs(connection, t._get_connection())
807
self.assertIs(new_password, t._get_credentials())
808
self.assertIs(connection, c._get_connection())
809
self.assertIs(new_password, c._get_credentials())
812
class TestReusedTransports(TestCase):
813
"""Tests for transport reuse"""
815
def test_reuse_same_transport(self):
816
possible_transports = []
817
t1 = get_transport('http://foo/',
818
possible_transports=possible_transports)
819
self.assertEqual([t1], possible_transports)
820
t2 = get_transport('http://foo/', possible_transports=[t1])
821
self.assertIs(t1, t2)
823
# Also check that final '/' are handled correctly
824
t3 = get_transport('http://foo/path/')
825
t4 = get_transport('http://foo/path', possible_transports=[t3])
826
self.assertIs(t3, t4)
828
t5 = get_transport('http://foo/path')
829
t6 = get_transport('http://foo/path/', possible_transports=[t5])
830
self.assertIs(t5, t6)
832
def test_don_t_reuse_different_transport(self):
833
t1 = get_transport('http://foo/path')
834
t2 = get_transport('http://bar/path', possible_transports=[t1])
835
self.assertIsNot(t1, t2)
838
class TestTransportTrace(TestCase):
841
transport = get_transport('trace+memory://')
842
self.assertIsInstance(
843
transport, bzrlib.transport.trace.TransportTraceDecorator)
845
def test_clone_preserves_activity(self):
846
transport = get_transport('trace+memory://')
847
transport2 = transport.clone('.')
848
self.assertTrue(transport is not transport2)
849
self.assertTrue(transport._activity is transport2._activity)
851
# the following specific tests are for the operations that have made use of
852
# logging in tests; we could test every single operation but doing that
853
# still won't cause a test failure when the top level Transport API
854
# changes; so there is little return doing that.
856
transport = get_transport('trace+memory:///')
857
transport.put_bytes('foo', 'barish')
860
# put_bytes records the bytes, not the content to avoid memory
862
expected_result.append(('put_bytes', 'foo', 6, None))
863
# get records the file name only.
864
expected_result.append(('get', 'foo'))
865
self.assertEqual(expected_result, transport._activity)
867
def test_readv(self):
868
transport = get_transport('trace+memory:///')
869
transport.put_bytes('foo', 'barish')
870
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
873
# put_bytes records the bytes, not the content to avoid memory
875
expected_result.append(('put_bytes', 'foo', 6, None))
876
# readv records the supplied offset request
877
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
878
self.assertEqual(expected_result, transport._activity)