/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_transport.py

  • Committer: Martin
  • Date: 2018-11-16 16:38:22 UTC
  • mto: This revision was merged to the branch mainline in revision 7172.
  • Revision ID: gzlist@googlemail.com-20181116163822-yg1h1cdng6w7w9kn
Make --profile-imports work on Python 3

Also tweak heading to line up correctly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
from cStringIO import StringIO
 
18
import errno
19
19
import os
20
20
import subprocess
21
21
import sys
22
22
import threading
23
23
 
24
 
from bzrlib import (
 
24
from .. import (
25
25
    errors,
26
26
    osutils,
27
27
    tests,
28
28
    transport,
29
29
    urlutils,
30
30
    )
31
 
from bzrlib.transport import (
 
31
from ..directory_service import directories
 
32
from ..sixish import (
 
33
    BytesIO,
 
34
    )
 
35
from ..transport import (
32
36
    chroot,
33
37
    fakenfs,
 
38
    http,
34
39
    local,
 
40
    location_to_url,
35
41
    memory,
36
42
    pathfilter,
37
43
    readonly,
38
44
    )
39
 
from bzrlib.tests import (
 
45
import breezy.transport.trace
 
46
from . import (
40
47
    features,
41
48
    test_server,
42
49
    )
48
55
class TestTransport(tests.TestCase):
49
56
    """Test the non transport-concrete class functionality."""
50
57
 
51
 
    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
 
    # of try/finally -- vila 20100205
53
 
 
54
58
    def test__get_set_protocol_handlers(self):
55
59
        handlers = transport._get_protocol_handlers()
56
 
        self.assertNotEqual([], handlers.keys( ))
57
 
        try:
58
 
            transport._clear_protocol_handlers()
59
 
            self.assertEqual([], transport._get_protocol_handlers().keys())
60
 
        finally:
61
 
            transport._set_protocol_handlers(handlers)
 
60
        self.assertNotEqual([], handlers.keys())
 
61
        transport._clear_protocol_handlers()
 
62
        self.addCleanup(transport._set_protocol_handlers, handlers)
 
63
        self.assertEqual([], transport._get_protocol_handlers().keys())
62
64
 
63
65
    def test_get_transport_modules(self):
64
66
        handlers = transport._get_protocol_handlers()
 
67
        self.addCleanup(transport._set_protocol_handlers, handlers)
65
68
        # don't pollute the current handlers
66
69
        transport._clear_protocol_handlers()
 
70
 
67
71
        class SampleHandler(object):
68
72
            """I exist, isnt that enough?"""
69
 
        try:
70
 
            transport._clear_protocol_handlers()
71
 
            transport.register_transport_proto('foo')
72
 
            transport.register_lazy_transport('foo',
73
 
                                              'bzrlib.tests.test_transport',
74
 
                                              'TestTransport.SampleHandler')
75
 
            transport.register_transport_proto('bar')
76
 
            transport.register_lazy_transport('bar',
77
 
                                              'bzrlib.tests.test_transport',
78
 
                                              'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__,
80
 
                              'bzrlib.transport.chroot',
81
 
                              'bzrlib.transport.pathfilter'],
82
 
                             transport._get_transport_modules())
83
 
        finally:
84
 
            transport._set_protocol_handlers(handlers)
 
73
        transport._clear_protocol_handlers()
 
74
        transport.register_transport_proto('foo')
 
75
        transport.register_lazy_transport('foo',
 
76
                                            'breezy.tests.test_transport',
 
77
                                            'TestTransport.SampleHandler')
 
78
        transport.register_transport_proto('bar')
 
79
        transport.register_lazy_transport('bar',
 
80
                                            'breezy.tests.test_transport',
 
81
                                            'TestTransport.SampleHandler')
 
82
        self.assertEqual([SampleHandler.__module__,
 
83
                            'breezy.transport.chroot',
 
84
                            'breezy.transport.pathfilter'],
 
85
                            transport._get_transport_modules())
85
86
 
86
87
    def test_transport_dependency(self):
87
88
        """Transport with missing dependency causes no error"""
88
89
        saved_handlers = transport._get_protocol_handlers()
 
90
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
89
91
        # don't pollute the current handlers
90
92
        transport._clear_protocol_handlers()
 
93
        transport.register_transport_proto('foo')
 
94
        transport.register_lazy_transport(
 
95
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
91
96
        try:
92
 
            transport.register_transport_proto('foo')
93
 
            transport.register_lazy_transport(
94
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
95
 
            try:
96
 
                transport.get_transport('foo://fooserver/foo')
97
 
            except errors.UnsupportedProtocol, e:
98
 
                e_str = str(e)
99
 
                self.assertEquals('Unsupported protocol'
100
 
                                  ' for url "foo://fooserver/foo":'
101
 
                                  ' Unable to import library "some_lib":'
102
 
                                  ' testing missing dependency', str(e))
103
 
            else:
104
 
                self.fail('Did not raise UnsupportedProtocol')
105
 
        finally:
106
 
            # restore original values
107
 
            transport._set_protocol_handlers(saved_handlers)
 
97
            transport.get_transport_from_url('foo://fooserver/foo')
 
98
        except errors.UnsupportedProtocol as e:
 
99
            e_str = str(e)
 
100
            self.assertEqual('Unsupported protocol'
 
101
                                ' for url "foo://fooserver/foo":'
 
102
                                ' Unable to import library "some_lib":'
 
103
                                ' testing missing dependency', str(e))
 
104
        else:
 
105
            self.fail('Did not raise UnsupportedProtocol')
108
106
 
109
107
    def test_transport_fallback(self):
110
108
        """Transport with missing dependency causes no error"""
111
109
        saved_handlers = transport._get_protocol_handlers()
112
 
        try:
113
 
            transport._clear_protocol_handlers()
114
 
            transport.register_transport_proto('foo')
115
 
            transport.register_lazy_transport(
116
 
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
 
            transport.register_lazy_transport(
118
 
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
 
            t = transport.get_transport('foo://fooserver/foo')
120
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
121
 
        finally:
122
 
            transport._set_protocol_handlers(saved_handlers)
 
110
        self.addCleanup(transport._set_protocol_handlers, saved_handlers)
 
111
        transport._clear_protocol_handlers()
 
112
        transport.register_transport_proto('foo')
 
113
        transport.register_lazy_transport(
 
114
            'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
 
115
        transport.register_lazy_transport(
 
116
            'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
 
117
        t = transport.get_transport_from_url('foo://fooserver/foo')
 
118
        self.assertTrue(isinstance(t, BackupTransportHandler))
123
119
 
124
120
    def test_ssh_hints(self):
125
121
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
126
122
        try:
127
 
            transport.get_transport('ssh://fooserver/foo')
128
 
        except errors.UnsupportedProtocol, e:
 
123
            transport.get_transport_from_url('ssh://fooserver/foo')
 
124
        except errors.UnsupportedProtocol as e:
129
125
            e_str = str(e)
130
 
            self.assertEquals('Unsupported protocol'
 
126
            self.assertEqual('Unsupported protocol'
131
127
                              ' for url "ssh://fooserver/foo":'
132
128
                              ' bzr supports bzr+ssh to operate over ssh,'
133
129
                              ' use "bzr+ssh://fooserver/foo".',
140
136
        a_file = transport.LateReadError('a path')
141
137
        try:
142
138
            a_file.read()
143
 
        except errors.ReadError, error:
 
139
        except errors.ReadError as error:
144
140
            self.assertEqual('a path', error.path)
145
141
        self.assertRaises(errors.ReadError, a_file.read, 40)
146
142
        a_file.close()
147
143
 
148
 
    def test__combine_paths(self):
149
 
        t = transport.Transport('/')
150
 
        self.assertEqual('/home/sarah/project/foo',
151
 
                         t._combine_paths('/home/sarah', 'project/foo'))
152
 
        self.assertEqual('/etc',
153
 
                         t._combine_paths('/home/sarah', '../../etc'))
154
 
        self.assertEqual('/etc',
155
 
                         t._combine_paths('/home/sarah', '../../../etc'))
156
 
        self.assertEqual('/etc',
157
 
                         t._combine_paths('/home/sarah', '/etc'))
158
 
 
159
144
    def test_local_abspath_non_local_transport(self):
160
145
        # the base implementation should throw
161
146
        t = memory.MemoryTransport()
218
203
 
219
204
    def test_coalesce_fudge(self):
220
205
        self.check([(10, 30, [(0, 10), (20, 10)]),
221
 
                    (100, 10, [(0, 10),]),
 
206
                    (100, 10, [(0, 10)]),
222
207
                   ], [(10, 10), (30, 10), (100, 10)],
223
 
                   fudge=10
224
 
                  )
 
208
                   fudge=10)
 
209
 
225
210
    def test_coalesce_max_size(self):
226
211
        self.check([(10, 20, [(0, 10), (10, 10)]),
227
212
                    (30, 50, [(0, 50)]),
228
213
                    # If one range is above max_size, it gets its own coalesced
229
214
                    # offset
230
 
                    (100, 80, [(0, 80),]),],
 
215
                    (100, 80, [(0, 80)]),],
231
216
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
232
 
                   max_size=50
233
 
                  )
 
217
                   max_size=50)
234
218
 
235
219
    def test_coalesce_no_max_size(self):
236
 
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
 
220
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
237
221
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
238
222
                  )
239
223
 
240
224
    def test_coalesce_default_limit(self):
241
225
        # By default we use a 100MB max size.
242
 
        ten_mb = 10*1024*1024
243
 
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
 
226
        ten_mb = 10 * 1024 * 1024
 
227
        self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
244
228
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
245
229
                   [(i*ten_mb, ten_mb) for i in range(11)])
246
 
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
247
 
                   [(i*ten_mb, ten_mb) for i in range(11)],
 
230
        self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
 
231
                   [(i * ten_mb, ten_mb) for i in range(11)],
248
232
                   max_size=1*1024*1024*1024)
249
233
 
250
234
 
255
239
        server.start_server()
256
240
        url = server.get_url()
257
241
        self.assertTrue(url in transport.transport_list_registry)
258
 
        t = transport.get_transport(url)
 
242
        t = transport.get_transport_from_url(url)
259
243
        del t
260
244
        server.stop_server()
261
245
        self.assertFalse(url in transport.transport_list_registry)
288
272
 
289
273
    def test_append_and_get(self):
290
274
        t = memory.MemoryTransport()
291
 
        t.append_bytes('path', 'content')
292
 
        self.assertEqual(t.get('path').read(), 'content')
293
 
        t.append_file('path', StringIO('content'))
294
 
        self.assertEqual(t.get('path').read(), 'contentcontent')
 
275
        t.append_bytes('path', b'content')
 
276
        self.assertEqual(t.get('path').read(), b'content')
 
277
        t.append_file('path', BytesIO(b'content'))
 
278
        with t.get('path') as f:
 
279
            self.assertEqual(f.read(), b'contentcontent')
295
280
 
296
281
    def test_put_and_get(self):
297
282
        t = memory.MemoryTransport()
298
 
        t.put_file('path', StringIO('content'))
299
 
        self.assertEqual(t.get('path').read(), 'content')
300
 
        t.put_bytes('path', 'content')
301
 
        self.assertEqual(t.get('path').read(), 'content')
 
283
        t.put_file('path', BytesIO(b'content'))
 
284
        self.assertEqual(t.get('path').read(), b'content')
 
285
        t.put_bytes('path', b'content')
 
286
        self.assertEqual(t.get('path').read(), b'content')
302
287
 
303
288
    def test_append_without_dir_fails(self):
304
289
        t = memory.MemoryTransport()
305
290
        self.assertRaises(errors.NoSuchFile,
306
 
                          t.append_bytes, 'dir/path', 'content')
 
291
                          t.append_bytes, 'dir/path', b'content')
307
292
 
308
293
    def test_put_without_dir_fails(self):
309
294
        t = memory.MemoryTransport()
310
295
        self.assertRaises(errors.NoSuchFile,
311
 
                          t.put_file, 'dir/path', StringIO('content'))
 
296
                          t.put_file, 'dir/path', BytesIO(b'content'))
312
297
 
313
298
    def test_get_missing(self):
314
299
        transport = memory.MemoryTransport()
316
301
 
317
302
    def test_has_missing(self):
318
303
        t = memory.MemoryTransport()
319
 
        self.assertEquals(False, t.has('foo'))
 
304
        self.assertEqual(False, t.has('foo'))
320
305
 
321
306
    def test_has_present(self):
322
307
        t = memory.MemoryTransport()
323
 
        t.append_bytes('foo', 'content')
324
 
        self.assertEquals(True, t.has('foo'))
 
308
        t.append_bytes('foo', b'content')
 
309
        self.assertEqual(True, t.has('foo'))
325
310
 
326
311
    def test_list_dir(self):
327
312
        t = memory.MemoryTransport()
328
 
        t.put_bytes('foo', 'content')
 
313
        t.put_bytes('foo', b'content')
329
314
        t.mkdir('dir')
330
 
        t.put_bytes('dir/subfoo', 'content')
331
 
        t.put_bytes('dirlike', 'content')
 
315
        t.put_bytes('dir/subfoo', b'content')
 
316
        t.put_bytes('dirlike', b'content')
332
317
 
333
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
 
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
 
318
        self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
 
319
        self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
335
320
 
336
321
    def test_mkdir(self):
337
322
        t = memory.MemoryTransport()
338
323
        t.mkdir('dir')
339
 
        t.append_bytes('dir/path', 'content')
340
 
        self.assertEqual(t.get('dir/path').read(), 'content')
 
324
        t.append_bytes('dir/path', b'content')
 
325
        with t.get('dir/path') as f:
 
326
            self.assertEqual(f.read(), b'content')
341
327
 
342
328
    def test_mkdir_missing_parent(self):
343
329
        t = memory.MemoryTransport()
356
342
    def test_iter_files_recursive(self):
357
343
        t = memory.MemoryTransport()
358
344
        t.mkdir('dir')
359
 
        t.put_bytes('dir/foo', 'content')
360
 
        t.put_bytes('dir/bar', 'content')
361
 
        t.put_bytes('bar', 'content')
 
345
        t.put_bytes('dir/foo', b'content')
 
346
        t.put_bytes('dir/bar', b'content')
 
347
        t.put_bytes('bar', b'content')
362
348
        paths = set(t.iter_files_recursive())
363
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
349
        self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
364
350
 
365
351
    def test_stat(self):
366
352
        t = memory.MemoryTransport()
367
 
        t.put_bytes('foo', 'content')
368
 
        t.put_bytes('bar', 'phowar')
 
353
        t.put_bytes('foo', b'content')
 
354
        t.put_bytes('bar', b'phowar')
369
355
        self.assertEqual(7, t.stat('foo').st_size)
370
356
        self.assertEqual(6, t.stat('bar').st_size)
371
357
 
376
362
    def test_abspath(self):
377
363
        # The abspath is always relative to the chroot_url.
378
364
        server = chroot.ChrootServer(
379
 
            transport.get_transport('memory:///foo/bar/'))
 
365
            transport.get_transport_from_url('memory:///foo/bar/'))
380
366
        self.start_server(server)
381
 
        t = transport.get_transport(server.get_url())
 
367
        t = transport.get_transport_from_url(server.get_url())
382
368
        self.assertEqual(server.get_url(), t.abspath('/'))
383
369
 
384
370
        subdir_t = t.clone('subdir')
386
372
 
387
373
    def test_clone(self):
388
374
        server = chroot.ChrootServer(
389
 
            transport.get_transport('memory:///foo/bar/'))
 
375
            transport.get_transport_from_url('memory:///foo/bar/'))
390
376
        self.start_server(server)
391
 
        t = transport.get_transport(server.get_url())
 
377
        t = transport.get_transport_from_url(server.get_url())
392
378
        # relpath from root and root path are the same
393
379
        relpath_cloned = t.clone('foo')
394
380
        abspath_cloned = t.clone('/foo')
403
389
        This is so that it is not possible to escape a chroot by doing::
404
390
            url = chroot_transport.base
405
391
            parent_url = urlutils.join(url, '..')
406
 
            new_t = transport.get_transport(parent_url)
 
392
            new_t = transport.get_transport_from_url(parent_url)
407
393
        """
408
394
        server = chroot.ChrootServer(
409
 
            transport.get_transport('memory:///path/subpath'))
 
395
            transport.get_transport_from_url('memory:///path/subpath'))
410
396
        self.start_server(server)
411
 
        t = transport.get_transport(server.get_url())
412
 
        new_t = transport.get_transport(t.base)
 
397
        t = transport.get_transport_from_url(server.get_url())
 
398
        new_t = transport.get_transport_from_url(t.base)
413
399
        self.assertEqual(t.server, new_t.server)
414
400
        self.assertEqual(t.base, new_t.base)
415
401
 
420
406
        This is so that it is not possible to escape a chroot by doing::
421
407
            url = chroot_transport.base
422
408
            parent_url = urlutils.join(url, '..')
423
 
            new_t = transport.get_transport(parent_url)
 
409
            new_t = transport.get_transport_from_url(parent_url)
424
410
        """
425
 
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
 
411
        server = chroot.ChrootServer(
 
412
            transport.get_transport_from_url('memory:///path/'))
426
413
        self.start_server(server)
427
 
        t = transport.get_transport(server.get_url())
 
414
        t = transport.get_transport_from_url(server.get_url())
428
415
        self.assertRaises(
429
 
            errors.InvalidURLJoin, urlutils.join, t.base, '..')
 
416
            urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
430
417
 
431
418
 
432
419
class TestChrootServer(tests.TestCase):
440
427
        backing_transport = memory.MemoryTransport()
441
428
        server = chroot.ChrootServer(backing_transport)
442
429
        server.start_server()
443
 
        try:
444
 
            self.assertTrue(server.scheme
445
 
                            in transport._get_protocol_handlers().keys())
446
 
        finally:
447
 
            server.stop_server()
 
430
        self.addCleanup(server.stop_server)
 
431
        self.assertTrue(server.scheme
 
432
                        in transport._get_protocol_handlers().keys())
448
433
 
449
434
    def test_stop_server(self):
450
435
        backing_transport = memory.MemoryTransport()
458
443
        backing_transport = memory.MemoryTransport()
459
444
        server = chroot.ChrootServer(backing_transport)
460
445
        server.start_server()
461
 
        try:
462
 
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
463
 
        finally:
464
 
            server.stop_server()
 
446
        self.addCleanup(server.stop_server)
 
447
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
 
448
 
 
449
 
 
450
class TestHooks(tests.TestCase):
 
451
    """Basic tests for transport hooks"""
 
452
 
 
453
    def _get_connected_transport(self):
 
454
        return transport.ConnectedTransport("bogus:nowhere")
 
455
 
 
456
    def test_transporthooks_initialisation(self):
 
457
        """Check all expected transport hook points are set up"""
 
458
        hookpoint = transport.TransportHooks()
 
459
        self.assertTrue("post_connect" in hookpoint,
 
460
            "post_connect not in %s" % (hookpoint,))
 
461
 
 
462
    def test_post_connect(self):
 
463
        """Ensure the post_connect hook is called when _set_transport is"""
 
464
        calls = []
 
465
        transport.Transport.hooks.install_named_hook("post_connect",
 
466
            calls.append, None)
 
467
        t = self._get_connected_transport()
 
468
        self.assertLength(0, calls)
 
469
        t._set_connection("connection", "auth")
 
470
        self.assertEqual(calls, [t])
465
471
 
466
472
 
467
473
class PathFilteringDecoratorTransportTest(tests.TestCase):
470
476
    def test_abspath(self):
471
477
        # The abspath is always relative to the base of the backing transport.
472
478
        server = pathfilter.PathFilteringServer(
473
 
            transport.get_transport('memory:///foo/bar/'),
 
479
            transport.get_transport_from_url('memory:///foo/bar/'),
474
480
            lambda x: x)
475
481
        server.start_server()
476
 
        t = transport.get_transport(server.get_url())
 
482
        t = transport.get_transport_from_url(server.get_url())
477
483
        self.assertEqual(server.get_url(), t.abspath('/'))
478
484
 
479
485
        subdir_t = t.clone('subdir')
482
488
 
483
489
    def make_pf_transport(self, filter_func=None):
484
490
        """Make a PathFilteringTransport backed by a MemoryTransport.
485
 
        
 
491
 
486
492
        :param filter_func: by default this will be a no-op function.  Use this
487
493
            parameter to override it."""
488
494
        if filter_func is None:
489
495
            filter_func = lambda x: x
490
496
        server = pathfilter.PathFilteringServer(
491
 
            transport.get_transport('memory:///foo/bar/'), filter_func)
 
497
            transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
492
498
        server.start_server()
493
499
        self.addCleanup(server.stop_server)
494
 
        return transport.get_transport(server.get_url())
 
500
        return transport.get_transport_from_url(server.get_url())
495
501
 
496
502
    def test__filter(self):
497
503
        # _filter (with an identity func as filter_func) always returns
510
516
 
511
517
    def test_filter_invocation(self):
512
518
        filter_log = []
 
519
 
513
520
        def filter(path):
514
521
            filter_log.append(path)
515
522
            return path
540
547
        otherwise) the filtering by doing::
541
548
            url = filtered_transport.base
542
549
            parent_url = urlutils.join(url, '..')
543
 
            new_t = transport.get_transport(parent_url)
 
550
            new_t = transport.get_transport_from_url(parent_url)
544
551
        """
545
552
        t = self.make_pf_transport()
546
 
        new_t = transport.get_transport(t.base)
 
553
        new_t = transport.get_transport_from_url(t.base)
547
554
        self.assertEqual(t.server, new_t.server)
548
555
        self.assertEqual(t.base, new_t.base)
549
556
 
558
565
        self.assertEqual(True, t.is_readonly())
559
566
 
560
567
    def test_http_parameters(self):
561
 
        from bzrlib.tests.http_server import HttpServer
 
568
        from breezy.tests.http_server import HttpServer
562
569
        # connect to '.' via http which is not listable
563
570
        server = HttpServer()
564
571
        self.start_server(server)
565
 
        t = transport.get_transport('readonly+' + server.get_url())
566
 
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
 
572
        t = transport.get_transport_from_url('readonly+' + server.get_url())
 
573
        self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
567
574
        self.assertEqual(False, t.listable())
568
575
        self.assertEqual(True, t.is_readonly())
569
576
 
585
592
    def test_http_parameters(self):
586
593
        # the listable and is_readonly parameters
587
594
        # are not changed by the fakenfs decorator
588
 
        from bzrlib.tests.http_server import HttpServer
 
595
        from breezy.tests.http_server import HttpServer
589
596
        # connect to '.' via http which is not listable
590
597
        server = HttpServer()
591
598
        self.start_server(server)
601
608
        # the url should be decorated appropriately
602
609
        self.assertStartsWith(server.get_url(), 'fakenfs+')
603
610
        # and we should be able to get a transport for it
604
 
        t = transport.get_transport(server.get_url())
 
611
        t = transport.get_transport_from_url(server.get_url())
605
612
        # which must be a FakeNFSTransportDecorator instance.
606
613
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
607
614
 
619
626
 
620
627
    def get_vfat_transport(self, url):
621
628
        """Return vfat-backed transport for test directory"""
622
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
629
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
623
630
        return FakeVFATTransportDecorator('vfat+' + url)
624
631
 
625
632
    def test_transport_creation(self):
626
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
633
        from breezy.transport.fakevfat import FakeVFATTransportDecorator
627
634
        t = self.get_vfat_transport('.')
628
635
        self.assertIsInstance(t, FakeVFATTransportDecorator)
629
636
 
654
661
 
655
662
    To verify a transport we need a server factory, which is a callable
656
663
    that accepts no parameters and returns an implementation of
657
 
    bzrlib.transport.Server.
 
664
    breezy.transport.Server.
658
665
 
659
666
    That Server is then used to construct transport instances and test
660
667
    the transport via loopback activity.
684
691
        base_url = self._server.get_url()
685
692
        url = self._adjust_url(base_url, relpath)
686
693
        # try getting the transport via the regular interface:
687
 
        t = transport.get_transport(url)
 
694
        t = transport.get_transport_from_url(url)
688
695
        # vila--20070607 if the following are commented out the test suite
689
696
        # still pass. Is this really still needed or was it a forgotten
690
697
        # temporary fix ?
695
702
        return t
696
703
 
697
704
 
 
705
class TestTransportFromPath(tests.TestCaseInTempDir):
 
706
 
 
707
    def test_with_path(self):
 
708
        t = transport.get_transport_from_path(self.test_dir)
 
709
        self.assertIsInstance(t, local.LocalTransport)
 
710
        self.assertEqual(t.base.rstrip("/"),
 
711
            urlutils.local_path_to_url(self.test_dir))
 
712
 
 
713
    def test_with_url(self):
 
714
        t = transport.get_transport_from_path("file:")
 
715
        self.assertIsInstance(t, local.LocalTransport)
 
716
        self.assertEqual(t.base.rstrip("/"),
 
717
            urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
 
718
 
 
719
 
 
720
class TestTransportFromUrl(tests.TestCaseInTempDir):
 
721
 
 
722
    def test_with_path(self):
 
723
        self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
 
724
            self.test_dir)
 
725
 
 
726
    def test_with_url(self):
 
727
        url = urlutils.local_path_to_url(self.test_dir)
 
728
        t = transport.get_transport_from_url(url)
 
729
        self.assertIsInstance(t, local.LocalTransport)
 
730
        self.assertEqual(t.base.rstrip("/"), url)
 
731
 
 
732
    def test_with_url_and_segment_parameters(self):
 
733
        url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
 
734
        t = transport.get_transport_from_url(url)
 
735
        self.assertIsInstance(t, local.LocalTransport)
 
736
        self.assertEqual(t.base.rstrip("/"), url)
 
737
        with open(os.path.join(self.test_dir, "afile"), 'w') as f:
 
738
            f.write("data")
 
739
        self.assertTrue(t.has("afile"))
 
740
 
 
741
 
698
742
class TestLocalTransports(tests.TestCase):
699
743
 
700
744
    def test_get_transport_from_abspath(self):
701
745
        here = osutils.abspath('.')
702
746
        t = transport.get_transport(here)
703
747
        self.assertIsInstance(t, local.LocalTransport)
704
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
 
748
        self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
705
749
 
706
750
    def test_get_transport_from_relpath(self):
707
751
        here = osutils.abspath('.')
708
752
        t = transport.get_transport('.')
709
753
        self.assertIsInstance(t, local.LocalTransport)
710
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
 
754
        self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
711
755
 
712
756
    def test_get_transport_from_local_url(self):
713
757
        here = osutils.abspath('.')
714
758
        here_url = urlutils.local_path_to_url(here) + '/'
715
759
        t = transport.get_transport(here_url)
716
760
        self.assertIsInstance(t, local.LocalTransport)
717
 
        self.assertEquals(t.base, here_url)
 
761
        self.assertEqual(t.base, here_url)
718
762
 
719
763
    def test_local_abspath(self):
720
764
        here = osutils.abspath('.')
721
765
        t = transport.get_transport(here)
722
 
        self.assertEquals(t.local_abspath(''), here)
 
766
        self.assertEqual(t.local_abspath(''), here)
 
767
 
 
768
 
 
769
class TestLocalTransportMutation(tests.TestCaseInTempDir):
 
770
 
 
771
    def test_local_transport_mkdir(self):
 
772
        here = osutils.abspath('.')
 
773
        t = transport.get_transport(here)
 
774
        t.mkdir('test')
 
775
        self.assertTrue(os.path.exists('test'))
 
776
 
 
777
    def test_local_transport_mkdir_permission_denied(self):
 
778
        # See https://bugs.launchpad.net/bzr/+bug/606537
 
779
        here = osutils.abspath('.')
 
780
        t = transport.get_transport(here)
 
781
        def fake_chmod(path, mode):
 
782
            e = OSError('permission denied')
 
783
            e.errno = errno.EPERM
 
784
            raise e
 
785
        self.overrideAttr(os, 'chmod', fake_chmod)
 
786
        t.mkdir('test')
 
787
        t.mkdir('test2', mode=0o707)
 
788
        self.assertTrue(os.path.exists('test'))
 
789
        self.assertTrue(os.path.exists('test2'))
 
790
 
 
791
 
 
792
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
 
793
 
 
794
    def test_local_fdatasync_calls_fdatasync(self):
 
795
        """Check fdatasync on a stream tries to flush the data to the OS.
 
796
        
 
797
        We can't easily observe the external effect but we can at least see
 
798
        it's called.
 
799
        """
 
800
        sentinel = object()
 
801
        fdatasync = getattr(os, 'fdatasync', sentinel)
 
802
        if fdatasync is sentinel:
 
803
            raise tests.TestNotApplicable('fdatasync not supported')
 
804
        t = self.get_transport('.')
 
805
        calls = self.recordCalls(os, 'fdatasync')
 
806
        w = t.open_write_stream('out')
 
807
        w.write(b'foo')
 
808
        w.fdatasync()
 
809
        with open('out', 'rb') as f:
 
810
            # Should have been flushed.
 
811
            self.assertEqual(f.read(), b'foo')
 
812
        self.assertEqual(len(calls), 1, calls)
 
813
 
 
814
    def test_missing_directory(self):
 
815
        t = self.get_transport('.')
 
816
        self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
723
817
 
724
818
 
725
819
class TestWin32LocalTransport(tests.TestCase):
726
820
 
727
821
    def test_unc_clone_to_root(self):
 
822
        self.requireFeature(features.win32_feature)
728
823
        # Win32 UNC path like \\HOST\path
729
824
        # clone to root should stop at least at \\HOST part
730
825
        # not on \\
731
826
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
732
 
        for i in xrange(4):
 
827
        for i in range(4):
733
828
            t = t.clone('..')
734
 
        self.assertEquals(t.base, 'file://HOST/')
 
829
        self.assertEqual(t.base, 'file://HOST/')
735
830
        # make sure we reach the root
736
831
        t = t.clone('..')
737
 
        self.assertEquals(t.base, 'file://HOST/')
 
832
        self.assertEqual(t.base, 'file://HOST/')
738
833
 
739
834
 
740
835
class TestConnectedTransport(tests.TestCase):
743
838
    def test_parse_url(self):
744
839
        t = transport.ConnectedTransport(
745
840
            'http://simple.example.com/home/source')
746
 
        self.assertEquals(t._host, 'simple.example.com')
747
 
        self.assertEquals(t._port, None)
748
 
        self.assertEquals(t._path, '/home/source/')
749
 
        self.failUnless(t._user is None)
750
 
        self.failUnless(t._password is None)
 
841
        self.assertEqual(t._parsed_url.host, 'simple.example.com')
 
842
        self.assertEqual(t._parsed_url.port, None)
 
843
        self.assertEqual(t._parsed_url.path, '/home/source/')
 
844
        self.assertTrue(t._parsed_url.user is None)
 
845
        self.assertTrue(t._parsed_url.password is None)
751
846
 
752
 
        self.assertEquals(t.base, 'http://simple.example.com/home/source/')
 
847
        self.assertEqual(t.base, 'http://simple.example.com/home/source/')
753
848
 
754
849
    def test_parse_url_with_at_in_user(self):
755
850
        # Bug 228058
756
851
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
757
 
        self.assertEquals(t._user, 'user@host.com')
 
852
        self.assertEqual(t._parsed_url.user, 'user@host.com')
758
853
 
759
854
    def test_parse_quoted_url(self):
760
855
        t = transport.ConnectedTransport(
761
856
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
762
 
        self.assertEquals(t._host, 'exAmple.com')
763
 
        self.assertEquals(t._port, 2222)
764
 
        self.assertEquals(t._user, 'robey')
765
 
        self.assertEquals(t._password, 'h@t')
766
 
        self.assertEquals(t._path, '/path/')
 
857
        self.assertEqual(t._parsed_url.host, 'exAmple.com')
 
858
        self.assertEqual(t._parsed_url.port, 2222)
 
859
        self.assertEqual(t._parsed_url.user, 'robey')
 
860
        self.assertEqual(t._parsed_url.password, 'h@t')
 
861
        self.assertEqual(t._parsed_url.path, '/path/')
767
862
 
768
863
        # Base should not keep track of the password
769
 
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')
 
864
        self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
770
865
 
771
866
    def test_parse_invalid_url(self):
772
 
        self.assertRaises(errors.InvalidURL,
 
867
        self.assertRaises(urlutils.InvalidURL,
773
868
                          transport.ConnectedTransport,
774
869
                          'sftp://lily.org:~janneke/public/bzr/gub')
775
870
 
776
871
    def test_relpath(self):
777
872
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
778
873
 
779
 
        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
 
874
        self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
 
875
            'sub')
780
876
        self.assertRaises(errors.PathNotChild, t.relpath,
781
877
                          'http://user@host.com/abs/path/sub')
782
878
        self.assertRaises(errors.PathNotChild, t.relpath,
787
883
                          'sftp://user@host.com:33/abs/path/sub')
788
884
        # Make sure it works when we don't supply a username
789
885
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
790
 
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
 
886
        self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
791
887
 
792
888
        # Make sure it works when parts of the path will be url encoded
793
889
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
794
 
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
 
890
        self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
795
891
 
796
892
    def test_connection_sharing_propagate_credentials(self):
797
893
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
798
 
        self.assertEquals('user', t._user)
799
 
        self.assertEquals('host.com', t._host)
 
894
        self.assertEqual('user', t._parsed_url.user)
 
895
        self.assertEqual('host.com', t._parsed_url.host)
800
896
        self.assertIs(None, t._get_connection())
801
 
        self.assertIs(None, t._password)
 
897
        self.assertIs(None, t._parsed_url.password)
802
898
        c = t.clone('subdir')
803
899
        self.assertIs(None, c._get_connection())
804
 
        self.assertIs(None, t._password)
 
900
        self.assertIs(None, t._parsed_url.password)
805
901
 
806
902
        # Simulate the user entering a password
807
903
        password = 'secret'
826
922
 
827
923
    def test_reuse_same_transport(self):
828
924
        possible_transports = []
829
 
        t1 = transport.get_transport('http://foo/',
 
925
        t1 = transport.get_transport_from_url('http://foo/',
830
926
                                     possible_transports=possible_transports)
831
927
        self.assertEqual([t1], possible_transports)
832
 
        t2 = transport.get_transport('http://foo/',
 
928
        t2 = transport.get_transport_from_url('http://foo/',
833
929
                                     possible_transports=[t1])
834
930
        self.assertIs(t1, t2)
835
931
 
836
932
        # Also check that final '/' are handled correctly
837
 
        t3 = transport.get_transport('http://foo/path/')
838
 
        t4 = transport.get_transport('http://foo/path',
 
933
        t3 = transport.get_transport_from_url('http://foo/path/')
 
934
        t4 = transport.get_transport_from_url('http://foo/path',
839
935
                                     possible_transports=[t3])
840
936
        self.assertIs(t3, t4)
841
937
 
842
 
        t5 = transport.get_transport('http://foo/path')
843
 
        t6 = transport.get_transport('http://foo/path/',
 
938
        t5 = transport.get_transport_from_url('http://foo/path')
 
939
        t6 = transport.get_transport_from_url('http://foo/path/',
844
940
                                     possible_transports=[t5])
845
941
        self.assertIs(t5, t6)
846
942
 
847
943
    def test_don_t_reuse_different_transport(self):
848
 
        t1 = transport.get_transport('http://foo/path')
849
 
        t2 = transport.get_transport('http://bar/path',
 
944
        t1 = transport.get_transport_from_url('http://foo/path')
 
945
        t2 = transport.get_transport_from_url('http://bar/path',
850
946
                                     possible_transports=[t1])
851
947
        self.assertIsNot(t1, t2)
852
948
 
853
949
 
854
950
class TestTransportTrace(tests.TestCase):
855
951
 
856
 
    def test_get(self):
857
 
        t = transport.get_transport('trace+memory://')
858
 
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
 
952
    def test_decorator(self):
 
953
        t = transport.get_transport_from_url('trace+memory://')
 
954
        self.assertIsInstance(
 
955
            t, breezy.transport.trace.TransportTraceDecorator)
859
956
 
860
957
    def test_clone_preserves_activity(self):
861
 
        t = transport.get_transport('trace+memory://')
 
958
        t = transport.get_transport_from_url('trace+memory://')
862
959
        t2 = t.clone('.')
863
960
        self.assertTrue(t is not t2)
864
961
        self.assertTrue(t._activity is t2._activity)
868
965
    # still won't cause a test failure when the top level Transport API
869
966
    # changes; so there is little return doing that.
870
967
    def test_get(self):
871
 
        t = transport.get_transport('trace+memory:///')
872
 
        t.put_bytes('foo', 'barish')
 
968
        t = transport.get_transport_from_url('trace+memory:///')
 
969
        t.put_bytes('foo', b'barish')
873
970
        t.get('foo')
874
971
        expected_result = []
875
972
        # put_bytes records the bytes, not the content to avoid memory
880
977
        self.assertEqual(expected_result, t._activity)
881
978
 
882
979
    def test_readv(self):
883
 
        t = transport.get_transport('trace+memory:///')
884
 
        t.put_bytes('foo', 'barish')
 
980
        t = transport.get_transport_from_url('trace+memory:///')
 
981
        t.put_bytes('foo', b'barish')
885
982
        list(t.readv('foo', [(0, 1), (3, 2)],
886
983
                     adjust_for_latency=True, upper_limit=6))
887
984
        expected_result = []
896
993
class TestSSHConnections(tests.TestCaseWithTransport):
897
994
 
898
995
    def test_bzr_connect_to_bzr_ssh(self):
899
 
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.
 
996
        """get_transport of a bzr+ssh:// behaves correctly.
900
997
 
901
998
        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
902
999
        """
909
1006
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
910
1007
        # override the interface (doesn't change self._vendor).
911
1008
        # Note that this does encryption, so can be slow.
912
 
        from bzrlib.tests import stub_sftp
 
1009
        from breezy.tests import stub_sftp
913
1010
 
914
1011
        # Start an SSH server
915
1012
        self.command_executed = []
918
1015
        # SSH channel ourselves.  Surely this has already been implemented
919
1016
        # elsewhere?
920
1017
        started = []
 
1018
 
921
1019
        class StubSSHServer(stub_sftp.StubServer):
922
1020
 
923
1021
            test = self
926
1024
                self.test.command_executed.append(command)
927
1025
                proc = subprocess.Popen(
928
1026
                    command, shell=True, stdin=subprocess.PIPE,
929
 
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
1027
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 
1028
                    bufsize=0)
930
1029
 
931
1030
                # XXX: horribly inefficient, not to mention ugly.
932
 
                # Start a thread for each of stdin/out/err, and relay bytes from
933
 
                # the subprocess to channel and vice versa.
 
1031
                # Start a thread for each of stdin/out/err, and relay bytes
 
1032
                # from the subprocess to channel and vice versa.
934
1033
                def ferry_bytes(read, write, close):
935
1034
                    while True:
936
1035
                        bytes = read(1)
937
 
                        if bytes == '':
 
1036
                        if bytes == b'':
938
1037
                            close()
939
1038
                            break
940
1039
                        write(bytes)
955
1054
        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
1055
        # We *don't* want to override the default SSH vendor: the detected one
957
1056
        # is the one to use.
 
1057
 
 
1058
        # FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
 
1059
        # inherits from SFTPServer which forces the SSH vendor to
 
1060
        # ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
958
1061
        self.start_server(ssh_server)
959
 
        port = ssh_server._listener.port
 
1062
        port = ssh_server.port
960
1063
 
961
1064
        if sys.platform == 'win32':
962
 
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
 
1065
            bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
963
1066
        else:
964
 
            bzr_remote_path = self.get_bzr_path()
965
 
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
 
1067
            bzr_remote_path = self.get_brz_path()
 
1068
        self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
966
1069
 
967
1070
        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
968
1071
        # variable is used to tell bzr what command to run on the remote end.
977
1080
        t.mkdir('foo')
978
1081
 
979
1082
        self.assertEqual(
980
 
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
 
1083
            [b'%s serve --inet --directory=/ --allow-writes' % bzr_remote_path.encode()],
981
1084
            self.command_executed)
982
1085
        # Make sure to disconnect, so that the remote process can stop, and we
983
1086
        # can cleanup. Then pause the test until everything is shutdown
989
1092
        # And the rest are threads
990
1093
        for t in started[1:]:
991
1094
            t.join()
 
1095
 
 
1096
 
 
1097
class TestUnhtml(tests.TestCase):
 
1098
 
 
1099
    """Tests for unhtml_roughly"""
 
1100
 
 
1101
    def test_truncation(self):
 
1102
        fake_html = "<p>something!\n" * 1000
 
1103
        result = http.unhtml_roughly(fake_html)
 
1104
        self.assertEqual(len(result), 1000)
 
1105
        self.assertStartsWith(result, " something!")
 
1106
 
 
1107
 
 
1108
class SomeDirectory(object):
 
1109
 
 
1110
    def look_up(self, name, url):
 
1111
        return "http://bar"
 
1112
 
 
1113
 
 
1114
class TestLocationToUrl(tests.TestCase):
 
1115
 
 
1116
    def get_base_location(self):
 
1117
        path = osutils.abspath('/foo/bar')
 
1118
        if path.startswith('/'):
 
1119
            url = 'file://%s' % (path,)
 
1120
        else:
 
1121
            # On Windows, abspaths start with the drive letter, so we have to
 
1122
            # add in the extra '/'
 
1123
            url = 'file:///%s' % (path,)
 
1124
        return path, url
 
1125
 
 
1126
    def test_regular_url(self):
 
1127
        self.assertEqual("file://foo", location_to_url("file://foo"))
 
1128
 
 
1129
    def test_directory(self):
 
1130
        directories.register("bar:", SomeDirectory, "Dummy directory")
 
1131
        self.addCleanup(directories.remove, "bar:")
 
1132
        self.assertEqual("http://bar", location_to_url("bar:"))
 
1133
 
 
1134
    def test_unicode_url(self):
 
1135
        self.assertRaises(urlutils.InvalidURL, location_to_url,
 
1136
            b"http://fo/\xc3\xaf".decode("utf-8"))
 
1137
 
 
1138
    def test_unicode_path(self):
 
1139
        path, url = self.get_base_location()
 
1140
        location = path + b"\xc3\xaf".decode("utf-8")
 
1141
        url += '%C3%AF'
 
1142
        self.assertEqual(url, location_to_url(location))
 
1143
 
 
1144
    def test_path(self):
 
1145
        path, url = self.get_base_location()
 
1146
        self.assertEqual(url, location_to_url(path))
 
1147
 
 
1148
    def test_relative_file_url(self):
 
1149
        self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
 
1150
            location_to_url("file:bar"))
 
1151
 
 
1152
    def test_absolute_file_url(self):
 
1153
        self.assertEqual("file:///bar", location_to_url("file:/bar"))