1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21
from cStringIO import StringIO
24
from bzrlib import urlutils
25
from bzrlib.errors import (ConnectionError,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_CoalescedOffset,
36
_get_protocol_handlers,
37
_set_protocol_handlers,
38
_get_transport_modules,
40
register_lazy_transport,
41
register_transport_proto,
42
_clear_protocol_handlers,
45
from bzrlib.transport.chroot import ChrootServer
46
from bzrlib.transport.memory import MemoryTransport
47
from bzrlib.transport.local import (LocalTransport,
48
EmulatedWin32LocalTransport)
51
# TODO: Should possibly split transport-specific tests into their own files.
54
class TestTransport(TestCase):
55
"""Test the non transport-concrete class functionality."""
57
def test__get_set_protocol_handlers(self):
58
handlers = _get_protocol_handlers()
59
self.assertNotEqual([], handlers.keys( ))
61
_clear_protocol_handlers()
62
self.assertEqual([], _get_protocol_handlers().keys())
64
_set_protocol_handlers(handlers)
66
def test_get_transport_modules(self):
67
handlers = _get_protocol_handlers()
68
class SampleHandler(object):
69
"""I exist, isnt that enough?"""
71
_clear_protocol_handlers()
72
register_transport_proto('foo')
73
register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
74
register_transport_proto('bar')
75
register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
76
self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
77
_get_transport_modules())
79
_set_protocol_handlers(handlers)
81
def test_transport_dependency(self):
82
"""Transport with missing dependency causes no error"""
83
saved_handlers = _get_protocol_handlers()
85
register_transport_proto('foo')
86
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
87
'BadTransportHandler')
89
get_transport('foo://fooserver/foo')
90
except UnsupportedProtocol, e:
92
self.assertEquals('Unsupported protocol'
93
' for url "foo://fooserver/foo":'
94
' Unable to import library "some_lib":'
95
' testing missing dependency', str(e))
97
self.fail('Did not raise UnsupportedProtocol')
99
# restore original values
100
_set_protocol_handlers(saved_handlers)
102
def test_transport_fallback(self):
103
"""Transport with missing dependency causes no error"""
104
saved_handlers = _get_protocol_handlers()
106
_clear_protocol_handlers()
107
register_transport_proto('foo')
108
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
109
'BackupTransportHandler')
110
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
111
'BadTransportHandler')
112
t = get_transport('foo://fooserver/foo')
113
self.assertTrue(isinstance(t, BackupTransportHandler))
115
_set_protocol_handlers(saved_handlers)
117
def test__combine_paths(self):
119
self.assertEqual('/home/sarah/project/foo',
120
t._combine_paths('/home/sarah', 'project/foo'))
121
self.assertEqual('/etc',
122
t._combine_paths('/home/sarah', '../../etc'))
123
self.assertEqual('/etc',
124
t._combine_paths('/home/sarah', '../../../etc'))
125
self.assertEqual('/etc',
126
t._combine_paths('/home/sarah', '/etc'))
129
class TestCoalesceOffsets(TestCase):
131
def check(self, expected, offsets, limit=0, fudge=0):
132
coalesce = Transport._coalesce_offsets
133
exp = [_CoalescedOffset(*x) for x in expected]
134
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
135
self.assertEqual(exp, out)
137
def test_coalesce_empty(self):
140
def test_coalesce_simple(self):
141
self.check([(0, 10, [(0, 10)])], [(0, 10)])
143
def test_coalesce_unrelated(self):
144
self.check([(0, 10, [(0, 10)]),
146
], [(0, 10), (20, 10)])
148
def test_coalesce_unsorted(self):
149
self.check([(20, 10, [(0, 10)]),
151
], [(20, 10), (0, 10)])
153
def test_coalesce_nearby(self):
154
self.check([(0, 20, [(0, 10), (10, 10)])],
157
def test_coalesce_overlapped(self):
158
self.check([(0, 15, [(0, 10), (5, 10)])],
161
def test_coalesce_limit(self):
162
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
163
(30, 10), (40, 10)]),
164
(60, 50, [(0, 10), (10, 10), (20, 10),
165
(30, 10), (40, 10)]),
166
], [(10, 10), (20, 10), (30, 10), (40, 10),
167
(50, 10), (60, 10), (70, 10), (80, 10),
168
(90, 10), (100, 10)],
171
def test_coalesce_no_limit(self):
172
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
173
(30, 10), (40, 10), (50, 10),
174
(60, 10), (70, 10), (80, 10),
176
], [(10, 10), (20, 10), (30, 10), (40, 10),
177
(50, 10), (60, 10), (70, 10), (80, 10),
178
(90, 10), (100, 10)])
180
def test_coalesce_fudge(self):
181
self.check([(10, 30, [(0, 10), (20, 10)]),
182
(100, 10, [(0, 10),]),
183
], [(10, 10), (30, 10), (100, 10)],
188
class TestMemoryTransport(TestCase):
190
def test_get_transport(self):
193
def test_clone(self):
194
transport = MemoryTransport()
195
self.assertTrue(isinstance(transport, MemoryTransport))
196
self.assertEqual("memory:///", transport.clone("/").base)
198
def test_abspath(self):
199
transport = MemoryTransport()
200
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
202
def test_abspath_of_root(self):
203
transport = MemoryTransport()
204
self.assertEqual("memory:///", transport.base)
205
self.assertEqual("memory:///", transport.abspath('/'))
207
def test_abspath_of_relpath_starting_at_root(self):
208
transport = MemoryTransport()
209
self.assertEqual("memory:///foo", transport.abspath('/foo'))
211
def test_append_and_get(self):
212
transport = MemoryTransport()
213
transport.append_bytes('path', 'content')
214
self.assertEqual(transport.get('path').read(), 'content')
215
transport.append_file('path', StringIO('content'))
216
self.assertEqual(transport.get('path').read(), 'contentcontent')
218
def test_put_and_get(self):
219
transport = MemoryTransport()
220
transport.put_file('path', StringIO('content'))
221
self.assertEqual(transport.get('path').read(), 'content')
222
transport.put_bytes('path', 'content')
223
self.assertEqual(transport.get('path').read(), 'content')
225
def test_append_without_dir_fails(self):
226
transport = MemoryTransport()
227
self.assertRaises(NoSuchFile,
228
transport.append_bytes, 'dir/path', 'content')
230
def test_put_without_dir_fails(self):
231
transport = MemoryTransport()
232
self.assertRaises(NoSuchFile,
233
transport.put_file, 'dir/path', StringIO('content'))
235
def test_get_missing(self):
236
transport = MemoryTransport()
237
self.assertRaises(NoSuchFile, transport.get, 'foo')
239
def test_has_missing(self):
240
transport = MemoryTransport()
241
self.assertEquals(False, transport.has('foo'))
243
def test_has_present(self):
244
transport = MemoryTransport()
245
transport.append_bytes('foo', 'content')
246
self.assertEquals(True, transport.has('foo'))
248
def test_list_dir(self):
249
transport = MemoryTransport()
250
transport.put_bytes('foo', 'content')
251
transport.mkdir('dir')
252
transport.put_bytes('dir/subfoo', 'content')
253
transport.put_bytes('dirlike', 'content')
255
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
256
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
258
def test_mkdir(self):
259
transport = MemoryTransport()
260
transport.mkdir('dir')
261
transport.append_bytes('dir/path', 'content')
262
self.assertEqual(transport.get('dir/path').read(), 'content')
264
def test_mkdir_missing_parent(self):
265
transport = MemoryTransport()
266
self.assertRaises(NoSuchFile,
267
transport.mkdir, 'dir/dir')
269
def test_mkdir_twice(self):
270
transport = MemoryTransport()
271
transport.mkdir('dir')
272
self.assertRaises(FileExists, transport.mkdir, 'dir')
274
def test_parameters(self):
275
transport = MemoryTransport()
276
self.assertEqual(True, transport.listable())
277
self.assertEqual(False, transport.should_cache())
278
self.assertEqual(False, transport.is_readonly())
280
def test_iter_files_recursive(self):
281
transport = MemoryTransport()
282
transport.mkdir('dir')
283
transport.put_bytes('dir/foo', 'content')
284
transport.put_bytes('dir/bar', 'content')
285
transport.put_bytes('bar', 'content')
286
paths = set(transport.iter_files_recursive())
287
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
290
transport = MemoryTransport()
291
transport.put_bytes('foo', 'content')
292
transport.put_bytes('bar', 'phowar')
293
self.assertEqual(7, transport.stat('foo').st_size)
294
self.assertEqual(6, transport.stat('bar').st_size)
297
class ChrootDecoratorTransportTest(TestCase):
298
"""Chroot decoration specific tests."""
300
def test_abspath(self):
301
# The abspath is always relative to the chroot_url.
302
server = ChrootServer(get_transport('memory:///foo/bar/'))
304
transport = get_transport(server.get_url())
305
self.assertEqual(server.get_url(), transport.abspath('/'))
307
subdir_transport = transport.clone('subdir')
308
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
311
def test_clone(self):
312
server = ChrootServer(get_transport('memory:///foo/bar/'))
314
transport = get_transport(server.get_url())
315
# relpath from root and root path are the same
316
relpath_cloned = transport.clone('foo')
317
abspath_cloned = transport.clone('/foo')
318
self.assertEqual(server, relpath_cloned.server)
319
self.assertEqual(server, abspath_cloned.server)
322
def test_chroot_url_preserves_chroot(self):
323
"""Calling get_transport on a chroot transport's base should produce a
324
transport with exactly the same behaviour as the original chroot
327
This is so that it is not possible to escape a chroot by doing::
328
url = chroot_transport.base
329
parent_url = urlutils.join(url, '..')
330
new_transport = get_transport(parent_url)
332
server = ChrootServer(get_transport('memory:///path/subpath'))
334
transport = get_transport(server.get_url())
335
new_transport = get_transport(transport.base)
336
self.assertEqual(transport.server, new_transport.server)
337
self.assertEqual(transport.base, new_transport.base)
340
def test_urljoin_preserves_chroot(self):
341
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
342
URL that escapes the intended chroot.
344
This is so that it is not possible to escape a chroot by doing::
345
url = chroot_transport.base
346
parent_url = urlutils.join(url, '..')
347
new_transport = get_transport(parent_url)
349
server = ChrootServer(get_transport('memory:///path/'))
351
transport = get_transport(server.get_url())
353
InvalidURLJoin, urlutils.join, transport.base, '..')
357
class ChrootServerTest(TestCase):
359
def test_construct(self):
360
backing_transport = MemoryTransport()
361
server = ChrootServer(backing_transport)
362
self.assertEqual(backing_transport, server.backing_transport)
364
def test_setUp(self):
365
backing_transport = MemoryTransport()
366
server = ChrootServer(backing_transport)
368
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
370
def test_tearDown(self):
371
backing_transport = MemoryTransport()
372
server = ChrootServer(backing_transport)
375
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
377
def test_get_url(self):
378
backing_transport = MemoryTransport()
379
server = ChrootServer(backing_transport)
381
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
385
class ReadonlyDecoratorTransportTest(TestCase):
386
"""Readonly decoration specific tests."""
388
def test_local_parameters(self):
389
import bzrlib.transport.readonly as readonly
390
# connect to . in readonly mode
391
transport = readonly.ReadonlyTransportDecorator('readonly+.')
392
self.assertEqual(True, transport.listable())
393
self.assertEqual(False, transport.should_cache())
394
self.assertEqual(True, transport.is_readonly())
396
def test_http_parameters(self):
397
from bzrlib.tests.HttpServer import HttpServer
398
import bzrlib.transport.readonly as readonly
399
# connect to . via http which is not listable
400
server = HttpServer()
403
transport = get_transport('readonly+' + server.get_url())
404
self.failUnless(isinstance(transport,
405
readonly.ReadonlyTransportDecorator))
406
self.assertEqual(False, transport.listable())
407
self.assertEqual(True, transport.should_cache())
408
self.assertEqual(True, transport.is_readonly())
413
class FakeNFSDecoratorTests(TestCaseInTempDir):
414
"""NFS decorator specific tests."""
416
def get_nfs_transport(self, url):
417
import bzrlib.transport.fakenfs as fakenfs
418
# connect to url with nfs decoration
419
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
421
def test_local_parameters(self):
422
# the listable, should_cache and is_readonly parameters
423
# are not changed by the fakenfs decorator
424
transport = self.get_nfs_transport('.')
425
self.assertEqual(True, transport.listable())
426
self.assertEqual(False, transport.should_cache())
427
self.assertEqual(False, transport.is_readonly())
429
def test_http_parameters(self):
430
# the listable, should_cache and is_readonly parameters
431
# are not changed by the fakenfs decorator
432
from bzrlib.tests.HttpServer import HttpServer
433
# connect to . via http which is not listable
434
server = HttpServer()
437
transport = self.get_nfs_transport(server.get_url())
438
self.assertIsInstance(
439
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
440
self.assertEqual(False, transport.listable())
441
self.assertEqual(True, transport.should_cache())
442
self.assertEqual(True, transport.is_readonly())
446
def test_fakenfs_server_default(self):
447
# a FakeNFSServer() should bring up a local relpath server for itself
448
import bzrlib.transport.fakenfs as fakenfs
449
server = fakenfs.FakeNFSServer()
452
# the url should be decorated appropriately
453
self.assertStartsWith(server.get_url(), 'fakenfs+')
454
# and we should be able to get a transport for it
455
transport = get_transport(server.get_url())
456
# which must be a FakeNFSTransportDecorator instance.
457
self.assertIsInstance(
458
transport, fakenfs.FakeNFSTransportDecorator)
462
def test_fakenfs_rename_semantics(self):
463
# a FakeNFS transport must mangle the way rename errors occur to
464
# look like NFS problems.
465
transport = self.get_nfs_transport('.')
466
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
468
self.assertRaises(bzrlib.errors.ResourceBusy,
469
transport.rename, 'from', 'to')
472
class FakeVFATDecoratorTests(TestCaseInTempDir):
473
"""Tests for simulation of VFAT restrictions"""
475
def get_vfat_transport(self, url):
476
"""Return vfat-backed transport for test directory"""
477
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
478
return FakeVFATTransportDecorator('vfat+' + url)
480
def test_transport_creation(self):
481
from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
482
transport = self.get_vfat_transport('.')
483
self.assertIsInstance(transport, FakeVFATTransportDecorator)
485
def test_transport_mkdir(self):
486
transport = self.get_vfat_transport('.')
487
transport.mkdir('HELLO')
488
self.assertTrue(transport.has('hello'))
489
self.assertTrue(transport.has('Hello'))
491
def test_forbidden_chars(self):
492
transport = self.get_vfat_transport('.')
493
self.assertRaises(ValueError, transport.has, "<NU>")
496
class BadTransportHandler(Transport):
497
def __init__(self, base_url):
498
raise DependencyNotPresent('some_lib', 'testing missing dependency')
501
class BackupTransportHandler(Transport):
502
"""Test transport that works as a backup for the BadTransportHandler"""
506
class TestTransportImplementation(TestCaseInTempDir):
507
"""Implementation verification for transports.
509
To verify a transport we need a server factory, which is a callable
510
that accepts no parameters and returns an implementation of
511
bzrlib.transport.Server.
513
That Server is then used to construct transport instances and test
514
the transport via loopback activity.
516
Currently this assumes that the Transport object is connected to the
517
current working directory. So that whatever is done
518
through the transport, should show up in the working
519
directory, and vice-versa. This is a bug, because its possible to have
520
URL schemes which provide access to something that may not be
521
result in storage on the local disk, i.e. due to file system limits, or
522
due to it being a database or some other non-filesystem tool.
524
This also tests to make sure that the functions work with both
525
generators and lists (assuming iter(list) is effectively a generator)
529
super(TestTransportImplementation, self).setUp()
530
self._server = self.transport_server()
532
self.addCleanup(self._server.tearDown)
534
def get_transport(self):
535
"""Return a connected transport to the local directory."""
536
base_url = self._server.get_url()
537
# try getting the transport via the regular interface:
538
t = get_transport(base_url)
539
if not isinstance(t, self.transport_class):
540
# we did not get the correct transport class type. Override the
541
# regular connection behaviour by direct construction.
542
t = self.transport_class(base_url)
546
class TestLocalTransports(TestCase):
548
def test_get_transport_from_abspath(self):
549
here = os.path.abspath('.')
550
t = get_transport(here)
551
self.assertIsInstance(t, LocalTransport)
552
self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
554
def test_get_transport_from_relpath(self):
555
here = os.path.abspath('.')
556
t = get_transport('.')
557
self.assertIsInstance(t, LocalTransport)
558
self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
560
def test_get_transport_from_local_url(self):
561
here = os.path.abspath('.')
562
here_url = urlutils.local_path_to_url(here) + '/'
563
t = get_transport(here_url)
564
self.assertIsInstance(t, LocalTransport)
565
self.assertEquals(t.base, here_url)
568
class TestWin32LocalTransport(TestCase):
570
def test_unc_clone_to_root(self):
571
# Win32 UNC path like \\HOST\path
572
# clone to root should stop at least at \\HOST part
574
t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
577
self.assertEquals(t.base, 'file://HOST/')
578
# make sure we reach the root
580
self.assertEquals(t.base, 'file://HOST/')