/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Robert Collins
  • Date: 2007-10-04 05:30:08 UTC
  • mfrom: (2881.1.1 knits)
  • mto: This revision was merged to the branch mainline in revision 2885.
  • Revision ID: robertc@robertcollins.net-20071004053008-aquje8fyntc8q69h
Fix knit test fallout from final readv api change.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
import os
 
19
import sys
 
20
import stat
 
21
from cStringIO import StringIO
 
22
 
 
23
import bzrlib
 
24
from bzrlib import (
 
25
    errors,
 
26
    osutils,
 
27
    urlutils,
 
28
    )
 
29
from bzrlib.errors import (ConnectionError,
 
30
                           DependencyNotPresent,
 
31
                           FileExists,
 
32
                           InvalidURLJoin,
 
33
                           NoSuchFile,
 
34
                           PathNotChild,
 
35
                           TransportNotPossible,
 
36
                           ConnectionError,
 
37
                           DependencyNotPresent,
 
38
                           ReadError,
 
39
                           UnsupportedProtocol,
 
40
                           )
 
41
from bzrlib.tests import TestCase, TestCaseInTempDir
 
42
from bzrlib.transport import (_clear_protocol_handlers,
 
43
                              _CoalescedOffset,
 
44
                              ConnectedTransport,
 
45
                              _get_protocol_handlers,
 
46
                              _set_protocol_handlers,
 
47
                              _get_transport_modules,
 
48
                              get_transport,
 
49
                              LateReadError,
 
50
                              register_lazy_transport,
 
51
                              register_transport_proto,
 
52
                              _clear_protocol_handlers,
 
53
                              Transport,
 
54
                              )
 
55
from bzrlib.transport.chroot import ChrootServer
 
56
from bzrlib.transport.memory import MemoryTransport
 
57
from bzrlib.transport.local import (LocalTransport,
 
58
                                    EmulatedWin32LocalTransport)
 
59
from bzrlib.transport.remote import (
 
60
    BZR_DEFAULT_PORT,
 
61
    RemoteTCPTransport
 
62
    )
 
63
 
 
64
 
 
65
# TODO: Should possibly split transport-specific tests into their own files.
 
66
 
 
67
 
 
68
class TestTransport(TestCase):
 
69
    """Test the non transport-concrete class functionality."""
 
70
 
 
71
    def test__get_set_protocol_handlers(self):
 
72
        handlers = _get_protocol_handlers()
 
73
        self.assertNotEqual([], handlers.keys( ))
 
74
        try:
 
75
            _clear_protocol_handlers()
 
76
            self.assertEqual([], _get_protocol_handlers().keys())
 
77
        finally:
 
78
            _set_protocol_handlers(handlers)
 
79
 
 
80
    def test_get_transport_modules(self):
 
81
        handlers = _get_protocol_handlers()
 
82
        # don't pollute the current handlers
 
83
        _clear_protocol_handlers()
 
84
        class SampleHandler(object):
 
85
            """I exist, isnt that enough?"""
 
86
        try:
 
87
            _clear_protocol_handlers()
 
88
            register_transport_proto('foo')
 
89
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
90
            register_transport_proto('bar')
 
91
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
92
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
 
93
                             _get_transport_modules())
 
94
        finally:
 
95
            _set_protocol_handlers(handlers)
 
96
 
 
97
    def test_transport_dependency(self):
 
98
        """Transport with missing dependency causes no error"""
 
99
        saved_handlers = _get_protocol_handlers()
 
100
        # don't pollute the current handlers
 
101
        _clear_protocol_handlers()
 
102
        try:
 
103
            register_transport_proto('foo')
 
104
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
105
                    'BadTransportHandler')
 
106
            try:
 
107
                get_transport('foo://fooserver/foo')
 
108
            except UnsupportedProtocol, e:
 
109
                e_str = str(e)
 
110
                self.assertEquals('Unsupported protocol'
 
111
                                  ' for url "foo://fooserver/foo":'
 
112
                                  ' Unable to import library "some_lib":'
 
113
                                  ' testing missing dependency', str(e))
 
114
            else:
 
115
                self.fail('Did not raise UnsupportedProtocol')
 
116
        finally:
 
117
            # restore original values
 
118
            _set_protocol_handlers(saved_handlers)
 
119
            
 
120
    def test_transport_fallback(self):
 
121
        """Transport with missing dependency causes no error"""
 
122
        saved_handlers = _get_protocol_handlers()
 
123
        try:
 
124
            _clear_protocol_handlers()
 
125
            register_transport_proto('foo')
 
126
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
127
                    'BackupTransportHandler')
 
128
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
129
                    'BadTransportHandler')
 
130
            t = get_transport('foo://fooserver/foo')
 
131
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
132
        finally:
 
133
            _set_protocol_handlers(saved_handlers)
 
134
 
 
135
    def test_LateReadError(self):
 
136
        """The LateReadError helper should raise on read()."""
 
137
        a_file = LateReadError('a path')
 
138
        try:
 
139
            a_file.read()
 
140
        except ReadError, error:
 
141
            self.assertEqual('a path', error.path)
 
142
        self.assertRaises(ReadError, a_file.read, 40)
 
143
        a_file.close()
 
144
 
 
145
    def test__combine_paths(self):
 
146
        t = Transport('/')
 
147
        self.assertEqual('/home/sarah/project/foo',
 
148
                         t._combine_paths('/home/sarah', 'project/foo'))
 
149
        self.assertEqual('/etc',
 
150
                         t._combine_paths('/home/sarah', '../../etc'))
 
151
        self.assertEqual('/etc',
 
152
                         t._combine_paths('/home/sarah', '../../../etc'))
 
153
        self.assertEqual('/etc',
 
154
                         t._combine_paths('/home/sarah', '/etc'))
 
155
 
 
156
    def test_local_abspath_non_local_transport(self):
 
157
        # the base implementation should throw
 
158
        t = MemoryTransport()
 
159
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
 
160
        self.assertEqual('memory:///t is not a local path.', str(e))
 
161
 
 
162
 
 
163
class TestCoalesceOffsets(TestCase):
 
164
    
 
165
    def check(self, expected, offsets, limit=0, fudge=0):
 
166
        coalesce = Transport._coalesce_offsets
 
167
        exp = [_CoalescedOffset(*x) for x in expected]
 
168
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
 
169
        self.assertEqual(exp, out)
 
170
 
 
171
    def test_coalesce_empty(self):
 
172
        self.check([], [])
 
173
 
 
174
    def test_coalesce_simple(self):
 
175
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
 
176
 
 
177
    def test_coalesce_unrelated(self):
 
178
        self.check([(0, 10, [(0, 10)]),
 
179
                    (20, 10, [(0, 10)]),
 
180
                   ], [(0, 10), (20, 10)])
 
181
            
 
182
    def test_coalesce_unsorted(self):
 
183
        self.check([(20, 10, [(0, 10)]),
 
184
                    (0, 10, [(0, 10)]),
 
185
                   ], [(20, 10), (0, 10)])
 
186
 
 
187
    def test_coalesce_nearby(self):
 
188
        self.check([(0, 20, [(0, 10), (10, 10)])],
 
189
                   [(0, 10), (10, 10)])
 
190
 
 
191
    def test_coalesce_overlapped(self):
 
192
        self.check([(0, 15, [(0, 10), (5, 10)])],
 
193
                   [(0, 10), (5, 10)])
 
194
 
 
195
    def test_coalesce_limit(self):
 
196
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
 
197
                              (30, 10), (40, 10)]),
 
198
                    (60, 50, [(0, 10), (10, 10), (20, 10),
 
199
                              (30, 10), (40, 10)]),
 
200
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
201
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
202
                       (90, 10), (100, 10)],
 
203
                    limit=5)
 
204
 
 
205
    def test_coalesce_no_limit(self):
 
206
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
 
207
                               (30, 10), (40, 10), (50, 10),
 
208
                               (60, 10), (70, 10), (80, 10),
 
209
                               (90, 10)]),
 
210
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
 
211
                       (50, 10), (60, 10), (70, 10), (80, 10),
 
212
                       (90, 10), (100, 10)])
 
213
 
 
214
    def test_coalesce_fudge(self):
 
215
        self.check([(10, 30, [(0, 10), (20, 10)]),
 
216
                    (100, 10, [(0, 10),]),
 
217
                   ], [(10, 10), (30, 10), (100, 10)],
 
218
                   fudge=10
 
219
                  )
 
220
 
 
221
 
 
222
class TestMemoryTransport(TestCase):
 
223
 
 
224
    def test_get_transport(self):
 
225
        MemoryTransport()
 
226
 
 
227
    def test_clone(self):
 
228
        transport = MemoryTransport()
 
229
        self.assertTrue(isinstance(transport, MemoryTransport))
 
230
        self.assertEqual("memory:///", transport.clone("/").base)
 
231
 
 
232
    def test_abspath(self):
 
233
        transport = MemoryTransport()
 
234
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
235
 
 
236
    def test_abspath_of_root(self):
 
237
        transport = MemoryTransport()
 
238
        self.assertEqual("memory:///", transport.base)
 
239
        self.assertEqual("memory:///", transport.abspath('/'))
 
240
 
 
241
    def test_abspath_of_relpath_starting_at_root(self):
 
242
        transport = MemoryTransport()
 
243
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
244
 
 
245
    def test_append_and_get(self):
 
246
        transport = MemoryTransport()
 
247
        transport.append_bytes('path', 'content')
 
248
        self.assertEqual(transport.get('path').read(), 'content')
 
249
        transport.append_file('path', StringIO('content'))
 
250
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
251
 
 
252
    def test_put_and_get(self):
 
253
        transport = MemoryTransport()
 
254
        transport.put_file('path', StringIO('content'))
 
255
        self.assertEqual(transport.get('path').read(), 'content')
 
256
        transport.put_bytes('path', 'content')
 
257
        self.assertEqual(transport.get('path').read(), 'content')
 
258
 
 
259
    def test_append_without_dir_fails(self):
 
260
        transport = MemoryTransport()
 
261
        self.assertRaises(NoSuchFile,
 
262
                          transport.append_bytes, 'dir/path', 'content')
 
263
 
 
264
    def test_put_without_dir_fails(self):
 
265
        transport = MemoryTransport()
 
266
        self.assertRaises(NoSuchFile,
 
267
                          transport.put_file, 'dir/path', StringIO('content'))
 
268
 
 
269
    def test_get_missing(self):
 
270
        transport = MemoryTransport()
 
271
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
272
 
 
273
    def test_has_missing(self):
 
274
        transport = MemoryTransport()
 
275
        self.assertEquals(False, transport.has('foo'))
 
276
 
 
277
    def test_has_present(self):
 
278
        transport = MemoryTransport()
 
279
        transport.append_bytes('foo', 'content')
 
280
        self.assertEquals(True, transport.has('foo'))
 
281
 
 
282
    def test_list_dir(self):
 
283
        transport = MemoryTransport()
 
284
        transport.put_bytes('foo', 'content')
 
285
        transport.mkdir('dir')
 
286
        transport.put_bytes('dir/subfoo', 'content')
 
287
        transport.put_bytes('dirlike', 'content')
 
288
 
 
289
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
 
290
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
 
291
 
 
292
    def test_mkdir(self):
 
293
        transport = MemoryTransport()
 
294
        transport.mkdir('dir')
 
295
        transport.append_bytes('dir/path', 'content')
 
296
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
297
 
 
298
    def test_mkdir_missing_parent(self):
 
299
        transport = MemoryTransport()
 
300
        self.assertRaises(NoSuchFile,
 
301
                          transport.mkdir, 'dir/dir')
 
302
 
 
303
    def test_mkdir_twice(self):
 
304
        transport = MemoryTransport()
 
305
        transport.mkdir('dir')
 
306
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
307
 
 
308
    def test_parameters(self):
 
309
        transport = MemoryTransport()
 
310
        self.assertEqual(True, transport.listable())
 
311
        self.assertEqual(False, transport.is_readonly())
 
312
 
 
313
    def test_iter_files_recursive(self):
 
314
        transport = MemoryTransport()
 
315
        transport.mkdir('dir')
 
316
        transport.put_bytes('dir/foo', 'content')
 
317
        transport.put_bytes('dir/bar', 'content')
 
318
        transport.put_bytes('bar', 'content')
 
319
        paths = set(transport.iter_files_recursive())
 
320
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
321
 
 
322
    def test_stat(self):
 
323
        transport = MemoryTransport()
 
324
        transport.put_bytes('foo', 'content')
 
325
        transport.put_bytes('bar', 'phowar')
 
326
        self.assertEqual(7, transport.stat('foo').st_size)
 
327
        self.assertEqual(6, transport.stat('bar').st_size)
 
328
 
 
329
 
 
330
class ChrootDecoratorTransportTest(TestCase):
 
331
    """Chroot decoration specific tests."""
 
332
 
 
333
    def test_abspath(self):
 
334
        # The abspath is always relative to the chroot_url.
 
335
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
336
        server.setUp()
 
337
        transport = get_transport(server.get_url())
 
338
        self.assertEqual(server.get_url(), transport.abspath('/'))
 
339
 
 
340
        subdir_transport = transport.clone('subdir')
 
341
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
 
342
        server.tearDown()
 
343
 
 
344
    def test_clone(self):
 
345
        server = ChrootServer(get_transport('memory:///foo/bar/'))
 
346
        server.setUp()
 
347
        transport = get_transport(server.get_url())
 
348
        # relpath from root and root path are the same
 
349
        relpath_cloned = transport.clone('foo')
 
350
        abspath_cloned = transport.clone('/foo')
 
351
        self.assertEqual(server, relpath_cloned.server)
 
352
        self.assertEqual(server, abspath_cloned.server)
 
353
        server.tearDown()
 
354
    
 
355
    def test_chroot_url_preserves_chroot(self):
 
356
        """Calling get_transport on a chroot transport's base should produce a
 
357
        transport with exactly the same behaviour as the original chroot
 
358
        transport.
 
359
 
 
360
        This is so that it is not possible to escape a chroot by doing::
 
361
            url = chroot_transport.base
 
362
            parent_url = urlutils.join(url, '..')
 
363
            new_transport = get_transport(parent_url)
 
364
        """
 
365
        server = ChrootServer(get_transport('memory:///path/subpath'))
 
366
        server.setUp()
 
367
        transport = get_transport(server.get_url())
 
368
        new_transport = get_transport(transport.base)
 
369
        self.assertEqual(transport.server, new_transport.server)
 
370
        self.assertEqual(transport.base, new_transport.base)
 
371
        server.tearDown()
 
372
        
 
373
    def test_urljoin_preserves_chroot(self):
 
374
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
 
375
        URL that escapes the intended chroot.
 
376
 
 
377
        This is so that it is not possible to escape a chroot by doing::
 
378
            url = chroot_transport.base
 
379
            parent_url = urlutils.join(url, '..')
 
380
            new_transport = get_transport(parent_url)
 
381
        """
 
382
        server = ChrootServer(get_transport('memory:///path/'))
 
383
        server.setUp()
 
384
        transport = get_transport(server.get_url())
 
385
        self.assertRaises(
 
386
            InvalidURLJoin, urlutils.join, transport.base, '..')
 
387
        server.tearDown()
 
388
 
 
389
 
 
390
class ChrootServerTest(TestCase):
 
391
 
 
392
    def test_construct(self):
 
393
        backing_transport = MemoryTransport()
 
394
        server = ChrootServer(backing_transport)
 
395
        self.assertEqual(backing_transport, server.backing_transport)
 
396
 
 
397
    def test_setUp(self):
 
398
        backing_transport = MemoryTransport()
 
399
        server = ChrootServer(backing_transport)
 
400
        server.setUp()
 
401
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
 
402
 
 
403
    def test_tearDown(self):
 
404
        backing_transport = MemoryTransport()
 
405
        server = ChrootServer(backing_transport)
 
406
        server.setUp()
 
407
        server.tearDown()
 
408
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
 
409
 
 
410
    def test_get_url(self):
 
411
        backing_transport = MemoryTransport()
 
412
        server = ChrootServer(backing_transport)
 
413
        server.setUp()
 
414
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
415
        server.tearDown()
 
416
 
 
417
 
 
418
class ReadonlyDecoratorTransportTest(TestCase):
 
419
    """Readonly decoration specific tests."""
 
420
 
 
421
    def test_local_parameters(self):
 
422
        import bzrlib.transport.readonly as readonly
 
423
        # connect to . in readonly mode
 
424
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
425
        self.assertEqual(True, transport.listable())
 
426
        self.assertEqual(True, transport.is_readonly())
 
427
 
 
428
    def test_http_parameters(self):
 
429
        from bzrlib.tests.HttpServer import HttpServer
 
430
        import bzrlib.transport.readonly as readonly
 
431
        # connect to . via http which is not listable
 
432
        server = HttpServer()
 
433
        server.setUp()
 
434
        try:
 
435
            transport = get_transport('readonly+' + server.get_url())
 
436
            self.failUnless(isinstance(transport,
 
437
                                       readonly.ReadonlyTransportDecorator))
 
438
            self.assertEqual(False, transport.listable())
 
439
            self.assertEqual(True, transport.is_readonly())
 
440
        finally:
 
441
            server.tearDown()
 
442
 
 
443
 
 
444
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
445
    """NFS decorator specific tests."""
 
446
 
 
447
    def get_nfs_transport(self, url):
 
448
        import bzrlib.transport.fakenfs as fakenfs
 
449
        # connect to url with nfs decoration
 
450
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
451
 
 
452
    def test_local_parameters(self):
 
453
        # the listable and is_readonly parameters
 
454
        # are not changed by the fakenfs decorator
 
455
        transport = self.get_nfs_transport('.')
 
456
        self.assertEqual(True, transport.listable())
 
457
        self.assertEqual(False, transport.is_readonly())
 
458
 
 
459
    def test_http_parameters(self):
 
460
        # the listable and is_readonly parameters
 
461
        # are not changed by the fakenfs decorator
 
462
        from bzrlib.tests.HttpServer import HttpServer
 
463
        # connect to . via http which is not listable
 
464
        server = HttpServer()
 
465
        server.setUp()
 
466
        try:
 
467
            transport = self.get_nfs_transport(server.get_url())
 
468
            self.assertIsInstance(
 
469
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
470
            self.assertEqual(False, transport.listable())
 
471
            self.assertEqual(True, transport.is_readonly())
 
472
        finally:
 
473
            server.tearDown()
 
474
 
 
475
    def test_fakenfs_server_default(self):
 
476
        # a FakeNFSServer() should bring up a local relpath server for itself
 
477
        import bzrlib.transport.fakenfs as fakenfs
 
478
        server = fakenfs.FakeNFSServer()
 
479
        server.setUp()
 
480
        try:
 
481
            # the url should be decorated appropriately
 
482
            self.assertStartsWith(server.get_url(), 'fakenfs+')
 
483
            # and we should be able to get a transport for it
 
484
            transport = get_transport(server.get_url())
 
485
            # which must be a FakeNFSTransportDecorator instance.
 
486
            self.assertIsInstance(
 
487
                transport, fakenfs.FakeNFSTransportDecorator)
 
488
        finally:
 
489
            server.tearDown()
 
490
 
 
491
    def test_fakenfs_rename_semantics(self):
 
492
        # a FakeNFS transport must mangle the way rename errors occur to
 
493
        # look like NFS problems.
 
494
        transport = self.get_nfs_transport('.')
 
495
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
496
                        transport=transport)
 
497
        self.assertRaises(errors.ResourceBusy,
 
498
                          transport.rename, 'from', 'to')
 
499
 
 
500
 
 
501
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
502
    """Tests for simulation of VFAT restrictions"""
 
503
 
 
504
    def get_vfat_transport(self, url):
 
505
        """Return vfat-backed transport for test directory"""
 
506
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
507
        return FakeVFATTransportDecorator('vfat+' + url)
 
508
 
 
509
    def test_transport_creation(self):
 
510
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
511
        transport = self.get_vfat_transport('.')
 
512
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
513
 
 
514
    def test_transport_mkdir(self):
 
515
        transport = self.get_vfat_transport('.')
 
516
        transport.mkdir('HELLO')
 
517
        self.assertTrue(transport.has('hello'))
 
518
        self.assertTrue(transport.has('Hello'))
 
519
 
 
520
    def test_forbidden_chars(self):
 
521
        transport = self.get_vfat_transport('.')
 
522
        self.assertRaises(ValueError, transport.has, "<NU>")
 
523
 
 
524
 
 
525
class BadTransportHandler(Transport):
 
526
    def __init__(self, base_url):
 
527
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
528
 
 
529
 
 
530
class BackupTransportHandler(Transport):
 
531
    """Test transport that works as a backup for the BadTransportHandler"""
 
532
    pass
 
533
 
 
534
 
 
535
class TestTransportImplementation(TestCaseInTempDir):
 
536
    """Implementation verification for transports.
 
537
    
 
538
    To verify a transport we need a server factory, which is a callable
 
539
    that accepts no parameters and returns an implementation of
 
540
    bzrlib.transport.Server.
 
541
    
 
542
    That Server is then used to construct transport instances and test
 
543
    the transport via loopback activity.
 
544
 
 
545
    Currently this assumes that the Transport object is connected to the 
 
546
    current working directory.  So that whatever is done 
 
547
    through the transport, should show up in the working 
 
548
    directory, and vice-versa. This is a bug, because its possible to have
 
549
    URL schemes which provide access to something that may not be 
 
550
    result in storage on the local disk, i.e. due to file system limits, or 
 
551
    due to it being a database or some other non-filesystem tool.
 
552
 
 
553
    This also tests to make sure that the functions work with both
 
554
    generators and lists (assuming iter(list) is effectively a generator)
 
555
    """
 
556
    
 
557
    def setUp(self):
 
558
        super(TestTransportImplementation, self).setUp()
 
559
        self._server = self.transport_server()
 
560
        self._server.setUp()
 
561
        self.addCleanup(self._server.tearDown)
 
562
 
 
563
    def get_transport(self, relpath=None):
 
564
        """Return a connected transport to the local directory.
 
565
 
 
566
        :param relpath: a path relative to the base url.
 
567
        """
 
568
        base_url = self._server.get_url()
 
569
        url = self._adjust_url(base_url, relpath)
 
570
        # try getting the transport via the regular interface:
 
571
        t = get_transport(url)
 
572
        # vila--20070607 if the following are commented out the test suite
 
573
        # still pass. Is this really still needed or was it a forgotten
 
574
        # temporary fix ?
 
575
        if not isinstance(t, self.transport_class):
 
576
            # we did not get the correct transport class type. Override the
 
577
            # regular connection behaviour by direct construction.
 
578
            t = self.transport_class(url)
 
579
        return t
 
580
 
 
581
 
 
582
class TestLocalTransports(TestCase):
 
583
 
 
584
    def test_get_transport_from_abspath(self):
 
585
        here = osutils.abspath('.')
 
586
        t = get_transport(here)
 
587
        self.assertIsInstance(t, LocalTransport)
 
588
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
589
 
 
590
    def test_get_transport_from_relpath(self):
 
591
        here = osutils.abspath('.')
 
592
        t = get_transport('.')
 
593
        self.assertIsInstance(t, LocalTransport)
 
594
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
595
 
 
596
    def test_get_transport_from_local_url(self):
 
597
        here = osutils.abspath('.')
 
598
        here_url = urlutils.local_path_to_url(here) + '/'
 
599
        t = get_transport(here_url)
 
600
        self.assertIsInstance(t, LocalTransport)
 
601
        self.assertEquals(t.base, here_url)
 
602
 
 
603
    def test_local_abspath(self):
 
604
        here = osutils.abspath('.')
 
605
        t = get_transport(here)
 
606
        self.assertEquals(t.local_abspath(''), here)
 
607
 
 
608
 
 
609
class TestWin32LocalTransport(TestCase):
 
610
 
 
611
    def test_unc_clone_to_root(self):
 
612
        # Win32 UNC path like \\HOST\path
 
613
        # clone to root should stop at least at \\HOST part
 
614
        # not on \\
 
615
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
 
616
        for i in xrange(4):
 
617
            t = t.clone('..')
 
618
        self.assertEquals(t.base, 'file://HOST/')
 
619
        # make sure we reach the root
 
620
        t = t.clone('..')
 
621
        self.assertEquals(t.base, 'file://HOST/')
 
622
 
 
623
 
 
624
class TestConnectedTransport(TestCase):
 
625
    """Tests for connected to remote server transports"""
 
626
 
 
627
    def test_parse_url(self):
 
628
        t = ConnectedTransport('sftp://simple.example.com/home/source')
 
629
        self.assertEquals(t._host, 'simple.example.com')
 
630
        self.assertEquals(t._port, 22)
 
631
        self.assertEquals(t._path, '/home/source/')
 
632
        self.failUnless(t._user is None)
 
633
        self.failUnless(t._password is None)
 
634
 
 
635
        self.assertEquals(t.base, 'sftp://simple.example.com/home/source/')
 
636
 
 
637
    def test_parse_quoted_url(self):
 
638
        t = ConnectedTransport('http://ro%62ey:h%40t@ex%41mple.com:2222/path')
 
639
        self.assertEquals(t._host, 'exAmple.com')
 
640
        self.assertEquals(t._port, 2222)
 
641
        self.assertEquals(t._user, 'robey')
 
642
        self.assertEquals(t._password, 'h@t')
 
643
        self.assertEquals(t._path, '/path/')
 
644
 
 
645
        # Base should not keep track of the password
 
646
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
647
 
 
648
    def test_parse_invalid_url(self):
 
649
        self.assertRaises(errors.InvalidURL,
 
650
                          ConnectedTransport,
 
651
                          'sftp://lily.org:~janneke/public/bzr/gub')
 
652
 
 
653
    def test_relpath(self):
 
654
        t = ConnectedTransport('sftp://user@host.com/abs/path')
 
655
 
 
656
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
657
        self.assertRaises(errors.PathNotChild, t.relpath,
 
658
                          'http://user@host.com/abs/path/sub')
 
659
        self.assertRaises(errors.PathNotChild, t.relpath,
 
660
                          'sftp://user2@host.com/abs/path/sub')
 
661
        self.assertRaises(errors.PathNotChild, t.relpath,
 
662
                          'sftp://user@otherhost.com/abs/path/sub')
 
663
        self.assertRaises(errors.PathNotChild, t.relpath,
 
664
                          'sftp://user@host.com:33/abs/path/sub')
 
665
        # Make sure it works when we don't supply a username
 
666
        t = ConnectedTransport('sftp://host.com/abs/path')
 
667
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
668
 
 
669
        # Make sure it works when parts of the path will be url encoded
 
670
        t = ConnectedTransport('sftp://host.com/dev/%path')
 
671
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
672
 
 
673
    def test_connection_sharing_propagate_credentials(self):
 
674
        t = ConnectedTransport('foo://user@host.com/abs/path')
 
675
        self.assertIs(None, t._get_connection())
 
676
        self.assertIs(None, t._password)
 
677
        c = t.clone('subdir')
 
678
        self.assertEquals(None, c._get_connection())
 
679
        self.assertIs(None, t._password)
 
680
 
 
681
        # Simulate the user entering a password
 
682
        password = 'secret'
 
683
        connection = object()
 
684
        t._set_connection(connection, password)
 
685
        self.assertIs(connection, t._get_connection())
 
686
        self.assertIs(password, t._get_credentials())
 
687
        self.assertIs(connection, c._get_connection())
 
688
        self.assertIs(password, c._get_credentials())
 
689
 
 
690
        # credentials can be updated
 
691
        new_password = 'even more secret'
 
692
        c._update_credentials(new_password)
 
693
        self.assertIs(connection, t._get_connection())
 
694
        self.assertIs(new_password, t._get_credentials())
 
695
        self.assertIs(connection, c._get_connection())
 
696
        self.assertIs(new_password, c._get_credentials())
 
697
 
 
698
 
 
699
class TestReusedTransports(TestCase):
 
700
    """Tests for transport reuse"""
 
701
 
 
702
    def test_reuse_same_transport(self):
 
703
        possible_transports = []
 
704
        t1 = get_transport('http://foo/',
 
705
                           possible_transports=possible_transports)
 
706
        self.assertEqual([t1], possible_transports)
 
707
        t2 = get_transport('http://foo/', possible_transports=[t1])
 
708
        self.assertIs(t1, t2)
 
709
 
 
710
        # Also check that final '/' are handled correctly
 
711
        t3 = get_transport('http://foo/path/')
 
712
        t4 = get_transport('http://foo/path', possible_transports=[t3])
 
713
        self.assertIs(t3, t4)
 
714
 
 
715
        t5 = get_transport('http://foo/path')
 
716
        t6 = get_transport('http://foo/path/', possible_transports=[t5])
 
717
        self.assertIs(t5, t6)
 
718
 
 
719
    def test_don_t_reuse_different_transport(self):
 
720
        t1 = get_transport('http://foo/path')
 
721
        t2 = get_transport('http://bar/path', possible_transports=[t1])
 
722
        self.assertIsNot(t1, t2)
 
723
 
 
724
 
 
725
class TestRemoteTCPTransport(TestCase):
 
726
    """Tests for bzr:// transport (RemoteTCPTransport)."""
 
727
 
 
728
    def test_relpath_with_implicit_port(self):
 
729
        """Connected transports with the same URL are the same, even if the
 
730
        port is implicit.
 
731
 
 
732
        So t.relpath(url) should always be '' if t.base is the same as url, or
 
733
        if the only difference is that one explicitly specifies the default
 
734
        port and the other doesn't specify a port.
 
735
        """
 
736
        t_implicit_port = RemoteTCPTransport('bzr://host.com/')
 
737
        self.assertEquals('', t_implicit_port.relpath('bzr://host.com/'))
 
738
        self.assertEquals('', t_implicit_port.relpath('bzr://host.com:4155/'))
 
739
        t_explicit_port = RemoteTCPTransport('bzr://host.com:4155/')
 
740
        self.assertEquals('', t_explicit_port.relpath('bzr://host.com/'))
 
741
        self.assertEquals('', t_explicit_port.relpath('bzr://host.com:4155/'))
 
742
 
 
743
    def test_construct_uses_default_port(self):
 
744
        """If no port is specified, then RemoteTCPTransport uses
 
745
        BZR_DEFAULT_PORT.
 
746
        """
 
747
        t = get_transport('bzr://host.com/')
 
748
        self.assertEquals(BZR_DEFAULT_PORT, t._port)
 
749
 
 
750
    def test_url_omits_default_port(self):
 
751
        """If a RemoteTCPTransport uses the default port, then its base URL
 
752
        will omit the port.
 
753
 
 
754
        This is like how ":80" is omitted from "http://example.com/".
 
755
        """
 
756
        t = get_transport('bzr://host.com:4155/')
 
757
        self.assertEquals('bzr://host.com/', t.base)
 
758
 
 
759
    def test_url_includes_non_default_port(self):
 
760
        """Non-default ports are included in the transport's URL.
 
761
 
 
762
        Contrast this to `test_url_omits_default_port`.
 
763
        """
 
764
        t = get_transport('bzr://host.com:666/')
 
765
        self.assertEquals('bzr://host.com:666/', t.base)
 
766
 
 
767
 
 
768
class TestTransportTrace(TestCase):
 
769
 
 
770
    def test_get(self):
 
771
        transport = get_transport('trace+memory://')
 
772
        self.assertIsInstance(
 
773
            transport, bzrlib.transport.trace.TransportTraceDecorator)
 
774
 
 
775
    def test_clone_preserves_activity(self):
 
776
        transport = get_transport('trace+memory://')
 
777
        transport2 = transport.clone('.')
 
778
        self.assertTrue(transport is not transport2)
 
779
        self.assertTrue(transport._activity is transport2._activity)
 
780
 
 
781
    # the following specific tests are for the operations that have made use of
 
782
    # logging in tests; we could test every single operation but doing that
 
783
    # still won't cause a test failure when the top level Transport API
 
784
    # changes; so there is little return doing that.
 
785
    def test_get(self):
 
786
        transport = get_transport('trace+memory:///')
 
787
        transport.put_bytes('foo', 'barish')
 
788
        transport.get('foo')
 
789
        expected_result = []
 
790
        # put_bytes records the bytes, not the content to avoid memory
 
791
        # pressure.
 
792
        expected_result.append(('put_bytes', 'foo', 6, None))
 
793
        # get records the file name only.
 
794
        expected_result.append(('get', 'foo'))
 
795
        self.assertEqual(expected_result, transport._activity)
 
796
 
 
797
    def test_readv(self):
 
798
        transport = get_transport('trace+memory:///')
 
799
        transport.put_bytes('foo', 'barish')
 
800
        list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
 
801
            upper_limit=6))
 
802
        expected_result = []
 
803
        # put_bytes records the bytes, not the content to avoid memory
 
804
        # pressure.
 
805
        expected_result.append(('put_bytes', 'foo', 6, None))
 
806
        # readv records the supplied offset request
 
807
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
 
808
        self.assertEqual(expected_result, transport._activity)