1
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
21
from cStringIO import StringIO
 
 
24
from bzrlib import urlutils
 
 
25
from bzrlib.errors import (NoSuchFile, FileExists,
 
 
31
from bzrlib.tests import TestCase, TestCaseInTempDir
 
 
32
from bzrlib.transport import (_CoalescedOffset,
 
 
33
                              _get_protocol_handlers,
 
 
34
                              _get_transport_modules,
 
 
36
                              register_lazy_transport,
 
 
37
                              _set_protocol_handlers,
 
 
40
from bzrlib.transport.memory import MemoryTransport
 
 
41
from bzrlib.transport.local import LocalTransport
 
 
44
# TODO: Should possibly split transport-specific tests into their own files.
 
 
47
class TestTransport(TestCase):
 
 
48
    """Test the non transport-concrete class functionality."""
 
 
50
    def test__get_set_protocol_handlers(self):
 
 
51
        handlers = _get_protocol_handlers()
 
 
52
        self.assertNotEqual({}, handlers)
 
 
54
            _set_protocol_handlers({})
 
 
55
            self.assertEqual({}, _get_protocol_handlers())
 
 
57
            _set_protocol_handlers(handlers)
 
 
59
    def test_get_transport_modules(self):
 
 
60
        handlers = _get_protocol_handlers()
 
 
61
        class SampleHandler(object):
 
 
62
            """I exist, isnt that enough?"""
 
 
65
            _set_protocol_handlers(my_handlers)
 
 
66
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
 
67
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
 
68
            self.assertEqual([SampleHandler.__module__],
 
 
69
                             _get_transport_modules())
 
 
71
            _set_protocol_handlers(handlers)
 
 
73
    def test_transport_dependency(self):
 
 
74
        """Transport with missing dependency causes no error"""
 
 
75
        saved_handlers = _get_protocol_handlers()
 
 
77
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
 
78
                    'BadTransportHandler')
 
 
80
                get_transport('foo://fooserver/foo')
 
 
81
            except UnsupportedProtocol, e:
 
 
83
                self.assertEquals('Unsupported protocol'
 
 
84
                                  ' for url "foo://fooserver/foo":'
 
 
85
                                  ' Unable to import library "some_lib":'
 
 
86
                                  ' testing missing dependency', str(e))
 
 
88
                self.fail('Did not raise UnsupportedProtocol')
 
 
90
            # restore original values
 
 
91
            _set_protocol_handlers(saved_handlers)
 
 
93
    def test_transport_fallback(self):
 
 
94
        """Transport with missing dependency causes no error"""
 
 
95
        saved_handlers = _get_protocol_handlers()
 
 
97
            _set_protocol_handlers({})
 
 
98
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
 
99
                    'BackupTransportHandler')
 
 
100
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
 
101
                    'BadTransportHandler')
 
 
102
            t = get_transport('foo://fooserver/foo')
 
 
103
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
 
105
            _set_protocol_handlers(saved_handlers)
 
 
107
    def test__combine_paths(self):
 
 
109
        self.assertEqual('/home/sarah/project/foo',
 
 
110
                         t._combine_paths('/home/sarah', 'project/foo'))
 
 
111
        self.assertEqual('/etc',
 
 
112
                         t._combine_paths('/home/sarah', '../../etc'))
 
 
113
        self.assertEqual('/etc',
 
 
114
                         t._combine_paths('/home/sarah', '../../../etc'))
 
 
115
        self.assertEqual('/etc',
 
 
116
                         t._combine_paths('/home/sarah', '/etc'))
 
 
119
class TestCoalesceOffsets(TestCase):
 
 
121
    def check(self, expected, offsets, limit=0, fudge=0):
 
 
122
        coalesce = Transport._coalesce_offsets
 
 
123
        exp = [_CoalescedOffset(*x) for x in expected]
 
 
124
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
 
125
        self.assertEqual(exp, out)
 
 
127
    def test_coalesce_empty(self):
 
 
130
    def test_coalesce_simple(self):
 
 
131
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
 
133
    def test_coalesce_unrelated(self):
 
 
134
        self.check([(0, 10, [(0, 10)]),
 
 
136
                   ], [(0, 10), (20, 10)])
 
 
138
    def test_coalesce_unsorted(self):
 
 
139
        self.check([(20, 10, [(0, 10)]),
 
 
141
                   ], [(20, 10), (0, 10)])
 
 
143
    def test_coalesce_nearby(self):
 
 
144
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
 
147
    def test_coalesce_overlapped(self):
 
 
148
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
 
151
    def test_coalesce_limit(self):
 
 
152
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
 
153
                              (30, 10), (40, 10)]),
 
 
154
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
 
155
                              (30, 10), (40, 10)]),
 
 
156
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
 
157
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
 
158
                       (90, 10), (100, 10)],
 
 
161
    def test_coalesce_no_limit(self):
 
 
162
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
 
163
                               (30, 10), (40, 10), (50, 10),
 
 
164
                               (60, 10), (70, 10), (80, 10),
 
 
166
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
 
167
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
 
168
                       (90, 10), (100, 10)])
 
 
170
    def test_coalesce_fudge(self):
 
 
171
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
 
172
                    (100, 10, [(0, 10),]),
 
 
173
                   ], [(10, 10), (30, 10), (100, 10)],
 
 
178
class TestMemoryTransport(TestCase):
 
 
180
    def test_get_transport(self):
 
 
183
    def test_clone(self):
 
 
184
        transport = MemoryTransport()
 
 
185
        self.assertTrue(isinstance(transport, MemoryTransport))
 
 
186
        self.assertEqual("memory:///", transport.clone("/").base)
 
 
188
    def test_abspath(self):
 
 
189
        transport = MemoryTransport()
 
 
190
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
 
192
    def test_abspath_of_root(self):
 
 
193
        transport = MemoryTransport()
 
 
194
        self.assertEqual("memory:///", transport.base)
 
 
195
        self.assertEqual("memory:///", transport.abspath('/'))
 
 
197
    def test_abspath_of_relpath_starting_at_root(self):
 
 
198
        transport = MemoryTransport()
 
 
199
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
 
201
    def test_append_and_get(self):
 
 
202
        transport = MemoryTransport()
 
 
203
        transport.append_bytes('path', 'content')
 
 
204
        self.assertEqual(transport.get('path').read(), 'content')
 
 
205
        transport.append_file('path', StringIO('content'))
 
 
206
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
 
208
    def test_put_and_get(self):
 
 
209
        transport = MemoryTransport()
 
 
210
        transport.put_file('path', StringIO('content'))
 
 
211
        self.assertEqual(transport.get('path').read(), 'content')
 
 
212
        transport.put_bytes('path', 'content')
 
 
213
        self.assertEqual(transport.get('path').read(), 'content')
 
 
215
    def test_append_without_dir_fails(self):
 
 
216
        transport = MemoryTransport()
 
 
217
        self.assertRaises(NoSuchFile,
 
 
218
                          transport.append_bytes, 'dir/path', 'content')
 
 
220
    def test_put_without_dir_fails(self):
 
 
221
        transport = MemoryTransport()
 
 
222
        self.assertRaises(NoSuchFile,
 
 
223
                          transport.put_file, 'dir/path', StringIO('content'))
 
 
225
    def test_get_missing(self):
 
 
226
        transport = MemoryTransport()
 
 
227
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
 
229
    def test_has_missing(self):
 
 
230
        transport = MemoryTransport()
 
 
231
        self.assertEquals(False, transport.has('foo'))
 
 
233
    def test_has_present(self):
 
 
234
        transport = MemoryTransport()
 
 
235
        transport.append_bytes('foo', 'content')
 
 
236
        self.assertEquals(True, transport.has('foo'))
 
 
238
    def test_mkdir(self):
 
 
239
        transport = MemoryTransport()
 
 
240
        transport.mkdir('dir')
 
 
241
        transport.append_bytes('dir/path', 'content')
 
 
242
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
 
244
    def test_mkdir_missing_parent(self):
 
 
245
        transport = MemoryTransport()
 
 
246
        self.assertRaises(NoSuchFile,
 
 
247
                          transport.mkdir, 'dir/dir')
 
 
249
    def test_mkdir_twice(self):
 
 
250
        transport = MemoryTransport()
 
 
251
        transport.mkdir('dir')
 
 
252
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
 
254
    def test_parameters(self):
 
 
255
        transport = MemoryTransport()
 
 
256
        self.assertEqual(True, transport.listable())
 
 
257
        self.assertEqual(False, transport.should_cache())
 
 
258
        self.assertEqual(False, transport.is_readonly())
 
 
260
    def test_iter_files_recursive(self):
 
 
261
        transport = MemoryTransport()
 
 
262
        transport.mkdir('dir')
 
 
263
        transport.put_bytes('dir/foo', 'content')
 
 
264
        transport.put_bytes('dir/bar', 'content')
 
 
265
        transport.put_bytes('bar', 'content')
 
 
266
        paths = set(transport.iter_files_recursive())
 
 
267
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
 
270
        transport = MemoryTransport()
 
 
271
        transport.put_bytes('foo', 'content')
 
 
272
        transport.put_bytes('bar', 'phowar')
 
 
273
        self.assertEqual(7, transport.stat('foo').st_size)
 
 
274
        self.assertEqual(6, transport.stat('bar').st_size)
 
 
277
class ReadonlyDecoratorTransportTest(TestCase):
 
 
278
    """Readonly decoration specific tests."""
 
 
280
    def test_local_parameters(self):
 
 
281
        import bzrlib.transport.readonly as readonly
 
 
282
        # connect to . in readonly mode
 
 
283
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
 
284
        self.assertEqual(True, transport.listable())
 
 
285
        self.assertEqual(False, transport.should_cache())
 
 
286
        self.assertEqual(True, transport.is_readonly())
 
 
288
    def test_http_parameters(self):
 
 
289
        import bzrlib.transport.readonly as readonly
 
 
290
        from bzrlib.transport.http import HttpServer
 
 
291
        # connect to . via http which is not listable
 
 
292
        server = HttpServer()
 
 
295
            transport = get_transport('readonly+' + server.get_url())
 
 
296
            self.failUnless(isinstance(transport,
 
 
297
                                       readonly.ReadonlyTransportDecorator))
 
 
298
            self.assertEqual(False, transport.listable())
 
 
299
            self.assertEqual(True, transport.should_cache())
 
 
300
            self.assertEqual(True, transport.is_readonly())
 
 
305
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
 
306
    """NFS decorator specific tests."""
 
 
308
    def get_nfs_transport(self, url):
 
 
309
        import bzrlib.transport.fakenfs as fakenfs
 
 
310
        # connect to url with nfs decoration
 
 
311
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
 
313
    def test_local_parameters(self):
 
 
314
        # the listable, should_cache and is_readonly parameters
 
 
315
        # are not changed by the fakenfs decorator
 
 
316
        transport = self.get_nfs_transport('.')
 
 
317
        self.assertEqual(True, transport.listable())
 
 
318
        self.assertEqual(False, transport.should_cache())
 
 
319
        self.assertEqual(False, transport.is_readonly())
 
 
321
    def test_http_parameters(self):
 
 
322
        # the listable, should_cache and is_readonly parameters
 
 
323
        # are not changed by the fakenfs decorator
 
 
324
        from bzrlib.transport.http import HttpServer
 
 
325
        # connect to . via http which is not listable
 
 
326
        server = HttpServer()
 
 
329
            transport = self.get_nfs_transport(server.get_url())
 
 
330
            self.assertIsInstance(
 
 
331
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
 
332
            self.assertEqual(False, transport.listable())
 
 
333
            self.assertEqual(True, transport.should_cache())
 
 
334
            self.assertEqual(True, transport.is_readonly())
 
 
338
    def test_fakenfs_server_default(self):
 
 
339
        # a FakeNFSServer() should bring up a local relpath server for itself
 
 
340
        import bzrlib.transport.fakenfs as fakenfs
 
 
341
        server = fakenfs.FakeNFSServer()
 
 
344
            # the url should be decorated appropriately
 
 
345
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
 
346
            # and we should be able to get a transport for it
 
 
347
            transport = get_transport(server.get_url())
 
 
348
            # which must be a FakeNFSTransportDecorator instance.
 
 
349
            self.assertIsInstance(
 
 
350
                transport, fakenfs.FakeNFSTransportDecorator)
 
 
354
    def test_fakenfs_rename_semantics(self):
 
 
355
        # a FakeNFS transport must mangle the way rename errors occur to
 
 
356
        # look like NFS problems.
 
 
357
        transport = self.get_nfs_transport('.')
 
 
358
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
 
360
        self.assertRaises(bzrlib.errors.ResourceBusy,
 
 
361
                          transport.rename, 'from', 'to')
 
 
364
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
 
365
    """Tests for simulation of VFAT restrictions"""
 
 
367
    def get_vfat_transport(self, url):
 
 
368
        """Return vfat-backed transport for test directory"""
 
 
369
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
 
370
        return FakeVFATTransportDecorator('vfat+' + url)
 
 
372
    def test_transport_creation(self):
 
 
373
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
 
374
        transport = self.get_vfat_transport('.')
 
 
375
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
 
377
    def test_transport_mkdir(self):
 
 
378
        transport = self.get_vfat_transport('.')
 
 
379
        transport.mkdir('HELLO')
 
 
380
        self.assertTrue(transport.has('hello'))
 
 
381
        self.assertTrue(transport.has('Hello'))
 
 
383
    def test_forbidden_chars(self):
 
 
384
        transport = self.get_vfat_transport('.')
 
 
385
        self.assertRaises(ValueError, transport.has, "<NU>")
 
 
388
class BadTransportHandler(Transport):
 
 
389
    def __init__(self, base_url):
 
 
390
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
 
393
class BackupTransportHandler(Transport):
 
 
394
    """Test transport that works as a backup for the BadTransportHandler"""
 
 
398
class TestTransportImplementation(TestCaseInTempDir):
 
 
399
    """Implementation verification for transports.
 
 
401
    To verify a transport we need a server factory, which is a callable
 
 
402
    that accepts no parameters and returns an implementation of
 
 
403
    bzrlib.transport.Server.
 
 
405
    That Server is then used to construct transport instances and test
 
 
406
    the transport via loopback activity.
 
 
408
    Currently this assumes that the Transport object is connected to the 
 
 
409
    current working directory.  So that whatever is done 
 
 
410
    through the transport, should show up in the working 
 
 
411
    directory, and vice-versa. This is a bug, because its possible to have
 
 
412
    URL schemes which provide access to something that may not be 
 
 
413
    result in storage on the local disk, i.e. due to file system limits, or 
 
 
414
    due to it being a database or some other non-filesystem tool.
 
 
416
    This also tests to make sure that the functions work with both
 
 
417
    generators and lists (assuming iter(list) is effectively a generator)
 
 
421
        super(TestTransportImplementation, self).setUp()
 
 
422
        self._server = self.transport_server()
 
 
426
        super(TestTransportImplementation, self).tearDown()
 
 
427
        self._server.tearDown()
 
 
429
    def get_transport(self):
 
 
430
        """Return a connected transport to the local directory."""
 
 
431
        base_url = self._server.get_url()
 
 
432
        # try getting the transport via the regular interface:
 
 
433
        t = get_transport(base_url)
 
 
434
        if not isinstance(t, self.transport_class):
 
 
435
            # we did not get the correct transport class type. Override the
 
 
436
            # regular connection behaviour by direct construction.
 
 
437
            t = self.transport_class(base_url)
 
 
441
class TestLocalTransports(TestCase):
 
 
443
    def test_get_transport_from_abspath(self):
 
 
444
        here = os.path.abspath('.')
 
 
445
        t = get_transport(here)
 
 
446
        self.assertIsInstance(t, LocalTransport)
 
 
447
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
 
449
    def test_get_transport_from_relpath(self):
 
 
450
        here = os.path.abspath('.')
 
 
451
        t = get_transport('.')
 
 
452
        self.assertIsInstance(t, LocalTransport)
 
 
453
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
 
455
    def test_get_transport_from_local_url(self):
 
 
456
        here = os.path.abspath('.')
 
 
457
        here_url = urlutils.local_path_to_url(here) + '/'
 
 
458
        t = get_transport(here_url)
 
 
459
        self.assertIsInstance(t, LocalTransport)
 
 
460
        self.assertEquals(t.base, here_url)