/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Alexander Belchenko
  • Date: 2007-10-04 05:50:44 UTC
  • mfrom: (2881 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2884.
  • Revision ID: bialix@ukr.net-20071004055044-pb88kgkfayawro8n
merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
 
21
from cStringIO import StringIO
 
22
 
 
23
import bzrlib
 
24
from bzrlib import (
 
25
    errors,
 
26
    osutils,
 
27
    urlutils,
 
28
    )
 
29
from bzrlib.errors import (ConnectionError,
 
30
                           DependencyNotPresent,
 
31
                           FileExists,
 
32
                           InvalidURLJoin,
 
33
                           NoSuchFile,
 
34
                           PathNotChild,
 
35
                           TransportNotPossible,
 
36
                           ConnectionError,
 
37
                           DependencyNotPresent,
 
38
                           ReadError,
 
39
                           UnsupportedProtocol,
 
40
                           )
 
41
from bzrlib.tests import TestCase, TestCaseInTempDir
 
42
from bzrlib.transport import (_CoalescedOffset,
 
43
                              ConnectedTransport,
 
44
                              _get_protocol_handlers,
 
45
                              _set_protocol_handlers,
 
46
                              _get_transport_modules,
 
47
                              get_transport,
 
48
                              LateReadError,
 
49
                              register_lazy_transport,
 
50
                              register_transport_proto,
 
51
                              _clear_protocol_handlers,
 
52
                              Transport,
 
53
                              )
 
54
from bzrlib.transport.chroot import ChrootServer
 
55
from bzrlib.transport.memory import MemoryTransport
 
56
from bzrlib.transport.local import (LocalTransport,
 
57
                                    EmulatedWin32LocalTransport)
 
58
from bzrlib.transport.remote import (
 
59
    BZR_DEFAULT_PORT,
 
60
    RemoteTCPTransport
 
61
    )
 
62
 
 
63
 
 
64
# TODO: Should possibly split transport-specific tests into their own files.
 
65
 
 
66
 
 
67
class TestTransport(TestCase):
 
68
    """Test the non transport-concrete class functionality."""
 
69
 
 
70
    def test__get_set_protocol_handlers(self):
 
71
        handlers = _get_protocol_handlers()
 
72
        self.assertNotEqual([], handlers.keys( ))
 
73
        try:
 
74
            _clear_protocol_handlers()
 
75
            self.assertEqual([], _get_protocol_handlers().keys())
 
76
        finally:
 
77
            _set_protocol_handlers(handlers)
 
78
 
 
79
    def test_get_transport_modules(self):
 
80
        handlers = _get_protocol_handlers()
 
81
        class SampleHandler(object):
 
82
            """I exist, isnt that enough?"""
 
83
        try:
 
84
            _clear_protocol_handlers()
 
85
            register_transport_proto('foo')
 
86
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
87
            register_transport_proto('bar')
 
88
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
89
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
90
                             _get_transport_modules())
 
91
        finally:
 
92
            _set_protocol_handlers(handlers)
 
93
 
 
94
    def test_transport_dependency(self):
 
95
        """Transport with missing dependency causes no error"""
 
96
        saved_handlers = _get_protocol_handlers()
 
97
        try:
 
98
            register_transport_proto('foo')
 
99
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
100
                    'BadTransportHandler')
 
101
            try:
 
102
                get_transport('foo://fooserver/foo')
 
103
            except UnsupportedProtocol, e:
 
104
                e_str = str(e)
 
105
                self.assertEquals('Unsupported protocol'
 
106
                                  ' for url "foo://fooserver/foo":'
 
107
                                  ' Unable to import library "some_lib":'
 
108
                                  ' testing missing dependency', str(e))
 
109
            else:
 
110
                self.fail('Did not raise UnsupportedProtocol')
 
111
        finally:
 
112
            # restore original values
 
113
            _set_protocol_handlers(saved_handlers)
 
114
            
 
115
    def test_transport_fallback(self):
 
116
        """Transport with missing dependency causes no error"""
 
117
        saved_handlers = _get_protocol_handlers()
 
118
        try:
 
119
            _clear_protocol_handlers()
 
120
            register_transport_proto('foo')
 
121
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
122
                    'BackupTransportHandler')
 
123
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
124
                    'BadTransportHandler')
 
125
            t = get_transport('foo://fooserver/foo')
 
126
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
127
        finally:
 
128
            _set_protocol_handlers(saved_handlers)
 
129
 
 
130
    def test_LateReadError(self):
 
131
        """The LateReadError helper should raise on read()."""
 
132
        a_file = LateReadError('a path')
 
133
        try:
 
134
            a_file.read()
 
135
        except ReadError, error:
 
136
            self.assertEqual('a path', error.path)
 
137
        self.assertRaises(ReadError, a_file.read, 40)
 
138
        a_file.close()
 
139
 
 
140
    def test__combine_paths(self):
 
141
        t = Transport('/')
 
142
        self.assertEqual('/home/sarah/project/foo',
 
143
                         t._combine_paths('/home/sarah', 'project/foo'))
 
144
        self.assertEqual('/etc',
 
145
                         t._combine_paths('/home/sarah', '../../etc'))
 
146
        self.assertEqual('/etc',
 
147
                         t._combine_paths('/home/sarah', '../../../etc'))
 
148
        self.assertEqual('/etc',
 
149
                         t._combine_paths('/home/sarah', '/etc'))
 
150
 
 
151
    def test_local_abspath_non_local_transport(self):
 
152
        # the base implementation should throw
 
153
        t = MemoryTransport()
 
154
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
155
        self.assertEqual('memory:///t is not a local path.', str(e))
 
156
 
 
157
 
 
158
class TestCoalesceOffsets(TestCase):
 
159
    
 
160
    def check(self, expected, offsets, limit=0, fudge=0):
 
161
        coalesce = Transport._coalesce_offsets
 
162
        exp = [_CoalescedOffset(*x) for x in expected]
 
163
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
164
        self.assertEqual(exp, out)
 
165
 
 
166
    def test_coalesce_empty(self):
 
167
        self.check([], [])
 
168
 
 
169
    def test_coalesce_simple(self):
 
170
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
171
 
 
172
    def test_coalesce_unrelated(self):
 
173
        self.check([(0, 10, [(0, 10)]),
 
174
                    (20, 10, [(0, 10)]),
 
175
                   ], [(0, 10), (20, 10)])
 
176
            
 
177
    def test_coalesce_unsorted(self):
 
178
        self.check([(20, 10, [(0, 10)]),
 
179
                    (0, 10, [(0, 10)]),
 
180
                   ], [(20, 10), (0, 10)])
 
181
 
 
182
    def test_coalesce_nearby(self):
 
183
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
184
                   [(0, 10), (10, 10)])
 
185
 
 
186
    def test_coalesce_overlapped(self):
 
187
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
188
                   [(0, 10), (5, 10)])
 
189
 
 
190
    def test_coalesce_limit(self):
 
191
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
192
                              (30, 10), (40, 10)]),
 
193
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
194
                              (30, 10), (40, 10)]),
 
195
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
196
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
197
                       (90, 10), (100, 10)],
 
198
                    limit=5)
 
199
 
 
200
    def test_coalesce_no_limit(self):
 
201
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
202
                               (30, 10), (40, 10), (50, 10),
 
203
                               (60, 10), (70, 10), (80, 10),
 
204
                               (90, 10)]),
 
205
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
206
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
207
                       (90, 10), (100, 10)])
 
208
 
 
209
    def test_coalesce_fudge(self):
 
210
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
211
                    (100, 10, [(0, 10),]),
 
212
                   ], [(10, 10), (30, 10), (100, 10)],
 
213
                   fudge=10
 
214
                  )
 
215
 
 
216
 
 
217
class TestMemoryTransport(TestCase):
 
218
 
 
219
    def test_get_transport(self):
 
220
        MemoryTransport()
 
221
 
 
222
    def test_clone(self):
 
223
        transport = MemoryTransport()
 
224
        self.assertTrue(isinstance(transport, MemoryTransport))
 
225
        self.assertEqual("memory:///", transport.clone("/").base)
 
226
 
 
227
    def test_abspath(self):
 
228
        transport = MemoryTransport()
 
229
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
230
 
 
231
    def test_abspath_of_root(self):
 
232
        transport = MemoryTransport()
 
233
        self.assertEqual("memory:///", transport.base)
 
234
        self.assertEqual("memory:///", transport.abspath('/'))
 
235
 
 
236
    def test_abspath_of_relpath_starting_at_root(self):
 
237
        transport = MemoryTransport()
 
238
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
239
 
 
240
    def test_append_and_get(self):
 
241
        transport = MemoryTransport()
 
242
        transport.append_bytes('path', 'content')
 
243
        self.assertEqual(transport.get('path').read(), 'content')
 
244
        transport.append_file('path', StringIO('content'))
 
245
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
246
 
 
247
    def test_put_and_get(self):
 
248
        transport = MemoryTransport()
 
249
        transport.put_file('path', StringIO('content'))
 
250
        self.assertEqual(transport.get('path').read(), 'content')
 
251
        transport.put_bytes('path', 'content')
 
252
        self.assertEqual(transport.get('path').read(), 'content')
 
253
 
 
254
    def test_append_without_dir_fails(self):
 
255
        transport = MemoryTransport()
 
256
        self.assertRaises(NoSuchFile,
 
257
                          transport.append_bytes, 'dir/path', 'content')
 
258
 
 
259
    def test_put_without_dir_fails(self):
 
260
        transport = MemoryTransport()
 
261
        self.assertRaises(NoSuchFile,
 
262
                          transport.put_file, 'dir/path', StringIO('content'))
 
263
 
 
264
    def test_get_missing(self):
 
265
        transport = MemoryTransport()
 
266
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
267
 
 
268
    def test_has_missing(self):
 
269
        transport = MemoryTransport()
 
270
        self.assertEquals(False, transport.has('foo'))
 
271
 
 
272
    def test_has_present(self):
 
273
        transport = MemoryTransport()
 
274
        transport.append_bytes('foo', 'content')
 
275
        self.assertEquals(True, transport.has('foo'))
 
276
 
 
277
    def test_list_dir(self):
 
278
        transport = MemoryTransport()
 
279
        transport.put_bytes('foo', 'content')
 
280
        transport.mkdir('dir')
 
281
        transport.put_bytes('dir/subfoo', 'content')
 
282
        transport.put_bytes('dirlike', 'content')
 
283
 
 
284
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
285
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
286
 
 
287
    def test_mkdir(self):
 
288
        transport = MemoryTransport()
 
289
        transport.mkdir('dir')
 
290
        transport.append_bytes('dir/path', 'content')
 
291
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
292
 
 
293
    def test_mkdir_missing_parent(self):
 
294
        transport = MemoryTransport()
 
295
        self.assertRaises(NoSuchFile,
 
296
                          transport.mkdir, 'dir/dir')
 
297
 
 
298
    def test_mkdir_twice(self):
 
299
        transport = MemoryTransport()
 
300
        transport.mkdir('dir')
 
301
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
302
 
 
303
    def test_parameters(self):
 
304
        transport = MemoryTransport()
 
305
        self.assertEqual(True, transport.listable())
 
306
        self.assertEqual(False, transport.is_readonly())
 
307
 
 
308
    def test_iter_files_recursive(self):
 
309
        transport = MemoryTransport()
 
310
        transport.mkdir('dir')
 
311
        transport.put_bytes('dir/foo', 'content')
 
312
        transport.put_bytes('dir/bar', 'content')
 
313
        transport.put_bytes('bar', 'content')
 
314
        paths = set(transport.iter_files_recursive())
 
315
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
316
 
 
317
    def test_stat(self):
 
318
        transport = MemoryTransport()
 
319
        transport.put_bytes('foo', 'content')
 
320
        transport.put_bytes('bar', 'phowar')
 
321
        self.assertEqual(7, transport.stat('foo').st_size)
 
322
        self.assertEqual(6, transport.stat('bar').st_size)
 
323
 
 
324
 
 
325
class ChrootDecoratorTransportTest(TestCase):
 
326
    """Chroot decoration specific tests."""
 
327
 
 
328
    def test_abspath(self):
 
329
        # The abspath is always relative to the chroot_url.
 
330
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
331
        server.setUp()
 
332
        transport = get_transport(server.get_url())
 
333
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
334
 
 
335
        subdir_transport = transport.clone('subdir')
 
336
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
337
        server.tearDown()
 
338
 
 
339
    def test_clone(self):
 
340
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
341
        server.setUp()
 
342
        transport = get_transport(server.get_url())
 
343
        # relpath from root and root path are the same
 
344
        relpath_cloned = transport.clone('foo')
 
345
        abspath_cloned = transport.clone('/foo')
 
346
        self.assertEqual(server, relpath_cloned.server)
 
347
        self.assertEqual(server, abspath_cloned.server)
 
348
        server.tearDown()
 
349
    
 
350
    def test_chroot_url_preserves_chroot(self):
 
351
        """Calling get_transport on a chroot transport's base should produce a
 
352
        transport with exactly the same behaviour as the original chroot
 
353
        transport.
 
354
 
 
355
        This is so that it is not possible to escape a chroot by doing::
 
356
            url = chroot_transport.base
 
357
            parent_url = urlutils.join(url, '..')
 
358
            new_transport = get_transport(parent_url)
 
359
        """
 
360
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
361
        server.setUp()
 
362
        transport = get_transport(server.get_url())
 
363
        new_transport = get_transport(transport.base)
 
364
        self.assertEqual(transport.server, new_transport.server)
 
365
        self.assertEqual(transport.base, new_transport.base)
 
366
        server.tearDown()
 
367
        
 
368
    def test_urljoin_preserves_chroot(self):
 
369
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
370
        URL that escapes the intended chroot.
 
371
 
 
372
        This is so that it is not possible to escape a chroot by doing::
 
373
            url = chroot_transport.base
 
374
            parent_url = urlutils.join(url, '..')
 
375
            new_transport = get_transport(parent_url)
 
376
        """
 
377
        server = ChrootServer(get_transport('memory:///path/'))
 
378
        server.setUp()
 
379
        transport = get_transport(server.get_url())
 
380
        self.assertRaises(
 
381
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
382
        server.tearDown()
 
383
 
 
384
 
 
385
class ChrootServerTest(TestCase):
 
386
 
 
387
    def test_construct(self):
 
388
        backing_transport = MemoryTransport()
 
389
        server = ChrootServer(backing_transport)
 
390
        self.assertEqual(backing_transport, server.backing_transport)
 
391
 
 
392
    def test_setUp(self):
 
393
        backing_transport = MemoryTransport()
 
394
        server = ChrootServer(backing_transport)
 
395
        server.setUp()
 
396
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
397
 
 
398
    def test_tearDown(self):
 
399
        backing_transport = MemoryTransport()
 
400
        server = ChrootServer(backing_transport)
 
401
        server.setUp()
 
402
        server.tearDown()
 
403
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
404
 
 
405
    def test_get_url(self):
 
406
        backing_transport = MemoryTransport()
 
407
        server = ChrootServer(backing_transport)
 
408
        server.setUp()
 
409
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
410
        server.tearDown()
 
411
 
 
412
 
 
413
class ReadonlyDecoratorTransportTest(TestCase):
 
414
    """Readonly decoration specific tests."""
 
415
 
 
416
    def test_local_parameters(self):
 
417
        import bzrlib.transport.readonly as readonly
 
418
        # connect to . in readonly mode
 
419
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
420
        self.assertEqual(True, transport.listable())
 
421
        self.assertEqual(True, transport.is_readonly())
 
422
 
 
423
    def test_http_parameters(self):
 
424
        from bzrlib.tests.HttpServer import HttpServer
 
425
        import bzrlib.transport.readonly as readonly
 
426
        # connect to . via http which is not listable
 
427
        server = HttpServer()
 
428
        server.setUp()
 
429
        try:
 
430
            transport = get_transport('readonly+' + server.get_url())
 
431
            self.failUnless(isinstance(transport,
 
432
                                       readonly.ReadonlyTransportDecorator))
 
433
            self.assertEqual(False, transport.listable())
 
434
            self.assertEqual(True, transport.is_readonly())
 
435
        finally:
 
436
            server.tearDown()
 
437
 
 
438
 
 
439
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
440
    """NFS decorator specific tests."""
 
441
 
 
442
    def get_nfs_transport(self, url):
 
443
        import bzrlib.transport.fakenfs as fakenfs
 
444
        # connect to url with nfs decoration
 
445
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
446
 
 
447
    def test_local_parameters(self):
 
448
        # the listable and is_readonly parameters
 
449
        # are not changed by the fakenfs decorator
 
450
        transport = self.get_nfs_transport('.')
 
451
        self.assertEqual(True, transport.listable())
 
452
        self.assertEqual(False, transport.is_readonly())
 
453
 
 
454
    def test_http_parameters(self):
 
455
        # the listable and is_readonly parameters
 
456
        # are not changed by the fakenfs decorator
 
457
        from bzrlib.tests.HttpServer import HttpServer
 
458
        # connect to . via http which is not listable
 
459
        server = HttpServer()
 
460
        server.setUp()
 
461
        try:
 
462
            transport = self.get_nfs_transport(server.get_url())
 
463
            self.assertIsInstance(
 
464
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
465
            self.assertEqual(False, transport.listable())
 
466
            self.assertEqual(True, transport.is_readonly())
 
467
        finally:
 
468
            server.tearDown()
 
469
 
 
470
    def test_fakenfs_server_default(self):
 
471
        # a FakeNFSServer() should bring up a local relpath server for itself
 
472
        import bzrlib.transport.fakenfs as fakenfs
 
473
        server = fakenfs.FakeNFSServer()
 
474
        server.setUp()
 
475
        try:
 
476
            # the url should be decorated appropriately
 
477
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
478
            # and we should be able to get a transport for it
 
479
            transport = get_transport(server.get_url())
 
480
            # which must be a FakeNFSTransportDecorator instance.
 
481
            self.assertIsInstance(
 
482
                transport, fakenfs.FakeNFSTransportDecorator)
 
483
        finally:
 
484
            server.tearDown()
 
485
 
 
486
    def test_fakenfs_rename_semantics(self):
 
487
        # a FakeNFS transport must mangle the way rename errors occur to
 
488
        # look like NFS problems.
 
489
        transport = self.get_nfs_transport('.')
 
490
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
491
                        transport=transport)
 
492
        self.assertRaises(errors.ResourceBusy,
 
493
                          transport.rename, 'from', 'to')
 
494
 
 
495
 
 
496
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
497
    """Tests for simulation of VFAT restrictions"""
 
498
 
 
499
    def get_vfat_transport(self, url):
 
500
        """Return vfat-backed transport for test directory"""
 
501
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
502
        return FakeVFATTransportDecorator('vfat+' + url)
 
503
 
 
504
    def test_transport_creation(self):
 
505
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
506
        transport = self.get_vfat_transport('.')
 
507
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
508
 
 
509
    def test_transport_mkdir(self):
 
510
        transport = self.get_vfat_transport('.')
 
511
        transport.mkdir('HELLO')
 
512
        self.assertTrue(transport.has('hello'))
 
513
        self.assertTrue(transport.has('Hello'))
 
514
 
 
515
    def test_forbidden_chars(self):
 
516
        transport = self.get_vfat_transport('.')
 
517
        self.assertRaises(ValueError, transport.has, "<NU>")
 
518
 
 
519
 
 
520
class BadTransportHandler(Transport):
 
521
    def __init__(self, base_url):
 
522
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
523
 
 
524
 
 
525
class BackupTransportHandler(Transport):
 
526
    """Test transport that works as a backup for the BadTransportHandler"""
 
527
    pass
 
528
 
 
529
 
 
530
class TestTransportImplementation(TestCaseInTempDir):
 
531
    """Implementation verification for transports.
 
532
    
 
533
    To verify a transport we need a server factory, which is a callable
 
534
    that accepts no parameters and returns an implementation of
 
535
    bzrlib.transport.Server.
 
536
    
 
537
    That Server is then used to construct transport instances and test
 
538
    the transport via loopback activity.
 
539
 
 
540
    Currently this assumes that the Transport object is connected to the 
 
541
    current working directory.  So that whatever is done 
 
542
    through the transport, should show up in the working 
 
543
    directory, and vice-versa. This is a bug, because its possible to have
 
544
    URL schemes which provide access to something that may not be 
 
545
    result in storage on the local disk, i.e. due to file system limits, or 
 
546
    due to it being a database or some other non-filesystem tool.
 
547
 
 
548
    This also tests to make sure that the functions work with both
 
549
    generators and lists (assuming iter(list) is effectively a generator)
 
550
    """
 
551
    
 
552
    def setUp(self):
 
553
        super(TestTransportImplementation, self).setUp()
 
554
        self._server = self.transport_server()
 
555
        self._server.setUp()
 
556
        self.addCleanup(self._server.tearDown)
 
557
 
 
558
    def get_transport(self, relpath=None):
 
559
        """Return a connected transport to the local directory.
 
560
 
 
561
        :param relpath: a path relative to the base url.
 
562
        """
 
563
        base_url = self._server.get_url()
 
564
        url = self._adjust_url(base_url, relpath)
 
565
        # try getting the transport via the regular interface:
 
566
        t = get_transport(url)
 
567
        # vila--20070607 if the following are commented out the test suite
 
568
        # still pass. Is this really still needed or was it a forgotten
 
569
        # temporary fix ?
 
570
        if not isinstance(t, self.transport_class):
 
571
            # we did not get the correct transport class type. Override the
 
572
            # regular connection behaviour by direct construction.
 
573
            t = self.transport_class(url)
 
574
        return t
 
575
 
 
576
 
 
577
class TestLocalTransports(TestCase):
 
578
 
 
579
    def test_get_transport_from_abspath(self):
 
580
        here = osutils.abspath('.')
 
581
        t = get_transport(here)
 
582
        self.assertIsInstance(t, LocalTransport)
 
583
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
584
 
 
585
    def test_get_transport_from_relpath(self):
 
586
        here = osutils.abspath('.')
 
587
        t = get_transport('.')
 
588
        self.assertIsInstance(t, LocalTransport)
 
589
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
590
 
 
591
    def test_get_transport_from_local_url(self):
 
592
        here = osutils.abspath('.')
 
593
        here_url = urlutils.local_path_to_url(here) + '/'
 
594
        t = get_transport(here_url)
 
595
        self.assertIsInstance(t, LocalTransport)
 
596
        self.assertEquals(t.base, here_url)
 
597
 
 
598
    def test_local_abspath(self):
 
599
        here = osutils.abspath('.')
 
600
        t = get_transport(here)
 
601
        self.assertEquals(t.local_abspath(''), here)
 
602
 
 
603
 
 
604
class TestWin32LocalTransport(TestCase):
 
605
 
 
606
    def test_unc_clone_to_root(self):
 
607
        # Win32 UNC path like \\HOST\path
 
608
        # clone to root should stop at least at \\HOST part
 
609
        # not on \\
 
610
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
611
        for i in xrange(4):
 
612
            t = t.clone('..')
 
613
        self.assertEquals(t.base, 'file://HOST/')
 
614
        # make sure we reach the root
 
615
        t = t.clone('..')
 
616
        self.assertEquals(t.base, 'file://HOST/')
 
617
 
 
618
 
 
619
class TestConnectedTransport(TestCase):
 
620
    """Tests for connected to remote server transports"""
 
621
 
 
622
    def test_parse_url(self):
 
623
        t = ConnectedTransport('sftp://simple.example.com/home/source')
 
624
        self.assertEquals(t._host, 'simple.example.com')
 
625
        self.assertEquals(t._port, 22)
 
626
        self.assertEquals(t._path, '/home/source/')
 
627
        self.failUnless(t._user is None)
 
628
        self.failUnless(t._password is None)
 
629
 
 
630
        self.assertEquals(t.base, 'sftp://simple.example.com/home/source/')
 
631
 
 
632
    def test_parse_quoted_url(self):
 
633
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
634
        self.assertEquals(t._host, 'exAmple.com')
 
635
        self.assertEquals(t._port, 2222)
 
636
        self.assertEquals(t._user, 'robey')
 
637
        self.assertEquals(t._password, 'h@t')
 
638
        self.assertEquals(t._path, '/path/')
 
639
 
 
640
        # Base should not keep track of the password
 
641
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
642
 
 
643
    def test_parse_invalid_url(self):
 
644
        self.assertRaises(errors.InvalidURL,
 
645
                          ConnectedTransport,
 
646
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
647
 
 
648
    def test_relpath(self):
 
649
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
650
 
 
651
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
652
        self.assertRaises(errors.PathNotChild, t.relpath,
 
653
                          'http://user@host.com/abs/path/sub')
 
654
        self.assertRaises(errors.PathNotChild, t.relpath,
 
655
                          'sftp://user2@host.com/abs/path/sub')
 
656
        self.assertRaises(errors.PathNotChild, t.relpath,
 
657
                          'sftp://user@otherhost.com/abs/path/sub')
 
658
        self.assertRaises(errors.PathNotChild, t.relpath,
 
659
                          'sftp://user@host.com:33/abs/path/sub')
 
660
        # Make sure it works when we don't supply a username
 
661
        t = ConnectedTransport('sftp://host.com/abs/path')
 
662
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
663
 
 
664
        # Make sure it works when parts of the path will be url encoded
 
665
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
666
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
667
 
 
668
    def test_connection_sharing_propagate_credentials(self):
 
669
        t = ConnectedTransport('foo://user@host.com/abs/path')
 
670
        self.assertIs(None, t._get_connection())
 
671
        self.assertIs(None, t._password)
 
672
        c = t.clone('subdir')
 
673
        self.assertEquals(None, c._get_connection())
 
674
        self.assertIs(None, t._password)
 
675
 
 
676
        # Simulate the user entering a password
 
677
        password = 'secret'
 
678
        connection = object()
 
679
        t._set_connection(connection, password)
 
680
        self.assertIs(connection, t._get_connection())
 
681
        self.assertIs(password, t._get_credentials())
 
682
        self.assertIs(connection, c._get_connection())
 
683
        self.assertIs(password, c._get_credentials())
 
684
 
 
685
        # credentials can be updated
 
686
        new_password = 'even more secret'
 
687
        c._update_credentials(new_password)
 
688
        self.assertIs(connection, t._get_connection())
 
689
        self.assertIs(new_password, t._get_credentials())
 
690
        self.assertIs(connection, c._get_connection())
 
691
        self.assertIs(new_password, c._get_credentials())
 
692
 
 
693
 
 
694
class TestReusedTransports(TestCase):
 
695
    """Tests for transport reuse"""
 
696
 
 
697
    def test_reuse_same_transport(self):
 
698
        possible_transports = []
 
699
        t1 = get_transport('http://foo/',
 
700
                           possible_transports=possible_transports)
 
701
        self.assertEqual([t1], possible_transports)
 
702
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
703
        self.assertIs(t1, t2)
 
704
 
 
705
        # Also check that final '/' are handled correctly
 
706
        t3 = get_transport('http://foo/path/')
 
707
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
708
        self.assertIs(t3, t4)
 
709
 
 
710
        t5 = get_transport('http://foo/path')
 
711
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
712
        self.assertIs(t5, t6)
 
713
 
 
714
    def test_don_t_reuse_different_transport(self):
 
715
        t1 = get_transport('http://foo/path')
 
716
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
717
        self.assertIsNot(t1, t2)
 
718
 
 
719
 
 
720
class TestRemoteTCPTransport(TestCase):
 
721
    """Tests for bzr:// transport (RemoteTCPTransport)."""
 
722
 
 
723
    def test_relpath_with_implicit_port(self):
 
724
        """Connected transports with the same URL are the same, even if the
 
725
        port is implicit.
 
726
 
 
727
        So t.relpath(url) should always be '' if t.base is the same as url, or
 
728
        if the only difference is that one explicitly specifies the default
 
729
        port and the other doesn't specify a port.
 
730
        """
 
731
        t_implicit_port = RemoteTCPTransport('bzr://host.com/')
 
732
        self.assertEquals('', t_implicit_port.relpath('bzr://host.com/'))
 
733
        self.assertEquals('', t_implicit_port.relpath('bzr://host.com:4155/'))
 
734
        t_explicit_port = RemoteTCPTransport('bzr://host.com:4155/')
 
735
        self.assertEquals('', t_explicit_port.relpath('bzr://host.com/'))
 
736
        self.assertEquals('', t_explicit_port.relpath('bzr://host.com:4155/'))
 
737
 
 
738
    def test_construct_uses_default_port(self):
 
739
        """If no port is specified, then RemoteTCPTransport uses
 
740
        BZR_DEFAULT_PORT.
 
741
        """
 
742
        t = get_transport('bzr://host.com/')
 
743
        self.assertEquals(BZR_DEFAULT_PORT, t._port)
 
744
 
 
745
    def test_url_omits_default_port(self):
 
746
        """If a RemoteTCPTransport uses the default port, then its base URL
 
747
        will omit the port.
 
748
 
 
749
        This is like how ":80" is omitted from "http://example.com/".
 
750
        """
 
751
        t = get_transport('bzr://host.com:4155/')
 
752
        self.assertEquals('bzr://host.com/', t.base)
 
753
 
 
754
    def test_url_includes_non_default_port(self):
 
755
        """Non-default ports are included in the transport's URL.
 
756
 
 
757
        Contrast this to `test_url_omits_default_port`.
 
758
        """
 
759
        t = get_transport('bzr://host.com:666/')
 
760
        self.assertEquals('bzr://host.com:666/', t.base)
 
761
 
 
762
 
 
763
 
 
764
def get_test_permutations():
 
765
    """Return transport permutations to be used in testing.
 
766
 
 
767
    This module registers some transports, but they're only for testing
 
768
    registration.  We don't really want to run all the transport tests against
 
769
    them.
 
770
    """
 
771
    return []