1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
31
from ..directory_service import directories
32
from ..sixish import (
35
from ..transport import (
45
import breezy.transport.trace
52
# TODO: Should possibly split transport-specific tests into their own files.
55
class TestTransport(tests.TestCase):
56
"""Test the non transport-concrete class functionality."""
58
def test__get_set_protocol_handlers(self):
59
handlers = transport._get_protocol_handlers()
60
self.assertNotEqual([], handlers.keys())
61
transport._clear_protocol_handlers()
62
self.addCleanup(transport._set_protocol_handlers, handlers)
63
self.assertEqual([], transport._get_protocol_handlers().keys())
65
def test_get_transport_modules(self):
66
handlers = transport._get_protocol_handlers()
67
self.addCleanup(transport._set_protocol_handlers, handlers)
68
# don't pollute the current handlers
69
transport._clear_protocol_handlers()
71
class SampleHandler(object):
72
"""I exist, isnt that enough?"""
73
transport._clear_protocol_handlers()
74
transport.register_transport_proto('foo')
75
transport.register_lazy_transport('foo',
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
78
transport.register_transport_proto('bar')
79
transport.register_lazy_transport('bar',
80
'breezy.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'breezy.transport.chroot',
84
'breezy.transport.pathfilter'],
85
transport._get_transport_modules())
87
def test_transport_dependency(self):
88
"""Transport with missing dependency causes no error"""
89
saved_handlers = transport._get_protocol_handlers()
90
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
# don't pollute the current handlers
92
transport._clear_protocol_handlers()
93
transport.register_transport_proto('foo')
94
transport.register_lazy_transport(
95
'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
97
transport.get_transport_from_url('foo://fooserver/foo')
98
except errors.UnsupportedProtocol as e:
100
self.assertEqual('Unsupported protocol'
101
' for url "foo://fooserver/foo":'
102
' Unable to import library "some_lib":'
103
' testing missing dependency', str(e))
105
self.fail('Did not raise UnsupportedProtocol')
107
def test_transport_fallback(self):
108
"""Transport with missing dependency causes no error"""
109
saved_handlers = transport._get_protocol_handlers()
110
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
111
transport._clear_protocol_handlers()
112
transport.register_transport_proto('foo')
113
transport.register_lazy_transport(
114
'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
115
transport.register_lazy_transport(
116
'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
117
t = transport.get_transport_from_url('foo://fooserver/foo')
118
self.assertTrue(isinstance(t, BackupTransportHandler))
120
def test_ssh_hints(self):
121
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
123
transport.get_transport_from_url('ssh://fooserver/foo')
124
except errors.UnsupportedProtocol as e:
126
self.assertEqual('Unsupported protocol'
127
' for url "ssh://fooserver/foo":'
128
' bzr supports bzr+ssh to operate over ssh,'
129
' use "bzr+ssh://fooserver/foo".',
132
self.fail('Did not raise UnsupportedProtocol')
134
def test_LateReadError(self):
135
"""The LateReadError helper should raise on read()."""
136
a_file = transport.LateReadError('a path')
139
except errors.ReadError as error:
140
self.assertEqual('a path', error.path)
141
self.assertRaises(errors.ReadError, a_file.read, 40)
144
def test_local_abspath_non_local_transport(self):
145
# the base implementation should throw
146
t = memory.MemoryTransport()
147
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
148
self.assertEqual('memory:///t is not a local path.', str(e))
151
class TestCoalesceOffsets(tests.TestCase):
153
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
154
coalesce = transport.Transport._coalesce_offsets
155
exp = [transport._CoalescedOffset(*x) for x in expected]
156
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
158
self.assertEqual(exp, out)
160
def test_coalesce_empty(self):
163
def test_coalesce_simple(self):
164
self.check([(0, 10, [(0, 10)])], [(0, 10)])
166
def test_coalesce_unrelated(self):
167
self.check([(0, 10, [(0, 10)]),
169
], [(0, 10), (20, 10)])
171
def test_coalesce_unsorted(self):
172
self.check([(20, 10, [(0, 10)]),
174
], [(20, 10), (0, 10)])
176
def test_coalesce_nearby(self):
177
self.check([(0, 20, [(0, 10), (10, 10)])],
180
def test_coalesce_overlapped(self):
181
self.assertRaises(ValueError,
182
self.check, [(0, 15, [(0, 10), (5, 10)])],
185
def test_coalesce_limit(self):
186
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
(30, 10), (40, 10)]),
188
(60, 50, [(0, 10), (10, 10), (20, 10),
189
(30, 10), (40, 10)]),
190
], [(10, 10), (20, 10), (30, 10), (40, 10),
191
(50, 10), (60, 10), (70, 10), (80, 10),
192
(90, 10), (100, 10)],
195
def test_coalesce_no_limit(self):
196
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
(30, 10), (40, 10), (50, 10),
198
(60, 10), (70, 10), (80, 10),
200
], [(10, 10), (20, 10), (30, 10), (40, 10),
201
(50, 10), (60, 10), (70, 10), (80, 10),
202
(90, 10), (100, 10)])
204
def test_coalesce_fudge(self):
205
self.check([(10, 30, [(0, 10), (20, 10)]),
206
(100, 10, [(0, 10)]),
207
], [(10, 10), (30, 10), (100, 10)],
210
def test_coalesce_max_size(self):
211
self.check([(10, 20, [(0, 10), (10, 10)]),
213
# If one range is above max_size, it gets its own coalesced
215
(100, 80, [(0, 80)]),],
216
[(10, 10), (20, 10), (30, 50), (100, 80)],
219
def test_coalesce_no_max_size(self):
220
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
[(10, 10), (20, 10), (30, 50), (80, 100)],
224
def test_coalesce_default_limit(self):
225
# By default we use a 100MB max size.
226
ten_mb = 10 * 1024 * 1024
227
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
(10*ten_mb, ten_mb, [(0, ten_mb)])],
229
[(i*ten_mb, ten_mb) for i in range(11)])
230
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
[(i * ten_mb, ten_mb) for i in range(11)],
232
max_size=1*1024*1024*1024)
235
class TestMemoryServer(tests.TestCase):
237
def test_create_server(self):
238
server = memory.MemoryServer()
239
server.start_server()
240
url = server.get_url()
241
self.assertTrue(url in transport.transport_list_registry)
242
t = transport.get_transport_from_url(url)
245
self.assertFalse(url in transport.transport_list_registry)
246
self.assertRaises(errors.UnsupportedProtocol,
247
transport.get_transport, url)
250
class TestMemoryTransport(tests.TestCase):
252
def test_get_transport(self):
253
memory.MemoryTransport()
255
def test_clone(self):
256
t = memory.MemoryTransport()
257
self.assertTrue(isinstance(t, memory.MemoryTransport))
258
self.assertEqual("memory:///", t.clone("/").base)
260
def test_abspath(self):
261
t = memory.MemoryTransport()
262
self.assertEqual("memory:///relpath", t.abspath('relpath'))
264
def test_abspath_of_root(self):
265
t = memory.MemoryTransport()
266
self.assertEqual("memory:///", t.base)
267
self.assertEqual("memory:///", t.abspath('/'))
269
def test_abspath_of_relpath_starting_at_root(self):
270
t = memory.MemoryTransport()
271
self.assertEqual("memory:///foo", t.abspath('/foo'))
273
def test_append_and_get(self):
274
t = memory.MemoryTransport()
275
t.append_bytes('path', b'content')
276
self.assertEqual(t.get('path').read(), b'content')
277
t.append_file('path', BytesIO(b'content'))
278
with t.get('path') as f:
279
self.assertEqual(f.read(), b'contentcontent')
281
def test_put_and_get(self):
282
t = memory.MemoryTransport()
283
t.put_file('path', BytesIO(b'content'))
284
self.assertEqual(t.get('path').read(), b'content')
285
t.put_bytes('path', b'content')
286
self.assertEqual(t.get('path').read(), b'content')
288
def test_append_without_dir_fails(self):
289
t = memory.MemoryTransport()
290
self.assertRaises(errors.NoSuchFile,
291
t.append_bytes, 'dir/path', b'content')
293
def test_put_without_dir_fails(self):
294
t = memory.MemoryTransport()
295
self.assertRaises(errors.NoSuchFile,
296
t.put_file, 'dir/path', BytesIO(b'content'))
298
def test_get_missing(self):
299
transport = memory.MemoryTransport()
300
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
302
def test_has_missing(self):
303
t = memory.MemoryTransport()
304
self.assertEqual(False, t.has('foo'))
306
def test_has_present(self):
307
t = memory.MemoryTransport()
308
t.append_bytes('foo', b'content')
309
self.assertEqual(True, t.has('foo'))
311
def test_list_dir(self):
312
t = memory.MemoryTransport()
313
t.put_bytes('foo', b'content')
315
t.put_bytes('dir/subfoo', b'content')
316
t.put_bytes('dirlike', b'content')
318
self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
319
self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
321
def test_mkdir(self):
322
t = memory.MemoryTransport()
324
t.append_bytes('dir/path', b'content')
325
with t.get('dir/path') as f:
326
self.assertEqual(f.read(), b'content')
328
def test_mkdir_missing_parent(self):
329
t = memory.MemoryTransport()
330
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
332
def test_mkdir_twice(self):
333
t = memory.MemoryTransport()
335
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
337
def test_parameters(self):
338
t = memory.MemoryTransport()
339
self.assertEqual(True, t.listable())
340
self.assertEqual(False, t.is_readonly())
342
def test_iter_files_recursive(self):
343
t = memory.MemoryTransport()
345
t.put_bytes('dir/foo', b'content')
346
t.put_bytes('dir/bar', b'content')
347
t.put_bytes('bar', b'content')
348
paths = set(t.iter_files_recursive())
349
self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
352
t = memory.MemoryTransport()
353
t.put_bytes('foo', b'content')
354
t.put_bytes('bar', b'phowar')
355
self.assertEqual(7, t.stat('foo').st_size)
356
self.assertEqual(6, t.stat('bar').st_size)
359
class ChrootDecoratorTransportTest(tests.TestCase):
360
"""Chroot decoration specific tests."""
362
def test_abspath(self):
363
# The abspath is always relative to the chroot_url.
364
server = chroot.ChrootServer(
365
transport.get_transport_from_url('memory:///foo/bar/'))
366
self.start_server(server)
367
t = transport.get_transport_from_url(server.get_url())
368
self.assertEqual(server.get_url(), t.abspath('/'))
370
subdir_t = t.clone('subdir')
371
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
373
def test_clone(self):
374
server = chroot.ChrootServer(
375
transport.get_transport_from_url('memory:///foo/bar/'))
376
self.start_server(server)
377
t = transport.get_transport_from_url(server.get_url())
378
# relpath from root and root path are the same
379
relpath_cloned = t.clone('foo')
380
abspath_cloned = t.clone('/foo')
381
self.assertEqual(server, relpath_cloned.server)
382
self.assertEqual(server, abspath_cloned.server)
384
def test_chroot_url_preserves_chroot(self):
385
"""Calling get_transport on a chroot transport's base should produce a
386
transport with exactly the same behaviour as the original chroot
389
This is so that it is not possible to escape a chroot by doing::
390
url = chroot_transport.base
391
parent_url = urlutils.join(url, '..')
392
new_t = transport.get_transport_from_url(parent_url)
394
server = chroot.ChrootServer(
395
transport.get_transport_from_url('memory:///path/subpath'))
396
self.start_server(server)
397
t = transport.get_transport_from_url(server.get_url())
398
new_t = transport.get_transport_from_url(t.base)
399
self.assertEqual(t.server, new_t.server)
400
self.assertEqual(t.base, new_t.base)
402
def test_urljoin_preserves_chroot(self):
403
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
404
URL that escapes the intended chroot.
406
This is so that it is not possible to escape a chroot by doing::
407
url = chroot_transport.base
408
parent_url = urlutils.join(url, '..')
409
new_t = transport.get_transport_from_url(parent_url)
411
server = chroot.ChrootServer(
412
transport.get_transport_from_url('memory:///path/'))
413
self.start_server(server)
414
t = transport.get_transport_from_url(server.get_url())
416
urlutils.InvalidURLJoin, urlutils.join, t.base, '..')
419
class TestChrootServer(tests.TestCase):
421
def test_construct(self):
422
backing_transport = memory.MemoryTransport()
423
server = chroot.ChrootServer(backing_transport)
424
self.assertEqual(backing_transport, server.backing_transport)
426
def test_setUp(self):
427
backing_transport = memory.MemoryTransport()
428
server = chroot.ChrootServer(backing_transport)
429
server.start_server()
430
self.addCleanup(server.stop_server)
431
self.assertTrue(server.scheme
432
in transport._get_protocol_handlers().keys())
434
def test_stop_server(self):
435
backing_transport = memory.MemoryTransport()
436
server = chroot.ChrootServer(backing_transport)
437
server.start_server()
439
self.assertFalse(server.scheme
440
in transport._get_protocol_handlers().keys())
442
def test_get_url(self):
443
backing_transport = memory.MemoryTransport()
444
server = chroot.ChrootServer(backing_transport)
445
server.start_server()
446
self.addCleanup(server.stop_server)
447
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
450
class TestHooks(tests.TestCase):
451
"""Basic tests for transport hooks"""
453
def _get_connected_transport(self):
454
return transport.ConnectedTransport("bogus:nowhere")
456
def test_transporthooks_initialisation(self):
457
"""Check all expected transport hook points are set up"""
458
hookpoint = transport.TransportHooks()
459
self.assertTrue("post_connect" in hookpoint,
460
"post_connect not in %s" % (hookpoint,))
462
def test_post_connect(self):
463
"""Ensure the post_connect hook is called when _set_transport is"""
465
transport.Transport.hooks.install_named_hook("post_connect",
467
t = self._get_connected_transport()
468
self.assertLength(0, calls)
469
t._set_connection("connection", "auth")
470
self.assertEqual(calls, [t])
473
class PathFilteringDecoratorTransportTest(tests.TestCase):
474
"""Pathfilter decoration specific tests."""
476
def test_abspath(self):
477
# The abspath is always relative to the base of the backing transport.
478
server = pathfilter.PathFilteringServer(
479
transport.get_transport_from_url('memory:///foo/bar/'),
481
server.start_server()
482
t = transport.get_transport_from_url(server.get_url())
483
self.assertEqual(server.get_url(), t.abspath('/'))
485
subdir_t = t.clone('subdir')
486
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
489
def make_pf_transport(self, filter_func=None):
490
"""Make a PathFilteringTransport backed by a MemoryTransport.
492
:param filter_func: by default this will be a no-op function. Use this
493
parameter to override it."""
494
if filter_func is None:
495
filter_func = lambda x: x
496
server = pathfilter.PathFilteringServer(
497
transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
498
server.start_server()
499
self.addCleanup(server.stop_server)
500
return transport.get_transport_from_url(server.get_url())
502
def test__filter(self):
503
# _filter (with an identity func as filter_func) always returns
504
# paths relative to the base of the backing transport.
505
t = self.make_pf_transport()
506
self.assertEqual('foo', t._filter('foo'))
507
self.assertEqual('foo/bar', t._filter('foo/bar'))
508
self.assertEqual('', t._filter('..'))
509
self.assertEqual('', t._filter('/'))
510
# The base of the pathfiltering transport is taken into account too.
511
t = t.clone('subdir1/subdir2')
512
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
513
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
514
self.assertEqual('subdir1', t._filter('..'))
515
self.assertEqual('', t._filter('/'))
517
def test_filter_invocation(self):
521
filter_log.append(path)
523
t = self.make_pf_transport(filter)
525
self.assertEqual(['abc'], filter_log)
527
t.clone('abc').has('xyz')
528
self.assertEqual(['abc/xyz'], filter_log)
531
self.assertEqual(['abc'], filter_log)
533
def test_clone(self):
534
t = self.make_pf_transport()
535
# relpath from root and root path are the same
536
relpath_cloned = t.clone('foo')
537
abspath_cloned = t.clone('/foo')
538
self.assertEqual(t.server, relpath_cloned.server)
539
self.assertEqual(t.server, abspath_cloned.server)
541
def test_url_preserves_pathfiltering(self):
542
"""Calling get_transport on a pathfiltered transport's base should
543
produce a transport with exactly the same behaviour as the original
544
pathfiltered transport.
546
This is so that it is not possible to escape (accidentally or
547
otherwise) the filtering by doing::
548
url = filtered_transport.base
549
parent_url = urlutils.join(url, '..')
550
new_t = transport.get_transport_from_url(parent_url)
552
t = self.make_pf_transport()
553
new_t = transport.get_transport_from_url(t.base)
554
self.assertEqual(t.server, new_t.server)
555
self.assertEqual(t.base, new_t.base)
558
class ReadonlyDecoratorTransportTest(tests.TestCase):
559
"""Readonly decoration specific tests."""
561
def test_local_parameters(self):
562
# connect to . in readonly mode
563
t = readonly.ReadonlyTransportDecorator('readonly+.')
564
self.assertEqual(True, t.listable())
565
self.assertEqual(True, t.is_readonly())
567
def test_http_parameters(self):
568
from breezy.tests.http_server import HttpServer
569
# connect to '.' via http which is not listable
570
server = HttpServer()
571
self.start_server(server)
572
t = transport.get_transport_from_url('readonly+' + server.get_url())
573
self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
574
self.assertEqual(False, t.listable())
575
self.assertEqual(True, t.is_readonly())
578
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
579
"""NFS decorator specific tests."""
581
def get_nfs_transport(self, url):
582
# connect to url with nfs decoration
583
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
585
def test_local_parameters(self):
586
# the listable and is_readonly parameters
587
# are not changed by the fakenfs decorator
588
t = self.get_nfs_transport('.')
589
self.assertEqual(True, t.listable())
590
self.assertEqual(False, t.is_readonly())
592
def test_http_parameters(self):
593
# the listable and is_readonly parameters
594
# are not changed by the fakenfs decorator
595
from breezy.tests.http_server import HttpServer
596
# connect to '.' via http which is not listable
597
server = HttpServer()
598
self.start_server(server)
599
t = self.get_nfs_transport(server.get_url())
600
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
601
self.assertEqual(False, t.listable())
602
self.assertEqual(True, t.is_readonly())
604
def test_fakenfs_server_default(self):
605
# a FakeNFSServer() should bring up a local relpath server for itself
606
server = test_server.FakeNFSServer()
607
self.start_server(server)
608
# the url should be decorated appropriately
609
self.assertStartsWith(server.get_url(), 'fakenfs+')
610
# and we should be able to get a transport for it
611
t = transport.get_transport_from_url(server.get_url())
612
# which must be a FakeNFSTransportDecorator instance.
613
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
615
def test_fakenfs_rename_semantics(self):
616
# a FakeNFS transport must mangle the way rename errors occur to
617
# look like NFS problems.
618
t = self.get_nfs_transport('.')
619
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
621
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
624
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
625
"""Tests for simulation of VFAT restrictions"""
627
def get_vfat_transport(self, url):
628
"""Return vfat-backed transport for test directory"""
629
from breezy.transport.fakevfat import FakeVFATTransportDecorator
630
return FakeVFATTransportDecorator('vfat+' + url)
632
def test_transport_creation(self):
633
from breezy.transport.fakevfat import FakeVFATTransportDecorator
634
t = self.get_vfat_transport('.')
635
self.assertIsInstance(t, FakeVFATTransportDecorator)
637
def test_transport_mkdir(self):
638
t = self.get_vfat_transport('.')
640
self.assertTrue(t.has('hello'))
641
self.assertTrue(t.has('Hello'))
643
def test_forbidden_chars(self):
644
t = self.get_vfat_transport('.')
645
self.assertRaises(ValueError, t.has, "<NU>")
648
class BadTransportHandler(transport.Transport):
649
def __init__(self, base_url):
650
raise errors.DependencyNotPresent('some_lib',
651
'testing missing dependency')
654
class BackupTransportHandler(transport.Transport):
655
"""Test transport that works as a backup for the BadTransportHandler"""
659
class TestTransportImplementation(tests.TestCaseInTempDir):
660
"""Implementation verification for transports.
662
To verify a transport we need a server factory, which is a callable
663
that accepts no parameters and returns an implementation of
664
breezy.transport.Server.
666
That Server is then used to construct transport instances and test
667
the transport via loopback activity.
669
Currently this assumes that the Transport object is connected to the
670
current working directory. So that whatever is done
671
through the transport, should show up in the working
672
directory, and vice-versa. This is a bug, because its possible to have
673
URL schemes which provide access to something that may not be
674
result in storage on the local disk, i.e. due to file system limits, or
675
due to it being a database or some other non-filesystem tool.
677
This also tests to make sure that the functions work with both
678
generators and lists (assuming iter(list) is effectively a generator)
682
super(TestTransportImplementation, self).setUp()
683
self._server = self.transport_server()
684
self.start_server(self._server)
686
def get_transport(self, relpath=None):
687
"""Return a connected transport to the local directory.
689
:param relpath: a path relative to the base url.
691
base_url = self._server.get_url()
692
url = self._adjust_url(base_url, relpath)
693
# try getting the transport via the regular interface:
694
t = transport.get_transport_from_url(url)
695
# vila--20070607 if the following are commented out the test suite
696
# still pass. Is this really still needed or was it a forgotten
698
if not isinstance(t, self.transport_class):
699
# we did not get the correct transport class type. Override the
700
# regular connection behaviour by direct construction.
701
t = self.transport_class(url)
705
class TestTransportFromPath(tests.TestCaseInTempDir):
707
def test_with_path(self):
708
t = transport.get_transport_from_path(self.test_dir)
709
self.assertIsInstance(t, local.LocalTransport)
710
self.assertEqual(t.base.rstrip("/"),
711
urlutils.local_path_to_url(self.test_dir))
713
def test_with_url(self):
714
t = transport.get_transport_from_path("file:")
715
self.assertIsInstance(t, local.LocalTransport)
716
self.assertEqual(t.base.rstrip("/"),
717
urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
720
class TestTransportFromUrl(tests.TestCaseInTempDir):
722
def test_with_path(self):
723
self.assertRaises(urlutils.InvalidURL, transport.get_transport_from_url,
726
def test_with_url(self):
727
url = urlutils.local_path_to_url(self.test_dir)
728
t = transport.get_transport_from_url(url)
729
self.assertIsInstance(t, local.LocalTransport)
730
self.assertEqual(t.base.rstrip("/"), url)
732
def test_with_url_and_segment_parameters(self):
733
url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
734
t = transport.get_transport_from_url(url)
735
self.assertIsInstance(t, local.LocalTransport)
736
self.assertEqual(t.base.rstrip("/"), url)
737
with open(os.path.join(self.test_dir, "afile"), 'w') as f:
739
self.assertTrue(t.has("afile"))
742
class TestLocalTransports(tests.TestCase):
744
def test_get_transport_from_abspath(self):
745
here = osutils.abspath('.')
746
t = transport.get_transport(here)
747
self.assertIsInstance(t, local.LocalTransport)
748
self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
750
def test_get_transport_from_relpath(self):
751
here = osutils.abspath('.')
752
t = transport.get_transport('.')
753
self.assertIsInstance(t, local.LocalTransport)
754
self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
756
def test_get_transport_from_local_url(self):
757
here = osutils.abspath('.')
758
here_url = urlutils.local_path_to_url(here) + '/'
759
t = transport.get_transport(here_url)
760
self.assertIsInstance(t, local.LocalTransport)
761
self.assertEqual(t.base, here_url)
763
def test_local_abspath(self):
764
here = osutils.abspath('.')
765
t = transport.get_transport(here)
766
self.assertEqual(t.local_abspath(''), here)
769
class TestLocalTransportMutation(tests.TestCaseInTempDir):
771
def test_local_transport_mkdir(self):
772
here = osutils.abspath('.')
773
t = transport.get_transport(here)
775
self.assertTrue(os.path.exists('test'))
777
def test_local_transport_mkdir_permission_denied(self):
778
# See https://bugs.launchpad.net/bzr/+bug/606537
779
here = osutils.abspath('.')
780
t = transport.get_transport(here)
781
def fake_chmod(path, mode):
782
e = OSError('permission denied')
783
e.errno = errno.EPERM
785
self.overrideAttr(os, 'chmod', fake_chmod)
787
t.mkdir('test2', mode=0o707)
788
self.assertTrue(os.path.exists('test'))
789
self.assertTrue(os.path.exists('test2'))
792
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
794
def test_local_fdatasync_calls_fdatasync(self):
795
"""Check fdatasync on a stream tries to flush the data to the OS.
797
We can't easily observe the external effect but we can at least see
801
fdatasync = getattr(os, 'fdatasync', sentinel)
802
if fdatasync is sentinel:
803
raise tests.TestNotApplicable('fdatasync not supported')
804
t = self.get_transport('.')
805
calls = self.recordCalls(os, 'fdatasync')
806
w = t.open_write_stream('out')
809
with open('out', 'rb') as f:
810
# Should have been flushed.
811
self.assertEqual(f.read(), b'foo')
812
self.assertEqual(len(calls), 1, calls)
814
def test_missing_directory(self):
815
t = self.get_transport('.')
816
self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
819
class TestWin32LocalTransport(tests.TestCase):
821
def test_unc_clone_to_root(self):
822
self.requireFeature(features.win32_feature)
823
# Win32 UNC path like \\HOST\path
824
# clone to root should stop at least at \\HOST part
826
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
829
self.assertEqual(t.base, 'file://HOST/')
830
# make sure we reach the root
832
self.assertEqual(t.base, 'file://HOST/')
835
class TestConnectedTransport(tests.TestCase):
836
"""Tests for connected to remote server transports"""
838
def test_parse_url(self):
839
t = transport.ConnectedTransport(
840
'http://simple.example.com/home/source')
841
self.assertEqual(t._parsed_url.host, 'simple.example.com')
842
self.assertEqual(t._parsed_url.port, None)
843
self.assertEqual(t._parsed_url.path, '/home/source/')
844
self.assertTrue(t._parsed_url.user is None)
845
self.assertTrue(t._parsed_url.password is None)
847
self.assertEqual(t.base, 'http://simple.example.com/home/source/')
849
def test_parse_url_with_at_in_user(self):
851
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
852
self.assertEqual(t._parsed_url.user, 'user@host.com')
854
def test_parse_quoted_url(self):
855
t = transport.ConnectedTransport(
856
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
857
self.assertEqual(t._parsed_url.host, 'exAmple.com')
858
self.assertEqual(t._parsed_url.port, 2222)
859
self.assertEqual(t._parsed_url.user, 'robey')
860
self.assertEqual(t._parsed_url.password, 'h@t')
861
self.assertEqual(t._parsed_url.path, '/path/')
863
# Base should not keep track of the password
864
self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
866
def test_parse_invalid_url(self):
867
self.assertRaises(urlutils.InvalidURL,
868
transport.ConnectedTransport,
869
'sftp://lily.org:~janneke/public/bzr/gub')
871
def test_relpath(self):
872
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
874
self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
876
self.assertRaises(errors.PathNotChild, t.relpath,
877
'http://user@host.com/abs/path/sub')
878
self.assertRaises(errors.PathNotChild, t.relpath,
879
'sftp://user2@host.com/abs/path/sub')
880
self.assertRaises(errors.PathNotChild, t.relpath,
881
'sftp://user@otherhost.com/abs/path/sub')
882
self.assertRaises(errors.PathNotChild, t.relpath,
883
'sftp://user@host.com:33/abs/path/sub')
884
# Make sure it works when we don't supply a username
885
t = transport.ConnectedTransport('sftp://host.com/abs/path')
886
self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
888
# Make sure it works when parts of the path will be url encoded
889
t = transport.ConnectedTransport('sftp://host.com/dev/%path')
890
self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
892
def test_connection_sharing_propagate_credentials(self):
893
t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
894
self.assertEqual('user', t._parsed_url.user)
895
self.assertEqual('host.com', t._parsed_url.host)
896
self.assertIs(None, t._get_connection())
897
self.assertIs(None, t._parsed_url.password)
898
c = t.clone('subdir')
899
self.assertIs(None, c._get_connection())
900
self.assertIs(None, t._parsed_url.password)
902
# Simulate the user entering a password
904
connection = object()
905
t._set_connection(connection, password)
906
self.assertIs(connection, t._get_connection())
907
self.assertIs(password, t._get_credentials())
908
self.assertIs(connection, c._get_connection())
909
self.assertIs(password, c._get_credentials())
911
# credentials can be updated
912
new_password = 'even more secret'
913
c._update_credentials(new_password)
914
self.assertIs(connection, t._get_connection())
915
self.assertIs(new_password, t._get_credentials())
916
self.assertIs(connection, c._get_connection())
917
self.assertIs(new_password, c._get_credentials())
920
class TestReusedTransports(tests.TestCase):
921
"""Tests for transport reuse"""
923
def test_reuse_same_transport(self):
924
possible_transports = []
925
t1 = transport.get_transport_from_url('http://foo/',
926
possible_transports=possible_transports)
927
self.assertEqual([t1], possible_transports)
928
t2 = transport.get_transport_from_url('http://foo/',
929
possible_transports=[t1])
930
self.assertIs(t1, t2)
932
# Also check that final '/' are handled correctly
933
t3 = transport.get_transport_from_url('http://foo/path/')
934
t4 = transport.get_transport_from_url('http://foo/path',
935
possible_transports=[t3])
936
self.assertIs(t3, t4)
938
t5 = transport.get_transport_from_url('http://foo/path')
939
t6 = transport.get_transport_from_url('http://foo/path/',
940
possible_transports=[t5])
941
self.assertIs(t5, t6)
943
def test_don_t_reuse_different_transport(self):
944
t1 = transport.get_transport_from_url('http://foo/path')
945
t2 = transport.get_transport_from_url('http://bar/path',
946
possible_transports=[t1])
947
self.assertIsNot(t1, t2)
950
class TestTransportTrace(tests.TestCase):
952
def test_decorator(self):
953
t = transport.get_transport_from_url('trace+memory://')
954
self.assertIsInstance(
955
t, breezy.transport.trace.TransportTraceDecorator)
957
def test_clone_preserves_activity(self):
958
t = transport.get_transport_from_url('trace+memory://')
960
self.assertTrue(t is not t2)
961
self.assertTrue(t._activity is t2._activity)
963
# the following specific tests are for the operations that have made use of
964
# logging in tests; we could test every single operation but doing that
965
# still won't cause a test failure when the top level Transport API
966
# changes; so there is little return doing that.
968
t = transport.get_transport_from_url('trace+memory:///')
969
t.put_bytes('foo', b'barish')
972
# put_bytes records the bytes, not the content to avoid memory
974
expected_result.append(('put_bytes', 'foo', 6, None))
975
# get records the file name only.
976
expected_result.append(('get', 'foo'))
977
self.assertEqual(expected_result, t._activity)
979
def test_readv(self):
980
t = transport.get_transport_from_url('trace+memory:///')
981
t.put_bytes('foo', b'barish')
982
list(t.readv('foo', [(0, 1), (3, 2)],
983
adjust_for_latency=True, upper_limit=6))
985
# put_bytes records the bytes, not the content to avoid memory
987
expected_result.append(('put_bytes', 'foo', 6, None))
988
# readv records the supplied offset request
989
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
990
self.assertEqual(expected_result, t._activity)
993
class TestSSHConnections(tests.TestCaseWithTransport):
995
def test_bzr_connect_to_bzr_ssh(self):
996
"""get_transport of a bzr+ssh:// behaves correctly.
998
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
1000
# This test actually causes a bzr instance to be invoked, which is very
1001
# expensive: it should be the only such test in the test suite.
1002
# A reasonable evolution for this would be to simply check inside
1003
# check_channel_exec_request that the command is appropriate, and then
1004
# satisfy requests in-process.
1005
self.requireFeature(features.paramiko)
1006
# SFTPFullAbsoluteServer has a get_url method, and doesn't
1007
# override the interface (doesn't change self._vendor).
1008
# Note that this does encryption, so can be slow.
1009
from breezy.tests import stub_sftp
1011
# Start an SSH server
1012
self.command_executed = []
1013
# XXX: This is horrible -- we define a really dumb SSH server that
1014
# executes commands, and manage the hooking up of stdin/out/err to the
1015
# SSH channel ourselves. Surely this has already been implemented
1019
class StubSSHServer(stub_sftp.StubServer):
1023
def check_channel_exec_request(self, channel, command):
1024
self.test.command_executed.append(command)
1025
proc = subprocess.Popen(
1026
command, shell=True, stdin=subprocess.PIPE,
1027
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
1030
# XXX: horribly inefficient, not to mention ugly.
1031
# Start a thread for each of stdin/out/err, and relay bytes
1032
# from the subprocess to channel and vice versa.
1033
def ferry_bytes(read, write, close):
1042
(channel.recv, proc.stdin.write, proc.stdin.close),
1043
(proc.stdout.read, channel.sendall, channel.close),
1044
(proc.stderr.read, channel.sendall_stderr, channel.close)]
1045
started.append(proc)
1046
for read, write, close in file_functions:
1047
t = threading.Thread(
1048
target=ferry_bytes, args=(read, write, close))
1054
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1055
# We *don't* want to override the default SSH vendor: the detected one
1056
# is the one to use.
1058
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1059
# inherits from SFTPServer which forces the SSH vendor to
1060
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1061
self.start_server(ssh_server)
1062
port = ssh_server.port
1064
if sys.platform == 'win32':
1065
bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
1067
bzr_remote_path = self.get_brz_path()
1068
self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1070
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
1071
# variable is used to tell bzr what command to run on the remote end.
1072
path_to_branch = osutils.abspath('.')
1073
if sys.platform == 'win32':
1074
# On Windows, we export all drives as '/C:/, etc. So we need to
1075
# prefix a '/' to get the right path.
1076
path_to_branch = '/' + path_to_branch
1077
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1078
t = transport.get_transport(url)
1079
self.permit_url(t.base)
1083
[b'%s serve --inet --directory=/ --allow-writes' % bzr_remote_path.encode()],
1084
self.command_executed)
1085
# Make sure to disconnect, so that the remote process can stop, and we
1086
# can cleanup. Then pause the test until everything is shutdown
1087
t._client._medium.disconnect()
1090
# First wait for the subprocess
1092
# And the rest are threads
1093
for t in started[1:]:
1097
class TestUnhtml(tests.TestCase):
1099
"""Tests for unhtml_roughly"""
1101
def test_truncation(self):
1102
fake_html = "<p>something!\n" * 1000
1103
result = http.unhtml_roughly(fake_html)
1104
self.assertEqual(len(result), 1000)
1105
self.assertStartsWith(result, " something!")
1108
class SomeDirectory(object):
1110
def look_up(self, name, url):
1114
class TestLocationToUrl(tests.TestCase):
1116
def get_base_location(self):
1117
path = osutils.abspath('/foo/bar')
1118
if path.startswith('/'):
1119
url = 'file://%s' % (path,)
1121
# On Windows, abspaths start with the drive letter, so we have to
1122
# add in the extra '/'
1123
url = 'file:///%s' % (path,)
1126
def test_regular_url(self):
1127
self.assertEqual("file://foo", location_to_url("file://foo"))
1129
def test_directory(self):
1130
directories.register("bar:", SomeDirectory, "Dummy directory")
1131
self.addCleanup(directories.remove, "bar:")
1132
self.assertEqual("http://bar", location_to_url("bar:"))
1134
def test_unicode_url(self):
1135
self.assertRaises(urlutils.InvalidURL, location_to_url,
1136
"http://fo/\xc3\xaf".decode("utf-8"))
1138
def test_unicode_path(self):
1139
path, url = self.get_base_location()
1140
location = path + "\xc3\xaf".decode("utf-8")
1142
self.assertEqual(url, location_to_url(location))
1144
def test_path(self):
1145
path, url = self.get_base_location()
1146
self.assertEqual(url, location_to_url(path))
1148
def test_relative_file_url(self):
1149
self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1150
location_to_url("file:bar"))
1152
def test_absolute_file_url(self):
1153
self.assertEqual("file:///bar", location_to_url("file:/bar"))