/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-09-19 01:33:10 UTC
  • mfrom: (4695.4.1 430529-extension-warnings)
  • Revision ID: pqm@pqm.ubuntu.com-20090919013310-4lds9snxescbsxed
(mbp) better messages about missing extensions

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
 
 
20
import bzrlib
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
from bzrlib.transport.pathfilter import PathFilteringServer
 
52
 
 
53
 
 
54
# TODO: Should possibly split transport-specific tests into their own files.
 
55
 
 
56
 
 
57
class TestTransport(TestCase):
 
58
    """Test the non transport-concrete class functionality."""
 
59
 
 
60
    def test__get_set_protocol_handlers(self):
 
61
        handlers = _get_protocol_handlers()
 
62
        self.assertNotEqual([], handlers.keys( ))
 
63
        try:
 
64
            _clear_protocol_handlers()
 
65
            self.assertEqual([], _get_protocol_handlers().keys())
 
66
        finally:
 
67
            _set_protocol_handlers(handlers)
 
68
 
 
69
    def test_get_transport_modules(self):
 
70
        handlers = _get_protocol_handlers()
 
71
        # don't pollute the current handlers
 
72
        _clear_protocol_handlers()
 
73
        class SampleHandler(object):
 
74
            """I exist, isnt that enough?"""
 
75
        try:
 
76
            _clear_protocol_handlers()
 
77
            register_transport_proto('foo')
 
78
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
79
                                    'TestTransport.SampleHandler')
 
80
            register_transport_proto('bar')
 
81
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
82
                                    'TestTransport.SampleHandler')
 
83
            self.assertEqual([SampleHandler.__module__,
 
84
                              'bzrlib.transport.chroot',
 
85
                              'bzrlib.transport.pathfilter'],
 
86
                             _get_transport_modules())
 
87
        finally:
 
88
            _set_protocol_handlers(handlers)
 
89
 
 
90
    def test_transport_dependency(self):
 
91
        """Transport with missing dependency causes no error"""
 
92
        saved_handlers = _get_protocol_handlers()
 
93
        # don't pollute the current handlers
 
94
        _clear_protocol_handlers()
 
95
        try:
 
96
            register_transport_proto('foo')
 
97
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
98
                    'BadTransportHandler')
 
99
            try:
 
100
                get_transport('foo://fooserver/foo')
 
101
            except UnsupportedProtocol, e:
 
102
                e_str = str(e)
 
103
                self.assertEquals('Unsupported protocol'
 
104
                                  ' for url "foo://fooserver/foo":'
 
105
                                  ' Unable to import library "some_lib":'
 
106
                                  ' testing missing dependency', str(e))
 
107
            else:
 
108
                self.fail('Did not raise UnsupportedProtocol')
 
109
        finally:
 
110
            # restore original values
 
111
            _set_protocol_handlers(saved_handlers)
 
112
 
 
113
    def test_transport_fallback(self):
 
114
        """Transport with missing dependency causes no error"""
 
115
        saved_handlers = _get_protocol_handlers()
 
116
        try:
 
117
            _clear_protocol_handlers()
 
118
            register_transport_proto('foo')
 
119
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
120
                    'BackupTransportHandler')
 
121
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
122
                    'BadTransportHandler')
 
123
            t = get_transport('foo://fooserver/foo')
 
124
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
125
        finally:
 
126
            _set_protocol_handlers(saved_handlers)
 
127
 
 
128
    def test_ssh_hints(self):
 
129
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
130
        try:
 
131
            get_transport('ssh://fooserver/foo')
 
132
        except UnsupportedProtocol, e:
 
133
            e_str = str(e)
 
134
            self.assertEquals('Unsupported protocol'
 
135
                              ' for url "ssh://fooserver/foo":'
 
136
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
137
                              str(e))
 
138
        else:
 
139
            self.fail('Did not raise UnsupportedProtocol')
 
140
 
 
141
    def test_LateReadError(self):
 
142
        """The LateReadError helper should raise on read()."""
 
143
        a_file = LateReadError('a path')
 
144
        try:
 
145
            a_file.read()
 
146
        except ReadError, error:
 
147
            self.assertEqual('a path', error.path)
 
148
        self.assertRaises(ReadError, a_file.read, 40)
 
149
        a_file.close()
 
150
 
 
151
    def test__combine_paths(self):
 
152
        t = Transport('/')
 
153
        self.assertEqual('/home/sarah/project/foo',
 
154
                         t._combine_paths('/home/sarah', 'project/foo'))
 
155
        self.assertEqual('/etc',
 
156
                         t._combine_paths('/home/sarah', '../../etc'))
 
157
        self.assertEqual('/etc',
 
158
                         t._combine_paths('/home/sarah', '../../../etc'))
 
159
        self.assertEqual('/etc',
 
160
                         t._combine_paths('/home/sarah', '/etc'))
 
161
 
 
162
    def test_local_abspath_non_local_transport(self):
 
163
        # the base implementation should throw
 
164
        t = MemoryTransport()
 
165
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
166
        self.assertEqual('memory:///t is not a local path.', str(e))
 
167
 
 
168
 
 
169
class TestCoalesceOffsets(TestCase):
 
170
 
 
171
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
172
        coalesce = Transport._coalesce_offsets
 
173
        exp = [_CoalescedOffset(*x) for x in expected]
 
174
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
175
                            max_size=max_size))
 
176
        self.assertEqual(exp, out)
 
177
 
 
178
    def test_coalesce_empty(self):
 
179
        self.check([], [])
 
180
 
 
181
    def test_coalesce_simple(self):
 
182
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
183
 
 
184
    def test_coalesce_unrelated(self):
 
185
        self.check([(0, 10, [(0, 10)]),
 
186
                    (20, 10, [(0, 10)]),
 
187
                   ], [(0, 10), (20, 10)])
 
188
 
 
189
    def test_coalesce_unsorted(self):
 
190
        self.check([(20, 10, [(0, 10)]),
 
191
                    (0, 10, [(0, 10)]),
 
192
                   ], [(20, 10), (0, 10)])
 
193
 
 
194
    def test_coalesce_nearby(self):
 
195
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
196
                   [(0, 10), (10, 10)])
 
197
 
 
198
    def test_coalesce_overlapped(self):
 
199
        self.assertRaises(ValueError,
 
200
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
201
                        [(0, 10), (5, 10)])
 
202
 
 
203
    def test_coalesce_limit(self):
 
204
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
205
                              (30, 10), (40, 10)]),
 
206
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
207
                              (30, 10), (40, 10)]),
 
208
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
209
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
210
                       (90, 10), (100, 10)],
 
211
                    limit=5)
 
212
 
 
213
    def test_coalesce_no_limit(self):
 
214
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
215
                               (30, 10), (40, 10), (50, 10),
 
216
                               (60, 10), (70, 10), (80, 10),
 
217
                               (90, 10)]),
 
218
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
219
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
220
                       (90, 10), (100, 10)])
 
221
 
 
222
    def test_coalesce_fudge(self):
 
223
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
224
                    (100, 10, [(0, 10),]),
 
225
                   ], [(10, 10), (30, 10), (100, 10)],
 
226
                   fudge=10
 
227
                  )
 
228
    def test_coalesce_max_size(self):
 
229
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
230
                    (30, 50, [(0, 50)]),
 
231
                    # If one range is above max_size, it gets its own coalesced
 
232
                    # offset
 
233
                    (100, 80, [(0, 80),]),],
 
234
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
235
                   max_size=50
 
236
                  )
 
237
 
 
238
    def test_coalesce_no_max_size(self):
 
239
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
240
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
241
                  )
 
242
 
 
243
    def test_coalesce_default_limit(self):
 
244
        # By default we use a 100MB max size.
 
245
        ten_mb = 10*1024*1024
 
246
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
247
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
248
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
249
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
250
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
251
                   max_size=1*1024*1024*1024)
 
252
 
 
253
 
 
254
class TestMemoryTransport(TestCase):
 
255
 
 
256
    def test_get_transport(self):
 
257
        MemoryTransport()
 
258
 
 
259
    def test_clone(self):
 
260
        transport = MemoryTransport()
 
261
        self.assertTrue(isinstance(transport, MemoryTransport))
 
262
        self.assertEqual("memory:///", transport.clone("/").base)
 
263
 
 
264
    def test_abspath(self):
 
265
        transport = MemoryTransport()
 
266
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
267
 
 
268
    def test_abspath_of_root(self):
 
269
        transport = MemoryTransport()
 
270
        self.assertEqual("memory:///", transport.base)
 
271
        self.assertEqual("memory:///", transport.abspath('/'))
 
272
 
 
273
    def test_abspath_of_relpath_starting_at_root(self):
 
274
        transport = MemoryTransport()
 
275
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
276
 
 
277
    def test_append_and_get(self):
 
278
        transport = MemoryTransport()
 
279
        transport.append_bytes('path', 'content')
 
280
        self.assertEqual(transport.get('path').read(), 'content')
 
281
        transport.append_file('path', StringIO('content'))
 
282
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
283
 
 
284
    def test_put_and_get(self):
 
285
        transport = MemoryTransport()
 
286
        transport.put_file('path', StringIO('content'))
 
287
        self.assertEqual(transport.get('path').read(), 'content')
 
288
        transport.put_bytes('path', 'content')
 
289
        self.assertEqual(transport.get('path').read(), 'content')
 
290
 
 
291
    def test_append_without_dir_fails(self):
 
292
        transport = MemoryTransport()
 
293
        self.assertRaises(NoSuchFile,
 
294
                          transport.append_bytes, 'dir/path', 'content')
 
295
 
 
296
    def test_put_without_dir_fails(self):
 
297
        transport = MemoryTransport()
 
298
        self.assertRaises(NoSuchFile,
 
299
                          transport.put_file, 'dir/path', StringIO('content'))
 
300
 
 
301
    def test_get_missing(self):
 
302
        transport = MemoryTransport()
 
303
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
304
 
 
305
    def test_has_missing(self):
 
306
        transport = MemoryTransport()
 
307
        self.assertEquals(False, transport.has('foo'))
 
308
 
 
309
    def test_has_present(self):
 
310
        transport = MemoryTransport()
 
311
        transport.append_bytes('foo', 'content')
 
312
        self.assertEquals(True, transport.has('foo'))
 
313
 
 
314
    def test_list_dir(self):
 
315
        transport = MemoryTransport()
 
316
        transport.put_bytes('foo', 'content')
 
317
        transport.mkdir('dir')
 
318
        transport.put_bytes('dir/subfoo', 'content')
 
319
        transport.put_bytes('dirlike', 'content')
 
320
 
 
321
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
322
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
323
 
 
324
    def test_mkdir(self):
 
325
        transport = MemoryTransport()
 
326
        transport.mkdir('dir')
 
327
        transport.append_bytes('dir/path', 'content')
 
328
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
329
 
 
330
    def test_mkdir_missing_parent(self):
 
331
        transport = MemoryTransport()
 
332
        self.assertRaises(NoSuchFile,
 
333
                          transport.mkdir, 'dir/dir')
 
334
 
 
335
    def test_mkdir_twice(self):
 
336
        transport = MemoryTransport()
 
337
        transport.mkdir('dir')
 
338
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
339
 
 
340
    def test_parameters(self):
 
341
        transport = MemoryTransport()
 
342
        self.assertEqual(True, transport.listable())
 
343
        self.assertEqual(False, transport.is_readonly())
 
344
 
 
345
    def test_iter_files_recursive(self):
 
346
        transport = MemoryTransport()
 
347
        transport.mkdir('dir')
 
348
        transport.put_bytes('dir/foo', 'content')
 
349
        transport.put_bytes('dir/bar', 'content')
 
350
        transport.put_bytes('bar', 'content')
 
351
        paths = set(transport.iter_files_recursive())
 
352
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
353
 
 
354
    def test_stat(self):
 
355
        transport = MemoryTransport()
 
356
        transport.put_bytes('foo', 'content')
 
357
        transport.put_bytes('bar', 'phowar')
 
358
        self.assertEqual(7, transport.stat('foo').st_size)
 
359
        self.assertEqual(6, transport.stat('bar').st_size)
 
360
 
 
361
 
 
362
class ChrootDecoratorTransportTest(TestCase):
 
363
    """Chroot decoration specific tests."""
 
364
 
 
365
    def test_abspath(self):
 
366
        # The abspath is always relative to the chroot_url.
 
367
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
368
        self.start_server(server)
 
369
        transport = get_transport(server.get_url())
 
370
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
371
 
 
372
        subdir_transport = transport.clone('subdir')
 
373
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
374
 
 
375
    def test_clone(self):
 
376
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
377
        self.start_server(server)
 
378
        transport = get_transport(server.get_url())
 
379
        # relpath from root and root path are the same
 
380
        relpath_cloned = transport.clone('foo')
 
381
        abspath_cloned = transport.clone('/foo')
 
382
        self.assertEqual(server, relpath_cloned.server)
 
383
        self.assertEqual(server, abspath_cloned.server)
 
384
 
 
385
    def test_chroot_url_preserves_chroot(self):
 
386
        """Calling get_transport on a chroot transport's base should produce a
 
387
        transport with exactly the same behaviour as the original chroot
 
388
        transport.
 
389
 
 
390
        This is so that it is not possible to escape a chroot by doing::
 
391
            url = chroot_transport.base
 
392
            parent_url = urlutils.join(url, '..')
 
393
            new_transport = get_transport(parent_url)
 
394
        """
 
395
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
396
        self.start_server(server)
 
397
        transport = get_transport(server.get_url())
 
398
        new_transport = get_transport(transport.base)
 
399
        self.assertEqual(transport.server, new_transport.server)
 
400
        self.assertEqual(transport.base, new_transport.base)
 
401
 
 
402
    def test_urljoin_preserves_chroot(self):
 
403
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
404
        URL that escapes the intended chroot.
 
405
 
 
406
        This is so that it is not possible to escape a chroot by doing::
 
407
            url = chroot_transport.base
 
408
            parent_url = urlutils.join(url, '..')
 
409
            new_transport = get_transport(parent_url)
 
410
        """
 
411
        server = ChrootServer(get_transport('memory:///path/'))
 
412
        self.start_server(server)
 
413
        transport = get_transport(server.get_url())
 
414
        self.assertRaises(
 
415
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
416
 
 
417
 
 
418
class ChrootServerTest(TestCase):
 
419
 
 
420
    def test_construct(self):
 
421
        backing_transport = MemoryTransport()
 
422
        server = ChrootServer(backing_transport)
 
423
        self.assertEqual(backing_transport, server.backing_transport)
 
424
 
 
425
    def test_setUp(self):
 
426
        backing_transport = MemoryTransport()
 
427
        server = ChrootServer(backing_transport)
 
428
        server.setUp()
 
429
        try:
 
430
            self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
431
        finally:
 
432
            server.tearDown()
 
433
 
 
434
    def test_tearDown(self):
 
435
        backing_transport = MemoryTransport()
 
436
        server = ChrootServer(backing_transport)
 
437
        server.setUp()
 
438
        server.tearDown()
 
439
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
440
 
 
441
    def test_get_url(self):
 
442
        backing_transport = MemoryTransport()
 
443
        server = ChrootServer(backing_transport)
 
444
        server.setUp()
 
445
        try:
 
446
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
447
        finally:
 
448
            server.tearDown()
 
449
 
 
450
 
 
451
class PathFilteringDecoratorTransportTest(TestCase):
 
452
    """Pathfilter decoration specific tests."""
 
453
 
 
454
    def test_abspath(self):
 
455
        # The abspath is always relative to the base of the backing transport.
 
456
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
457
            lambda x: x)
 
458
        server.setUp()
 
459
        transport = get_transport(server.get_url())
 
460
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
461
 
 
462
        subdir_transport = transport.clone('subdir')
 
463
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
464
        server.tearDown()
 
465
 
 
466
    def make_pf_transport(self, filter_func=None):
 
467
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
468
        
 
469
        :param filter_func: by default this will be a no-op function.  Use this
 
470
            parameter to override it."""
 
471
        if filter_func is None:
 
472
            filter_func = lambda x: x
 
473
        server = PathFilteringServer(
 
474
            get_transport('memory:///foo/bar/'), filter_func)
 
475
        server.setUp()
 
476
        self.addCleanup(server.tearDown)
 
477
        return get_transport(server.get_url())
 
478
 
 
479
    def test__filter(self):
 
480
        # _filter (with an identity func as filter_func) always returns
 
481
        # paths relative to the base of the backing transport.
 
482
        transport = self.make_pf_transport()
 
483
        self.assertEqual('foo', transport._filter('foo'))
 
484
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
485
        self.assertEqual('', transport._filter('..'))
 
486
        self.assertEqual('', transport._filter('/'))
 
487
        # The base of the pathfiltering transport is taken into account too.
 
488
        transport = transport.clone('subdir1/subdir2')
 
489
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
490
        self.assertEqual(
 
491
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
492
        self.assertEqual('subdir1', transport._filter('..'))
 
493
        self.assertEqual('', transport._filter('/'))
 
494
 
 
495
    def test_filter_invocation(self):
 
496
        filter_log = []
 
497
        def filter(path):
 
498
            filter_log.append(path)
 
499
            return path
 
500
        transport = self.make_pf_transport(filter)
 
501
        transport.has('abc')
 
502
        self.assertEqual(['abc'], filter_log)
 
503
        del filter_log[:]
 
504
        transport.clone('abc').has('xyz')
 
505
        self.assertEqual(['abc/xyz'], filter_log)
 
506
        del filter_log[:]
 
507
        transport.has('/abc')
 
508
        self.assertEqual(['abc'], filter_log)
 
509
 
 
510
    def test_clone(self):
 
511
        transport = self.make_pf_transport()
 
512
        # relpath from root and root path are the same
 
513
        relpath_cloned = transport.clone('foo')
 
514
        abspath_cloned = transport.clone('/foo')
 
515
        self.assertEqual(transport.server, relpath_cloned.server)
 
516
        self.assertEqual(transport.server, abspath_cloned.server)
 
517
 
 
518
    def test_url_preserves_pathfiltering(self):
 
519
        """Calling get_transport on a pathfiltered transport's base should
 
520
        produce a transport with exactly the same behaviour as the original
 
521
        pathfiltered transport.
 
522
 
 
523
        This is so that it is not possible to escape (accidentally or
 
524
        otherwise) the filtering by doing::
 
525
            url = filtered_transport.base
 
526
            parent_url = urlutils.join(url, '..')
 
527
            new_transport = get_transport(parent_url)
 
528
        """
 
529
        transport = self.make_pf_transport()
 
530
        new_transport = get_transport(transport.base)
 
531
        self.assertEqual(transport.server, new_transport.server)
 
532
        self.assertEqual(transport.base, new_transport.base)
 
533
 
 
534
 
 
535
class ReadonlyDecoratorTransportTest(TestCase):
 
536
    """Readonly decoration specific tests."""
 
537
 
 
538
    def test_local_parameters(self):
 
539
        import bzrlib.transport.readonly as readonly
 
540
        # connect to . in readonly mode
 
541
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
542
        self.assertEqual(True, transport.listable())
 
543
        self.assertEqual(True, transport.is_readonly())
 
544
 
 
545
    def test_http_parameters(self):
 
546
        from bzrlib.tests.http_server import HttpServer
 
547
        import bzrlib.transport.readonly as readonly
 
548
        # connect to '.' via http which is not listable
 
549
        server = HttpServer()
 
550
        self.start_server(server)
 
551
        transport = get_transport('readonly+' + server.get_url())
 
552
        self.failUnless(isinstance(transport,
 
553
                                   readonly.ReadonlyTransportDecorator))
 
554
        self.assertEqual(False, transport.listable())
 
555
        self.assertEqual(True, transport.is_readonly())
 
556
 
 
557
 
 
558
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
559
    """NFS decorator specific tests."""
 
560
 
 
561
    def get_nfs_transport(self, url):
 
562
        import bzrlib.transport.fakenfs as fakenfs
 
563
        # connect to url with nfs decoration
 
564
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
565
 
 
566
    def test_local_parameters(self):
 
567
        # the listable and is_readonly parameters
 
568
        # are not changed by the fakenfs decorator
 
569
        transport = self.get_nfs_transport('.')
 
570
        self.assertEqual(True, transport.listable())
 
571
        self.assertEqual(False, transport.is_readonly())
 
572
 
 
573
    def test_http_parameters(self):
 
574
        # the listable and is_readonly parameters
 
575
        # are not changed by the fakenfs decorator
 
576
        from bzrlib.tests.http_server import HttpServer
 
577
        # connect to '.' via http which is not listable
 
578
        server = HttpServer()
 
579
        self.start_server(server)
 
580
        transport = self.get_nfs_transport(server.get_url())
 
581
        self.assertIsInstance(
 
582
            transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
583
        self.assertEqual(False, transport.listable())
 
584
        self.assertEqual(True, transport.is_readonly())
 
585
 
 
586
    def test_fakenfs_server_default(self):
 
587
        # a FakeNFSServer() should bring up a local relpath server for itself
 
588
        import bzrlib.transport.fakenfs as fakenfs
 
589
        server = fakenfs.FakeNFSServer()
 
590
        self.start_server(server)
 
591
        # the url should be decorated appropriately
 
592
        self.assertStartsWith(server.get_url(), 'fakenfs+')
 
593
        # and we should be able to get a transport for it
 
594
        transport = get_transport(server.get_url())
 
595
        # which must be a FakeNFSTransportDecorator instance.
 
596
        self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
 
597
 
 
598
    def test_fakenfs_rename_semantics(self):
 
599
        # a FakeNFS transport must mangle the way rename errors occur to
 
600
        # look like NFS problems.
 
601
        transport = self.get_nfs_transport('.')
 
602
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
603
                        transport=transport)
 
604
        self.assertRaises(errors.ResourceBusy,
 
605
                          transport.rename, 'from', 'to')
 
606
 
 
607
 
 
608
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
609
    """Tests for simulation of VFAT restrictions"""
 
610
 
 
611
    def get_vfat_transport(self, url):
 
612
        """Return vfat-backed transport for test directory"""
 
613
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
614
        return FakeVFATTransportDecorator('vfat+' + url)
 
615
 
 
616
    def test_transport_creation(self):
 
617
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
618
        transport = self.get_vfat_transport('.')
 
619
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
620
 
 
621
    def test_transport_mkdir(self):
 
622
        transport = self.get_vfat_transport('.')
 
623
        transport.mkdir('HELLO')
 
624
        self.assertTrue(transport.has('hello'))
 
625
        self.assertTrue(transport.has('Hello'))
 
626
 
 
627
    def test_forbidden_chars(self):
 
628
        transport = self.get_vfat_transport('.')
 
629
        self.assertRaises(ValueError, transport.has, "<NU>")
 
630
 
 
631
 
 
632
class BadTransportHandler(Transport):
 
633
    def __init__(self, base_url):
 
634
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
635
 
 
636
 
 
637
class BackupTransportHandler(Transport):
 
638
    """Test transport that works as a backup for the BadTransportHandler"""
 
639
    pass
 
640
 
 
641
 
 
642
class TestTransportImplementation(TestCaseInTempDir):
 
643
    """Implementation verification for transports.
 
644
 
 
645
    To verify a transport we need a server factory, which is a callable
 
646
    that accepts no parameters and returns an implementation of
 
647
    bzrlib.transport.Server.
 
648
 
 
649
    That Server is then used to construct transport instances and test
 
650
    the transport via loopback activity.
 
651
 
 
652
    Currently this assumes that the Transport object is connected to the
 
653
    current working directory.  So that whatever is done
 
654
    through the transport, should show up in the working
 
655
    directory, and vice-versa. This is a bug, because its possible to have
 
656
    URL schemes which provide access to something that may not be
 
657
    result in storage on the local disk, i.e. due to file system limits, or
 
658
    due to it being a database or some other non-filesystem tool.
 
659
 
 
660
    This also tests to make sure that the functions work with both
 
661
    generators and lists (assuming iter(list) is effectively a generator)
 
662
    """
 
663
 
 
664
    def setUp(self):
 
665
        super(TestTransportImplementation, self).setUp()
 
666
        self._server = self.transport_server()
 
667
        self.start_server(self._server)
 
668
 
 
669
    def get_transport(self, relpath=None):
 
670
        """Return a connected transport to the local directory.
 
671
 
 
672
        :param relpath: a path relative to the base url.
 
673
        """
 
674
        base_url = self._server.get_url()
 
675
        url = self._adjust_url(base_url, relpath)
 
676
        # try getting the transport via the regular interface:
 
677
        t = get_transport(url)
 
678
        # vila--20070607 if the following are commented out the test suite
 
679
        # still pass. Is this really still needed or was it a forgotten
 
680
        # temporary fix ?
 
681
        if not isinstance(t, self.transport_class):
 
682
            # we did not get the correct transport class type. Override the
 
683
            # regular connection behaviour by direct construction.
 
684
            t = self.transport_class(url)
 
685
        return t
 
686
 
 
687
 
 
688
class TestLocalTransports(TestCase):
 
689
 
 
690
    def test_get_transport_from_abspath(self):
 
691
        here = osutils.abspath('.')
 
692
        t = get_transport(here)
 
693
        self.assertIsInstance(t, LocalTransport)
 
694
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
695
 
 
696
    def test_get_transport_from_relpath(self):
 
697
        here = osutils.abspath('.')
 
698
        t = get_transport('.')
 
699
        self.assertIsInstance(t, LocalTransport)
 
700
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
701
 
 
702
    def test_get_transport_from_local_url(self):
 
703
        here = osutils.abspath('.')
 
704
        here_url = urlutils.local_path_to_url(here) + '/'
 
705
        t = get_transport(here_url)
 
706
        self.assertIsInstance(t, LocalTransport)
 
707
        self.assertEquals(t.base, here_url)
 
708
 
 
709
    def test_local_abspath(self):
 
710
        here = osutils.abspath('.')
 
711
        t = get_transport(here)
 
712
        self.assertEquals(t.local_abspath(''), here)
 
713
 
 
714
 
 
715
class TestWin32LocalTransport(TestCase):
 
716
 
 
717
    def test_unc_clone_to_root(self):
 
718
        # Win32 UNC path like \\HOST\path
 
719
        # clone to root should stop at least at \\HOST part
 
720
        # not on \\
 
721
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
722
        for i in xrange(4):
 
723
            t = t.clone('..')
 
724
        self.assertEquals(t.base, 'file://HOST/')
 
725
        # make sure we reach the root
 
726
        t = t.clone('..')
 
727
        self.assertEquals(t.base, 'file://HOST/')
 
728
 
 
729
 
 
730
class TestConnectedTransport(TestCase):
 
731
    """Tests for connected to remote server transports"""
 
732
 
 
733
    def test_parse_url(self):
 
734
        t = ConnectedTransport('http://simple.example.com/home/source')
 
735
        self.assertEquals(t._host, 'simple.example.com')
 
736
        self.assertEquals(t._port, None)
 
737
        self.assertEquals(t._path, '/home/source/')
 
738
        self.failUnless(t._user is None)
 
739
        self.failUnless(t._password is None)
 
740
 
 
741
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
742
 
 
743
    def test_parse_url_with_at_in_user(self):
 
744
        # Bug 228058
 
745
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
746
        self.assertEquals(t._user, 'user@host.com')
 
747
 
 
748
    def test_parse_quoted_url(self):
 
749
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
750
        self.assertEquals(t._host, 'exAmple.com')
 
751
        self.assertEquals(t._port, 2222)
 
752
        self.assertEquals(t._user, 'robey')
 
753
        self.assertEquals(t._password, 'h@t')
 
754
        self.assertEquals(t._path, '/path/')
 
755
 
 
756
        # Base should not keep track of the password
 
757
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
758
 
 
759
    def test_parse_invalid_url(self):
 
760
        self.assertRaises(errors.InvalidURL,
 
761
                          ConnectedTransport,
 
762
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
763
 
 
764
    def test_relpath(self):
 
765
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
766
 
 
767
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
768
        self.assertRaises(errors.PathNotChild, t.relpath,
 
769
                          'http://user@host.com/abs/path/sub')
 
770
        self.assertRaises(errors.PathNotChild, t.relpath,
 
771
                          'sftp://user2@host.com/abs/path/sub')
 
772
        self.assertRaises(errors.PathNotChild, t.relpath,
 
773
                          'sftp://user@otherhost.com/abs/path/sub')
 
774
        self.assertRaises(errors.PathNotChild, t.relpath,
 
775
                          'sftp://user@host.com:33/abs/path/sub')
 
776
        # Make sure it works when we don't supply a username
 
777
        t = ConnectedTransport('sftp://host.com/abs/path')
 
778
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
779
 
 
780
        # Make sure it works when parts of the path will be url encoded
 
781
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
782
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
783
 
 
784
    def test_connection_sharing_propagate_credentials(self):
 
785
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
786
        self.assertEquals('user', t._user)
 
787
        self.assertEquals('host.com', t._host)
 
788
        self.assertIs(None, t._get_connection())
 
789
        self.assertIs(None, t._password)
 
790
        c = t.clone('subdir')
 
791
        self.assertIs(None, c._get_connection())
 
792
        self.assertIs(None, t._password)
 
793
 
 
794
        # Simulate the user entering a password
 
795
        password = 'secret'
 
796
        connection = object()
 
797
        t._set_connection(connection, password)
 
798
        self.assertIs(connection, t._get_connection())
 
799
        self.assertIs(password, t._get_credentials())
 
800
        self.assertIs(connection, c._get_connection())
 
801
        self.assertIs(password, c._get_credentials())
 
802
 
 
803
        # credentials can be updated
 
804
        new_password = 'even more secret'
 
805
        c._update_credentials(new_password)
 
806
        self.assertIs(connection, t._get_connection())
 
807
        self.assertIs(new_password, t._get_credentials())
 
808
        self.assertIs(connection, c._get_connection())
 
809
        self.assertIs(new_password, c._get_credentials())
 
810
 
 
811
 
 
812
class TestReusedTransports(TestCase):
 
813
    """Tests for transport reuse"""
 
814
 
 
815
    def test_reuse_same_transport(self):
 
816
        possible_transports = []
 
817
        t1 = get_transport('http://foo/',
 
818
                           possible_transports=possible_transports)
 
819
        self.assertEqual([t1], possible_transports)
 
820
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
821
        self.assertIs(t1, t2)
 
822
 
 
823
        # Also check that final '/' are handled correctly
 
824
        t3 = get_transport('http://foo/path/')
 
825
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
826
        self.assertIs(t3, t4)
 
827
 
 
828
        t5 = get_transport('http://foo/path')
 
829
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
830
        self.assertIs(t5, t6)
 
831
 
 
832
    def test_don_t_reuse_different_transport(self):
 
833
        t1 = get_transport('http://foo/path')
 
834
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
835
        self.assertIsNot(t1, t2)
 
836
 
 
837
 
 
838
class TestTransportTrace(TestCase):
 
839
 
 
840
    def test_get(self):
 
841
        transport = get_transport('trace+memory://')
 
842
        self.assertIsInstance(
 
843
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
844
 
 
845
    def test_clone_preserves_activity(self):
 
846
        transport = get_transport('trace+memory://')
 
847
        transport2 = transport.clone('.')
 
848
        self.assertTrue(transport is not transport2)
 
849
        self.assertTrue(transport._activity is transport2._activity)
 
850
 
 
851
    # the following specific tests are for the operations that have made use of
 
852
    # logging in tests; we could test every single operation but doing that
 
853
    # still won't cause a test failure when the top level Transport API
 
854
    # changes; so there is little return doing that.
 
855
    def test_get(self):
 
856
        transport = get_transport('trace+memory:///')
 
857
        transport.put_bytes('foo', 'barish')
 
858
        transport.get('foo')
 
859
        expected_result = []
 
860
        # put_bytes records the bytes, not the content to avoid memory
 
861
        # pressure.
 
862
        expected_result.append(('put_bytes', 'foo', 6, None))
 
863
        # get records the file name only.
 
864
        expected_result.append(('get', 'foo'))
 
865
        self.assertEqual(expected_result, transport._activity)
 
866
 
 
867
    def test_readv(self):
 
868
        transport = get_transport('trace+memory:///')
 
869
        transport.put_bytes('foo', 'barish')
 
870
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
871
            upper_limit=6))
 
872
        expected_result = []
 
873
        # put_bytes records the bytes, not the content to avoid memory
 
874
        # pressure.
 
875
        expected_result.append(('put_bytes', 'foo', 6, None))
 
876
        # readv records the supplied offset request
 
877
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
878
        self.assertEqual(expected_result, transport._activity)