/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Andrew Bennetts
  • Date: 2009-09-17 03:16:05 UTC
  • mto: This revision was merged to the branch mainline in revision 4702.
  • Revision ID: andrew.bennetts@canonical.com-20090917031605-xilizo5jfq4scbw0
Update documentation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from cStringIO import StringIO
 
19
 
 
20
import bzrlib
 
21
from bzrlib import (
 
22
    errors,
 
23
    osutils,
 
24
    urlutils,
 
25
    )
 
26
from bzrlib.errors import (DependencyNotPresent,
 
27
                           FileExists,
 
28
                           InvalidURLJoin,
 
29
                           NoSuchFile,
 
30
                           PathNotChild,
 
31
                           ReadError,
 
32
                           UnsupportedProtocol,
 
33
                           )
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir
 
35
from bzrlib.transport import (_clear_protocol_handlers,
 
36
                              _CoalescedOffset,
 
37
                              ConnectedTransport,
 
38
                              _get_protocol_handlers,
 
39
                              _set_protocol_handlers,
 
40
                              _get_transport_modules,
 
41
                              get_transport,
 
42
                              LateReadError,
 
43
                              register_lazy_transport,
 
44
                              register_transport_proto,
 
45
                              Transport,
 
46
                              )
 
47
from bzrlib.transport.chroot import ChrootServer
 
48
from bzrlib.transport.memory import MemoryTransport
 
49
from bzrlib.transport.local import (LocalTransport,
 
50
                                    EmulatedWin32LocalTransport)
 
51
from bzrlib.transport.pathfilter import PathFilteringServer
 
52
 
 
53
 
 
54
# TODO: Should possibly split transport-specific tests into their own files.
 
55
 
 
56
 
 
57
class TestTransport(TestCase):
 
58
    """Test the non transport-concrete class functionality."""
 
59
 
 
60
    def test__get_set_protocol_handlers(self):
 
61
        handlers = _get_protocol_handlers()
 
62
        self.assertNotEqual([], handlers.keys( ))
 
63
        try:
 
64
            _clear_protocol_handlers()
 
65
            self.assertEqual([], _get_protocol_handlers().keys())
 
66
        finally:
 
67
            _set_protocol_handlers(handlers)
 
68
 
 
69
    def test_get_transport_modules(self):
 
70
        handlers = _get_protocol_handlers()
 
71
        # don't pollute the current handlers
 
72
        _clear_protocol_handlers()
 
73
        class SampleHandler(object):
 
74
            """I exist, isnt that enough?"""
 
75
        try:
 
76
            _clear_protocol_handlers()
 
77
            register_transport_proto('foo')
 
78
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
79
                                    'TestTransport.SampleHandler')
 
80
            register_transport_proto('bar')
 
81
            register_lazy_transport('bar', 'bzrlib.tests.test_transport',
 
82
                                    'TestTransport.SampleHandler')
 
83
            self.assertEqual([SampleHandler.__module__,
 
84
                              'bzrlib.transport.chroot'],
 
85
                             _get_transport_modules())
 
86
        finally:
 
87
            _set_protocol_handlers(handlers)
 
88
 
 
89
    def test_transport_dependency(self):
 
90
        """Transport with missing dependency causes no error"""
 
91
        saved_handlers = _get_protocol_handlers()
 
92
        # don't pollute the current handlers
 
93
        _clear_protocol_handlers()
 
94
        try:
 
95
            register_transport_proto('foo')
 
96
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
97
                    'BadTransportHandler')
 
98
            try:
 
99
                get_transport('foo://fooserver/foo')
 
100
            except UnsupportedProtocol, e:
 
101
                e_str = str(e)
 
102
                self.assertEquals('Unsupported protocol'
 
103
                                  ' for url "foo://fooserver/foo":'
 
104
                                  ' Unable to import library "some_lib":'
 
105
                                  ' testing missing dependency', str(e))
 
106
            else:
 
107
                self.fail('Did not raise UnsupportedProtocol')
 
108
        finally:
 
109
            # restore original values
 
110
            _set_protocol_handlers(saved_handlers)
 
111
 
 
112
    def test_transport_fallback(self):
 
113
        """Transport with missing dependency causes no error"""
 
114
        saved_handlers = _get_protocol_handlers()
 
115
        try:
 
116
            _clear_protocol_handlers()
 
117
            register_transport_proto('foo')
 
118
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
119
                    'BackupTransportHandler')
 
120
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
121
                    'BadTransportHandler')
 
122
            t = get_transport('foo://fooserver/foo')
 
123
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
124
        finally:
 
125
            _set_protocol_handlers(saved_handlers)
 
126
 
 
127
    def test_ssh_hints(self):
 
128
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
 
129
        try:
 
130
            get_transport('ssh://fooserver/foo')
 
131
        except UnsupportedProtocol, e:
 
132
            e_str = str(e)
 
133
            self.assertEquals('Unsupported protocol'
 
134
                              ' for url "ssh://fooserver/foo":'
 
135
                              ' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
 
136
                              str(e))
 
137
        else:
 
138
            self.fail('Did not raise UnsupportedProtocol')
 
139
 
 
140
    def test_LateReadError(self):
 
141
        """The LateReadError helper should raise on read()."""
 
142
        a_file = LateReadError('a path')
 
143
        try:
 
144
            a_file.read()
 
145
        except ReadError, error:
 
146
            self.assertEqual('a path', error.path)
 
147
        self.assertRaises(ReadError, a_file.read, 40)
 
148
        a_file.close()
 
149
 
 
150
    def test__combine_paths(self):
 
151
        t = Transport('/')
 
152
        self.assertEqual('/home/sarah/project/foo',
 
153
                         t._combine_paths('/home/sarah', 'project/foo'))
 
154
        self.assertEqual('/etc',
 
155
                         t._combine_paths('/home/sarah', '../../etc'))
 
156
        self.assertEqual('/etc',
 
157
                         t._combine_paths('/home/sarah', '../../../etc'))
 
158
        self.assertEqual('/etc',
 
159
                         t._combine_paths('/home/sarah', '/etc'))
 
160
 
 
161
    def test_local_abspath_non_local_transport(self):
 
162
        # the base implementation should throw
 
163
        t = MemoryTransport()
 
164
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
165
        self.assertEqual('memory:///t is not a local path.', str(e))
 
166
 
 
167
 
 
168
class TestCoalesceOffsets(TestCase):
 
169
 
 
170
    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
 
171
        coalesce = Transport._coalesce_offsets
 
172
        exp = [_CoalescedOffset(*x) for x in expected]
 
173
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
 
174
                            max_size=max_size))
 
175
        self.assertEqual(exp, out)
 
176
 
 
177
    def test_coalesce_empty(self):
 
178
        self.check([], [])
 
179
 
 
180
    def test_coalesce_simple(self):
 
181
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
182
 
 
183
    def test_coalesce_unrelated(self):
 
184
        self.check([(0, 10, [(0, 10)]),
 
185
                    (20, 10, [(0, 10)]),
 
186
                   ], [(0, 10), (20, 10)])
 
187
 
 
188
    def test_coalesce_unsorted(self):
 
189
        self.check([(20, 10, [(0, 10)]),
 
190
                    (0, 10, [(0, 10)]),
 
191
                   ], [(20, 10), (0, 10)])
 
192
 
 
193
    def test_coalesce_nearby(self):
 
194
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
195
                   [(0, 10), (10, 10)])
 
196
 
 
197
    def test_coalesce_overlapped(self):
 
198
        self.assertRaises(ValueError,
 
199
            self.check, [(0, 15, [(0, 10), (5, 10)])],
 
200
                        [(0, 10), (5, 10)])
 
201
 
 
202
    def test_coalesce_limit(self):
 
203
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
204
                              (30, 10), (40, 10)]),
 
205
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
206
                              (30, 10), (40, 10)]),
 
207
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
208
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
209
                       (90, 10), (100, 10)],
 
210
                    limit=5)
 
211
 
 
212
    def test_coalesce_no_limit(self):
 
213
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
214
                               (30, 10), (40, 10), (50, 10),
 
215
                               (60, 10), (70, 10), (80, 10),
 
216
                               (90, 10)]),
 
217
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
218
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
219
                       (90, 10), (100, 10)])
 
220
 
 
221
    def test_coalesce_fudge(self):
 
222
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
223
                    (100, 10, [(0, 10),]),
 
224
                   ], [(10, 10), (30, 10), (100, 10)],
 
225
                   fudge=10
 
226
                  )
 
227
    def test_coalesce_max_size(self):
 
228
        self.check([(10, 20, [(0, 10), (10, 10)]),
 
229
                    (30, 50, [(0, 50)]),
 
230
                    # If one range is above max_size, it gets its own coalesced
 
231
                    # offset
 
232
                    (100, 80, [(0, 80),]),],
 
233
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
 
234
                   max_size=50
 
235
                  )
 
236
 
 
237
    def test_coalesce_no_max_size(self):
 
238
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
239
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
 
240
                  )
 
241
 
 
242
    def test_coalesce_default_limit(self):
 
243
        # By default we use a 100MB max size.
 
244
        ten_mb = 10*1024*1024
 
245
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
246
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
 
247
                   [(i*ten_mb, ten_mb) for i in range(11)])
 
248
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
 
249
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
250
                   max_size=1*1024*1024*1024)
 
251
 
 
252
 
 
253
class TestMemoryTransport(TestCase):
 
254
 
 
255
    def test_get_transport(self):
 
256
        MemoryTransport()
 
257
 
 
258
    def test_clone(self):
 
259
        transport = MemoryTransport()
 
260
        self.assertTrue(isinstance(transport, MemoryTransport))
 
261
        self.assertEqual("memory:///", transport.clone("/").base)
 
262
 
 
263
    def test_abspath(self):
 
264
        transport = MemoryTransport()
 
265
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
266
 
 
267
    def test_abspath_of_root(self):
 
268
        transport = MemoryTransport()
 
269
        self.assertEqual("memory:///", transport.base)
 
270
        self.assertEqual("memory:///", transport.abspath('/'))
 
271
 
 
272
    def test_abspath_of_relpath_starting_at_root(self):
 
273
        transport = MemoryTransport()
 
274
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
275
 
 
276
    def test_append_and_get(self):
 
277
        transport = MemoryTransport()
 
278
        transport.append_bytes('path', 'content')
 
279
        self.assertEqual(transport.get('path').read(), 'content')
 
280
        transport.append_file('path', StringIO('content'))
 
281
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
282
 
 
283
    def test_put_and_get(self):
 
284
        transport = MemoryTransport()
 
285
        transport.put_file('path', StringIO('content'))
 
286
        self.assertEqual(transport.get('path').read(), 'content')
 
287
        transport.put_bytes('path', 'content')
 
288
        self.assertEqual(transport.get('path').read(), 'content')
 
289
 
 
290
    def test_append_without_dir_fails(self):
 
291
        transport = MemoryTransport()
 
292
        self.assertRaises(NoSuchFile,
 
293
                          transport.append_bytes, 'dir/path', 'content')
 
294
 
 
295
    def test_put_without_dir_fails(self):
 
296
        transport = MemoryTransport()
 
297
        self.assertRaises(NoSuchFile,
 
298
                          transport.put_file, 'dir/path', StringIO('content'))
 
299
 
 
300
    def test_get_missing(self):
 
301
        transport = MemoryTransport()
 
302
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
303
 
 
304
    def test_has_missing(self):
 
305
        transport = MemoryTransport()
 
306
        self.assertEquals(False, transport.has('foo'))
 
307
 
 
308
    def test_has_present(self):
 
309
        transport = MemoryTransport()
 
310
        transport.append_bytes('foo', 'content')
 
311
        self.assertEquals(True, transport.has('foo'))
 
312
 
 
313
    def test_list_dir(self):
 
314
        transport = MemoryTransport()
 
315
        transport.put_bytes('foo', 'content')
 
316
        transport.mkdir('dir')
 
317
        transport.put_bytes('dir/subfoo', 'content')
 
318
        transport.put_bytes('dirlike', 'content')
 
319
 
 
320
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
321
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
322
 
 
323
    def test_mkdir(self):
 
324
        transport = MemoryTransport()
 
325
        transport.mkdir('dir')
 
326
        transport.append_bytes('dir/path', 'content')
 
327
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
328
 
 
329
    def test_mkdir_missing_parent(self):
 
330
        transport = MemoryTransport()
 
331
        self.assertRaises(NoSuchFile,
 
332
                          transport.mkdir, 'dir/dir')
 
333
 
 
334
    def test_mkdir_twice(self):
 
335
        transport = MemoryTransport()
 
336
        transport.mkdir('dir')
 
337
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
338
 
 
339
    def test_parameters(self):
 
340
        transport = MemoryTransport()
 
341
        self.assertEqual(True, transport.listable())
 
342
        self.assertEqual(False, transport.is_readonly())
 
343
 
 
344
    def test_iter_files_recursive(self):
 
345
        transport = MemoryTransport()
 
346
        transport.mkdir('dir')
 
347
        transport.put_bytes('dir/foo', 'content')
 
348
        transport.put_bytes('dir/bar', 'content')
 
349
        transport.put_bytes('bar', 'content')
 
350
        paths = set(transport.iter_files_recursive())
 
351
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
352
 
 
353
    def test_stat(self):
 
354
        transport = MemoryTransport()
 
355
        transport.put_bytes('foo', 'content')
 
356
        transport.put_bytes('bar', 'phowar')
 
357
        self.assertEqual(7, transport.stat('foo').st_size)
 
358
        self.assertEqual(6, transport.stat('bar').st_size)
 
359
 
 
360
 
 
361
class ChrootDecoratorTransportTest(TestCase):
 
362
    """Chroot decoration specific tests."""
 
363
 
 
364
    def test_abspath(self):
 
365
        # The abspath is always relative to the chroot_url.
 
366
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
367
        server.setUp()
 
368
        transport = get_transport(server.get_url())
 
369
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
370
 
 
371
        subdir_transport = transport.clone('subdir')
 
372
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
373
        server.tearDown()
 
374
 
 
375
    def test_clone(self):
 
376
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
377
        server.setUp()
 
378
        transport = get_transport(server.get_url())
 
379
        # relpath from root and root path are the same
 
380
        relpath_cloned = transport.clone('foo')
 
381
        abspath_cloned = transport.clone('/foo')
 
382
        self.assertEqual(server, relpath_cloned.server)
 
383
        self.assertEqual(server, abspath_cloned.server)
 
384
        server.tearDown()
 
385
 
 
386
    def test_chroot_url_preserves_chroot(self):
 
387
        """Calling get_transport on a chroot transport's base should produce a
 
388
        transport with exactly the same behaviour as the original chroot
 
389
        transport.
 
390
 
 
391
        This is so that it is not possible to escape a chroot by doing::
 
392
            url = chroot_transport.base
 
393
            parent_url = urlutils.join(url, '..')
 
394
            new_transport = get_transport(parent_url)
 
395
        """
 
396
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
397
        server.setUp()
 
398
        transport = get_transport(server.get_url())
 
399
        new_transport = get_transport(transport.base)
 
400
        self.assertEqual(transport.server, new_transport.server)
 
401
        self.assertEqual(transport.base, new_transport.base)
 
402
        server.tearDown()
 
403
 
 
404
    def test_urljoin_preserves_chroot(self):
 
405
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
406
        URL that escapes the intended chroot.
 
407
 
 
408
        This is so that it is not possible to escape a chroot by doing::
 
409
            url = chroot_transport.base
 
410
            parent_url = urlutils.join(url, '..')
 
411
            new_transport = get_transport(parent_url)
 
412
        """
 
413
        server = ChrootServer(get_transport('memory:///path/'))
 
414
        server.setUp()
 
415
        transport = get_transport(server.get_url())
 
416
        self.assertRaises(
 
417
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
418
        server.tearDown()
 
419
 
 
420
 
 
421
class ChrootServerTest(TestCase):
 
422
 
 
423
    def test_construct(self):
 
424
        backing_transport = MemoryTransport()
 
425
        server = ChrootServer(backing_transport)
 
426
        self.assertEqual(backing_transport, server.backing_transport)
 
427
 
 
428
    def test_setUp(self):
 
429
        backing_transport = MemoryTransport()
 
430
        server = ChrootServer(backing_transport)
 
431
        server.setUp()
 
432
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
433
 
 
434
    def test_tearDown(self):
 
435
        backing_transport = MemoryTransport()
 
436
        server = ChrootServer(backing_transport)
 
437
        server.setUp()
 
438
        server.tearDown()
 
439
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
440
 
 
441
    def test_get_url(self):
 
442
        backing_transport = MemoryTransport()
 
443
        server = ChrootServer(backing_transport)
 
444
        server.setUp()
 
445
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
446
        server.tearDown()
 
447
 
 
448
 
 
449
class PathFilteringDecoratorTransportTest(TestCase):
 
450
    """Pathfilter decoration specific tests."""
 
451
 
 
452
    def test_abspath(self):
 
453
        # The abspath is always relative to the base of the backing transport.
 
454
        server = PathFilteringServer(get_transport('memory:///foo/bar/'),
 
455
            lambda x: x)
 
456
        server.setUp()
 
457
        transport = get_transport(server.get_url())
 
458
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
459
 
 
460
        subdir_transport = transport.clone('subdir')
 
461
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
462
        server.tearDown()
 
463
 
 
464
    def make_pf_transport(self, filter_func=None):
 
465
        """Make a PathFilteringTransport backed by a MemoryTransport.
 
466
        
 
467
        :param filter_func: by default this will be a no-op function.  Use this
 
468
            parameter to override it."""
 
469
        if filter_func is None:
 
470
            filter_func = lambda x: x
 
471
        server = PathFilteringServer(
 
472
            get_transport('memory:///foo/bar/'), filter_func)
 
473
        server.setUp()
 
474
        self.addCleanup(server.tearDown)
 
475
        return get_transport(server.get_url())
 
476
 
 
477
    def test__filter(self):
 
478
        # _filter (with an identity func as filter_func) always returns
 
479
        # paths relative to the base of the backing transport.
 
480
        transport = self.make_pf_transport()
 
481
        self.assertEqual('foo', transport._filter('foo'))
 
482
        self.assertEqual('foo/bar', transport._filter('foo/bar'))
 
483
        self.assertEqual('', transport._filter('..'))
 
484
        self.assertEqual('', transport._filter('/'))
 
485
        # The base of the pathfiltering transport is taken into account too.
 
486
        transport = transport.clone('subdir1/subdir2')
 
487
        self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
 
488
        self.assertEqual(
 
489
            'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
 
490
        self.assertEqual('subdir1', transport._filter('..'))
 
491
        self.assertEqual('', transport._filter('/'))
 
492
 
 
493
    def test_filter_invocation(self):
 
494
        filter_log = []
 
495
        def filter(path):
 
496
            filter_log.append(path)
 
497
            return path
 
498
        transport = self.make_pf_transport(filter)
 
499
        transport.has('abc')
 
500
        self.assertEqual(['abc'], filter_log)
 
501
        del filter_log[:]
 
502
        transport.clone('abc').has('xyz')
 
503
        self.assertEqual(['abc/xyz'], filter_log)
 
504
        del filter_log[:]
 
505
        transport.has('/abc')
 
506
        self.assertEqual(['abc'], filter_log)
 
507
 
 
508
    def test_clone(self):
 
509
        transport = self.make_pf_transport()
 
510
        # relpath from root and root path are the same
 
511
        relpath_cloned = transport.clone('foo')
 
512
        abspath_cloned = transport.clone('/foo')
 
513
        self.assertEqual(transport.server, relpath_cloned.server)
 
514
        self.assertEqual(transport.server, abspath_cloned.server)
 
515
 
 
516
    def test_url_preserves_pathfiltering(self):
 
517
        """Calling get_transport on a pathfiltered transport's base should
 
518
        produce a transport with exactly the same behaviour as the original
 
519
        pathfiltered transport.
 
520
 
 
521
        This is so that it is not possible to escape (accidentally or
 
522
        otherwise) the filtering by doing::
 
523
            url = filtered_transport.base
 
524
            parent_url = urlutils.join(url, '..')
 
525
            new_transport = get_transport(parent_url)
 
526
        """
 
527
        transport = self.make_pf_transport()
 
528
        new_transport = get_transport(transport.base)
 
529
        self.assertEqual(transport.server, new_transport.server)
 
530
        self.assertEqual(transport.base, new_transport.base)
 
531
 
 
532
 
 
533
class ReadonlyDecoratorTransportTest(TestCase):
 
534
    """Readonly decoration specific tests."""
 
535
 
 
536
    def test_local_parameters(self):
 
537
        import bzrlib.transport.readonly as readonly
 
538
        # connect to . in readonly mode
 
539
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
540
        self.assertEqual(True, transport.listable())
 
541
        self.assertEqual(True, transport.is_readonly())
 
542
 
 
543
    def test_http_parameters(self):
 
544
        from bzrlib.tests.http_server import HttpServer
 
545
        import bzrlib.transport.readonly as readonly
 
546
        # connect to '.' via http which is not listable
 
547
        server = HttpServer()
 
548
        server.setUp()
 
549
        try:
 
550
            transport = get_transport('readonly+' + server.get_url())
 
551
            self.failUnless(isinstance(transport,
 
552
                                       readonly.ReadonlyTransportDecorator))
 
553
            self.assertEqual(False, transport.listable())
 
554
            self.assertEqual(True, transport.is_readonly())
 
555
        finally:
 
556
            server.tearDown()
 
557
 
 
558
 
 
559
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
560
    """NFS decorator specific tests."""
 
561
 
 
562
    def get_nfs_transport(self, url):
 
563
        import bzrlib.transport.fakenfs as fakenfs
 
564
        # connect to url with nfs decoration
 
565
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
566
 
 
567
    def test_local_parameters(self):
 
568
        # the listable and is_readonly parameters
 
569
        # are not changed by the fakenfs decorator
 
570
        transport = self.get_nfs_transport('.')
 
571
        self.assertEqual(True, transport.listable())
 
572
        self.assertEqual(False, transport.is_readonly())
 
573
 
 
574
    def test_http_parameters(self):
 
575
        # the listable and is_readonly parameters
 
576
        # are not changed by the fakenfs decorator
 
577
        from bzrlib.tests.http_server import HttpServer
 
578
        # connect to '.' via http which is not listable
 
579
        server = HttpServer()
 
580
        server.setUp()
 
581
        try:
 
582
            transport = self.get_nfs_transport(server.get_url())
 
583
            self.assertIsInstance(
 
584
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
585
            self.assertEqual(False, transport.listable())
 
586
            self.assertEqual(True, transport.is_readonly())
 
587
        finally:
 
588
            server.tearDown()
 
589
 
 
590
    def test_fakenfs_server_default(self):
 
591
        # a FakeNFSServer() should bring up a local relpath server for itself
 
592
        import bzrlib.transport.fakenfs as fakenfs
 
593
        server = fakenfs.FakeNFSServer()
 
594
        server.setUp()
 
595
        try:
 
596
            # the url should be decorated appropriately
 
597
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
598
            # and we should be able to get a transport for it
 
599
            transport = get_transport(server.get_url())
 
600
            # which must be a FakeNFSTransportDecorator instance.
 
601
            self.assertIsInstance(
 
602
                transport, fakenfs.FakeNFSTransportDecorator)
 
603
        finally:
 
604
            server.tearDown()
 
605
 
 
606
    def test_fakenfs_rename_semantics(self):
 
607
        # a FakeNFS transport must mangle the way rename errors occur to
 
608
        # look like NFS problems.
 
609
        transport = self.get_nfs_transport('.')
 
610
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
611
                        transport=transport)
 
612
        self.assertRaises(errors.ResourceBusy,
 
613
                          transport.rename, 'from', 'to')
 
614
 
 
615
 
 
616
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
617
    """Tests for simulation of VFAT restrictions"""
 
618
 
 
619
    def get_vfat_transport(self, url):
 
620
        """Return vfat-backed transport for test directory"""
 
621
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
622
        return FakeVFATTransportDecorator('vfat+' + url)
 
623
 
 
624
    def test_transport_creation(self):
 
625
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
626
        transport = self.get_vfat_transport('.')
 
627
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
628
 
 
629
    def test_transport_mkdir(self):
 
630
        transport = self.get_vfat_transport('.')
 
631
        transport.mkdir('HELLO')
 
632
        self.assertTrue(transport.has('hello'))
 
633
        self.assertTrue(transport.has('Hello'))
 
634
 
 
635
    def test_forbidden_chars(self):
 
636
        transport = self.get_vfat_transport('.')
 
637
        self.assertRaises(ValueError, transport.has, "<NU>")
 
638
 
 
639
 
 
640
class BadTransportHandler(Transport):
 
641
    def __init__(self, base_url):
 
642
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
643
 
 
644
 
 
645
class BackupTransportHandler(Transport):
 
646
    """Test transport that works as a backup for the BadTransportHandler"""
 
647
    pass
 
648
 
 
649
 
 
650
class TestTransportImplementation(TestCaseInTempDir):
 
651
    """Implementation verification for transports.
 
652
 
 
653
    To verify a transport we need a server factory, which is a callable
 
654
    that accepts no parameters and returns an implementation of
 
655
    bzrlib.transport.Server.
 
656
 
 
657
    That Server is then used to construct transport instances and test
 
658
    the transport via loopback activity.
 
659
 
 
660
    Currently this assumes that the Transport object is connected to the
 
661
    current working directory.  So that whatever is done
 
662
    through the transport, should show up in the working
 
663
    directory, and vice-versa. This is a bug, because its possible to have
 
664
    URL schemes which provide access to something that may not be
 
665
    result in storage on the local disk, i.e. due to file system limits, or
 
666
    due to it being a database or some other non-filesystem tool.
 
667
 
 
668
    This also tests to make sure that the functions work with both
 
669
    generators and lists (assuming iter(list) is effectively a generator)
 
670
    """
 
671
 
 
672
    def setUp(self):
 
673
        super(TestTransportImplementation, self).setUp()
 
674
        self._server = self.transport_server()
 
675
        self._server.setUp()
 
676
        self.addCleanup(self._server.tearDown)
 
677
 
 
678
    def get_transport(self, relpath=None):
 
679
        """Return a connected transport to the local directory.
 
680
 
 
681
        :param relpath: a path relative to the base url.
 
682
        """
 
683
        base_url = self._server.get_url()
 
684
        url = self._adjust_url(base_url, relpath)
 
685
        # try getting the transport via the regular interface:
 
686
        t = get_transport(url)
 
687
        # vila--20070607 if the following are commented out the test suite
 
688
        # still pass. Is this really still needed or was it a forgotten
 
689
        # temporary fix ?
 
690
        if not isinstance(t, self.transport_class):
 
691
            # we did not get the correct transport class type. Override the
 
692
            # regular connection behaviour by direct construction.
 
693
            t = self.transport_class(url)
 
694
        return t
 
695
 
 
696
 
 
697
class TestLocalTransports(TestCase):
 
698
 
 
699
    def test_get_transport_from_abspath(self):
 
700
        here = osutils.abspath('.')
 
701
        t = get_transport(here)
 
702
        self.assertIsInstance(t, LocalTransport)
 
703
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
704
 
 
705
    def test_get_transport_from_relpath(self):
 
706
        here = osutils.abspath('.')
 
707
        t = get_transport('.')
 
708
        self.assertIsInstance(t, LocalTransport)
 
709
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
710
 
 
711
    def test_get_transport_from_local_url(self):
 
712
        here = osutils.abspath('.')
 
713
        here_url = urlutils.local_path_to_url(here) + '/'
 
714
        t = get_transport(here_url)
 
715
        self.assertIsInstance(t, LocalTransport)
 
716
        self.assertEquals(t.base, here_url)
 
717
 
 
718
    def test_local_abspath(self):
 
719
        here = osutils.abspath('.')
 
720
        t = get_transport(here)
 
721
        self.assertEquals(t.local_abspath(''), here)
 
722
 
 
723
 
 
724
class TestWin32LocalTransport(TestCase):
 
725
 
 
726
    def test_unc_clone_to_root(self):
 
727
        # Win32 UNC path like \\HOST\path
 
728
        # clone to root should stop at least at \\HOST part
 
729
        # not on \\
 
730
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
731
        for i in xrange(4):
 
732
            t = t.clone('..')
 
733
        self.assertEquals(t.base, 'file://HOST/')
 
734
        # make sure we reach the root
 
735
        t = t.clone('..')
 
736
        self.assertEquals(t.base, 'file://HOST/')
 
737
 
 
738
 
 
739
class TestConnectedTransport(TestCase):
 
740
    """Tests for connected to remote server transports"""
 
741
 
 
742
    def test_parse_url(self):
 
743
        t = ConnectedTransport('http://simple.example.com/home/source')
 
744
        self.assertEquals(t._host, 'simple.example.com')
 
745
        self.assertEquals(t._port, None)
 
746
        self.assertEquals(t._path, '/home/source/')
 
747
        self.failUnless(t._user is None)
 
748
        self.failUnless(t._password is None)
 
749
 
 
750
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
751
 
 
752
    def test_parse_url_with_at_in_user(self):
 
753
        # Bug 228058
 
754
        t = ConnectedTransport('ftp://user@host.com@www.host.com/')
 
755
        self.assertEquals(t._user, 'user@host.com')
 
756
 
 
757
    def test_parse_quoted_url(self):
 
758
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
759
        self.assertEquals(t._host, 'exAmple.com')
 
760
        self.assertEquals(t._port, 2222)
 
761
        self.assertEquals(t._user, 'robey')
 
762
        self.assertEquals(t._password, 'h@t')
 
763
        self.assertEquals(t._path, '/path/')
 
764
 
 
765
        # Base should not keep track of the password
 
766
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
767
 
 
768
    def test_parse_invalid_url(self):
 
769
        self.assertRaises(errors.InvalidURL,
 
770
                          ConnectedTransport,
 
771
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
772
 
 
773
    def test_relpath(self):
 
774
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
775
 
 
776
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
777
        self.assertRaises(errors.PathNotChild, t.relpath,
 
778
                          'http://user@host.com/abs/path/sub')
 
779
        self.assertRaises(errors.PathNotChild, t.relpath,
 
780
                          'sftp://user2@host.com/abs/path/sub')
 
781
        self.assertRaises(errors.PathNotChild, t.relpath,
 
782
                          'sftp://user@otherhost.com/abs/path/sub')
 
783
        self.assertRaises(errors.PathNotChild, t.relpath,
 
784
                          'sftp://user@host.com:33/abs/path/sub')
 
785
        # Make sure it works when we don't supply a username
 
786
        t = ConnectedTransport('sftp://host.com/abs/path')
 
787
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
788
 
 
789
        # Make sure it works when parts of the path will be url encoded
 
790
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
791
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
792
 
 
793
    def test_connection_sharing_propagate_credentials(self):
 
794
        t = ConnectedTransport('ftp://user@host.com/abs/path')
 
795
        self.assertEquals('user', t._user)
 
796
        self.assertEquals('host.com', t._host)
 
797
        self.assertIs(None, t._get_connection())
 
798
        self.assertIs(None, t._password)
 
799
        c = t.clone('subdir')
 
800
        self.assertIs(None, c._get_connection())
 
801
        self.assertIs(None, t._password)
 
802
 
 
803
        # Simulate the user entering a password
 
804
        password = 'secret'
 
805
        connection = object()
 
806
        t._set_connection(connection, password)
 
807
        self.assertIs(connection, t._get_connection())
 
808
        self.assertIs(password, t._get_credentials())
 
809
        self.assertIs(connection, c._get_connection())
 
810
        self.assertIs(password, c._get_credentials())
 
811
 
 
812
        # credentials can be updated
 
813
        new_password = 'even more secret'
 
814
        c._update_credentials(new_password)
 
815
        self.assertIs(connection, t._get_connection())
 
816
        self.assertIs(new_password, t._get_credentials())
 
817
        self.assertIs(connection, c._get_connection())
 
818
        self.assertIs(new_password, c._get_credentials())
 
819
 
 
820
 
 
821
class TestReusedTransports(TestCase):
 
822
    """Tests for transport reuse"""
 
823
 
 
824
    def test_reuse_same_transport(self):
 
825
        possible_transports = []
 
826
        t1 = get_transport('http://foo/',
 
827
                           possible_transports=possible_transports)
 
828
        self.assertEqual([t1], possible_transports)
 
829
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
830
        self.assertIs(t1, t2)
 
831
 
 
832
        # Also check that final '/' are handled correctly
 
833
        t3 = get_transport('http://foo/path/')
 
834
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
835
        self.assertIs(t3, t4)
 
836
 
 
837
        t5 = get_transport('http://foo/path')
 
838
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
839
        self.assertIs(t5, t6)
 
840
 
 
841
    def test_don_t_reuse_different_transport(self):
 
842
        t1 = get_transport('http://foo/path')
 
843
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
844
        self.assertIsNot(t1, t2)
 
845
 
 
846
 
 
847
class TestTransportTrace(TestCase):
 
848
 
 
849
    def test_get(self):
 
850
        transport = get_transport('trace+memory://')
 
851
        self.assertIsInstance(
 
852
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
853
 
 
854
    def test_clone_preserves_activity(self):
 
855
        transport = get_transport('trace+memory://')
 
856
        transport2 = transport.clone('.')
 
857
        self.assertTrue(transport is not transport2)
 
858
        self.assertTrue(transport._activity is transport2._activity)
 
859
 
 
860
    # the following specific tests are for the operations that have made use of
 
861
    # logging in tests; we could test every single operation but doing that
 
862
    # still won't cause a test failure when the top level Transport API
 
863
    # changes; so there is little return doing that.
 
864
    def test_get(self):
 
865
        transport = get_transport('trace+memory:///')
 
866
        transport.put_bytes('foo', 'barish')
 
867
        transport.get('foo')
 
868
        expected_result = []
 
869
        # put_bytes records the bytes, not the content to avoid memory
 
870
        # pressure.
 
871
        expected_result.append(('put_bytes', 'foo', 6, None))
 
872
        # get records the file name only.
 
873
        expected_result.append(('get', 'foo'))
 
874
        self.assertEqual(expected_result, transport._activity)
 
875
 
 
876
    def test_readv(self):
 
877
        transport = get_transport('trace+memory:///')
 
878
        transport.put_bytes('foo', 'barish')
 
879
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
880
            upper_limit=6))
 
881
        expected_result = []
 
882
        # put_bytes records the bytes, not the content to avoid memory
 
883
        # pressure.
 
884
        expected_result.append(('put_bytes', 'foo', 6, None))
 
885
        # readv records the supplied offset request
 
886
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
887
        self.assertEqual(expected_result, transport._activity)