1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from cStringIO import StringIO
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
51
from bzrlib.transport.pathfilter import PathFilteringServer
54
# TODO: Should possibly split transport-specific tests into their own files.
57
class TestTransport(TestCase):
58
"""Test the non transport-concrete class functionality."""
60
def test__get_set_protocol_handlers(self):
61
handlers = _get_protocol_handlers()
62
self.assertNotEqual([], handlers.keys( ))
64
_clear_protocol_handlers()
65
self.assertEqual([], _get_protocol_handlers().keys())
67
_set_protocol_handlers(handlers)
69
def test_get_transport_modules(self):
70
handlers = _get_protocol_handlers()
71
# don't pollute the current handlers
72
_clear_protocol_handlers()
73
class SampleHandler(object):
74
"""I exist, isnt that enough?"""
76
_clear_protocol_handlers()
77
register_transport_proto('foo')
78
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
80
register_transport_proto('bar')
81
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
82
'TestTransport.SampleHandler')
83
self.assertEqual([SampleHandler.__module__,
84
'bzrlib.transport.chroot'],
85
_get_transport_modules())
87
_set_protocol_handlers(handlers)
89
def test_transport_dependency(self):
90
"""Transport with missing dependency causes no error"""
91
saved_handlers = _get_protocol_handlers()
92
# don't pollute the current handlers
93
_clear_protocol_handlers()
95
register_transport_proto('foo')
96
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
97
'BadTransportHandler')
99
get_transport('foo://fooserver/foo')
100
except UnsupportedProtocol, e:
102
self.assertEquals('Unsupported protocol'
103
' for url "foo://fooserver/foo":'
104
' Unable to import library "some_lib":'
105
' testing missing dependency', str(e))
107
self.fail('Did not raise UnsupportedProtocol')
109
# restore original values
110
_set_protocol_handlers(saved_handlers)
112
def test_transport_fallback(self):
113
"""Transport with missing dependency causes no error"""
114
saved_handlers = _get_protocol_handlers()
116
_clear_protocol_handlers()
117
register_transport_proto('foo')
118
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
119
'BackupTransportHandler')
120
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
121
'BadTransportHandler')
122
t = get_transport('foo://fooserver/foo')
123
self.assertTrue(isinstance(t, BackupTransportHandler))
125
_set_protocol_handlers(saved_handlers)
127
def test_ssh_hints(self):
128
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
130
get_transport('ssh://fooserver/foo')
131
except UnsupportedProtocol, e:
133
self.assertEquals('Unsupported protocol'
134
' for url "ssh://fooserver/foo":'
135
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
138
self.fail('Did not raise UnsupportedProtocol')
140
def test_LateReadError(self):
141
"""The LateReadError helper should raise on read()."""
142
a_file = LateReadError('a path')
145
except ReadError, error:
146
self.assertEqual('a path', error.path)
147
self.assertRaises(ReadError, a_file.read, 40)
150
def test__combine_paths(self):
152
self.assertEqual('/home/sarah/project/foo',
153
t._combine_paths('/home/sarah', 'project/foo'))
154
self.assertEqual('/etc',
155
t._combine_paths('/home/sarah', '../../etc'))
156
self.assertEqual('/etc',
157
t._combine_paths('/home/sarah', '../../../etc'))
158
self.assertEqual('/etc',
159
t._combine_paths('/home/sarah', '/etc'))
161
def test_local_abspath_non_local_transport(self):
162
# the base implementation should throw
163
t = MemoryTransport()
164
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
165
self.assertEqual('memory:///t is not a local path.', str(e))
168
class TestCoalesceOffsets(TestCase):
170
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
171
coalesce = Transport._coalesce_offsets
172
exp = [_CoalescedOffset(*x) for x in expected]
173
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
175
self.assertEqual(exp, out)
177
def test_coalesce_empty(self):
180
def test_coalesce_simple(self):
181
self.check([(0, 10, [(0, 10)])], [(0, 10)])
183
def test_coalesce_unrelated(self):
184
self.check([(0, 10, [(0, 10)]),
186
], [(0, 10), (20, 10)])
188
def test_coalesce_unsorted(self):
189
self.check([(20, 10, [(0, 10)]),
191
], [(20, 10), (0, 10)])
193
def test_coalesce_nearby(self):
194
self.check([(0, 20, [(0, 10), (10, 10)])],
197
def test_coalesce_overlapped(self):
198
self.assertRaises(ValueError,
199
self.check, [(0, 15, [(0, 10), (5, 10)])],
202
def test_coalesce_limit(self):
203
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
204
(30, 10), (40, 10)]),
205
(60, 50, [(0, 10), (10, 10), (20, 10),
206
(30, 10), (40, 10)]),
207
], [(10, 10), (20, 10), (30, 10), (40, 10),
208
(50, 10), (60, 10), (70, 10), (80, 10),
209
(90, 10), (100, 10)],
212
def test_coalesce_no_limit(self):
213
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
214
(30, 10), (40, 10), (50, 10),
215
(60, 10), (70, 10), (80, 10),
217
], [(10, 10), (20, 10), (30, 10), (40, 10),
218
(50, 10), (60, 10), (70, 10), (80, 10),
219
(90, 10), (100, 10)])
221
def test_coalesce_fudge(self):
222
self.check([(10, 30, [(0, 10), (20, 10)]),
223
(100, 10, [(0, 10),]),
224
], [(10, 10), (30, 10), (100, 10)],
227
def test_coalesce_max_size(self):
228
self.check([(10, 20, [(0, 10), (10, 10)]),
230
# If one range is above max_size, it gets its own coalesced
232
(100, 80, [(0, 80),]),],
233
[(10, 10), (20, 10), (30, 50), (100, 80)],
237
def test_coalesce_no_max_size(self):
238
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
239
[(10, 10), (20, 10), (30, 50), (80, 100)],
242
def test_coalesce_default_limit(self):
243
# By default we use a 100MB max size.
244
ten_mb = 10*1024*1024
245
self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
246
(10*ten_mb, ten_mb, [(0, ten_mb)])],
247
[(i*ten_mb, ten_mb) for i in range(11)])
248
self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
249
[(i*ten_mb, ten_mb) for i in range(11)],
250
max_size=1*1024*1024*1024)
253
class TestMemoryTransport(TestCase):
255
def test_get_transport(self):
258
def test_clone(self):
259
transport = MemoryTransport()
260
self.assertTrue(isinstance(transport, MemoryTransport))
261
self.assertEqual("memory:///", transport.clone("/").base)
263
def test_abspath(self):
264
transport = MemoryTransport()
265
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
267
def test_abspath_of_root(self):
268
transport = MemoryTransport()
269
self.assertEqual("memory:///", transport.base)
270
self.assertEqual("memory:///", transport.abspath('/'))
272
def test_abspath_of_relpath_starting_at_root(self):
273
transport = MemoryTransport()
274
self.assertEqual("memory:///foo", transport.abspath('/foo'))
276
def test_append_and_get(self):
277
transport = MemoryTransport()
278
transport.append_bytes('path', 'content')
279
self.assertEqual(transport.get('path').read(), 'content')
280
transport.append_file('path', StringIO('content'))
281
self.assertEqual(transport.get('path').read(), 'contentcontent')
283
def test_put_and_get(self):
284
transport = MemoryTransport()
285
transport.put_file('path', StringIO('content'))
286
self.assertEqual(transport.get('path').read(), 'content')
287
transport.put_bytes('path', 'content')
288
self.assertEqual(transport.get('path').read(), 'content')
290
def test_append_without_dir_fails(self):
291
transport = MemoryTransport()
292
self.assertRaises(NoSuchFile,
293
transport.append_bytes, 'dir/path', 'content')
295
def test_put_without_dir_fails(self):
296
transport = MemoryTransport()
297
self.assertRaises(NoSuchFile,
298
transport.put_file, 'dir/path', StringIO('content'))
300
def test_get_missing(self):
301
transport = MemoryTransport()
302
self.assertRaises(NoSuchFile, transport.get, 'foo')
304
def test_has_missing(self):
305
transport = MemoryTransport()
306
self.assertEquals(False, transport.has('foo'))
308
def test_has_present(self):
309
transport = MemoryTransport()
310
transport.append_bytes('foo', 'content')
311
self.assertEquals(True, transport.has('foo'))
313
def test_list_dir(self):
314
transport = MemoryTransport()
315
transport.put_bytes('foo', 'content')
316
transport.mkdir('dir')
317
transport.put_bytes('dir/subfoo', 'content')
318
transport.put_bytes('dirlike', 'content')
320
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
321
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
323
def test_mkdir(self):
324
transport = MemoryTransport()
325
transport.mkdir('dir')
326
transport.append_bytes('dir/path', 'content')
327
self.assertEqual(transport.get('dir/path').read(), 'content')
329
def test_mkdir_missing_parent(self):
330
transport = MemoryTransport()
331
self.assertRaises(NoSuchFile,
332
transport.mkdir, 'dir/dir')
334
def test_mkdir_twice(self):
335
transport = MemoryTransport()
336
transport.mkdir('dir')
337
self.assertRaises(FileExists, transport.mkdir, 'dir')
339
def test_parameters(self):
340
transport = MemoryTransport()
341
self.assertEqual(True, transport.listable())
342
self.assertEqual(False, transport.is_readonly())
344
def test_iter_files_recursive(self):
345
transport = MemoryTransport()
346
transport.mkdir('dir')
347
transport.put_bytes('dir/foo', 'content')
348
transport.put_bytes('dir/bar', 'content')
349
transport.put_bytes('bar', 'content')
350
paths = set(transport.iter_files_recursive())
351
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
354
transport = MemoryTransport()
355
transport.put_bytes('foo', 'content')
356
transport.put_bytes('bar', 'phowar')
357
self.assertEqual(7, transport.stat('foo').st_size)
358
self.assertEqual(6, transport.stat('bar').st_size)
361
class ChrootDecoratorTransportTest(TestCase):
362
"""Chroot decoration specific tests."""
364
def test_abspath(self):
365
# The abspath is always relative to the chroot_url.
366
server = ChrootServer(get_transport('memory:///foo/bar/'))
368
transport = get_transport(server.get_url())
369
self.assertEqual(server.get_url(), transport.abspath('/'))
371
subdir_transport = transport.clone('subdir')
372
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
375
def test_clone(self):
376
server = ChrootServer(get_transport('memory:///foo/bar/'))
378
transport = get_transport(server.get_url())
379
# relpath from root and root path are the same
380
relpath_cloned = transport.clone('foo')
381
abspath_cloned = transport.clone('/foo')
382
self.assertEqual(server, relpath_cloned.server)
383
self.assertEqual(server, abspath_cloned.server)
386
def test_chroot_url_preserves_chroot(self):
387
"""Calling get_transport on a chroot transport's base should produce a
388
transport with exactly the same behaviour as the original chroot
391
This is so that it is not possible to escape a chroot by doing::
392
url = chroot_transport.base
393
parent_url = urlutils.join(url, '..')
394
new_transport = get_transport(parent_url)
396
server = ChrootServer(get_transport('memory:///path/subpath'))
398
transport = get_transport(server.get_url())
399
new_transport = get_transport(transport.base)
400
self.assertEqual(transport.server, new_transport.server)
401
self.assertEqual(transport.base, new_transport.base)
404
def test_urljoin_preserves_chroot(self):
405
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
406
URL that escapes the intended chroot.
408
This is so that it is not possible to escape a chroot by doing::
409
url = chroot_transport.base
410
parent_url = urlutils.join(url, '..')
411
new_transport = get_transport(parent_url)
413
server = ChrootServer(get_transport('memory:///path/'))
415
transport = get_transport(server.get_url())
417
InvalidURLJoin, urlutils.join, transport.base, '..')
421
class ChrootServerTest(TestCase):
423
def test_construct(self):
424
backing_transport = MemoryTransport()
425
server = ChrootServer(backing_transport)
426
self.assertEqual(backing_transport, server.backing_transport)
428
def test_setUp(self):
429
backing_transport = MemoryTransport()
430
server = ChrootServer(backing_transport)
432
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
434
def test_tearDown(self):
435
backing_transport = MemoryTransport()
436
server = ChrootServer(backing_transport)
439
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
441
def test_get_url(self):
442
backing_transport = MemoryTransport()
443
server = ChrootServer(backing_transport)
445
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
449
class PathFilteringDecoratorTransportTest(TestCase):
450
"""Pathfilter decoration specific tests."""
452
def test_abspath(self):
453
# The abspath is always relative to the base of the backing transport.
454
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
457
transport = get_transport(server.get_url())
458
self.assertEqual(server.get_url(), transport.abspath('/'))
460
subdir_transport = transport.clone('subdir')
461
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
464
def make_pf_transport(self, filter_func=None):
465
"""Make a PathFilteringTransport backed by a MemoryTransport.
467
:param filter_func: by default this will be a no-op function. Use this
468
parameter to override it."""
469
if filter_func is None:
470
filter_func = lambda x: x
471
server = PathFilteringServer(
472
get_transport('memory:///foo/bar/'), filter_func)
474
self.addCleanup(server.tearDown)
475
return get_transport(server.get_url())
477
def test__filter(self):
478
# _filter (with an identity func as filter_func) always returns
479
# paths relative to the base of the backing transport.
480
transport = self.make_pf_transport()
481
self.assertEqual('foo', transport._filter('foo'))
482
self.assertEqual('foo/bar', transport._filter('foo/bar'))
483
self.assertEqual('', transport._filter('..'))
484
self.assertEqual('', transport._filter('/'))
485
# The base of the pathfiltering transport is taken into account too.
486
transport = transport.clone('subdir1/subdir2')
487
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
489
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
490
self.assertEqual('subdir1', transport._filter('..'))
491
self.assertEqual('', transport._filter('/'))
493
def test_filter_invocation(self):
496
filter_log.append(path)
498
transport = self.make_pf_transport(filter)
500
self.assertEqual(['abc'], filter_log)
502
transport.clone('abc').has('xyz')
503
self.assertEqual(['abc/xyz'], filter_log)
505
transport.has('/abc')
506
self.assertEqual(['abc'], filter_log)
508
def test_clone(self):
509
transport = self.make_pf_transport()
510
# relpath from root and root path are the same
511
relpath_cloned = transport.clone('foo')
512
abspath_cloned = transport.clone('/foo')
513
self.assertEqual(transport.server, relpath_cloned.server)
514
self.assertEqual(transport.server, abspath_cloned.server)
516
def test_url_preserves_pathfiltering(self):
517
"""Calling get_transport on a pathfiltered transport's base should
518
produce a transport with exactly the same behaviour as the original
519
pathfiltered transport.
521
This is so that it is not possible to escape (accidentally or
522
otherwise) the filtering by doing::
523
url = filtered_transport.base
524
parent_url = urlutils.join(url, '..')
525
new_transport = get_transport(parent_url)
527
transport = self.make_pf_transport()
528
new_transport = get_transport(transport.base)
529
self.assertEqual(transport.server, new_transport.server)
530
self.assertEqual(transport.base, new_transport.base)
533
class ReadonlyDecoratorTransportTest(TestCase):
534
"""Readonly decoration specific tests."""
536
def test_local_parameters(self):
537
import bzrlib.transport.readonly as readonly
538
# connect to . in readonly mode
539
transport = readonly.ReadonlyTransportDecorator('readonly+.')
540
self.assertEqual(True, transport.listable())
541
self.assertEqual(True, transport.is_readonly())
543
def test_http_parameters(self):
544
from bzrlib.tests.http_server import HttpServer
545
import bzrlib.transport.readonly as readonly
546
# connect to '.' via http which is not listable
547
server = HttpServer()
550
transport = get_transport('readonly+' + server.get_url())
551
self.failUnless(isinstance(transport,
552
readonly.ReadonlyTransportDecorator))
553
self.assertEqual(False, transport.listable())
554
self.assertEqual(True, transport.is_readonly())
559
class FakeNFSDecoratorTests(TestCaseInTempDir):
560
"""NFS decorator specific tests."""
562
def get_nfs_transport(self, url):
563
import bzrlib.transport.fakenfs as fakenfs
564
# connect to url with nfs decoration
565
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
567
def test_local_parameters(self):
568
# the listable and is_readonly parameters
569
# are not changed by the fakenfs decorator
570
transport = self.get_nfs_transport('.')
571
self.assertEqual(True, transport.listable())
572
self.assertEqual(False, transport.is_readonly())
574
def test_http_parameters(self):
575
# the listable and is_readonly parameters
576
# are not changed by the fakenfs decorator
577
from bzrlib.tests.http_server import HttpServer
578
# connect to '.' via http which is not listable
579
server = HttpServer()
582
transport = self.get_nfs_transport(server.get_url())
583
self.assertIsInstance(
584
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
585
self.assertEqual(False, transport.listable())
586
self.assertEqual(True, transport.is_readonly())
590
def test_fakenfs_server_default(self):
591
# a FakeNFSServer() should bring up a local relpath server for itself
592
import bzrlib.transport.fakenfs as fakenfs
593
server = fakenfs.FakeNFSServer()
596
# the url should be decorated appropriately
597
self.assertStartsWith(server.get_url(), 'fakenfs+')
598
# and we should be able to get a transport for it
599
transport = get_transport(server.get_url())
600
# which must be a FakeNFSTransportDecorator instance.
601
self.assertIsInstance(
602
transport, fakenfs.FakeNFSTransportDecorator)
606
def test_fakenfs_rename_semantics(self):
607
# a FakeNFS transport must mangle the way rename errors occur to
608
# look like NFS problems.
609
transport = self.get_nfs_transport('.')
610
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
612
self.assertRaises(errors.ResourceBusy,
613
transport.rename, 'from', 'to')
616
class FakeVFATDecoratorTests(TestCaseInTempDir):
617
"""Tests for simulation of VFAT restrictions"""
619
def get_vfat_transport(self, url):
620
"""Return vfat-backed transport for test directory"""
621
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
622
return FakeVFATTransportDecorator('vfat+' + url)
624
def test_transport_creation(self):
625
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
626
transport = self.get_vfat_transport('.')
627
self.assertIsInstance(transport, FakeVFATTransportDecorator)
629
def test_transport_mkdir(self):
630
transport = self.get_vfat_transport('.')
631
transport.mkdir('HELLO')
632
self.assertTrue(transport.has('hello'))
633
self.assertTrue(transport.has('Hello'))
635
def test_forbidden_chars(self):
636
transport = self.get_vfat_transport('.')
637
self.assertRaises(ValueError, transport.has, "<NU>")
640
class BadTransportHandler(Transport):
641
def __init__(self, base_url):
642
raise DependencyNotPresent('some_lib', 'testing missing dependency')
645
class BackupTransportHandler(Transport):
646
"""Test transport that works as a backup for the BadTransportHandler"""
650
class TestTransportImplementation(TestCaseInTempDir):
651
"""Implementation verification for transports.
653
To verify a transport we need a server factory, which is a callable
654
that accepts no parameters and returns an implementation of
655
bzrlib.transport.Server.
657
That Server is then used to construct transport instances and test
658
the transport via loopback activity.
660
Currently this assumes that the Transport object is connected to the
661
current working directory. So that whatever is done
662
through the transport, should show up in the working
663
directory, and vice-versa. This is a bug, because its possible to have
664
URL schemes which provide access to something that may not be
665
result in storage on the local disk, i.e. due to file system limits, or
666
due to it being a database or some other non-filesystem tool.
668
This also tests to make sure that the functions work with both
669
generators and lists (assuming iter(list) is effectively a generator)
673
super(TestTransportImplementation, self).setUp()
674
self._server = self.transport_server()
676
self.addCleanup(self._server.tearDown)
678
def get_transport(self, relpath=None):
679
"""Return a connected transport to the local directory.
681
:param relpath: a path relative to the base url.
683
base_url = self._server.get_url()
684
url = self._adjust_url(base_url, relpath)
685
# try getting the transport via the regular interface:
686
t = get_transport(url)
687
# vila--20070607 if the following are commented out the test suite
688
# still pass. Is this really still needed or was it a forgotten
690
if not isinstance(t, self.transport_class):
691
# we did not get the correct transport class type. Override the
692
# regular connection behaviour by direct construction.
693
t = self.transport_class(url)
697
class TestLocalTransports(TestCase):
699
def test_get_transport_from_abspath(self):
700
here = osutils.abspath('.')
701
t = get_transport(here)
702
self.assertIsInstance(t, LocalTransport)
703
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
705
def test_get_transport_from_relpath(self):
706
here = osutils.abspath('.')
707
t = get_transport('.')
708
self.assertIsInstance(t, LocalTransport)
709
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
711
def test_get_transport_from_local_url(self):
712
here = osutils.abspath('.')
713
here_url = urlutils.local_path_to_url(here) + '/'
714
t = get_transport(here_url)
715
self.assertIsInstance(t, LocalTransport)
716
self.assertEquals(t.base, here_url)
718
def test_local_abspath(self):
719
here = osutils.abspath('.')
720
t = get_transport(here)
721
self.assertEquals(t.local_abspath(''), here)
724
class TestWin32LocalTransport(TestCase):
726
def test_unc_clone_to_root(self):
727
# Win32 UNC path like \\HOST\path
728
# clone to root should stop at least at \\HOST part
730
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
733
self.assertEquals(t.base, 'file://HOST/')
734
# make sure we reach the root
736
self.assertEquals(t.base, 'file://HOST/')
739
class TestConnectedTransport(TestCase):
740
"""Tests for connected to remote server transports"""
742
def test_parse_url(self):
743
t = ConnectedTransport('http://simple.example.com/home/source')
744
self.assertEquals(t._host, 'simple.example.com')
745
self.assertEquals(t._port, None)
746
self.assertEquals(t._path, '/home/source/')
747
self.failUnless(t._user is None)
748
self.failUnless(t._password is None)
750
self.assertEquals(t.base, 'http://simple.example.com/home/source/')
752
def test_parse_url_with_at_in_user(self):
754
t = ConnectedTransport('ftp://user@host.com@www.host.com/')
755
self.assertEquals(t._user, 'user@host.com')
757
def test_parse_quoted_url(self):
758
t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
759
self.assertEquals(t._host, 'exAmple.com')
760
self.assertEquals(t._port, 2222)
761
self.assertEquals(t._user, 'robey')
762
self.assertEquals(t._password, 'h@t')
763
self.assertEquals(t._path, '/path/')
765
# Base should not keep track of the password
766
self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
768
def test_parse_invalid_url(self):
769
self.assertRaises(errors.InvalidURL,
771
'sftp://lily.org:~janneke/public/bzr/gub')
773
def test_relpath(self):
774
t = ConnectedTransport('sftp://user@host.com/abs/path')
776
self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
777
self.assertRaises(errors.PathNotChild, t.relpath,
778
'http://user@host.com/abs/path/sub')
779
self.assertRaises(errors.PathNotChild, t.relpath,
780
'sftp://user2@host.com/abs/path/sub')
781
self.assertRaises(errors.PathNotChild, t.relpath,
782
'sftp://user@otherhost.com/abs/path/sub')
783
self.assertRaises(errors.PathNotChild, t.relpath,
784
'sftp://user@host.com:33/abs/path/sub')
785
# Make sure it works when we don't supply a username
786
t = ConnectedTransport('sftp://host.com/abs/path')
787
self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
789
# Make sure it works when parts of the path will be url encoded
790
t = ConnectedTransport('sftp://host.com/dev/%path')
791
self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
793
def test_connection_sharing_propagate_credentials(self):
794
t = ConnectedTransport('ftp://user@host.com/abs/path')
795
self.assertEquals('user', t._user)
796
self.assertEquals('host.com', t._host)
797
self.assertIs(None, t._get_connection())
798
self.assertIs(None, t._password)
799
c = t.clone('subdir')
800
self.assertIs(None, c._get_connection())
801
self.assertIs(None, t._password)
803
# Simulate the user entering a password
805
connection = object()
806
t._set_connection(connection, password)
807
self.assertIs(connection, t._get_connection())
808
self.assertIs(password, t._get_credentials())
809
self.assertIs(connection, c._get_connection())
810
self.assertIs(password, c._get_credentials())
812
# credentials can be updated
813
new_password = 'even more secret'
814
c._update_credentials(new_password)
815
self.assertIs(connection, t._get_connection())
816
self.assertIs(new_password, t._get_credentials())
817
self.assertIs(connection, c._get_connection())
818
self.assertIs(new_password, c._get_credentials())
821
class TestReusedTransports(TestCase):
822
"""Tests for transport reuse"""
824
def test_reuse_same_transport(self):
825
possible_transports = []
826
t1 = get_transport('http://foo/',
827
possible_transports=possible_transports)
828
self.assertEqual([t1], possible_transports)
829
t2 = get_transport('http://foo/', possible_transports=[t1])
830
self.assertIs(t1, t2)
832
# Also check that final '/' are handled correctly
833
t3 = get_transport('http://foo/path/')
834
t4 = get_transport('http://foo/path', possible_transports=[t3])
835
self.assertIs(t3, t4)
837
t5 = get_transport('http://foo/path')
838
t6 = get_transport('http://foo/path/', possible_transports=[t5])
839
self.assertIs(t5, t6)
841
def test_don_t_reuse_different_transport(self):
842
t1 = get_transport('http://foo/path')
843
t2 = get_transport('http://bar/path', possible_transports=[t1])
844
self.assertIsNot(t1, t2)
847
class TestTransportTrace(TestCase):
850
transport = get_transport('trace+memory://')
851
self.assertIsInstance(
852
transport, bzrlib.transport.trace.TransportTraceDecorator)
854
def test_clone_preserves_activity(self):
855
transport = get_transport('trace+memory://')
856
transport2 = transport.clone('.')
857
self.assertTrue(transport is not transport2)
858
self.assertTrue(transport._activity is transport2._activity)
860
# the following specific tests are for the operations that have made use of
861
# logging in tests; we could test every single operation but doing that
862
# still won't cause a test failure when the top level Transport API
863
# changes; so there is little return doing that.
865
transport = get_transport('trace+memory:///')
866
transport.put_bytes('foo', 'barish')
869
# put_bytes records the bytes, not the content to avoid memory
871
expected_result.append(('put_bytes', 'foo', 6, None))
872
# get records the file name only.
873
expected_result.append(('get', 'foo'))
874
self.assertEqual(expected_result, transport._activity)
876
def test_readv(self):
877
transport = get_transport('trace+memory:///')
878
transport.put_bytes('foo', 'barish')
879
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
882
# put_bytes records the bytes, not the content to avoid memory
884
expected_result.append(('put_bytes', 'foo', 6, None))
885
# readv records the supplied offset request
886
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
887
self.assertEqual(expected_result, transport._activity)