18
18
from cStringIO import StringIO
24
21
from bzrlib import (
31
from bzrlib.transport import (
39
from bzrlib.tests import (
26
from bzrlib.errors import (DependencyNotPresent,
34
from bzrlib.tests import TestCase, TestCaseInTempDir
35
from bzrlib.transport import (_clear_protocol_handlers,
38
_get_protocol_handlers,
39
_set_protocol_handlers,
40
_get_transport_modules,
43
register_lazy_transport,
44
register_transport_proto,
47
from bzrlib.transport.chroot import ChrootServer
48
from bzrlib.transport.memory import MemoryTransport
49
from bzrlib.transport.local import (LocalTransport,
50
EmulatedWin32LocalTransport)
51
from bzrlib.transport.pathfilter import PathFilteringServer
45
54
# TODO: Should possibly split transport-specific tests into their own files.
48
class TestTransport(tests.TestCase):
57
class TestTransport(TestCase):
49
58
"""Test the non transport-concrete class functionality."""
51
# FIXME: These tests should use addCleanup() and/or overrideAttr() instead
52
# of try/finally -- vila 20100205
54
60
def test__get_set_protocol_handlers(self):
55
handlers = transport._get_protocol_handlers()
61
handlers = _get_protocol_handlers()
56
62
self.assertNotEqual([], handlers.keys( ))
58
transport._clear_protocol_handlers()
59
self.assertEqual([], transport._get_protocol_handlers().keys())
64
_clear_protocol_handlers()
65
self.assertEqual([], _get_protocol_handlers().keys())
61
transport._set_protocol_handlers(handlers)
67
_set_protocol_handlers(handlers)
63
69
def test_get_transport_modules(self):
64
handlers = transport._get_protocol_handlers()
70
handlers = _get_protocol_handlers()
65
71
# don't pollute the current handlers
66
transport._clear_protocol_handlers()
72
_clear_protocol_handlers()
67
73
class SampleHandler(object):
68
74
"""I exist, isnt that enough?"""
70
transport._clear_protocol_handlers()
71
transport.register_transport_proto('foo')
72
transport.register_lazy_transport('foo',
73
'bzrlib.tests.test_transport',
74
'TestTransport.SampleHandler')
75
transport.register_transport_proto('bar')
76
transport.register_lazy_transport('bar',
77
'bzrlib.tests.test_transport',
78
'TestTransport.SampleHandler')
76
_clear_protocol_handlers()
77
register_transport_proto('foo')
78
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
79
'TestTransport.SampleHandler')
80
register_transport_proto('bar')
81
register_lazy_transport('bar', 'bzrlib.tests.test_transport',
82
'TestTransport.SampleHandler')
79
83
self.assertEqual([SampleHandler.__module__,
80
84
'bzrlib.transport.chroot',
81
85
'bzrlib.transport.pathfilter'],
82
transport._get_transport_modules())
86
_get_transport_modules())
84
transport._set_protocol_handlers(handlers)
88
_set_protocol_handlers(handlers)
86
90
def test_transport_dependency(self):
87
91
"""Transport with missing dependency causes no error"""
88
saved_handlers = transport._get_protocol_handlers()
92
saved_handlers = _get_protocol_handlers()
89
93
# don't pollute the current handlers
90
transport._clear_protocol_handlers()
94
_clear_protocol_handlers()
92
transport.register_transport_proto('foo')
93
transport.register_lazy_transport(
94
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
96
register_transport_proto('foo')
97
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
98
'BadTransportHandler')
96
transport.get_transport('foo://fooserver/foo')
97
except errors.UnsupportedProtocol, e:
100
get_transport('foo://fooserver/foo')
101
except UnsupportedProtocol, e:
99
103
self.assertEquals('Unsupported protocol'
100
104
' for url "foo://fooserver/foo":'
104
108
self.fail('Did not raise UnsupportedProtocol')
106
110
# restore original values
107
transport._set_protocol_handlers(saved_handlers)
111
_set_protocol_handlers(saved_handlers)
109
113
def test_transport_fallback(self):
110
114
"""Transport with missing dependency causes no error"""
111
saved_handlers = transport._get_protocol_handlers()
115
saved_handlers = _get_protocol_handlers()
113
transport._clear_protocol_handlers()
114
transport.register_transport_proto('foo')
115
transport.register_lazy_transport(
116
'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
117
transport.register_lazy_transport(
118
'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
119
t = transport.get_transport('foo://fooserver/foo')
117
_clear_protocol_handlers()
118
register_transport_proto('foo')
119
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
120
'BackupTransportHandler')
121
register_lazy_transport('foo', 'bzrlib.tests.test_transport',
122
'BadTransportHandler')
123
t = get_transport('foo://fooserver/foo')
120
124
self.assertTrue(isinstance(t, BackupTransportHandler))
122
transport._set_protocol_handlers(saved_handlers)
126
_set_protocol_handlers(saved_handlers)
124
128
def test_ssh_hints(self):
125
129
"""Transport ssh:// should raise an error pointing out bzr+ssh://"""
127
transport.get_transport('ssh://fooserver/foo')
128
except errors.UnsupportedProtocol, e:
131
get_transport('ssh://fooserver/foo')
132
except UnsupportedProtocol, e:
130
134
self.assertEquals('Unsupported protocol'
131
135
' for url "ssh://fooserver/foo":'
132
' bzr supports bzr+ssh to operate over ssh,'
133
' use "bzr+ssh://fooserver/foo".',
136
' bzr supports bzr+ssh to operate over ssh, use "bzr+ssh://fooserver/foo".',
136
139
self.fail('Did not raise UnsupportedProtocol')
138
141
def test_LateReadError(self):
139
142
"""The LateReadError helper should raise on read()."""
140
a_file = transport.LateReadError('a path')
143
a_file = LateReadError('a path')
143
except errors.ReadError, error:
146
except ReadError, error:
144
147
self.assertEqual('a path', error.path)
145
self.assertRaises(errors.ReadError, a_file.read, 40)
148
self.assertRaises(ReadError, a_file.read, 40)
148
151
def test__combine_paths(self):
149
t = transport.Transport('/')
150
153
self.assertEqual('/home/sarah/project/foo',
151
154
t._combine_paths('/home/sarah', 'project/foo'))
152
155
self.assertEqual('/etc',
248
251
max_size=1*1024*1024*1024)
251
class TestMemoryServer(tests.TestCase):
253
def test_create_server(self):
254
server = memory.MemoryServer()
255
server.start_server()
256
url = server.get_url()
257
self.assertTrue(url in transport.transport_list_registry)
258
t = transport.get_transport(url)
261
self.assertFalse(url in transport.transport_list_registry)
262
self.assertRaises(errors.UnsupportedProtocol,
263
transport.get_transport, url)
266
class TestMemoryTransport(tests.TestCase):
254
class TestMemoryTransport(TestCase):
268
256
def test_get_transport(self):
269
memory.MemoryTransport()
271
259
def test_clone(self):
272
t = memory.MemoryTransport()
273
self.assertTrue(isinstance(t, memory.MemoryTransport))
274
self.assertEqual("memory:///", t.clone("/").base)
260
transport = MemoryTransport()
261
self.assertTrue(isinstance(transport, MemoryTransport))
262
self.assertEqual("memory:///", transport.clone("/").base)
276
264
def test_abspath(self):
277
t = memory.MemoryTransport()
278
self.assertEqual("memory:///relpath", t.abspath('relpath'))
265
transport = MemoryTransport()
266
self.assertEqual("memory:///relpath", transport.abspath('relpath'))
280
268
def test_abspath_of_root(self):
281
t = memory.MemoryTransport()
282
self.assertEqual("memory:///", t.base)
283
self.assertEqual("memory:///", t.abspath('/'))
269
transport = MemoryTransport()
270
self.assertEqual("memory:///", transport.base)
271
self.assertEqual("memory:///", transport.abspath('/'))
285
273
def test_abspath_of_relpath_starting_at_root(self):
286
t = memory.MemoryTransport()
287
self.assertEqual("memory:///foo", t.abspath('/foo'))
274
transport = MemoryTransport()
275
self.assertEqual("memory:///foo", transport.abspath('/foo'))
289
277
def test_append_and_get(self):
290
t = memory.MemoryTransport()
291
t.append_bytes('path', 'content')
292
self.assertEqual(t.get('path').read(), 'content')
293
t.append_file('path', StringIO('content'))
294
self.assertEqual(t.get('path').read(), 'contentcontent')
278
transport = MemoryTransport()
279
transport.append_bytes('path', 'content')
280
self.assertEqual(transport.get('path').read(), 'content')
281
transport.append_file('path', StringIO('content'))
282
self.assertEqual(transport.get('path').read(), 'contentcontent')
296
284
def test_put_and_get(self):
297
t = memory.MemoryTransport()
298
t.put_file('path', StringIO('content'))
299
self.assertEqual(t.get('path').read(), 'content')
300
t.put_bytes('path', 'content')
301
self.assertEqual(t.get('path').read(), 'content')
285
transport = MemoryTransport()
286
transport.put_file('path', StringIO('content'))
287
self.assertEqual(transport.get('path').read(), 'content')
288
transport.put_bytes('path', 'content')
289
self.assertEqual(transport.get('path').read(), 'content')
303
291
def test_append_without_dir_fails(self):
304
t = memory.MemoryTransport()
305
self.assertRaises(errors.NoSuchFile,
306
t.append_bytes, 'dir/path', 'content')
292
transport = MemoryTransport()
293
self.assertRaises(NoSuchFile,
294
transport.append_bytes, 'dir/path', 'content')
308
296
def test_put_without_dir_fails(self):
309
t = memory.MemoryTransport()
310
self.assertRaises(errors.NoSuchFile,
311
t.put_file, 'dir/path', StringIO('content'))
297
transport = MemoryTransport()
298
self.assertRaises(NoSuchFile,
299
transport.put_file, 'dir/path', StringIO('content'))
313
301
def test_get_missing(self):
314
transport = memory.MemoryTransport()
315
self.assertRaises(errors.NoSuchFile, transport.get, 'foo')
302
transport = MemoryTransport()
303
self.assertRaises(NoSuchFile, transport.get, 'foo')
317
305
def test_has_missing(self):
318
t = memory.MemoryTransport()
319
self.assertEquals(False, t.has('foo'))
306
transport = MemoryTransport()
307
self.assertEquals(False, transport.has('foo'))
321
309
def test_has_present(self):
322
t = memory.MemoryTransport()
323
t.append_bytes('foo', 'content')
324
self.assertEquals(True, t.has('foo'))
310
transport = MemoryTransport()
311
transport.append_bytes('foo', 'content')
312
self.assertEquals(True, transport.has('foo'))
326
314
def test_list_dir(self):
327
t = memory.MemoryTransport()
328
t.put_bytes('foo', 'content')
330
t.put_bytes('dir/subfoo', 'content')
331
t.put_bytes('dirlike', 'content')
315
transport = MemoryTransport()
316
transport.put_bytes('foo', 'content')
317
transport.mkdir('dir')
318
transport.put_bytes('dir/subfoo', 'content')
319
transport.put_bytes('dirlike', 'content')
333
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
334
self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))
321
self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
322
self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
336
324
def test_mkdir(self):
337
t = memory.MemoryTransport()
339
t.append_bytes('dir/path', 'content')
340
self.assertEqual(t.get('dir/path').read(), 'content')
325
transport = MemoryTransport()
326
transport.mkdir('dir')
327
transport.append_bytes('dir/path', 'content')
328
self.assertEqual(transport.get('dir/path').read(), 'content')
342
330
def test_mkdir_missing_parent(self):
343
t = memory.MemoryTransport()
344
self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')
331
transport = MemoryTransport()
332
self.assertRaises(NoSuchFile,
333
transport.mkdir, 'dir/dir')
346
335
def test_mkdir_twice(self):
347
t = memory.MemoryTransport()
349
self.assertRaises(errors.FileExists, t.mkdir, 'dir')
336
transport = MemoryTransport()
337
transport.mkdir('dir')
338
self.assertRaises(FileExists, transport.mkdir, 'dir')
351
340
def test_parameters(self):
352
t = memory.MemoryTransport()
353
self.assertEqual(True, t.listable())
354
self.assertEqual(False, t.is_readonly())
341
transport = MemoryTransport()
342
self.assertEqual(True, transport.listable())
343
self.assertEqual(False, transport.is_readonly())
356
345
def test_iter_files_recursive(self):
357
t = memory.MemoryTransport()
359
t.put_bytes('dir/foo', 'content')
360
t.put_bytes('dir/bar', 'content')
361
t.put_bytes('bar', 'content')
362
paths = set(t.iter_files_recursive())
346
transport = MemoryTransport()
347
transport.mkdir('dir')
348
transport.put_bytes('dir/foo', 'content')
349
transport.put_bytes('dir/bar', 'content')
350
transport.put_bytes('bar', 'content')
351
paths = set(transport.iter_files_recursive())
363
352
self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
365
354
def test_stat(self):
366
t = memory.MemoryTransport()
367
t.put_bytes('foo', 'content')
368
t.put_bytes('bar', 'phowar')
369
self.assertEqual(7, t.stat('foo').st_size)
370
self.assertEqual(6, t.stat('bar').st_size)
373
class ChrootDecoratorTransportTest(tests.TestCase):
355
transport = MemoryTransport()
356
transport.put_bytes('foo', 'content')
357
transport.put_bytes('bar', 'phowar')
358
self.assertEqual(7, transport.stat('foo').st_size)
359
self.assertEqual(6, transport.stat('bar').st_size)
362
class ChrootDecoratorTransportTest(TestCase):
374
363
"""Chroot decoration specific tests."""
376
365
def test_abspath(self):
377
366
# The abspath is always relative to the chroot_url.
378
server = chroot.ChrootServer(
379
transport.get_transport('memory:///foo/bar/'))
367
server = ChrootServer(get_transport('memory:///foo/bar/'))
380
368
self.start_server(server)
381
t = transport.get_transport(server.get_url())
382
self.assertEqual(server.get_url(), t.abspath('/'))
369
transport = get_transport(server.get_url())
370
self.assertEqual(server.get_url(), transport.abspath('/'))
384
subdir_t = t.clone('subdir')
385
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
372
subdir_transport = transport.clone('subdir')
373
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
387
375
def test_clone(self):
388
server = chroot.ChrootServer(
389
transport.get_transport('memory:///foo/bar/'))
376
server = ChrootServer(get_transport('memory:///foo/bar/'))
390
377
self.start_server(server)
391
t = transport.get_transport(server.get_url())
378
transport = get_transport(server.get_url())
392
379
# relpath from root and root path are the same
393
relpath_cloned = t.clone('foo')
394
abspath_cloned = t.clone('/foo')
380
relpath_cloned = transport.clone('foo')
381
abspath_cloned = transport.clone('/foo')
395
382
self.assertEqual(server, relpath_cloned.server)
396
383
self.assertEqual(server, abspath_cloned.server)
420
406
This is so that it is not possible to escape a chroot by doing::
421
407
url = chroot_transport.base
422
408
parent_url = urlutils.join(url, '..')
423
new_t = transport.get_transport(parent_url)
409
new_transport = get_transport(parent_url)
425
server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
411
server = ChrootServer(get_transport('memory:///path/'))
426
412
self.start_server(server)
427
t = transport.get_transport(server.get_url())
413
transport = get_transport(server.get_url())
428
414
self.assertRaises(
429
errors.InvalidURLJoin, urlutils.join, t.base, '..')
432
class TestChrootServer(tests.TestCase):
415
InvalidURLJoin, urlutils.join, transport.base, '..')
418
class ChrootServerTest(TestCase):
434
420
def test_construct(self):
435
backing_transport = memory.MemoryTransport()
436
server = chroot.ChrootServer(backing_transport)
421
backing_transport = MemoryTransport()
422
server = ChrootServer(backing_transport)
437
423
self.assertEqual(backing_transport, server.backing_transport)
439
425
def test_setUp(self):
440
backing_transport = memory.MemoryTransport()
441
server = chroot.ChrootServer(backing_transport)
442
server.start_server()
426
backing_transport = MemoryTransport()
427
server = ChrootServer(backing_transport)
444
self.assertTrue(server.scheme
445
in transport._get_protocol_handlers().keys())
430
self.assertTrue(server.scheme in _get_protocol_handlers().keys())
449
def test_stop_server(self):
450
backing_transport = memory.MemoryTransport()
451
server = chroot.ChrootServer(backing_transport)
452
server.start_server()
454
self.assertFalse(server.scheme
455
in transport._get_protocol_handlers().keys())
434
def test_tearDown(self):
435
backing_transport = MemoryTransport()
436
server = ChrootServer(backing_transport)
439
self.assertFalse(server.scheme in _get_protocol_handlers().keys())
457
441
def test_get_url(self):
458
backing_transport = memory.MemoryTransport()
459
server = chroot.ChrootServer(backing_transport)
460
server.start_server()
442
backing_transport = MemoryTransport()
443
server = ChrootServer(backing_transport)
462
446
self.assertEqual('chroot-%d:///' % id(server), server.get_url())
467
class PathFilteringDecoratorTransportTest(tests.TestCase):
451
class PathFilteringDecoratorTransportTest(TestCase):
468
452
"""Pathfilter decoration specific tests."""
470
454
def test_abspath(self):
471
455
# The abspath is always relative to the base of the backing transport.
472
server = pathfilter.PathFilteringServer(
473
transport.get_transport('memory:///foo/bar/'),
456
server = PathFilteringServer(get_transport('memory:///foo/bar/'),
475
server.start_server()
476
t = transport.get_transport(server.get_url())
477
self.assertEqual(server.get_url(), t.abspath('/'))
459
transport = get_transport(server.get_url())
460
self.assertEqual(server.get_url(), transport.abspath('/'))
479
subdir_t = t.clone('subdir')
480
self.assertEqual(server.get_url(), subdir_t.abspath('/'))
462
subdir_transport = transport.clone('subdir')
463
self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
483
466
def make_pf_transport(self, filter_func=None):
484
467
"""Make a PathFilteringTransport backed by a MemoryTransport.
487
470
parameter to override it."""
488
471
if filter_func is None:
489
472
filter_func = lambda x: x
490
server = pathfilter.PathFilteringServer(
491
transport.get_transport('memory:///foo/bar/'), filter_func)
492
server.start_server()
493
self.addCleanup(server.stop_server)
494
return transport.get_transport(server.get_url())
473
server = PathFilteringServer(
474
get_transport('memory:///foo/bar/'), filter_func)
476
self.addCleanup(server.tearDown)
477
return get_transport(server.get_url())
496
479
def test__filter(self):
497
480
# _filter (with an identity func as filter_func) always returns
498
481
# paths relative to the base of the backing transport.
499
t = self.make_pf_transport()
500
self.assertEqual('foo', t._filter('foo'))
501
self.assertEqual('foo/bar', t._filter('foo/bar'))
502
self.assertEqual('', t._filter('..'))
503
self.assertEqual('', t._filter('/'))
482
transport = self.make_pf_transport()
483
self.assertEqual('foo', transport._filter('foo'))
484
self.assertEqual('foo/bar', transport._filter('foo/bar'))
485
self.assertEqual('', transport._filter('..'))
486
self.assertEqual('', transport._filter('/'))
504
487
# The base of the pathfiltering transport is taken into account too.
505
t = t.clone('subdir1/subdir2')
506
self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
507
self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
508
self.assertEqual('subdir1', t._filter('..'))
509
self.assertEqual('', t._filter('/'))
488
transport = transport.clone('subdir1/subdir2')
489
self.assertEqual('subdir1/subdir2/foo', transport._filter('foo'))
491
'subdir1/subdir2/foo/bar', transport._filter('foo/bar'))
492
self.assertEqual('subdir1', transport._filter('..'))
493
self.assertEqual('', transport._filter('/'))
511
495
def test_filter_invocation(self):
513
497
def filter(path):
514
498
filter_log.append(path)
516
t = self.make_pf_transport(filter)
500
transport = self.make_pf_transport(filter)
518
502
self.assertEqual(['abc'], filter_log)
519
503
del filter_log[:]
520
t.clone('abc').has('xyz')
504
transport.clone('abc').has('xyz')
521
505
self.assertEqual(['abc/xyz'], filter_log)
522
506
del filter_log[:]
507
transport.has('/abc')
524
508
self.assertEqual(['abc'], filter_log)
526
510
def test_clone(self):
527
t = self.make_pf_transport()
511
transport = self.make_pf_transport()
528
512
# relpath from root and root path are the same
529
relpath_cloned = t.clone('foo')
530
abspath_cloned = t.clone('/foo')
531
self.assertEqual(t.server, relpath_cloned.server)
532
self.assertEqual(t.server, abspath_cloned.server)
513
relpath_cloned = transport.clone('foo')
514
abspath_cloned = transport.clone('/foo')
515
self.assertEqual(transport.server, relpath_cloned.server)
516
self.assertEqual(transport.server, abspath_cloned.server)
534
518
def test_url_preserves_pathfiltering(self):
535
519
"""Calling get_transport on a pathfiltered transport's base should
540
524
otherwise) the filtering by doing::
541
525
url = filtered_transport.base
542
526
parent_url = urlutils.join(url, '..')
543
new_t = transport.get_transport(parent_url)
527
new_transport = get_transport(parent_url)
545
t = self.make_pf_transport()
546
new_t = transport.get_transport(t.base)
547
self.assertEqual(t.server, new_t.server)
548
self.assertEqual(t.base, new_t.base)
551
class ReadonlyDecoratorTransportTest(tests.TestCase):
529
transport = self.make_pf_transport()
530
new_transport = get_transport(transport.base)
531
self.assertEqual(transport.server, new_transport.server)
532
self.assertEqual(transport.base, new_transport.base)
535
class ReadonlyDecoratorTransportTest(TestCase):
552
536
"""Readonly decoration specific tests."""
554
538
def test_local_parameters(self):
539
import bzrlib.transport.readonly as readonly
555
540
# connect to . in readonly mode
556
t = readonly.ReadonlyTransportDecorator('readonly+.')
557
self.assertEqual(True, t.listable())
558
self.assertEqual(True, t.is_readonly())
541
transport = readonly.ReadonlyTransportDecorator('readonly+.')
542
self.assertEqual(True, transport.listable())
543
self.assertEqual(True, transport.is_readonly())
560
545
def test_http_parameters(self):
561
546
from bzrlib.tests.http_server import HttpServer
547
import bzrlib.transport.readonly as readonly
562
548
# connect to '.' via http which is not listable
563
549
server = HttpServer()
564
550
self.start_server(server)
565
t = transport.get_transport('readonly+' + server.get_url())
566
self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
567
self.assertEqual(False, t.listable())
568
self.assertEqual(True, t.is_readonly())
571
class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
551
transport = get_transport('readonly+' + server.get_url())
552
self.failUnless(isinstance(transport,
553
readonly.ReadonlyTransportDecorator))
554
self.assertEqual(False, transport.listable())
555
self.assertEqual(True, transport.is_readonly())
558
class FakeNFSDecoratorTests(TestCaseInTempDir):
572
559
"""NFS decorator specific tests."""
574
561
def get_nfs_transport(self, url):
562
import bzrlib.transport.fakenfs as fakenfs
575
563
# connect to url with nfs decoration
576
564
return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
578
566
def test_local_parameters(self):
579
567
# the listable and is_readonly parameters
580
568
# are not changed by the fakenfs decorator
581
t = self.get_nfs_transport('.')
582
self.assertEqual(True, t.listable())
583
self.assertEqual(False, t.is_readonly())
569
transport = self.get_nfs_transport('.')
570
self.assertEqual(True, transport.listable())
571
self.assertEqual(False, transport.is_readonly())
585
573
def test_http_parameters(self):
586
574
# the listable and is_readonly parameters
589
577
# connect to '.' via http which is not listable
590
578
server = HttpServer()
591
579
self.start_server(server)
592
t = self.get_nfs_transport(server.get_url())
593
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
594
self.assertEqual(False, t.listable())
595
self.assertEqual(True, t.is_readonly())
580
transport = self.get_nfs_transport(server.get_url())
581
self.assertIsInstance(
582
transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
583
self.assertEqual(False, transport.listable())
584
self.assertEqual(True, transport.is_readonly())
597
586
def test_fakenfs_server_default(self):
598
587
# a FakeNFSServer() should bring up a local relpath server for itself
599
server = test_server.FakeNFSServer()
588
import bzrlib.transport.fakenfs as fakenfs
589
server = fakenfs.FakeNFSServer()
600
590
self.start_server(server)
601
591
# the url should be decorated appropriately
602
592
self.assertStartsWith(server.get_url(), 'fakenfs+')
603
593
# and we should be able to get a transport for it
604
t = transport.get_transport(server.get_url())
594
transport = get_transport(server.get_url())
605
595
# which must be a FakeNFSTransportDecorator instance.
606
self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
596
self.assertIsInstance(transport, fakenfs.FakeNFSTransportDecorator)
608
598
def test_fakenfs_rename_semantics(self):
609
599
# a FakeNFS transport must mangle the way rename errors occur to
610
600
# look like NFS problems.
611
t = self.get_nfs_transport('.')
601
transport = self.get_nfs_transport('.')
612
602
self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
614
self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')
617
class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
604
self.assertRaises(errors.ResourceBusy,
605
transport.rename, 'from', 'to')
608
class FakeVFATDecoratorTests(TestCaseInTempDir):
618
609
"""Tests for simulation of VFAT restrictions"""
620
611
def get_vfat_transport(self, url):
821
809
self.assertIs(new_password, c._get_credentials())
824
class TestReusedTransports(tests.TestCase):
812
class TestReusedTransports(TestCase):
825
813
"""Tests for transport reuse"""
827
815
def test_reuse_same_transport(self):
828
816
possible_transports = []
829
t1 = transport.get_transport('http://foo/',
830
possible_transports=possible_transports)
817
t1 = get_transport('http://foo/',
818
possible_transports=possible_transports)
831
819
self.assertEqual([t1], possible_transports)
832
t2 = transport.get_transport('http://foo/',
833
possible_transports=[t1])
820
t2 = get_transport('http://foo/', possible_transports=[t1])
834
821
self.assertIs(t1, t2)
836
823
# Also check that final '/' are handled correctly
837
t3 = transport.get_transport('http://foo/path/')
838
t4 = transport.get_transport('http://foo/path',
839
possible_transports=[t3])
824
t3 = get_transport('http://foo/path/')
825
t4 = get_transport('http://foo/path', possible_transports=[t3])
840
826
self.assertIs(t3, t4)
842
t5 = transport.get_transport('http://foo/path')
843
t6 = transport.get_transport('http://foo/path/',
844
possible_transports=[t5])
828
t5 = get_transport('http://foo/path')
829
t6 = get_transport('http://foo/path/', possible_transports=[t5])
845
830
self.assertIs(t5, t6)
847
832
def test_don_t_reuse_different_transport(self):
848
t1 = transport.get_transport('http://foo/path')
849
t2 = transport.get_transport('http://bar/path',
850
possible_transports=[t1])
833
t1 = get_transport('http://foo/path')
834
t2 = get_transport('http://bar/path', possible_transports=[t1])
851
835
self.assertIsNot(t1, t2)
854
class TestTransportTrace(tests.TestCase):
838
class TestTransportTrace(TestCase):
856
840
def test_get(self):
857
t = transport.get_transport('trace+memory://')
858
self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)
841
transport = get_transport('trace+memory://')
842
self.assertIsInstance(
843
transport, bzrlib.transport.trace.TransportTraceDecorator)
860
845
def test_clone_preserves_activity(self):
861
t = transport.get_transport('trace+memory://')
863
self.assertTrue(t is not t2)
864
self.assertTrue(t._activity is t2._activity)
846
transport = get_transport('trace+memory://')
847
transport2 = transport.clone('.')
848
self.assertTrue(transport is not transport2)
849
self.assertTrue(transport._activity is transport2._activity)
866
851
# the following specific tests are for the operations that have made use of
867
852
# logging in tests; we could test every single operation but doing that
868
853
# still won't cause a test failure when the top level Transport API
869
854
# changes; so there is little return doing that.
870
855
def test_get(self):
871
t = transport.get_transport('trace+memory:///')
872
t.put_bytes('foo', 'barish')
856
transport = get_transport('trace+memory:///')
857
transport.put_bytes('foo', 'barish')
874
859
expected_result = []
875
860
# put_bytes records the bytes, not the content to avoid memory
877
862
expected_result.append(('put_bytes', 'foo', 6, None))
878
863
# get records the file name only.
879
864
expected_result.append(('get', 'foo'))
880
self.assertEqual(expected_result, t._activity)
865
self.assertEqual(expected_result, transport._activity)
882
867
def test_readv(self):
883
t = transport.get_transport('trace+memory:///')
884
t.put_bytes('foo', 'barish')
885
list(t.readv('foo', [(0, 1), (3, 2)],
886
adjust_for_latency=True, upper_limit=6))
868
transport = get_transport('trace+memory:///')
869
transport.put_bytes('foo', 'barish')
870
list(transport.readv('foo', [(0, 1), (3, 2)], adjust_for_latency=True,
887
872
expected_result = []
888
873
# put_bytes records the bytes, not the content to avoid memory
890
875
expected_result.append(('put_bytes', 'foo', 6, None))
891
876
# readv records the supplied offset request
892
877
expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
893
self.assertEqual(expected_result, t._activity)
896
class TestSSHConnections(tests.TestCaseWithTransport):
898
def test_bzr_connect_to_bzr_ssh(self):
899
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
901
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
903
# This test actually causes a bzr instance to be invoked, which is very
904
# expensive: it should be the only such test in the test suite.
905
# A reasonable evolution for this would be to simply check inside
906
# check_channel_exec_request that the command is appropriate, and then
907
# satisfy requests in-process.
908
self.requireFeature(features.paramiko)
909
# SFTPFullAbsoluteServer has a get_url method, and doesn't
910
# override the interface (doesn't change self._vendor).
911
# Note that this does encryption, so can be slow.
912
from bzrlib.tests import stub_sftp
914
# Start an SSH server
915
self.command_executed = []
916
# XXX: This is horrible -- we define a really dumb SSH server that
917
# executes commands, and manage the hooking up of stdin/out/err to the
918
# SSH channel ourselves. Surely this has already been implemented
921
class StubSSHServer(stub_sftp.StubServer):
925
def check_channel_exec_request(self, channel, command):
926
self.test.command_executed.append(command)
927
proc = subprocess.Popen(
928
command, shell=True, stdin=subprocess.PIPE,
929
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
931
# XXX: horribly inefficient, not to mention ugly.
932
# Start a thread for each of stdin/out/err, and relay bytes from
933
# the subprocess to channel and vice versa.
934
def ferry_bytes(read, write, close):
943
(channel.recv, proc.stdin.write, proc.stdin.close),
944
(proc.stdout.read, channel.sendall, channel.close),
945
(proc.stderr.read, channel.sendall_stderr, channel.close)]
947
for read, write, close in file_functions:
948
t = threading.Thread(
949
target=ferry_bytes, args=(read, write, close))
955
ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
956
# We *don't* want to override the default SSH vendor: the detected one
958
self.start_server(ssh_server)
959
port = ssh_server._listener.port
961
if sys.platform == 'win32':
962
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
964
bzr_remote_path = self.get_bzr_path()
965
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
967
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
968
# variable is used to tell bzr what command to run on the remote end.
969
path_to_branch = osutils.abspath('.')
970
if sys.platform == 'win32':
971
# On Windows, we export all drives as '/C:/, etc. So we need to
972
# prefix a '/' to get the right path.
973
path_to_branch = '/' + path_to_branch
974
url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
975
t = transport.get_transport(url)
976
self.permit_url(t.base)
980
['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
981
self.command_executed)
982
# Make sure to disconnect, so that the remote process can stop, and we
983
# can cleanup. Then pause the test until everything is shutdown
984
t._client._medium.disconnect()
987
# First wait for the subprocess
989
# And the rest are threads
990
for t in started[1:]:
878
self.assertEqual(expected_result, transport._activity)