1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
 
 
3
# This program is free software; you can redistribute it and/or modify
 
 
4
# it under the terms of the GNU General Public License as published by
 
 
5
# the Free Software Foundation; either version 2 of the License, or
 
 
6
# (at your option) any later version.
 
 
8
# This program is distributed in the hope that it will be useful,
 
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
 
11
# GNU General Public License for more details.
 
 
13
# You should have received a copy of the GNU General Public License
 
 
14
# along with this program; if not, write to the Free Software
 
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 
21
from cStringIO import StringIO
 
 
24
from bzrlib.errors import (NoSuchFile, FileExists,
 
 
30
from bzrlib.tests import TestCase, TestCaseInTempDir
 
 
31
from bzrlib.transport import (_CoalescedOffset,
 
 
32
                              _get_protocol_handlers,
 
 
33
                              _get_transport_modules,
 
 
35
                              register_lazy_transport,
 
 
36
                              _set_protocol_handlers,
 
 
39
from bzrlib.transport.memory import MemoryTransport
 
 
40
from bzrlib.transport.local import LocalTransport
 
 
43
class TestTransport(TestCase):
 
 
44
    """Test the non transport-concrete class functionality."""
 
 
46
    def test__get_set_protocol_handlers(self):
 
 
47
        handlers = _get_protocol_handlers()
 
 
48
        self.assertNotEqual({}, handlers)
 
 
50
            _set_protocol_handlers({})
 
 
51
            self.assertEqual({}, _get_protocol_handlers())
 
 
53
            _set_protocol_handlers(handlers)
 
 
55
    def test_get_transport_modules(self):
 
 
56
        handlers = _get_protocol_handlers()
 
 
57
        class SampleHandler(object):
 
 
58
            """I exist, isnt that enough?"""
 
 
61
            _set_protocol_handlers(my_handlers)
 
 
62
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
 
63
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
 
64
            self.assertEqual([SampleHandler.__module__],
 
 
65
                             _get_transport_modules())
 
 
67
            _set_protocol_handlers(handlers)
 
 
69
    def test_transport_dependency(self):
 
 
70
        """Transport with missing dependency causes no error"""
 
 
71
        saved_handlers = _get_protocol_handlers()
 
 
73
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
 
74
                    'BadTransportHandler')
 
 
76
                get_transport('foo://fooserver/foo')
 
 
77
            except UnsupportedProtocol, e:
 
 
79
                self.assertEquals('Unsupported protocol'
 
 
80
                                  ' for url "foo://fooserver/foo":'
 
 
81
                                  ' Unable to import library "some_lib":'
 
 
82
                                  ' testing missing dependency', str(e))
 
 
84
                self.fail('Did not raise UnsupportedProtocol')
 
 
86
            # restore original values
 
 
87
            _set_protocol_handlers(saved_handlers)
 
 
89
    def test_transport_fallback(self):
 
 
90
        """Transport with missing dependency causes no error"""
 
 
91
        saved_handlers = _get_protocol_handlers()
 
 
93
            _set_protocol_handlers({})
 
 
94
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
 
95
                    'BackupTransportHandler')
 
 
96
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
 
97
                    'BadTransportHandler')
 
 
98
            t = get_transport('foo://fooserver/foo')
 
 
99
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
 
101
            _set_protocol_handlers(saved_handlers)
 
 
104
class TestCoalesceOffsets(TestCase):
 
 
106
    def check(self, expected, offsets, limit=0, fudge=0):
 
 
107
        coalesce = Transport._coalesce_offsets
 
 
108
        exp = [_CoalescedOffset(*x) for x in expected]
 
 
109
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
 
110
        self.assertEqual(exp, out)
 
 
112
    def test_coalesce_empty(self):
 
 
115
    def test_coalesce_simple(self):
 
 
116
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
 
118
    def test_coalesce_unrelated(self):
 
 
119
        self.check([(0, 10, [(0, 10)]),
 
 
121
                   ], [(0, 10), (20, 10)])
 
 
123
    def test_coalesce_unsorted(self):
 
 
124
        self.check([(20, 10, [(0, 10)]),
 
 
126
                   ], [(20, 10), (0, 10)])
 
 
128
    def test_coalesce_nearby(self):
 
 
129
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
 
132
    def test_coalesce_overlapped(self):
 
 
133
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
 
136
    def test_coalesce_limit(self):
 
 
137
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
 
138
                              (30, 10), (40, 10)]),
 
 
139
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
 
140
                              (30, 10), (40, 10)]),
 
 
141
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
 
142
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
 
143
                       (90, 10), (100, 10)],
 
 
146
    def test_coalesce_no_limit(self):
 
 
147
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
 
148
                               (30, 10), (40, 10), (50, 10),
 
 
149
                               (60, 10), (70, 10), (80, 10),
 
 
151
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
 
152
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
 
153
                       (90, 10), (100, 10)])
 
 
155
    def test_coalesce_fudge(self):
 
 
156
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
 
157
                    (100, 10, [(0, 10),]),
 
 
158
                   ], [(10, 10), (30, 10), (100, 10)],
 
 
163
class TestMemoryTransport(TestCase):
 
 
165
    def test_get_transport(self):
 
 
168
    def test_clone(self):
 
 
169
        transport = MemoryTransport()
 
 
170
        self.assertTrue(isinstance(transport, MemoryTransport))
 
 
172
    def test_abspath(self):
 
 
173
        transport = MemoryTransport()
 
 
174
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
 
176
    def test_relpath(self):
 
 
177
        transport = MemoryTransport()
 
 
179
    def test_append_and_get(self):
 
 
180
        transport = MemoryTransport()
 
 
181
        transport.append('path', StringIO('content'))
 
 
182
        self.assertEqual(transport.get('path').read(), 'content')
 
 
183
        transport.append('path', StringIO('content'))
 
 
184
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
 
186
    def test_put_and_get(self):
 
 
187
        transport = MemoryTransport()
 
 
188
        transport.put('path', StringIO('content'))
 
 
189
        self.assertEqual(transport.get('path').read(), 'content')
 
 
190
        transport.put('path', StringIO('content'))
 
 
191
        self.assertEqual(transport.get('path').read(), 'content')
 
 
193
    def test_append_without_dir_fails(self):
 
 
194
        transport = MemoryTransport()
 
 
195
        self.assertRaises(NoSuchFile,
 
 
196
                          transport.append, 'dir/path', StringIO('content'))
 
 
198
    def test_put_without_dir_fails(self):
 
 
199
        transport = MemoryTransport()
 
 
200
        self.assertRaises(NoSuchFile,
 
 
201
                          transport.put, 'dir/path', StringIO('content'))
 
 
203
    def test_get_missing(self):
 
 
204
        transport = MemoryTransport()
 
 
205
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
 
207
    def test_has_missing(self):
 
 
208
        transport = MemoryTransport()
 
 
209
        self.assertEquals(False, transport.has('foo'))
 
 
211
    def test_has_present(self):
 
 
212
        transport = MemoryTransport()
 
 
213
        transport.append('foo', StringIO('content'))
 
 
214
        self.assertEquals(True, transport.has('foo'))
 
 
216
    def test_mkdir(self):
 
 
217
        transport = MemoryTransport()
 
 
218
        transport.mkdir('dir')
 
 
219
        transport.append('dir/path', StringIO('content'))
 
 
220
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
 
222
    def test_mkdir_missing_parent(self):
 
 
223
        transport = MemoryTransport()
 
 
224
        self.assertRaises(NoSuchFile,
 
 
225
                          transport.mkdir, 'dir/dir')
 
 
227
    def test_mkdir_twice(self):
 
 
228
        transport = MemoryTransport()
 
 
229
        transport.mkdir('dir')
 
 
230
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
 
232
    def test_parameters(self):
 
 
233
        transport = MemoryTransport()
 
 
234
        self.assertEqual(True, transport.listable())
 
 
235
        self.assertEqual(False, transport.should_cache())
 
 
236
        self.assertEqual(False, transport.is_readonly())
 
 
238
    def test_iter_files_recursive(self):
 
 
239
        transport = MemoryTransport()
 
 
240
        transport.mkdir('dir')
 
 
241
        transport.put('dir/foo', StringIO('content'))
 
 
242
        transport.put('dir/bar', StringIO('content'))
 
 
243
        transport.put('bar', StringIO('content'))
 
 
244
        paths = set(transport.iter_files_recursive())
 
 
245
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
 
248
        transport = MemoryTransport()
 
 
249
        transport.put('foo', StringIO('content'))
 
 
250
        transport.put('bar', StringIO('phowar'))
 
 
251
        self.assertEqual(7, transport.stat('foo').st_size)
 
 
252
        self.assertEqual(6, transport.stat('bar').st_size)
 
 
255
class ReadonlyDecoratorTransportTest(TestCase):
 
 
256
    """Readonly decoration specific tests."""
 
 
258
    def test_local_parameters(self):
 
 
259
        import bzrlib.transport.readonly as readonly
 
 
260
        # connect to . in readonly mode
 
 
261
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
 
262
        self.assertEqual(True, transport.listable())
 
 
263
        self.assertEqual(False, transport.should_cache())
 
 
264
        self.assertEqual(True, transport.is_readonly())
 
 
266
    def test_http_parameters(self):
 
 
267
        import bzrlib.transport.readonly as readonly
 
 
268
        from bzrlib.transport.http import HttpServer
 
 
269
        # connect to . via http which is not listable
 
 
270
        server = HttpServer()
 
 
273
            transport = get_transport('readonly+' + server.get_url())
 
 
274
            self.failUnless(isinstance(transport,
 
 
275
                                       readonly.ReadonlyTransportDecorator))
 
 
276
            self.assertEqual(False, transport.listable())
 
 
277
            self.assertEqual(True, transport.should_cache())
 
 
278
            self.assertEqual(True, transport.is_readonly())
 
 
283
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
 
284
    """NFS decorator specific tests."""
 
 
286
    def get_nfs_transport(self, url):
 
 
287
        import bzrlib.transport.fakenfs as fakenfs
 
 
288
        # connect to url with nfs decoration
 
 
289
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
 
291
    def test_local_parameters(self):
 
 
292
        # the listable, should_cache and is_readonly parameters
 
 
293
        # are not changed by the fakenfs decorator
 
 
294
        transport = self.get_nfs_transport('.')
 
 
295
        self.assertEqual(True, transport.listable())
 
 
296
        self.assertEqual(False, transport.should_cache())
 
 
297
        self.assertEqual(False, transport.is_readonly())
 
 
299
    def test_http_parameters(self):
 
 
300
        # the listable, should_cache and is_readonly parameters
 
 
301
        # are not changed by the fakenfs decorator
 
 
302
        from bzrlib.transport.http import HttpServer
 
 
303
        # connect to . via http which is not listable
 
 
304
        server = HttpServer()
 
 
307
            transport = self.get_nfs_transport(server.get_url())
 
 
308
            self.assertIsInstance(
 
 
309
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
 
310
            self.assertEqual(False, transport.listable())
 
 
311
            self.assertEqual(True, transport.should_cache())
 
 
312
            self.assertEqual(True, transport.is_readonly())
 
 
316
    def test_fakenfs_server_default(self):
 
 
317
        # a FakeNFSServer() should bring up a local relpath server for itself
 
 
318
        import bzrlib.transport.fakenfs as fakenfs
 
 
319
        server = fakenfs.FakeNFSServer()
 
 
322
            # the server should be a relpath localhost server
 
 
323
            self.assertEqual(server.get_url(), 'fakenfs+.')
 
 
324
            # and we should be able to get a transport for it
 
 
325
            transport = get_transport(server.get_url())
 
 
326
            # which must be a FakeNFSTransportDecorator instance.
 
 
327
            self.assertIsInstance(
 
 
328
                transport, fakenfs.FakeNFSTransportDecorator)
 
 
332
    def test_fakenfs_rename_semantics(self):
 
 
333
        # a FakeNFS transport must mangle the way rename errors occur to
 
 
334
        # look like NFS problems.
 
 
335
        transport = self.get_nfs_transport('.')
 
 
336
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
 
338
        self.assertRaises(bzrlib.errors.ResourceBusy,
 
 
339
                          transport.rename, 'from', 'to')
 
 
342
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
 
343
    """Tests for simulation of VFAT restrictions"""
 
 
345
    def get_vfat_transport(self, url):
 
 
346
        """Return vfat-backed transport for test directory"""
 
 
347
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
 
348
        return FakeVFATTransportDecorator('vfat+' + url)
 
 
350
    def test_transport_creation(self):
 
 
351
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
 
352
        transport = self.get_vfat_transport('.')
 
 
353
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
 
355
    def test_transport_mkdir(self):
 
 
356
        transport = self.get_vfat_transport('.')
 
 
357
        transport.mkdir('HELLO')
 
 
358
        self.assertTrue(transport.has('hello'))
 
 
359
        self.assertTrue(transport.has('Hello'))
 
 
361
    def test_forbidden_chars(self):
 
 
362
        transport = self.get_vfat_transport('.')
 
 
363
        self.assertRaises(ValueError, transport.has, "<NU>")
 
 
366
class BadTransportHandler(Transport):
 
 
367
    def __init__(self, base_url):
 
 
368
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
 
371
class BackupTransportHandler(Transport):
 
 
372
    """Test transport that works as a backup for the BadTransportHandler"""
 
 
376
class TestTransportImplementation(TestCaseInTempDir):
 
 
377
    """Implementation verification for transports.
 
 
379
    To verify a transport we need a server factory, which is a callable
 
 
380
    that accepts no parameters and returns an implementation of
 
 
381
    bzrlib.transport.Server.
 
 
383
    That Server is then used to construct transport instances and test
 
 
384
    the transport via loopback activity.
 
 
386
    Currently this assumes that the Transport object is connected to the 
 
 
387
    current working directory.  So that whatever is done 
 
 
388
    through the transport, should show up in the working 
 
 
389
    directory, and vice-versa. This is a bug, because its possible to have
 
 
390
    URL schemes which provide access to something that may not be 
 
 
391
    result in storage on the local disk, i.e. due to file system limits, or 
 
 
392
    due to it being a database or some other non-filesystem tool.
 
 
394
    This also tests to make sure that the functions work with both
 
 
395
    generators and lists (assuming iter(list) is effectively a generator)
 
 
399
        super(TestTransportImplementation, self).setUp()
 
 
400
        self._server = self.transport_server()
 
 
404
        super(TestTransportImplementation, self).tearDown()
 
 
405
        self._server.tearDown()
 
 
407
    def get_transport(self):
 
 
408
        """Return a connected transport to the local directory."""
 
 
409
        base_url = self._server.get_url()
 
 
410
        # try getting the transport via the regular interface:
 
 
411
        t = get_transport(base_url)
 
 
412
        if not isinstance(t, self.transport_class): 
 
 
413
            # we did not get the correct transport class type. Override the
 
 
414
            # regular connection behaviour by direct construction.
 
 
415
            t = self.transport_class(base_url)