1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
31
from ..directory_service import directories
32
from ..sixish import (
35
from ..transport import (
45
import breezy.transport.trace
52
# TODO: Should possibly split transport-specific tests into their own files.
55
class TestTransport(tests.TestCase):
56
"""Test the non transport-concrete class functionality."""
58
def test__get_set_protocol_handlers(self):
59
handlers = transport._get_protocol_handlers()
60
self.assertNotEqual([], handlers.keys())
61
transport._clear_protocol_handlers()
62
self.addCleanup(transport._set_protocol_handlers, handlers)
63
self.assertEqual([], transport._get_protocol_handlers().keys())
65
def test_get_transport_modules(self):
66
handlers = transport._get_protocol_handlers()
67
self.addCleanup(transport._set_protocol_handlers, handlers)
68
# don't pollute the current handlers
69
transport._clear_protocol_handlers()
71
class SampleHandler(object):
72
"""I exist, isnt that enough?"""
73
transport._clear_protocol_handlers()
74
transport.register_transport_proto('foo')
75
transport.register_lazy_transport('foo',
76
'breezy.tests.test_transport',
77
'TestTransport.SampleHandler')
78
transport.register_transport_proto('bar')
79
transport.register_lazy_transport('bar',
80
'breezy.tests.test_transport',
81
'TestTransport.SampleHandler')
82
self.assertEqual([SampleHandler.__module__,
83
'breezy.transport.chroot',
84
'breezy.transport.pathfilter'],
85
transport._get_transport_modules())
87
def test_transport_dependency(self):
88
"""Transport with missing dependency causes no error"""
89
saved_handlers = transport._get_protocol_handlers()
90
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
91
# don't pollute the current handlers
92
transport._clear_protocol_handlers()
93
transport.register_transport_proto('foo')
94
transport.register_lazy_transport(
95
'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
97
transport.get_transport_from_url('foo://fooserver/foo')
98
except errors.UnsupportedProtocol as e:
100
self.assertEqual('Unsupported protocol'
101
' for url "foo://fooserver/foo":'
102
' Unable to import library "some_lib":'
103
' testing missing dependency', str(e))
105
self.fail('Did not raise UnsupportedProtocol')
107
def test_transport_fallback(self):
108
"""Transport with missing dependency causes no error"""
109
saved_handlers = transport._get_protocol_handlers()
110
self.addCleanup(transport._set_protocol_handlers, saved_handlers)
111
transport._clear_protocol_handlers()
112
transport.register_transport_proto('foo')
113
transport.register_lazy_transport(
114
'foo', 'breezy.tests.test_transport', 'BackupTransportHandler')
115
transport.register_lazy_transport(
116
'foo', 'breezy.tests.test_transport', 'BadTransportHandler')
117
t = transport.get_transport_from_url('foo://fooserver/foo')
118
self.assertTrue(isinstance(t, BackupTransportHandler))
120
def test_ssh_hints(self):
121
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
123
transport.get_transport_from_url('ssh://fooserver/foo')
124
except errors.UnsupportedProtocol as e:
126
self.assertEqual('Unsupported protocol'
127
' for url "ssh://fooserver/foo":'
128
' bzr supports bzr+ssh to operate over ssh,'
129
' use "bzr+ssh://fooserver/foo".',
132
self.fail('Did not raise UnsupportedProtocol')
134
def test_LateReadError(self):
135
"""The LateReadError helper should raise on read()."""
136
a_file = transport.LateReadError('a path')
139
except errors.ReadError as error:
140
self.assertEqual('a path', error.path)
141
self.assertRaises(errors.ReadError, a_file.read, 40)
144
def test_local_abspath_non_local_transport(self):
145
# the base implementation should throw
146
t = memory.MemoryTransport()
147
e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
148
self.assertEqual('memory:///t is not a local path.', str(e))
151
class TestCoalesceOffsets(tests.TestCase):
153
def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
154
coalesce = transport.Transport._coalesce_offsets
155
exp = [transport._CoalescedOffset(*x) for x in expected]
156
out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
158
self.assertEqual(exp, out)
160
def test_coalesce_empty(self):
163
def test_coalesce_simple(self):
164
self.check([(0, 10, [(0, 10)])], [(0, 10)])
166
def test_coalesce_unrelated(self):
167
self.check([(0, 10, [(0, 10)]),
169
], [(0, 10), (20, 10)])
171
def test_coalesce_unsorted(self):
172
self.check([(20, 10, [(0, 10)]),
174
], [(20, 10), (0, 10)])
176
def test_coalesce_nearby(self):
177
self.check([(0, 20, [(0, 10), (10, 10)])],
180
def test_coalesce_overlapped(self):
181
self.assertRaises(ValueError,
182
self.check, [(0, 15, [(0, 10), (5, 10)])],
185
def test_coalesce_limit(self):
186
self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
187
(30, 10), (40, 10)]),
188
(60, 50, [(0, 10), (10, 10), (20, 10),
189
(30, 10), (40, 10)]),
190
], [(10, 10), (20, 10), (30, 10), (40, 10),
191
(50, 10), (60, 10), (70, 10), (80, 10),
192
(90, 10), (100, 10)],
195
def test_coalesce_no_limit(self):
196
self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
197
(30, 10), (40, 10), (50, 10),
198
(60, 10), (70, 10), (80, 10),
200
], [(10, 10), (20, 10), (30, 10), (40, 10),
201
(50, 10), (60, 10), (70, 10), (80, 10),
202
(90, 10), (100, 10)])
204
def test_coalesce_fudge(self):
205
self.check([(10, 30, [(0, 10), (20, 10)]),
206
(100, 10, [(0, 10)]),
207
], [(10, 10), (30, 10), (100, 10)],
210
def test_coalesce_max_size(self):
211
self.check([(10, 20, [(0, 10), (10, 10)]),
213
# If one range is above max_size, it gets its own coalesced
215
(100, 80, [(0, 80)]),],
216
[(10, 10), (20, 10), (30, 50), (100, 80)],
219
def test_coalesce_no_max_size(self):
220
self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)])],
221
[(10, 10), (20, 10), (30, 50), (80, 100)],
224
def test_coalesce_default_limit(self):
225
# By default we use a 100MB max size.
226
ten_mb = 10 * 1024 * 1024
227
self.check([(0, 10 * ten_mb, [(i * ten_mb, ten_mb) for i in range(10)]),
228
(10*ten_mb, ten_mb, [(0, ten_mb)])],
229
[(i*ten_mb, ten_mb) for i in range(11)])
230
self.check([(0, 11 * ten_mb, [(i * ten_mb, ten_mb) for i in range(11)])],
231
[(i * ten_mb, ten_mb) for i in range(11)],
232
max_size=1*1024*1024*1024)
235
class TestMemoryServer(tests.TestCase):
237
def test_create_server(self):
238
server = memory.MemoryServer()
239
server.start_server()
240
url = server.get_url()
241
self.assertTrue(url in transport.transport_list_registry)
242
t = transport.get_transport_from_url(url)
245
self.assertFalse(url in transport.transport_list_registry)
246
self.assertRaises(errors.UnsupportedProtocol,
247
transport.get_transport, url)
250
class TestMemoryTransport(tests.TestCase):
252
def test_get_transport(self):
253
memory.MemoryTransport()
255
def test_clone(self):
256
t = memory.MemoryTransport()
257
self.assertTrue(isinstance(t, memory.MemoryTransport))
258
self.assertEqual("memory:///", t.clone("/").base)
260
def test_abspath(self):
261
t = memory.MemoryTransport()
262
self.assertEqual("memory:///relpath", t.abspath('relpath'))
264
def test_abspath_of_root(self):
265
t = memory.MemoryTransport()
266
self.assertEqual("memory:///", t.base)
267
self.assertEqual("memory:///", t.abspath('/'))
269
def test_abspath_of_relpath_starting_at_root(self):
270
t = memory.MemoryTransport()
271
self.assertEqual("memory:///foo", t.abspath('/foo'))
273
def test_append_and_get(self):
274
t = memory.MemoryTransport()
275
t.append_bytes('path', 'content')
276
self.assertEqual(t.get('path').read(), 'content')
277
t.append_file('path', BytesIO(b'content'))
278
self.assertEqual(t.get('path').read(), 'contentcontent')
280
def test_put_and_get(self):
281
t = memory.MemoryTransport()
282
t.put_file('path', BytesIO(b'content'))
283
self.assertEqual(t.get('path').read(), 'content')
284
t.put_bytes('path', 'content')
285
self.assertEqual(t.get('path').read(), 'content')
287
def test_append_without_dir_fails(self):
288
t = memory.MemoryTransport()
289
self.assertRaises(errors.NoSuchFile,
290
t.append_bytes, 'dir/path', 'content')
292
def test_put_without_dir_fails(self):
293
t = memory.MemoryTransport()
294
self.assertRaises(errors.NoSuchFile,
295
t.put_file, 'dir/path', BytesIO(b'content'))
297
def test_get_missing(self):
298
transport = memory.MemoryTransport()
299
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
301
def test_has_missing(self):
302
t = memory.MemoryTransport()
303
self.assertEqual(False, t.has('foo'))
305
def test_has_present(self):
306
t = memory.MemoryTransport()
307
t.append_bytes('foo', 'content')
308
self.assertEqual(True, t.has('foo'))
310
def test_list_dir(self):
311
t = memory.MemoryTransport()
312
t.put_bytes('foo', 'content')
314
t.put_bytes('dir/subfoo', 'content')
315
t.put_bytes('dirlike', 'content')
317
self.assertEqual(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
318
self.assertEqual(['subfoo'], sorted(t.list_dir('dir')))
320
def test_mkdir(self):
321
t = memory.MemoryTransport()
323
t.append_bytes('dir/path', 'content')
324
self.assertEqual(t.get('dir/path').read(), 'content')
326
def test_mkdir_missing_parent(self):
327
t = memory.MemoryTransport()
328
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
330
def test_mkdir_twice(self):
331
t = memory.MemoryTransport()
333
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
335
def test_parameters(self):
336
t = memory.MemoryTransport()
337
self.assertEqual(True, t.listable())
338
self.assertEqual(False, t.is_readonly())
340
def test_iter_files_recursive(self):
341
t = memory.MemoryTransport()
343
t.put_bytes('dir/foo', 'content')
344
t.put_bytes('dir/bar', 'content')
345
t.put_bytes('bar', 'content')
346
paths = set(t.iter_files_recursive())
347
self.assertEqual({'dir/foo', 'dir/bar', 'bar'}, paths)
350
t = memory.MemoryTransport()
351
t.put_bytes('foo', 'content')
352
t.put_bytes('bar', 'phowar')
353
self.assertEqual(7, t.stat('foo').st_size)
354
self.assertEqual(6, t.stat('bar').st_size)
357
class ChrootDecoratorTransportTest(tests.TestCase):
358
"""Chroot decoration specific tests."""
360
def test_abspath(self):
361
# The abspath is always relative to the chroot_url.
362
server = chroot.ChrootServer(
363
transport.get_transport_from_url('memory:///foo/bar/'))
364
self.start_server(server)
365
t = transport.get_transport_from_url(server.get_url())
366
self.assertEqual(server.get_url(), t.abspath('/'))
368
subdir_t = t.clone('subdir')
369
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
371
def test_clone(self):
372
server = chroot.ChrootServer(
373
transport.get_transport_from_url('memory:///foo/bar/'))
374
self.start_server(server)
375
t = transport.get_transport_from_url(server.get_url())
376
# relpath from root and root path are the same
377
relpath_cloned = t.clone('foo')
378
abspath_cloned = t.clone('/foo')
379
self.assertEqual(server, relpath_cloned.server)
380
self.assertEqual(server, abspath_cloned.server)
382
def test_chroot_url_preserves_chroot(self):
383
"""Calling get_transport on a chroot transport's base should produce a
384
transport with exactly the same behaviour as the original chroot
387
This is so that it is not possible to escape a chroot by doing::
388
url = chroot_transport.base
389
parent_url = urlutils.join(url, '..')
390
new_t = transport.get_transport_from_url(parent_url)
392
server = chroot.ChrootServer(
393
transport.get_transport_from_url('memory:///path/subpath'))
394
self.start_server(server)
395
t = transport.get_transport_from_url(server.get_url())
396
new_t = transport.get_transport_from_url(t.base)
397
self.assertEqual(t.server, new_t.server)
398
self.assertEqual(t.base, new_t.base)
400
def test_urljoin_preserves_chroot(self):
401
"""Using urlutils.join(url, '..') on a chroot URL should not produce a
402
URL that escapes the intended chroot.
404
This is so that it is not possible to escape a chroot by doing::
405
url = chroot_transport.base
406
parent_url = urlutils.join(url, '..')
407
new_t = transport.get_transport_from_url(parent_url)
409
server = chroot.ChrootServer(
410
transport.get_transport_from_url('memory:///path/'))
411
self.start_server(server)
412
t = transport.get_transport_from_url(server.get_url())
414
errors.InvalidURLJoin, urlutils.join, t.base, '..')
417
class TestChrootServer(tests.TestCase):
419
def test_construct(self):
420
backing_transport = memory.MemoryTransport()
421
server = chroot.ChrootServer(backing_transport)
422
self.assertEqual(backing_transport, server.backing_transport)
424
def test_setUp(self):
425
backing_transport = memory.MemoryTransport()
426
server = chroot.ChrootServer(backing_transport)
427
server.start_server()
428
self.addCleanup(server.stop_server)
429
self.assertTrue(server.scheme
430
in transport._get_protocol_handlers().keys())
432
def test_stop_server(self):
433
backing_transport = memory.MemoryTransport()
434
server = chroot.ChrootServer(backing_transport)
435
server.start_server()
437
self.assertFalse(server.scheme
438
in transport._get_protocol_handlers().keys())
440
def test_get_url(self):
441
backing_transport = memory.MemoryTransport()
442
server = chroot.ChrootServer(backing_transport)
443
server.start_server()
444
self.addCleanup(server.stop_server)
445
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
448
class TestHooks(tests.TestCase):
449
"""Basic tests for transport hooks"""
451
def _get_connected_transport(self):
452
return transport.ConnectedTransport("bogus:nowhere")
454
def test_transporthooks_initialisation(self):
455
"""Check all expected transport hook points are set up"""
456
hookpoint = transport.TransportHooks()
457
self.assertTrue("post_connect" in hookpoint,
458
"post_connect not in %s" % (hookpoint,))
460
def test_post_connect(self):
461
"""Ensure the post_connect hook is called when _set_transport is"""
463
transport.Transport.hooks.install_named_hook("post_connect",
465
t = self._get_connected_transport()
466
self.assertLength(0, calls)
467
t._set_connection("connection", "auth")
468
self.assertEqual(calls, [t])
471
class PathFilteringDecoratorTransportTest(tests.TestCase):
472
"""Pathfilter decoration specific tests."""
474
def test_abspath(self):
475
# The abspath is always relative to the base of the backing transport.
476
server = pathfilter.PathFilteringServer(
477
transport.get_transport_from_url('memory:///foo/bar/'),
479
server.start_server()
480
t = transport.get_transport_from_url(server.get_url())
481
self.assertEqual(server.get_url(), t.abspath('/'))
483
subdir_t = t.clone('subdir')
484
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
487
def make_pf_transport(self, filter_func=None):
488
"""Make a PathFilteringTransport backed by a MemoryTransport.
490
:param filter_func: by default this will be a no-op function. Use this
491
parameter to override it."""
492
if filter_func is None:
493
filter_func = lambda x: x
494
server = pathfilter.PathFilteringServer(
495
transport.get_transport_from_url('memory:///foo/bar/'), filter_func)
496
server.start_server()
497
self.addCleanup(server.stop_server)
498
return transport.get_transport_from_url(server.get_url())
500
def test__filter(self):
501
# _filter (with an identity func as filter_func) always returns
502
# paths relative to the base of the backing transport.
503
t = self.make_pf_transport()
504
self.assertEqual('foo', t._filter('foo'))
505
self.assertEqual('foo/bar', t._filter('foo/bar'))
506
self.assertEqual('', t._filter('..'))
507
self.assertEqual('', t._filter('/'))
508
# The base of the pathfiltering transport is taken into account too.
509
t = t.clone('subdir1/subdir2')
510
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
511
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
512
self.assertEqual('subdir1', t._filter('..'))
513
self.assertEqual('', t._filter('/'))
515
def test_filter_invocation(self):
519
filter_log.append(path)
521
t = self.make_pf_transport(filter)
523
self.assertEqual(['abc'], filter_log)
525
t.clone('abc').has('xyz')
526
self.assertEqual(['abc/xyz'], filter_log)
529
self.assertEqual(['abc'], filter_log)
531
def test_clone(self):
532
t = self.make_pf_transport()
533
# relpath from root and root path are the same
534
relpath_cloned = t.clone('foo')
535
abspath_cloned = t.clone('/foo')
536
self.assertEqual(t.server, relpath_cloned.server)
537
self.assertEqual(t.server, abspath_cloned.server)
539
def test_url_preserves_pathfiltering(self):
540
"""Calling get_transport on a pathfiltered transport's base should
541
produce a transport with exactly the same behaviour as the original
542
pathfiltered transport.
544
This is so that it is not possible to escape (accidentally or
545
otherwise) the filtering by doing::
546
url = filtered_transport.base
547
parent_url = urlutils.join(url, '..')
548
new_t = transport.get_transport_from_url(parent_url)
550
t = self.make_pf_transport()
551
new_t = transport.get_transport_from_url(t.base)
552
self.assertEqual(t.server, new_t.server)
553
self.assertEqual(t.base, new_t.base)
556
class ReadonlyDecoratorTransportTest(tests.TestCase):
557
"""Readonly decoration specific tests."""
559
def test_local_parameters(self):
560
# connect to . in readonly mode
561
t = readonly.ReadonlyTransportDecorator('readonly+.')
562
self.assertEqual(True, t.listable())
563
self.assertEqual(True, t.is_readonly())
565
def test_http_parameters(self):
566
from breezy.tests.http_server import HttpServer
567
# connect to '.' via http which is not listable
568
server = HttpServer()
569
self.start_server(server)
570
t = transport.get_transport_from_url('readonly+' + server.get_url())
571
self.assertIsInstance(t, readonly.ReadonlyTransportDecorator)
572
self.assertEqual(False, t.listable())
573
self.assertEqual(True, t.is_readonly())
576
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
577
"""NFS decorator specific tests."""
579
def get_nfs_transport(self, url):
580
# connect to url with nfs decoration
581
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
583
def test_local_parameters(self):
584
# the listable and is_readonly parameters
585
# are not changed by the fakenfs decorator
586
t = self.get_nfs_transport('.')
587
self.assertEqual(True, t.listable())
588
self.assertEqual(False, t.is_readonly())
590
def test_http_parameters(self):
591
# the listable and is_readonly parameters
592
# are not changed by the fakenfs decorator
593
from breezy.tests.http_server import HttpServer
594
# connect to '.' via http which is not listable
595
server = HttpServer()
596
self.start_server(server)
597
t = self.get_nfs_transport(server.get_url())
598
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
599
self.assertEqual(False, t.listable())
600
self.assertEqual(True, t.is_readonly())
602
def test_fakenfs_server_default(self):
603
# a FakeNFSServer() should bring up a local relpath server for itself
604
server = test_server.FakeNFSServer()
605
self.start_server(server)
606
# the url should be decorated appropriately
607
self.assertStartsWith(server.get_url(), 'fakenfs+')
608
# and we should be able to get a transport for it
609
t = transport.get_transport_from_url(server.get_url())
610
# which must be a FakeNFSTransportDecorator instance.
611
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
613
def test_fakenfs_rename_semantics(self):
614
# a FakeNFS transport must mangle the way rename errors occur to
615
# look like NFS problems.
616
t = self.get_nfs_transport('.')
617
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
619
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
622
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
623
"""Tests for simulation of VFAT restrictions"""
625
def get_vfat_transport(self, url):
626
"""Return vfat-backed transport for test directory"""
627
from breezy.transport.fakevfat import FakeVFATTransportDecorator
628
return FakeVFATTransportDecorator('vfat+' + url)
630
def test_transport_creation(self):
631
from breezy.transport.fakevfat import FakeVFATTransportDecorator
632
t = self.get_vfat_transport('.')
633
self.assertIsInstance(t, FakeVFATTransportDecorator)
635
def test_transport_mkdir(self):
636
t = self.get_vfat_transport('.')
638
self.assertTrue(t.has('hello'))
639
self.assertTrue(t.has('Hello'))
641
def test_forbidden_chars(self):
642
t = self.get_vfat_transport('.')
643
self.assertRaises(ValueError, t.has, "<NU>")
646
class BadTransportHandler(transport.Transport):
647
def __init__(self, base_url):
648
raise errors.DependencyNotPresent('some_lib',
649
'testing missing dependency')
652
class BackupTransportHandler(transport.Transport):
653
"""Test transport that works as a backup for the BadTransportHandler"""
657
class TestTransportImplementation(tests.TestCaseInTempDir):
658
"""Implementation verification for transports.
660
To verify a transport we need a server factory, which is a callable
661
that accepts no parameters and returns an implementation of
662
breezy.transport.Server.
664
That Server is then used to construct transport instances and test
665
the transport via loopback activity.
667
Currently this assumes that the Transport object is connected to the
668
current working directory. So that whatever is done
669
through the transport, should show up in the working
670
directory, and vice-versa. This is a bug, because its possible to have
671
URL schemes which provide access to something that may not be
672
result in storage on the local disk, i.e. due to file system limits, or
673
due to it being a database or some other non-filesystem tool.
675
This also tests to make sure that the functions work with both
676
generators and lists (assuming iter(list) is effectively a generator)
680
super(TestTransportImplementation, self).setUp()
681
self._server = self.transport_server()
682
self.start_server(self._server)
684
def get_transport(self, relpath=None):
685
"""Return a connected transport to the local directory.
687
:param relpath: a path relative to the base url.
689
base_url = self._server.get_url()
690
url = self._adjust_url(base_url, relpath)
691
# try getting the transport via the regular interface:
692
t = transport.get_transport_from_url(url)
693
# vila--20070607 if the following are commented out the test suite
694
# still pass. Is this really still needed or was it a forgotten
696
if not isinstance(t, self.transport_class):
697
# we did not get the correct transport class type. Override the
698
# regular connection behaviour by direct construction.
699
t = self.transport_class(url)
703
class TestTransportFromPath(tests.TestCaseInTempDir):
705
def test_with_path(self):
706
t = transport.get_transport_from_path(self.test_dir)
707
self.assertIsInstance(t, local.LocalTransport)
708
self.assertEqual(t.base.rstrip("/"),
709
urlutils.local_path_to_url(self.test_dir))
711
def test_with_url(self):
712
t = transport.get_transport_from_path("file:")
713
self.assertIsInstance(t, local.LocalTransport)
714
self.assertEqual(t.base.rstrip("/"),
715
urlutils.local_path_to_url(os.path.join(self.test_dir, "file:")))
718
class TestTransportFromUrl(tests.TestCaseInTempDir):
720
def test_with_path(self):
721
self.assertRaises(errors.InvalidURL, transport.get_transport_from_url,
724
def test_with_url(self):
725
url = urlutils.local_path_to_url(self.test_dir)
726
t = transport.get_transport_from_url(url)
727
self.assertIsInstance(t, local.LocalTransport)
728
self.assertEqual(t.base.rstrip("/"), url)
730
def test_with_url_and_segment_parameters(self):
731
url = urlutils.local_path_to_url(self.test_dir)+",branch=foo"
732
t = transport.get_transport_from_url(url)
733
self.assertIsInstance(t, local.LocalTransport)
734
self.assertEqual(t.base.rstrip("/"), url)
735
with open(os.path.join(self.test_dir, "afile"), 'w') as f:
737
self.assertTrue(t.has("afile"))
740
class TestLocalTransports(tests.TestCase):
742
def test_get_transport_from_abspath(self):
743
here = osutils.abspath('.')
744
t = transport.get_transport(here)
745
self.assertIsInstance(t, local.LocalTransport)
746
self.assertEqual(t.base, urlutils.local_path_to_url(here) + '/')
748
def test_get_transport_from_relpath(self):
749
here = osutils.abspath('.')
750
t = transport.get_transport('.')
751
self.assertIsInstance(t, local.LocalTransport)
752
self.assertEqual(t.base, urlutils.local_path_to_url('.') + '/')
754
def test_get_transport_from_local_url(self):
755
here = osutils.abspath('.')
756
here_url = urlutils.local_path_to_url(here) + '/'
757
t = transport.get_transport(here_url)
758
self.assertIsInstance(t, local.LocalTransport)
759
self.assertEqual(t.base, here_url)
761
def test_local_abspath(self):
762
here = osutils.abspath('.')
763
t = transport.get_transport(here)
764
self.assertEqual(t.local_abspath(''), here)
767
class TestLocalTransportMutation(tests.TestCaseInTempDir):
769
def test_local_transport_mkdir(self):
770
here = osutils.abspath('.')
771
t = transport.get_transport(here)
773
self.assertTrue(os.path.exists('test'))
775
def test_local_transport_mkdir_permission_denied(self):
776
# See https://bugs.launchpad.net/bzr/+bug/606537
777
here = osutils.abspath('.')
778
t = transport.get_transport(here)
779
def fake_chmod(path, mode):
780
e = OSError('permission denied')
781
e.errno = errno.EPERM
783
self.overrideAttr(os, 'chmod', fake_chmod)
785
t.mkdir('test2', mode=0o707)
786
self.assertTrue(os.path.exists('test'))
787
self.assertTrue(os.path.exists('test2'))
790
class TestLocalTransportWriteStream(tests.TestCaseWithTransport):
792
def test_local_fdatasync_calls_fdatasync(self):
793
"""Check fdatasync on a stream tries to flush the data to the OS.
795
We can't easily observe the external effect but we can at least see
799
fdatasync = getattr(os, 'fdatasync', sentinel)
800
if fdatasync is sentinel:
801
raise tests.TestNotApplicable('fdatasync not supported')
802
t = self.get_transport('.')
803
calls = self.recordCalls(os, 'fdatasync')
804
w = t.open_write_stream('out')
807
with open('out', 'rb') as f:
808
# Should have been flushed.
809
self.assertEqual(f.read(), 'foo')
810
self.assertEqual(len(calls), 1, calls)
812
def test_missing_directory(self):
813
t = self.get_transport('.')
814
self.assertRaises(errors.NoSuchFile, t.open_write_stream, 'dir/foo')
817
class TestWin32LocalTransport(tests.TestCase):
819
def test_unc_clone_to_root(self):
820
self.requireFeature(features.win32_feature)
821
# Win32 UNC path like \\HOST\path
822
# clone to root should stop at least at \\HOST part
824
t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
827
self.assertEqual(t.base, 'file://HOST/')
828
# make sure we reach the root
830
self.assertEqual(t.base, 'file://HOST/')
833
class TestConnectedTransport(tests.TestCase):
834
"""Tests for connected to remote server transports"""
836
def test_parse_url(self):
837
t = transport.ConnectedTransport(
838
'http://simple.example.com/home/source')
839
self.assertEqual(t._parsed_url.host, 'simple.example.com')
840
self.assertEqual(t._parsed_url.port, None)
841
self.assertEqual(t._parsed_url.path, '/home/source/')
842
self.assertTrue(t._parsed_url.user is None)
843
self.assertTrue(t._parsed_url.password is None)
845
self.assertEqual(t.base, 'http://simple.example.com/home/source/')
847
def test_parse_url_with_at_in_user(self):
849
t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
850
self.assertEqual(t._parsed_url.user, 'user@host.com')
852
def test_parse_quoted_url(self):
853
t = transport.ConnectedTransport(
854
'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
855
self.assertEqual(t._parsed_url.host, 'exAmple.com')
856
self.assertEqual(t._parsed_url.port, 2222)
857
self.assertEqual(t._parsed_url.user, 'robey')
858
self.assertEqual(t._parsed_url.password, 'h@t')
859
self.assertEqual(t._parsed_url.path, '/path/')
861
# Base should not keep track of the password
862
self.assertEqual(t.base, 'http://ro%62ey@ex%41mple.com:2222/path/')
864
def test_parse_invalid_url(self):
865
self.assertRaises(errors.InvalidURL,
866
transport.ConnectedTransport,
867
'sftp://lily.org:~janneke/public/bzr/gub')
869
def test_relpath(self):
870
t = transport.ConnectedTransport('sftp://user@host.com/abs/path')
872
self.assertEqual(t.relpath('sftp://user@host.com/abs/path/sub'),
874
self.assertRaises(errors.PathNotChild, t.relpath,
875
'http://user@host.com/abs/path/sub')
876
self.assertRaises(errors.PathNotChild, t.relpath,
877
'sftp://user2@host.com/abs/path/sub')
878
self.assertRaises(errors.PathNotChild, t.relpath,
879
'sftp://user@otherhost.com/abs/path/sub')
880
self.assertRaises(errors.PathNotChild, t.relpath,
881
'sftp://user@host.com:33/abs/path/sub')
882
# Make sure it works when we don't supply a username
883
t = transport.ConnectedTransport('sftp://host.com/abs/path')
884
self.assertEqual(t.relpath('sftp://host.com/abs/path/sub'), 'sub')
886
# Make sure it works when parts of the path will be url encoded
887
t = transport.ConnectedTransport('sftp://host.com/dev/%path')
888
self.assertEqual(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')
890
def test_connection_sharing_propagate_credentials(self):
891
t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
892
self.assertEqual('user', t._parsed_url.user)
893
self.assertEqual('host.com', t._parsed_url.host)
894
self.assertIs(None, t._get_connection())
895
self.assertIs(None, t._parsed_url.password)
896
c = t.clone('subdir')
897
self.assertIs(None, c._get_connection())
898
self.assertIs(None, t._parsed_url.password)
900
# Simulate the user entering a password
902
connection = object()
903
t._set_connection(connection, password)
904
self.assertIs(connection, t._get_connection())
905
self.assertIs(password, t._get_credentials())
906
self.assertIs(connection, c._get_connection())
907
self.assertIs(password, c._get_credentials())
909
# credentials can be updated
910
new_password = 'even more secret'
911
c._update_credentials(new_password)
912
self.assertIs(connection, t._get_connection())
913
self.assertIs(new_password, t._get_credentials())
914
self.assertIs(connection, c._get_connection())
915
self.assertIs(new_password, c._get_credentials())
918
class TestReusedTransports(tests.TestCase):
919
"""Tests for transport reuse"""
921
def test_reuse_same_transport(self):
922
possible_transports = []
923
t1 = transport.get_transport_from_url('http://foo/',
924
possible_transports=possible_transports)
925
self.assertEqual([t1], possible_transports)
926
t2 = transport.get_transport_from_url('http://foo/',
927
possible_transports=[t1])
928
self.assertIs(t1, t2)
930
# Also check that final '/' are handled correctly
931
t3 = transport.get_transport_from_url('http://foo/path/')
932
t4 = transport.get_transport_from_url('http://foo/path',
933
possible_transports=[t3])
934
self.assertIs(t3, t4)
936
t5 = transport.get_transport_from_url('http://foo/path')
937
t6 = transport.get_transport_from_url('http://foo/path/',
938
possible_transports=[t5])
939
self.assertIs(t5, t6)
941
def test_don_t_reuse_different_transport(self):
942
t1 = transport.get_transport_from_url('http://foo/path')
943
t2 = transport.get_transport_from_url('http://bar/path',
944
possible_transports=[t1])
945
self.assertIsNot(t1, t2)
948
class TestTransportTrace(tests.TestCase):
950
def test_decorator(self):
951
t = transport.get_transport_from_url('trace+memory://')
952
self.assertIsInstance(
953
t, breezy.transport.trace.TransportTraceDecorator)
955
def test_clone_preserves_activity(self):
956
t = transport.get_transport_from_url('trace+memory://')
958
self.assertTrue(t is not t2)
959
self.assertTrue(t._activity is t2._activity)
961
# the following specific tests are for the operations that have made use of
962
# logging in tests; we could test every single operation but doing that
963
# still won't cause a test failure when the top level Transport API
964
# changes; so there is little return doing that.
966
t = transport.get_transport_from_url('trace+memory:///')
967
t.put_bytes('foo', 'barish')
970
# put_bytes records the bytes, not the content to avoid memory
972
expected_result.append(('put_bytes', 'foo', 6, None))
973
# get records the file name only.
974
expected_result.append(('get', 'foo'))
975
self.assertEqual(expected_result, t._activity)
977
def test_readv(self):
978
t = transport.get_transport_from_url('trace+memory:///')
979
t.put_bytes('foo', 'barish')
980
list(t.readv('foo', [(0, 1), (3, 2)],
981
adjust_for_latency=True, upper_limit=6))
983
# put_bytes records the bytes, not the content to avoid memory
985
expected_result.append(('put_bytes', 'foo', 6, None))
986
# readv records the supplied offset request
987
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
988
self.assertEqual(expected_result, t._activity)
991
class TestSSHConnections(tests.TestCaseWithTransport):
993
def test_bzr_connect_to_bzr_ssh(self):
994
"""get_transport of a bzr+ssh:// behaves correctly.
996
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
998
# This test actually causes a bzr instance to be invoked, which is very
999
# expensive: it should be the only such test in the test suite.
1000
# A reasonable evolution for this would be to simply check inside
1001
# check_channel_exec_request that the command is appropriate, and then
1002
# satisfy requests in-process.
1003
self.requireFeature(features.paramiko)
1004
# SFTPFullAbsoluteServer has a get_url method, and doesn't
1005
# override the interface (doesn't change self._vendor).
1006
# Note that this does encryption, so can be slow.
1007
from breezy.tests import stub_sftp
1009
# Start an SSH server
1010
self.command_executed = []
1011
# XXX: This is horrible -- we define a really dumb SSH server that
1012
# executes commands, and manage the hooking up of stdin/out/err to the
1013
# SSH channel ourselves. Surely this has already been implemented
1017
class StubSSHServer(stub_sftp.StubServer):
1021
def check_channel_exec_request(self, channel, command):
1022
self.test.command_executed.append(command)
1023
proc = subprocess.Popen(
1024
command, shell=True, stdin=subprocess.PIPE,
1025
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1027
# XXX: horribly inefficient, not to mention ugly.
1028
# Start a thread for each of stdin/out/err, and relay bytes
1029
# from the subprocess to channel and vice versa.
1030
def ferry_bytes(read, write, close):
1039
(channel.recv, proc.stdin.write, proc.stdin.close),
1040
(proc.stdout.read, channel.sendall, channel.close),
1041
(proc.stderr.read, channel.sendall_stderr, channel.close)]
1042
started.append(proc)
1043
for read, write, close in file_functions:
1044
t = threading.Thread(
1045
target=ferry_bytes, args=(read, write, close))
1051
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
1052
# We *don't* want to override the default SSH vendor: the detected one
1053
# is the one to use.
1055
# FIXME: I don't understand the above comment, SFTPFullAbsoluteServer
1056
# inherits from SFTPServer which forces the SSH vendor to
1057
# ssh.ParamikoVendor(). So it's forced, not detected. --vila 20100623
1058
self.start_server(ssh_server)
1059
port = ssh_server.port
1061
if sys.platform == 'win32':
1062
bzr_remote_path = sys.executable + ' ' + self.get_brz_path()
1064
bzr_remote_path = self.get_brz_path()
1065
self.overrideEnv('BZR_REMOTE_PATH', bzr_remote_path)
1067
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
1068
# variable is used to tell bzr what command to run on the remote end.
1069
path_to_branch = osutils.abspath('.')
1070
if sys.platform == 'win32':
1071
# On Windows, we export all drives as '/C:/, etc. So we need to
1072
# prefix a '/' to get the right path.
1073
path_to_branch = '/' + path_to_branch
1074
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
1075
t = transport.get_transport(url)
1076
self.permit_url(t.base)
1080
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
1081
self.command_executed)
1082
# Make sure to disconnect, so that the remote process can stop, and we
1083
# can cleanup. Then pause the test until everything is shutdown
1084
t._client._medium.disconnect()
1087
# First wait for the subprocess
1089
# And the rest are threads
1090
for t in started[1:]:
1094
class TestUnhtml(tests.TestCase):
1096
"""Tests for unhtml_roughly"""
1098
def test_truncation(self):
1099
fake_html = "<p>something!\n" * 1000
1100
result = http.unhtml_roughly(fake_html)
1101
self.assertEqual(len(result), 1000)
1102
self.assertStartsWith(result, " something!")
1105
class SomeDirectory(object):
1107
def look_up(self, name, url):
1111
class TestLocationToUrl(tests.TestCase):
1113
def get_base_location(self):
1114
path = osutils.abspath('/foo/bar')
1115
if path.startswith('/'):
1116
url = 'file://%s' % (path,)
1118
# On Windows, abspaths start with the drive letter, so we have to
1119
# add in the extra '/'
1120
url = 'file:///%s' % (path,)
1123
def test_regular_url(self):
1124
self.assertEqual("file://foo", location_to_url("file://foo"))
1126
def test_directory(self):
1127
directories.register("bar:", SomeDirectory, "Dummy directory")
1128
self.addCleanup(directories.remove, "bar:")
1129
self.assertEqual("http://bar", location_to_url("bar:"))
1131
def test_unicode_url(self):
1132
self.assertRaises(errors.InvalidURL, location_to_url,
1133
"http://fo/\xc3\xaf".decode("utf-8"))
1135
def test_unicode_path(self):
1136
path, url = self.get_base_location()
1137
location = path + "\xc3\xaf".decode("utf-8")
1139
self.assertEqual(url, location_to_url(location))
1141
def test_path(self):
1142
path, url = self.get_base_location()
1143
self.assertEqual(url, location_to_url(path))
1145
def test_relative_file_url(self):
1146
self.assertEqual(urlutils.local_path_to_url(".") + "/bar",
1147
location_to_url("file:bar"))
1149
def test_absolute_file_url(self):
1150
self.assertEqual("file:///bar", location_to_url("file:/bar"))