/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-10 21:59:15 UTC
  • mto: This revision was merged to the branch mainline in revision 6690.
  • Revision ID: jelmer@jelmer.uk-20170610215915-zcpu0in3r1irx3ml
Move serializer to bzr.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
from cStringIO import StringIO
 
18
import errno
19
19
import os
20
20
import subprocess
21
21
import sys
22
22
import threading
23
23
 
24
 
from bzrlib import (
 
24
from .. import (
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.transport import (
 
31
from ..directory_service import directories
 
32
from ..sixish import (
 
33
    BytesIO,
 
34
    )
 
35
from ..transport import (
32
36
    chroot,
33
37
    fakenfs,
 
38
    http,
34
39
    local,
 
40
    location_to_url,
35
41
    memory,
36
42
    pathfilter,
37
43
    readonly,
38
44
    )
39
 
from bzrlib.tests import (
 
45
import breezy.transport.trace
 
46
from . import (
40
47
    features,
41
48
    test_server,
42
49
    )
48
55
class TestTransport(tests.TestCase):
49
56
    """Test the non transport-concrete class functionality."""
50
57
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
58
    def test__get_set_protocol_handlers(self):
55
59
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
60
        self.assertNotEqual([], handlers.keys())
 
61
        transport._clear_protocol_handlers()
 
62
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
63
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
64
 
63
65
    def test_get_transport_modules(self):
64
66
        handlers = transport._get_protocol_handlers()
 
67
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
68
        # don't pollute the current handlers
66
69
        transport._clear_protocol_handlers()
 
70
 
67
71
        class SampleHandler(object):
68
72
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
73
        transport._clear_protocol_handlers()
 
74
        transport.register_transport_proto('foo')
 
75
        transport.register_lazy_transport('foo',
 
76
                                            'breezy.tests.test_transport',
 
77
                                            'TestTransport.SampleHandler')
 
78
        transport.register_transport_proto('bar')
 
79
        transport.register_lazy_transport('bar',
 
80
                                            'breezy.tests.test_transport',
 
81
                                            'TestTransport.SampleHandler')
 
82
        self.assertEqual([SampleHandler.__module__,
 
83
                            'breezy.transport.chroot',
 
84
                            'breezy.transport.pathfilter'],
 
85
                            transport._get_transport_modules())
85
86
 
86
87
    def test_transport_dependency(self):
87
88
        """Transport with missing dependency causes no error"""
88
89
        saved_handlers = transport._get_protocol_handlers()
 
90
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
91
        # don't pollute the current handlers
90
92
        transport._clear_protocol_handlers()
 
93
        transport.register_transport_proto('foo')
 
94
        transport.register_lazy_transport(
 
95
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
91
96
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
97
            transport.get_transport_from_url('foo://fooserver/foo')
 
98
        except errors.UnsupportedProtocol as e:
 
99
            e_str = str(e)
 
100
            self.assertEqual('Unsupported protocol'
 
101
                                ' for url "foo://fooserver/foo":'
 
102
                                ' Unable to import library "some_lib":'
 
103
                                ' testing missing dependency', str(e))
 
104
        else:
 
105
            self.fail('Did not raise UnsupportedProtocol')
108
106
 
109
107
    def test_transport_fallback(self):
110
108
        """Transport with missing dependency causes no error"""
111
109
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
110
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
111
        transport._clear_protocol_handlers()
 
112
        transport.register_transport_proto('foo')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
 
115
        transport.register_lazy_transport(
 
116
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
117
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
118
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
119
 
124
120
    def test_ssh_hints(self):
125
121
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
122
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
 
123
            transport.get_transport_from_url('ssh://fooserver/foo')
 
124
        except errors.UnsupportedProtocol as e:
129
125
            e_str = str(e)
130
 
            self.assertEquals('Unsupported protocol'
 
126
            self.assertEqual('Unsupported protocol'
131
127
                              ' for url "ssh://fooserver/foo":'
132
128
                              ' bzr supports bzr+ssh to operate over ssh,'
133
129
                              ' use "bzr+ssh://fooserver/foo".',
140
136
        a_file = transport.LateReadError('a path')
141
137
        try:
142
138
            a_file.read()
143
 
        except errors.ReadError, error:
 
139
        except errors.ReadError as error:
144
140
            self.assertEqual('a path', error.path)
145
141
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
142
        a_file.close()
147
143
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
144
    def test_local_abspath_non_local_transport(self):
160
145
        # the base implementation should throw
161
146
        t = memory.MemoryTransport()
218
203
 
219
204
    def test_coalesce_fudge(self):
220
205
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
206
                    (100, 10, [(0, 10)]),
222
207
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
208
                   fudge=10)
 
209
 
225
210
    def test_coalesce_max_size(self):
226
211
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
212
                    (30, 50, [(0, 50)]),
228
213
                    # If one range is above max_size, it gets its own coalesced
229
214
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
215
                    (100, 80, [(0, 80)]),],
231
216
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
217
                   max_size=50)
234
218
 
235
219
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
220
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
221
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
222
                  )
239
223
 
240
224
    def test_coalesce_default_limit(self):
241
225
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
226
        ten_mb = 10 * 1024 * 1024
 
227
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
228
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
229
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
230
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
231
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
232
                   max_size=1*1024*1024*1024)
249
233
 
250
234
 
255
239
        server.start_server()
256
240
        url = server.get_url()
257
241
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
242
        t = transport.get_transport_from_url(url)
259
243
        del t
260
244
        server.stop_server()
261
245
        self.assertFalse(url in transport.transport_list_registry)
290
274
        t = memory.MemoryTransport()
291
275
        t.append_bytes('path', 'content')
292
276
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
 
277
        t.append_file('path', BytesIO(b'content'))
294
278
        self.assertEqual(t.get('path').read(), 'contentcontent')
295
279
 
296
280
    def test_put_and_get(self):
297
281
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
 
282
        t.put_file('path', BytesIO(b'content'))
299
283
        self.assertEqual(t.get('path').read(), 'content')
300
284
        t.put_bytes('path', 'content')
301
285
        self.assertEqual(t.get('path').read(), 'content')
308
292
    def test_put_without_dir_fails(self):
309
293
        t = memory.MemoryTransport()
310
294
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
295
                          t.put_file, 'dir/path', BytesIO(b'content'))
312
296
 
313
297
    def test_get_missing(self):
314
298
        transport = memory.MemoryTransport()
316
300
 
317
301
    def test_has_missing(self):
318
302
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
303
        self.assertEqual(False, t.has('foo'))
320
304
 
321
305
    def test_has_present(self):
322
306
        t = memory.MemoryTransport()
323
307
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
308
        self.assertEqual(True, t.has('foo'))
325
309
 
326
310
    def test_list_dir(self):
327
311
        t = memory.MemoryTransport()
330
314
        t.put_bytes('dir/subfoo', 'content')
331
315
        t.put_bytes('dirlike', 'content')
332
316
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
317
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
318
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
335
319
 
336
320
    def test_mkdir(self):
337
321
        t = memory.MemoryTransport()
360
344
        t.put_bytes('dir/bar', 'content')
361
345
        t.put_bytes('bar', 'content')
362
346
        paths = set(t.iter_files_recursive())
363
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
347
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
364
348
 
365
349
    def test_stat(self):
366
350
        t = memory.MemoryTransport()
376
360
    def test_abspath(self):
377
361
        # The abspath is always relative to the chroot_url.
378
362
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
363
            transport.get_transport_from_url('memory:///foo/bar/'))
380
364
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
365
        t = transport.get_transport_from_url(server.get_url())
382
366
        self.assertEqual(server.get_url(), t.abspath('/'))
383
367
 
384
368
        subdir_t = t.clone('subdir')
386
370
 
387
371
    def test_clone(self):
388
372
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
373
            transport.get_transport_from_url('memory:///foo/bar/'))
390
374
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
375
        t = transport.get_transport_from_url(server.get_url())
392
376
        # relpath from root and root path are the same
393
377
        relpath_cloned = t.clone('foo')
394
378
        abspath_cloned = t.clone('/foo')
403
387
        This is so that it is not possible to escape a chroot by doing::
404
388
            url = chroot_transport.base
405
389
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
390
            new_t = transport.get_transport_from_url(parent_url)
407
391
        """
408
392
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
393
            transport.get_transport_from_url('memory:///path/subpath'))
410
394
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
395
        t = transport.get_transport_from_url(server.get_url())
 
396
        new_t = transport.get_transport_from_url(t.base)
413
397
        self.assertEqual(t.server, new_t.server)
414
398
        self.assertEqual(t.base, new_t.base)
415
399
 
420
404
        This is so that it is not possible to escape a chroot by doing::
421
405
            url = chroot_transport.base
422
406
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
407
            new_t = transport.get_transport_from_url(parent_url)
424
408
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
409
        server = chroot.ChrootServer(
 
410
            transport.get_transport_from_url('memory:///path/'))
426
411
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
412
        t = transport.get_transport_from_url(server.get_url())
428
413
        self.assertRaises(
429
414
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
430
415
 
440
425
        backing_transport = memory.MemoryTransport()
441
426
        server = chroot.ChrootServer(backing_transport)
442
427
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
428
        self.addCleanup(server.stop_server)
 
429
        self.assertTrue(server.scheme
 
430
                        in transport._get_protocol_handlers().keys())
448
431
 
449
432
    def test_stop_server(self):
450
433
        backing_transport = memory.MemoryTransport()
458
441
        backing_transport = memory.MemoryTransport()
459
442
        server = chroot.ChrootServer(backing_transport)
460
443
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
444
        self.addCleanup(server.stop_server)
 
445
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
446
 
 
447
 
 
448
class TestHooks(tests.TestCase):
 
449
    """Basic tests for transport hooks"""
 
450
 
 
451
    def _get_connected_transport(self):
 
452
        return transport.ConnectedTransport("bogus:nowhere")
 
453
 
 
454
    def test_transporthooks_initialisation(self):
 
455
        """Check all expected transport hook points are set up"""
 
456
        hookpoint = transport.TransportHooks()
 
457
        self.assertTrue("post_connect" in hookpoint,
 
458
            "post_connect not in %s" % (hookpoint,))
 
459
 
 
460
    def test_post_connect(self):
 
461
        """Ensure the post_connect hook is called when _set_transport is"""
 
462
        calls = []
 
463
        transport.Transport.hooks.install_named_hook("post_connect",
 
464
            calls.append, None)
 
465
        t = self._get_connected_transport()
 
466
        self.assertLength(0, calls)
 
467
        t._set_connection("connection", "auth")
 
468
        self.assertEqual(calls, [t])
465
469
 
466
470
 
467
471
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
474
    def test_abspath(self):
471
475
        # The abspath is always relative to the base of the backing transport.
472
476
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
477
            transport.get_transport_from_url('memory:///foo/bar/'),
474
478
            lambda x: x)
475
479
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
480
        t = transport.get_transport_from_url(server.get_url())
477
481
        self.assertEqual(server.get_url(), t.abspath('/'))
478
482
 
479
483
        subdir_t = t.clone('subdir')
482
486
 
483
487
    def make_pf_transport(self, filter_func=None):
484
488
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
489
 
486
490
        :param filter_func: by default this will be a no-op function.  Use this
487
491
            parameter to override it."""
488
492
        if filter_func is None:
489
493
            filter_func = lambda x: x
490
494
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
495
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
496
        server.start_server()
493
497
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
498
        return transport.get_transport_from_url(server.get_url())
495
499
 
496
500
    def test__filter(self):
497
501
        # _filter (with an identity func as filter_func) always returns
510
514
 
511
515
    def test_filter_invocation(self):
512
516
        filter_log = []
 
517
 
513
518
        def filter(path):
514
519
            filter_log.append(path)
515
520
            return path
540
545
        otherwise) the filtering by doing::
541
546
            url = filtered_transport.base
542
547
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
548
            new_t = transport.get_transport_from_url(parent_url)
544
549
        """
545
550
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
551
        new_t = transport.get_transport_from_url(t.base)
547
552
        self.assertEqual(t.server, new_t.server)
548
553
        self.assertEqual(t.base, new_t.base)
549
554
 
558
563
        self.assertEqual(True, t.is_readonly())
559
564
 
560
565
    def test_http_parameters(self):
561
 
        from bzrlib.tests.http_server import HttpServer
 
566
        from breezy.tests.http_server import HttpServer
562
567
        # connect to '.' via http which is not listable
563
568
        server = HttpServer()
564
569
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
570
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
571
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
572
        self.assertEqual(False, t.listable())
568
573
        self.assertEqual(True, t.is_readonly())
569
574
 
585
590
    def test_http_parameters(self):
586
591
        # the listable and is_readonly parameters
587
592
        # are not changed by the fakenfs decorator
588
 
        from bzrlib.tests.http_server import HttpServer
 
593
        from breezy.tests.http_server import HttpServer
589
594
        # connect to '.' via http which is not listable
590
595
        server = HttpServer()
591
596
        self.start_server(server)
601
606
        # the url should be decorated appropriately
602
607
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
608
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
609
        t = transport.get_transport_from_url(server.get_url())
605
610
        # which must be a FakeNFSTransportDecorator instance.
606
611
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
612
 
619
624
 
620
625
    def get_vfat_transport(self, url):
621
626
        """Return vfat-backed transport for test directory"""
622
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
627
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
623
628
        return FakeVFATTransportDecorator('vfat+' + url)
624
629
 
625
630
    def test_transport_creation(self):
626
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
631
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
627
632
        t = self.get_vfat_transport('.')
628
633
        self.assertIsInstance(t, FakeVFATTransportDecorator)
629
634
 
654
659
 
655
660
    To verify a transport we need a server factory, which is a callable
656
661
    that accepts no parameters and returns an implementation of
657
 
    bzrlib.transport.Server.
 
662
    breezy.transport.Server.
658
663
 
659
664
    That Server is then used to construct transport instances and test
660
665
    the transport via loopback activity.
684
689
        base_url = self._server.get_url()
685
690
        url = self._adjust_url(base_url, relpath)
686
691
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
692
        t = transport.get_transport_from_url(url)
688
693
        # vila--20070607 if the following are commented out the test suite
689
694
        # still pass. Is this really still needed or was it a forgotten
690
695
        # temporary fix ?
695
700
        return t
696
701
 
697
702
 
 
703
class TestTransportFromPath(tests.TestCaseInTempDir):
 
704
 
 
705
    def test_with_path(self):
 
706
        t = transport.get_transport_from_path(self.test_dir)
 
707
        self.assertIsInstance(t, local.LocalTransport)
 
708
        self.assertEqual(t.base.rstrip("/"),
 
709
            urlutils.local_path_to_url(self.test_dir))
 
710
 
 
711
    def test_with_url(self):
 
712
        t = transport.get_transport_from_path("file:")
 
713
        self.assertIsInstance(t, local.LocalTransport)
 
714
        self.assertEqual(t.base.rstrip("/"),
 
715
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
716
 
 
717
 
 
718
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
719
 
 
720
    def test_with_path(self):
 
721
        self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
 
722
            self.test_dir)
 
723
 
 
724
    def test_with_url(self):
 
725
        url = urlutils.local_path_to_url(self.test_dir)
 
726
        t = transport.get_transport_from_url(url)
 
727
        self.assertIsInstance(t, local.LocalTransport)
 
728
        self.assertEqual(t.base.rstrip("/"), url)
 
729
 
 
730
    def test_with_url_and_segment_parameters(self):
 
731
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
732
        t = transport.get_transport_from_url(url)
 
733
        self.assertIsInstance(t, local.LocalTransport)
 
734
        self.assertEqual(t.base.rstrip("/"), url)
 
735
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
736
            f.write("data")
 
737
        self.assertTrue(t.has("afile"))
 
738
 
 
739
 
698
740
class TestLocalTransports(tests.TestCase):
699
741
 
700
742
    def test_get_transport_from_abspath(self):
701
743
        here = osutils.abspath('.')
702
744
        t = transport.get_transport(here)
703
745
        self.assertIsInstance(t, local.LocalTransport)
704
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
746
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
705
747
 
706
748
    def test_get_transport_from_relpath(self):
707
749
        here = osutils.abspath('.')
708
750
        t = transport.get_transport('.')
709
751
        self.assertIsInstance(t, local.LocalTransport)
710
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
752
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
711
753
 
712
754
    def test_get_transport_from_local_url(self):
713
755
        here = osutils.abspath('.')
714
756
        here_url = urlutils.local_path_to_url(here) + '/'
715
757
        t = transport.get_transport(here_url)
716
758
        self.assertIsInstance(t, local.LocalTransport)
717
 
        self.assertEquals(t.base, here_url)
 
759
        self.assertEqual(t.base, here_url)
718
760
 
719
761
    def test_local_abspath(self):
720
762
        here = osutils.abspath('.')
721
763
        t = transport.get_transport(here)
722
 
        self.assertEquals(t.local_abspath(''), here)
 
764
        self.assertEqual(t.local_abspath(''), here)
 
765
 
 
766
 
 
767
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
768
 
 
769
    def test_local_transport_mkdir(self):
 
770
        here = osutils.abspath('.')
 
771
        t = transport.get_transport(here)
 
772
        t.mkdir('test')
 
773
        self.assertTrue(os.path.exists('test'))
 
774
 
 
775
    def test_local_transport_mkdir_permission_denied(self):
 
776
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
777
        here = osutils.abspath('.')
 
778
        t = transport.get_transport(here)
 
779
        def fake_chmod(path, mode):
 
780
            e = OSError('permission denied')
 
781
            e.errno = errno.EPERM
 
782
            raise e
 
783
        self.overrideAttr(os, 'chmod', fake_chmod)
 
784
        t.mkdir('test')
 
785
        t.mkdir('test2', mode=0o707)
 
786
        self.assertTrue(os.path.exists('test'))
 
787
        self.assertTrue(os.path.exists('test2'))
 
788
 
 
789
 
 
790
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
791
 
 
792
    def test_local_fdatasync_calls_fdatasync(self):
 
793
        """Check fdatasync on a stream tries to flush the data to the OS.
 
794
        
 
795
        We can't easily observe the external effect but we can at least see
 
796
        it's called.
 
797
        """
 
798
        sentinel = object()
 
799
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
800
        if fdatasync is sentinel:
 
801
            raise tests.TestNotApplicable('fdatasync not supported')
 
802
        t = self.get_transport('.')
 
803
        calls = self.recordCalls(os, 'fdatasync')
 
804
        w = t.open_write_stream('out')
 
805
        w.write('foo')
 
806
        w.fdatasync()
 
807
        with open('out', 'rb') as f:
 
808
            # Should have been flushed.
 
809
            self.assertEqual(f.read(), 'foo')
 
810
        self.assertEqual(len(calls), 1, calls)
 
811
 
 
812
    def test_missing_directory(self):
 
813
        t = self.get_transport('.')
 
814
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
723
815
 
724
816
 
725
817
class TestWin32LocalTransport(tests.TestCase):
726
818
 
727
819
    def test_unc_clone_to_root(self):
 
820
        self.requireFeature(features.win32_feature)
728
821
        # Win32 UNC path like \\HOST\path
729
822
        # clone to root should stop at least at \\HOST part
730
823
        # not on \\
731
824
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
 
        for i in xrange(4):
 
825
        for i in range(4):
733
826
            t = t.clone('..')
734
 
        self.assertEquals(t.base, 'file://HOST/')
 
827
        self.assertEqual(t.base, 'file://HOST/')
735
828
        # make sure we reach the root
736
829
        t = t.clone('..')
737
 
        self.assertEquals(t.base, 'file://HOST/')
 
830
        self.assertEqual(t.base, 'file://HOST/')
738
831
 
739
832
 
740
833
class TestConnectedTransport(tests.TestCase):
743
836
    def test_parse_url(self):
744
837
        t = transport.ConnectedTransport(
745
838
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
839
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
 
840
        self.assertEqual(t._parsed_url.port, None)
 
841
        self.assertEqual(t._parsed_url.path, '/home/source/')
 
842
        self.assertTrue(t._parsed_url.user is None)
 
843
        self.assertTrue(t._parsed_url.password is None)
751
844
 
752
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
845
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
753
846
 
754
847
    def test_parse_url_with_at_in_user(self):
755
848
        # Bug 228058
756
849
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
850
        self.assertEqual(t._parsed_url.user, 'user@host.com')
758
851
 
759
852
    def test_parse_quoted_url(self):
760
853
        t = transport.ConnectedTransport(
761
854
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
855
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
 
856
        self.assertEqual(t._parsed_url.port, 2222)
 
857
        self.assertEqual(t._parsed_url.user, 'robey')
 
858
        self.assertEqual(t._parsed_url.password, 'h@t')
 
859
        self.assertEqual(t._parsed_url.path, '/path/')
767
860
 
768
861
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
862
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
863
 
771
864
    def test_parse_invalid_url(self):
772
865
        self.assertRaises(errors.InvalidURL,
776
869
    def test_relpath(self):
777
870
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
871
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
872
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
 
873
            'sub')
780
874
        self.assertRaises(errors.PathNotChild, t.relpath,
781
875
                          'http://user@host.com/abs/path/sub')
782
876
        self.assertRaises(errors.PathNotChild, t.relpath,
787
881
                          'sftp://user@host.com:33/abs/path/sub')
788
882
        # Make sure it works when we don't supply a username
789
883
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
790
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
884
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
885
 
792
886
        # Make sure it works when parts of the path will be url encoded
793
887
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
794
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
888
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
889
 
796
890
    def test_connection_sharing_propagate_credentials(self):
797
891
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
892
        self.assertEqual('user', t._parsed_url.user)
 
893
        self.assertEqual('host.com', t._parsed_url.host)
800
894
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
895
        self.assertIs(None, t._parsed_url.password)
802
896
        c = t.clone('subdir')
803
897
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
898
        self.assertIs(None, t._parsed_url.password)
805
899
 
806
900
        # Simulate the user entering a password
807
901
        password = 'secret'
826
920
 
827
921
    def test_reuse_same_transport(self):
828
922
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
923
        t1 = transport.get_transport_from_url('http://foo/',
830
924
                                     possible_transports=possible_transports)
831
925
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
926
        t2 = transport.get_transport_from_url('http://foo/',
833
927
                                     possible_transports=[t1])
834
928
        self.assertIs(t1, t2)
835
929
 
836
930
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
931
        t3 = transport.get_transport_from_url('http://foo/path/')
 
932
        t4 = transport.get_transport_from_url('http://foo/path',
839
933
                                     possible_transports=[t3])
840
934
        self.assertIs(t3, t4)
841
935
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
936
        t5 = transport.get_transport_from_url('http://foo/path')
 
937
        t6 = transport.get_transport_from_url('http://foo/path/',
844
938
                                     possible_transports=[t5])
845
939
        self.assertIs(t5, t6)
846
940
 
847
941
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
942
        t1 = transport.get_transport_from_url('http://foo/path')
 
943
        t2 = transport.get_transport_from_url('http://bar/path',
850
944
                                     possible_transports=[t1])
851
945
        self.assertIsNot(t1, t2)
852
946
 
853
947
 
854
948
class TestTransportTrace(tests.TestCase):
855
949
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
950
    def test_decorator(self):
 
951
        t = transport.get_transport_from_url('trace+memory://')
 
952
        self.assertIsInstance(
 
953
            t, breezy.transport.trace.TransportTraceDecorator)
859
954
 
860
955
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
956
        t = transport.get_transport_from_url('trace+memory://')
862
957
        t2 = t.clone('.')
863
958
        self.assertTrue(t is not t2)
864
959
        self.assertTrue(t._activity is t2._activity)
868
963
    # still won't cause a test failure when the top level Transport API
869
964
    # changes; so there is little return doing that.
870
965
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
 
966
        t = transport.get_transport_from_url('trace+memory:///')
872
967
        t.put_bytes('foo', 'barish')
873
968
        t.get('foo')
874
969
        expected_result = []
880
975
        self.assertEqual(expected_result, t._activity)
881
976
 
882
977
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
 
978
        t = transport.get_transport_from_url('trace+memory:///')
884
979
        t.put_bytes('foo', 'barish')
885
980
        list(t.readv('foo', [(0, 1), (3, 2)],
886
981
                     adjust_for_latency=True, upper_limit=6))
896
991
class TestSSHConnections(tests.TestCaseWithTransport):
897
992
 
898
993
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
994
        """get_transport of a bzr+ssh:// behaves correctly.
900
995
 
901
996
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
997
        """
909
1004
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
1005
        # override the interface (doesn't change self._vendor).
911
1006
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
1007
        from breezy.tests import stub_sftp
913
1008
 
914
1009
        # Start an SSH server
915
1010
        self.command_executed = []
918
1013
        # SSH channel ourselves.  Surely this has already been implemented
919
1014
        # elsewhere?
920
1015
        started = []
 
1016
 
921
1017
        class StubSSHServer(stub_sftp.StubServer):
922
1018
 
923
1019
            test = self
929
1025
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
930
1026
 
931
1027
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
1028
                # Start a thread for each of stdin/out/err, and relay bytes
 
1029
                # from the subprocess to channel and vice versa.
934
1030
                def ferry_bytes(read, write, close):
935
1031
                    while True:
936
1032
                        bytes = read(1)
955
1051
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
1052
        # We *don't* want to override the default SSH vendor: the detected one
957
1053
        # is the one to use.
 
1054
 
 
1055
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1056
        # inherits from SFTPServer which forces the SSH vendor to
 
1057
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1058
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1059
        port = ssh_server.port
960
1060
 
961
1061
        if sys.platform == 'win32':
962
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1062
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
963
1063
        else:
964
 
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1064
            bzr_remote_path = self.get_brz_path()
 
1065
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1066
 
967
1067
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1068
        # variable is used to tell bzr what command to run on the remote end.
989
1089
        # And the rest are threads
990
1090
        for t in started[1:]:
991
1091
            t.join()
 
1092
 
 
1093
 
 
1094
class TestUnhtml(tests.TestCase):
 
1095
 
 
1096
    """Tests for unhtml_roughly"""
 
1097
 
 
1098
    def test_truncation(self):
 
1099
        fake_html = "<p>something!\n" * 1000
 
1100
        result = http.unhtml_roughly(fake_html)
 
1101
        self.assertEqual(len(result), 1000)
 
1102
        self.assertStartsWith(result, " something!")
 
1103
 
 
1104
 
 
1105
class SomeDirectory(object):
 
1106
 
 
1107
    def look_up(self, name, url):
 
1108
        return "http://bar"
 
1109
 
 
1110
 
 
1111
class TestLocationToUrl(tests.TestCase):
 
1112
 
 
1113
    def get_base_location(self):
 
1114
        path = osutils.abspath('/foo/bar')
 
1115
        if path.startswith('/'):
 
1116
            url = 'file://%s' % (path,)
 
1117
        else:
 
1118
            # On Windows, abspaths start with the drive letter, so we have to
 
1119
            # add in the extra '/'
 
1120
            url = 'file:///%s' % (path,)
 
1121
        return path, url
 
1122
 
 
1123
    def test_regular_url(self):
 
1124
        self.assertEqual("file://foo", location_to_url("file://foo"))
 
1125
 
 
1126
    def test_directory(self):
 
1127
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1128
        self.addCleanup(directories.remove, "bar:")
 
1129
        self.assertEqual("http://bar", location_to_url("bar:"))
 
1130
 
 
1131
    def test_unicode_url(self):
 
1132
        self.assertRaises(errors.InvalidURL, location_to_url,
 
1133
            "http://fo/\xc3\xaf".decode("utf-8"))
 
1134
 
 
1135
    def test_unicode_path(self):
 
1136
        path, url = self.get_base_location()
 
1137
        location = path + "\xc3\xaf".decode("utf-8")
 
1138
        url += '%C3%AF'
 
1139
        self.assertEqual(url, location_to_url(location))
 
1140
 
 
1141
    def test_path(self):
 
1142
        path, url = self.get_base_location()
 
1143
        self.assertEqual(url, location_to_url(path))
 
1144
 
 
1145
    def test_relative_file_url(self):
 
1146
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
 
1147
            location_to_url("file:bar"))
 
1148
 
 
1149
    def test_absolute_file_url(self):
 
1150
        self.assertEqual("file:///bar", location_to_url("file:/bar"))