1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
32
transport as _mod_transport,
35
from ..errors import (ConnectionError,
41
from ..osutils import getcwd
42
from ..sixish import (
46
from ..bzr.smart import medium
52
from . import test_server
53
from .test_transport import TestTransportImplementation
54
from ..transport import (
57
_get_transport_modules,
59
from ..transport.memory import MemoryTransport
60
from ..transport.remote import RemoteTransport
63
def get_transport_test_permutations(module):
64
"""Get the permutations module wants to have tested."""
65
if getattr(module, 'get_test_permutations', None) is None:
67
"transport module %s doesn't provide get_test_permutations()"
70
return module.get_test_permutations()
73
def transport_test_permutations():
74
"""Return a list of the klass, server_factory pairs to test."""
76
for module in _get_transport_modules():
78
permutations = get_transport_test_permutations(
79
pyutils.get_named_object(module))
80
for (klass, server_factory) in permutations:
81
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
{"transport_class":klass,
83
"transport_server":server_factory})
84
result.append(scenario)
85
except errors.DependencyNotPresent as e:
86
# Continue even if a dependency prevents us
87
# from adding this test
92
def load_tests(loader, standard_tests, pattern):
93
"""Multiply tests for tranport implementations."""
94
result = loader.suiteClass()
95
scenarios = transport_test_permutations()
96
return multiply_tests(standard_tests, scenarios, result)
99
class TransportTests(TestTransportImplementation):
102
super(TransportTests, self).setUp()
103
self.overrideEnv('BRZ_NO_SMART_VFS', None)
105
def check_transport_contents(self, content, transport, relpath):
106
"""Check that transport.get_bytes(relpath) == content."""
107
self.assertEqualDiff(content, transport.get_bytes(relpath))
109
def test_ensure_base_missing(self):
110
""".ensure_base() should create the directory if it doesn't exist"""
111
t = self.get_transport()
113
if t_a.is_readonly():
114
self.assertRaises(TransportNotPossible,
117
self.assertTrue(t_a.ensure_base())
118
self.assertTrue(t.has('a'))
120
def test_ensure_base_exists(self):
121
""".ensure_base() should just be happy if it already exists"""
122
t = self.get_transport()
128
# ensure_base returns False if it didn't create the base
129
self.assertFalse(t_a.ensure_base())
131
def test_ensure_base_missing_parent(self):
132
""".ensure_base() will fail if the parent dir doesn't exist"""
133
t = self.get_transport()
139
self.assertRaises(NoSuchFile, t_b.ensure_base)
141
def test_external_url(self):
142
""".external_url either works or raises InProcessTransport."""
143
t = self.get_transport()
146
except errors.InProcessTransport:
150
t = self.get_transport()
152
files = ['a', 'b', 'e', 'g', '%']
153
self.build_tree(files, transport=t)
154
self.assertEqual(True, t.has('a'))
155
self.assertEqual(False, t.has('c'))
156
self.assertEqual(True, t.has(urlutils.escape('%')))
157
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
158
self.assertEqual(False, t.has_any(['c', 'd', 'f',
159
urlutils.escape('%%')]))
160
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
161
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
163
def test_has_root_works(self):
164
if self.transport_server is test_server.SmartTCPServer_for_testing:
165
raise TestNotApplicable(
166
"SmartTCPServer_for_testing intentionally does not allow "
168
current_transport = self.get_transport()
169
self.assertTrue(current_transport.has('/'))
170
root = current_transport.clone('/')
171
self.assertTrue(root.has(''))
174
t = self.get_transport()
177
content = b'contents of a\n'
178
self.build_tree(['a'], transport=t, line_endings='binary')
179
self.check_transport_contents(b'contents of a\n', t, 'a')
181
self.assertEqual(content, f.read())
183
def test_get_unknown_file(self):
184
t = self.get_transport()
186
contents = [b'contents of a\n',
189
self.build_tree(files, transport=t, line_endings='binary')
190
self.assertRaises(NoSuchFile, t.get, 'c')
191
def iterate_and_close(func, *args):
192
for f in func(*args):
193
# We call f.read() here because things like paramiko actually
194
# spawn a thread to prefetch the content, which we want to
195
# consume before we close the handle.
199
def test_get_directory_read_gives_ReadError(self):
200
"""consistent errors for read() on a file returned by get()."""
201
t = self.get_transport()
203
self.build_tree(['a directory/'])
205
t.mkdir('a%20directory')
206
# getting the file must either work or fail with a PathError
208
a_file = t.get('a%20directory')
209
except (errors.PathError, errors.RedirectRequested):
210
# early failure return immediately.
212
# having got a file, read() must either work (i.e. http reading a dir
213
# listing) or fail with ReadError
216
except errors.ReadError:
219
def test_get_bytes(self):
220
t = self.get_transport()
222
files = ['a', 'b', 'e', 'g']
223
contents = [b'contents of a\n',
228
self.build_tree(files, transport=t, line_endings='binary')
229
self.check_transport_contents(b'contents of a\n', t, 'a')
231
for content, fname in zip(contents, files):
232
self.assertEqual(content, t.get_bytes(fname))
234
def test_get_bytes_unknown_file(self):
235
t = self.get_transport()
236
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
238
def test_get_with_open_write_stream_sees_all_content(self):
239
t = self.get_transport()
242
with t.open_write_stream('foo') as handle:
244
self.assertEqual(b'b', t.get_bytes('foo'))
246
def test_get_bytes_with_open_write_stream_sees_all_content(self):
247
t = self.get_transport()
250
with t.open_write_stream('foo') as handle:
252
self.assertEqual(b'b', t.get_bytes('foo'))
253
with t.get('foo') as f:
254
self.assertEqual(b'b', f.read())
256
def test_put_bytes(self):
257
t = self.get_transport()
260
self.assertRaises(TransportNotPossible,
261
t.put_bytes, 'a', b'some text for a\n')
264
t.put_bytes('a', b'some text for a\n')
265
self.assertTrue(t.has('a'))
266
self.check_transport_contents(b'some text for a\n', t, 'a')
268
# The contents should be overwritten
269
t.put_bytes('a', b'new text for a\n')
270
self.check_transport_contents(b'new text for a\n', t, 'a')
272
self.assertRaises(NoSuchFile,
273
t.put_bytes, 'path/doesnt/exist/c', b'contents')
275
def test_put_bytes_non_atomic(self):
276
t = self.get_transport()
279
self.assertRaises(TransportNotPossible,
280
t.put_bytes_non_atomic, 'a', b'some text for a\n')
283
self.assertFalse(t.has('a'))
284
t.put_bytes_non_atomic('a', b'some text for a\n')
285
self.assertTrue(t.has('a'))
286
self.check_transport_contents(b'some text for a\n', t, 'a')
287
# Put also replaces contents
288
t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
289
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
291
# Make sure we can create another file
292
t.put_bytes_non_atomic('d', b'contents for\nd\n')
293
# And overwrite 'a' with empty contents
294
t.put_bytes_non_atomic('a', b'')
295
self.check_transport_contents(b'contents for\nd\n', t, 'd')
296
self.check_transport_contents(b'', t, 'a')
298
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
300
# Now test the create_parent flag
301
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
303
self.assertFalse(t.has('dir/a'))
304
t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
305
create_parent_dir=True)
306
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
308
# But we still get NoSuchFile if we can't make the parent dir
309
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
311
create_parent_dir=True)
313
def test_put_bytes_permissions(self):
314
t = self.get_transport()
318
if not t._can_roundtrip_unix_modebits():
319
# Can't roundtrip, so no need to run this test
321
t.put_bytes('mode644', b'test text\n', mode=0o644)
322
self.assertTransportMode(t, 'mode644', 0o644)
323
t.put_bytes('mode666', b'test text\n', mode=0o666)
324
self.assertTransportMode(t, 'mode666', 0o666)
325
t.put_bytes('mode600', b'test text\n', mode=0o600)
326
self.assertTransportMode(t, 'mode600', 0o600)
327
# Yes, you can put_bytes a file such that it becomes readonly
328
t.put_bytes('mode400', b'test text\n', mode=0o400)
329
self.assertTransportMode(t, 'mode400', 0o400)
331
# The default permissions should be based on the current umask
332
umask = osutils.get_umask()
333
t.put_bytes('nomode', b'test text\n', mode=None)
334
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
336
def test_put_bytes_non_atomic_permissions(self):
337
t = self.get_transport()
341
if not t._can_roundtrip_unix_modebits():
342
# Can't roundtrip, so no need to run this test
344
t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
345
self.assertTransportMode(t, 'mode644', 0o644)
346
t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
347
self.assertTransportMode(t, 'mode666', 0o666)
348
t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
349
self.assertTransportMode(t, 'mode600', 0o600)
350
t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
351
self.assertTransportMode(t, 'mode400', 0o400)
353
# The default permissions should be based on the current umask
354
umask = osutils.get_umask()
355
t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
356
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
358
# We should also be able to set the mode for a parent directory
360
t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
361
dir_mode=0o700, create_parent_dir=True)
362
self.assertTransportMode(t, 'dir700', 0o700)
363
t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
364
dir_mode=0o770, create_parent_dir=True)
365
self.assertTransportMode(t, 'dir770', 0o770)
366
t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
367
dir_mode=0o777, create_parent_dir=True)
368
self.assertTransportMode(t, 'dir777', 0o777)
370
def test_put_file(self):
371
t = self.get_transport()
374
self.assertRaises(TransportNotPossible,
375
t.put_file, 'a', BytesIO(b'some text for a\n'))
378
result = t.put_file('a', BytesIO(b'some text for a\n'))
379
# put_file returns the length of the data written
380
self.assertEqual(16, result)
381
self.assertTrue(t.has('a'))
382
self.check_transport_contents(b'some text for a\n', t, 'a')
383
# Put also replaces contents
384
result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
385
self.assertEqual(19, result)
386
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
387
self.assertRaises(NoSuchFile,
388
t.put_file, 'path/doesnt/exist/c',
389
BytesIO(b'contents'))
391
def test_put_file_non_atomic(self):
392
t = self.get_transport()
395
self.assertRaises(TransportNotPossible,
396
t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
399
self.assertFalse(t.has('a'))
400
t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
401
self.assertTrue(t.has('a'))
402
self.check_transport_contents(b'some text for a\n', t, 'a')
403
# Put also replaces contents
404
t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
405
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
407
# Make sure we can create another file
408
t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
409
# And overwrite 'a' with empty contents
410
t.put_file_non_atomic('a', BytesIO(b''))
411
self.check_transport_contents(b'contents for\nd\n', t, 'd')
412
self.check_transport_contents(b'', t, 'a')
414
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
415
BytesIO(b'contents\n'))
416
# Now test the create_parent flag
417
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
418
BytesIO(b'contents\n'))
419
self.assertFalse(t.has('dir/a'))
420
t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
421
create_parent_dir=True)
422
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
424
# But we still get NoSuchFile if we can't make the parent dir
425
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
426
BytesIO(b'contents\n'),
427
create_parent_dir=True)
429
def test_put_file_permissions(self):
431
t = self.get_transport()
435
if not t._can_roundtrip_unix_modebits():
436
# Can't roundtrip, so no need to run this test
438
t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
439
self.assertTransportMode(t, 'mode644', 0o644)
440
t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
441
self.assertTransportMode(t, 'mode666', 0o666)
442
t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
443
self.assertTransportMode(t, 'mode600', 0o600)
444
# Yes, you can put a file such that it becomes readonly
445
t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
446
self.assertTransportMode(t, 'mode400', 0o400)
447
# The default permissions should be based on the current umask
448
umask = osutils.get_umask()
449
t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
450
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
452
def test_put_file_non_atomic_permissions(self):
453
t = self.get_transport()
457
if not t._can_roundtrip_unix_modebits():
458
# Can't roundtrip, so no need to run this test
460
t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
461
self.assertTransportMode(t, 'mode644', 0o644)
462
t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
463
self.assertTransportMode(t, 'mode666', 0o666)
464
t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
465
self.assertTransportMode(t, 'mode600', 0o600)
466
# Yes, you can put_file_non_atomic a file such that it becomes readonly
467
t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
468
self.assertTransportMode(t, 'mode400', 0o400)
470
# The default permissions should be based on the current umask
471
umask = osutils.get_umask()
472
t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
473
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
# We should also be able to set the mode for a parent directory
478
t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
479
dir_mode=0o700, create_parent_dir=True)
480
self.assertTransportMode(t, 'dir700', 0o700)
481
t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
482
dir_mode=0o770, create_parent_dir=True)
483
self.assertTransportMode(t, 'dir770', 0o770)
484
t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
485
dir_mode=0o777, create_parent_dir=True)
486
self.assertTransportMode(t, 'dir777', 0o777)
488
def test_put_bytes_unicode(self):
489
t = self.get_transport()
492
unicode_string = u'\u1234'
493
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
495
def test_mkdir(self):
496
t = self.get_transport()
499
# cannot mkdir on readonly transports. We're not testing for
500
# cache coherency because cache behaviour is not currently
501
# defined for the transport interface.
502
self.assertRaises(TransportNotPossible, t.mkdir, '.')
503
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
504
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
508
self.assertEqual(t.has('dir_a'), True)
509
self.assertEqual(t.has('dir_b'), False)
512
self.assertEqual(t.has('dir_b'), True)
514
self.assertEqual([t.has(n) for n in
515
['dir_a', 'dir_b', 'dir_q', 'dir_b']],
516
[True, True, False, True])
518
# we were testing that a local mkdir followed by a transport
519
# mkdir failed thusly, but given that we * in one process * do not
520
# concurrently fiddle with disk dirs and then use transport to do
521
# things, the win here seems marginal compared to the constraint on
522
# the interface. RBC 20051227
524
self.assertRaises(FileExists, t.mkdir, 'dir_g')
526
# Test get/put in sub-directories
527
t.put_bytes('dir_a/a', b'contents of dir_a/a')
528
t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
529
self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
530
self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
532
# mkdir of a dir with an absent parent
533
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
535
def test_mkdir_permissions(self):
536
t = self.get_transport()
539
if not t._can_roundtrip_unix_modebits():
540
# no sense testing on this transport
542
# Test mkdir with a mode
543
t.mkdir('dmode755', mode=0o755)
544
self.assertTransportMode(t, 'dmode755', 0o755)
545
t.mkdir('dmode555', mode=0o555)
546
self.assertTransportMode(t, 'dmode555', 0o555)
547
t.mkdir('dmode777', mode=0o777)
548
self.assertTransportMode(t, 'dmode777', 0o777)
549
t.mkdir('dmode700', mode=0o700)
550
self.assertTransportMode(t, 'dmode700', 0o700)
551
t.mkdir('mdmode755', mode=0o755)
552
self.assertTransportMode(t, 'mdmode755', 0o755)
554
# Default mode should be based on umask
555
umask = osutils.get_umask()
556
t.mkdir('dnomode', mode=None)
557
self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
559
def test_opening_a_file_stream_creates_file(self):
560
t = self.get_transport()
563
handle = t.open_write_stream('foo')
565
self.assertEqual(b'', t.get_bytes('foo'))
569
def test_opening_a_file_stream_can_set_mode(self):
570
t = self.get_transport()
572
self.assertRaises((TransportNotPossible, NotImplementedError),
573
t.open_write_stream, 'foo')
575
if not t._can_roundtrip_unix_modebits():
576
# Can't roundtrip, so no need to run this test
579
def check_mode(name, mode, expected):
580
handle = t.open_write_stream(name, mode=mode)
582
self.assertTransportMode(t, name, expected)
583
check_mode('mode644', 0o644, 0o644)
584
check_mode('mode666', 0o666, 0o666)
585
check_mode('mode600', 0o600, 0o600)
586
# The default permissions should be based on the current umask
587
check_mode('nomode', None, 0o666 & ~osutils.get_umask())
589
def test_copy_to(self):
590
# FIXME: test: same server to same server (partly done)
591
# same protocol two servers
592
# and different protocols (done for now except for MemoryTransport.
595
def simple_copy_files(transport_from, transport_to):
596
files = ['a', 'b', 'c', 'd']
597
self.build_tree(files, transport=transport_from)
598
self.assertEqual(4, transport_from.copy_to(files, transport_to))
600
self.check_transport_contents(transport_to.get_bytes(f),
603
t = self.get_transport()
604
if t.__class__.__name__ == "SFTPTransport":
605
self.skipTest("SFTP copy_to currently too flakey to use")
606
temp_transport = MemoryTransport('memory:///')
607
simple_copy_files(t, temp_transport)
608
if not t.is_readonly():
609
t.mkdir('copy_to_simple')
610
t2 = t.clone('copy_to_simple')
611
simple_copy_files(t, t2)
614
# Test that copying into a missing directory raises
617
self.build_tree(['e/', 'e/f'])
620
t.put_bytes('e/f', b'contents of e')
621
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
622
temp_transport.mkdir('e')
623
t.copy_to(['e/f'], temp_transport)
626
temp_transport = MemoryTransport('memory:///')
628
files = ['a', 'b', 'c', 'd']
629
t.copy_to(iter(files), temp_transport)
631
self.check_transport_contents(temp_transport.get_bytes(f),
635
for mode in (0o666, 0o644, 0o600, 0o400):
636
temp_transport = MemoryTransport("memory:///")
637
t.copy_to(files, temp_transport, mode=mode)
639
self.assertTransportMode(temp_transport, f, mode)
641
def test_create_prefix(self):
642
t = self.get_transport()
643
sub = t.clone('foo').clone('bar')
646
except TransportNotPossible:
647
self.assertTrue(t.is_readonly())
649
self.assertTrue(t.has('foo/bar'))
651
def test_append_file(self):
652
t = self.get_transport()
655
self.assertRaises(TransportNotPossible,
656
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
658
t.put_bytes('a', b'diff\ncontents for\na\n')
659
t.put_bytes('b', b'contents\nfor b\n')
662
t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
664
self.check_transport_contents(
665
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
668
# a file with no parent should fail..
669
self.assertRaises(NoSuchFile,
670
t.append_file, 'missing/path', BytesIO(b'content'))
672
# And we can create new files, too
674
t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
675
self.check_transport_contents(b'some text\nfor a missing file\n',
678
def test_append_bytes(self):
679
t = self.get_transport()
682
self.assertRaises(TransportNotPossible,
683
t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
686
self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
687
self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
690
t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
692
self.check_transport_contents(
693
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
696
# a file with no parent should fail..
697
self.assertRaises(NoSuchFile,
698
t.append_bytes, 'missing/path', b'content')
700
def test_append_file_mode(self):
701
"""Check that append accepts a mode parameter"""
702
# check append accepts a mode
703
t = self.get_transport()
705
self.assertRaises(TransportNotPossible,
706
t.append_file, 'f', BytesIO(b'f'), mode=None)
708
t.append_file('f', BytesIO(b'f'), mode=None)
710
def test_append_bytes_mode(self):
711
# check append_bytes accepts a mode
712
t = self.get_transport()
714
self.assertRaises(TransportNotPossible,
715
t.append_bytes, 'f', b'f', mode=None)
717
t.append_bytes('f', b'f', mode=None)
719
def test_delete(self):
720
# TODO: Test Transport.delete
721
t = self.get_transport()
723
# Not much to do with a readonly transport
725
self.assertRaises(TransportNotPossible, t.delete, 'missing')
728
t.put_bytes('a', b'a little bit of text\n')
729
self.assertTrue(t.has('a'))
731
self.assertFalse(t.has('a'))
733
self.assertRaises(NoSuchFile, t.delete, 'a')
735
t.put_bytes('a', b'a text\n')
736
t.put_bytes('b', b'b text\n')
737
t.put_bytes('c', b'c text\n')
738
self.assertEqual([True, True, True],
739
[t.has(n) for n in ['a', 'b', 'c']])
742
self.assertEqual([False, True, False],
743
[t.has(n) for n in ['a', 'b', 'c']])
744
self.assertFalse(t.has('a'))
745
self.assertTrue(t.has('b'))
746
self.assertFalse(t.has('c'))
748
for name in ['a', 'c', 'd']:
749
self.assertRaises(NoSuchFile, t.delete, name)
751
# We should have deleted everything
752
# SftpServer creates control files in the
753
# working directory, so we can just do a
755
# self.assertEqual([], os.listdir('.'))
757
def test_recommended_page_size(self):
758
"""Transports recommend a page size for partial access to files."""
759
t = self.get_transport()
760
self.assertIsInstance(t.recommended_page_size(), int)
762
def test_rmdir(self):
763
t = self.get_transport()
764
# Not much to do with a readonly transport
766
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
771
# ftp may not be able to raise NoSuchFile for lack of
772
# details when failing
773
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
775
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
777
def test_rmdir_not_empty(self):
778
"""Deleting a non-empty directory raises an exception
780
sftp (and possibly others) don't give us a specific "directory not
781
empty" exception -- we can just see that the operation failed.
783
t = self.get_transport()
788
self.assertRaises(PathError, t.rmdir, 'adir')
790
def test_rmdir_empty_but_similar_prefix(self):
791
"""rmdir does not get confused by sibling paths.
793
A naive implementation of MemoryTransport would refuse to rmdir
794
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
795
uses "path.startswith(dir)" on all file paths to determine if directory
798
t = self.get_transport()
802
t.put_bytes('foo-bar', b'')
805
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
806
self.assertTrue(t.has('foo-bar'))
808
def test_rename_dir_succeeds(self):
809
t = self.get_transport()
811
self.assertRaises((TransportNotPossible, NotImplementedError),
812
t.rename, 'foo', 'bar')
815
t.mkdir('adir/asubdir')
816
t.rename('adir', 'bdir')
817
self.assertTrue(t.has('bdir/asubdir'))
818
self.assertFalse(t.has('adir'))
820
def test_rename_dir_nonempty(self):
821
"""Attempting to replace a nonemtpy directory should fail"""
822
t = self.get_transport()
824
self.assertRaises((TransportNotPossible, NotImplementedError),
825
t.rename, 'foo', 'bar')
828
t.mkdir('adir/asubdir')
830
t.mkdir('bdir/bsubdir')
831
# any kind of PathError would be OK, though we normally expect
833
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
834
# nothing was changed so it should still be as before
835
self.assertTrue(t.has('bdir/bsubdir'))
836
self.assertFalse(t.has('adir/bdir'))
837
self.assertFalse(t.has('adir/bsubdir'))
839
def test_rename_across_subdirs(self):
840
t = self.get_transport()
842
raise TestNotApplicable("transport is readonly")
847
ta.put_bytes('f', b'aoeu')
848
ta.rename('f', '../b/f')
849
self.assertTrue(tb.has('f'))
850
self.assertFalse(ta.has('f'))
851
self.assertTrue(t.has('b/f'))
853
def test_delete_tree(self):
854
t = self.get_transport()
856
# Not much to do with a readonly transport
858
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
861
# and does it like listing ?
864
t.delete_tree('adir')
865
except TransportNotPossible:
866
# ok, this transport does not support delete_tree
869
# did it delete that trivial case?
870
self.assertRaises(NoSuchFile, t.stat, 'adir')
872
self.build_tree(['adir/',
880
t.delete_tree('adir')
881
# adir should be gone now.
882
self.assertRaises(NoSuchFile, t.stat, 'adir')
885
t = self.get_transport()
890
# TODO: I would like to use os.listdir() to
891
# make sure there are no extra files, but SftpServer
892
# creates control files in the working directory
893
# perhaps all of this could be done in a subdirectory
895
t.put_bytes('a', b'a first file\n')
896
self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
899
self.assertTrue(t.has('b'))
900
self.assertFalse(t.has('a'))
902
self.check_transport_contents(b'a first file\n', t, 'b')
903
self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
906
t.put_bytes('c', b'c this file\n')
908
self.assertFalse(t.has('c'))
909
self.check_transport_contents(b'c this file\n', t, 'b')
911
# TODO: Try to write a test for atomicity
912
# TODO: Test moving into a non-existent subdirectory
915
t = self.get_transport()
920
t.put_bytes('a', b'a file\n')
922
self.check_transport_contents(b'a file\n', t, 'b')
924
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
926
# What should the assert be if you try to copy a
927
# file over a directory?
928
#self.assertRaises(Something, t.copy, 'a', 'c')
929
t.put_bytes('d', b'text in d\n')
931
self.check_transport_contents(b'text in d\n', t, 'b')
933
def test_connection_error(self):
934
"""ConnectionError is raised when connection is impossible.
936
The error should be raised from the first operation on the transport.
939
url = self._server.get_bogus_url()
940
except NotImplementedError:
941
raise TestSkipped("Transport %s has no bogus URL support." %
942
self._server.__class__)
943
t = _mod_transport.get_transport_from_url(url)
944
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
947
# TODO: Test stat, just try once, and if it throws, stop testing
948
from stat import S_ISDIR, S_ISREG
950
t = self.get_transport()
954
except TransportNotPossible as e:
955
# This transport cannot stat
958
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
959
sizes = [14, 0, 16, 0, 18]
960
self.build_tree(paths, transport=t, line_endings='binary')
962
for path, size in zip(paths, sizes):
964
if path.endswith('/'):
965
self.assertTrue(S_ISDIR(st.st_mode))
966
# directory sizes are meaningless
968
self.assertTrue(S_ISREG(st.st_mode))
969
self.assertEqual(size, st.st_size)
971
self.assertRaises(NoSuchFile, t.stat, 'q')
972
self.assertRaises(NoSuchFile, t.stat, 'b/a')
974
self.build_tree(['subdir/', 'subdir/file'], transport=t)
975
subdir = t.clone('subdir')
976
st = subdir.stat('./file')
977
st = subdir.stat('.')
979
def test_hardlink(self):
980
from stat import ST_NLINK
982
t = self.get_transport()
984
source_name = "original_target"
985
link_name = "target_link"
987
self.build_tree([source_name], transport=t)
990
t.hardlink(source_name, link_name)
992
self.assertTrue(t.has(source_name))
993
self.assertTrue(t.has(link_name))
995
st = t.stat(link_name)
996
self.assertEqual(st[ST_NLINK], 2)
997
except TransportNotPossible:
998
raise TestSkipped("Transport %s does not support hardlinks." %
999
self._server.__class__)
1001
def test_symlink(self):
1002
from stat import S_ISLNK
1004
t = self.get_transport()
1006
source_name = "original_target"
1007
link_name = "target_link"
1009
self.build_tree([source_name], transport=t)
1012
t.symlink(source_name, link_name)
1014
self.assertTrue(t.has(source_name))
1015
self.assertTrue(t.has(link_name))
1017
st = t.stat(link_name)
1018
self.assertTrue(S_ISLNK(st.st_mode),
1019
"expected symlink, got mode %o" % st.st_mode)
1020
except TransportNotPossible:
1021
raise TestSkipped("Transport %s does not support symlinks." %
1022
self._server.__class__)
1024
self.knownFailure("Paramiko fails to create symlinks during tests")
1026
def test_list_dir(self):
1027
# TODO: Test list_dir, just try once, and if it throws, stop testing
1028
t = self.get_transport()
1030
if not t.listable():
1031
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1034
def sorted_list(d, transport):
1035
l = sorted(transport.list_dir(d))
1038
self.assertEqual([], sorted_list('.', t))
1039
# c2 is precisely one letter longer than c here to test that
1040
# suffixing is not confused.
1041
# a%25b checks that quoting is done consistently across transports
1042
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1044
if not t.is_readonly():
1045
self.build_tree(tree_names, transport=t)
1047
self.build_tree(tree_names)
1050
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1052
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1053
self.assertEqual(['d', 'e'], sorted_list('c', t))
1055
# Cloning the transport produces an equivalent listing
1056
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1058
if not t.is_readonly():
1065
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1066
self.assertEqual(['e'], sorted_list('c', t))
1068
self.assertListRaises(PathError, t.list_dir, 'q')
1069
self.assertListRaises(PathError, t.list_dir, 'c/f')
1070
# 'a' is a file, list_dir should raise an error
1071
self.assertListRaises(PathError, t.list_dir, 'a')
1073
def test_list_dir_result_is_url_escaped(self):
1074
t = self.get_transport()
1075
if not t.listable():
1076
raise TestSkipped("transport not listable")
1078
if not t.is_readonly():
1079
self.build_tree(['a/', 'a/%'], transport=t)
1081
self.build_tree(['a/', 'a/%'])
1083
names = list(t.list_dir('a'))
1084
self.assertEqual(['%25'], names)
1085
self.assertIsInstance(names[0], str)
1087
def test_clone_preserve_info(self):
1088
t1 = self.get_transport()
1089
if not isinstance(t1, ConnectedTransport):
1090
raise TestSkipped("not a connected transport")
1092
t2 = t1.clone('subdir')
1093
self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1094
self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1095
self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1096
self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1097
self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1099
def test__reuse_for(self):
1100
t = self.get_transport()
1101
if not isinstance(t, ConnectedTransport):
1102
raise TestSkipped("not a connected transport")
1104
def new_url(scheme=None, user=None, password=None,
1105
host=None, port=None, path=None):
1106
"""Build a new url from t.base changing only parts of it.
1108
Only the parameters different from None will be changed.
1110
if scheme is None: scheme = t._parsed_url.scheme
1111
if user is None: user = t._parsed_url.user
1112
if password is None: password = t._parsed_url.password
1113
if user is None: user = t._parsed_url.user
1114
if host is None: host = t._parsed_url.host
1115
if port is None: port = t._parsed_url.port
1116
if path is None: path = t._parsed_url.path
1117
return str(urlutils.URL(scheme, user, password, host, port, path))
1119
if t._parsed_url.scheme == 'ftp':
1123
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1124
if t._parsed_url.user == 'me':
1128
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1129
# passwords are not taken into account because:
1130
# - it makes no sense to have two different valid passwords for the
1132
# - _password in ConnectedTransport is intended to collect what the
1133
# user specified from the command-line and there are cases where the
1134
# new url can contain no password (if the url was built from an
1135
# existing transport.base for example)
1136
# - password are considered part of the credentials provided at
1137
# connection creation time and as such may not be present in the url
1138
# (they may be typed by the user when prompted for example)
1139
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1140
# We will not connect, we can use a invalid host
1141
self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
1142
if t._parsed_url.port == 1234:
1146
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1147
# No point in trying to reuse a transport for a local URL
1148
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1150
def test_connection_sharing(self):
1151
t = self.get_transport()
1152
if not isinstance(t, ConnectedTransport):
1153
raise TestSkipped("not a connected transport")
1155
c = t.clone('subdir')
1156
# Some transports will create the connection only when needed
1157
t.has('surely_not') # Force connection
1158
self.assertIs(t._get_connection(), c._get_connection())
1160
# Temporary failure, we need to create a new dummy connection
1161
new_connection = None
1162
t._set_connection(new_connection)
1163
# Check that both transports use the same connection
1164
self.assertIs(new_connection, t._get_connection())
1165
self.assertIs(new_connection, c._get_connection())
1167
def test_reuse_connection_for_various_paths(self):
1168
t = self.get_transport()
1169
if not isinstance(t, ConnectedTransport):
1170
raise TestSkipped("not a connected transport")
1172
t.has('surely_not') # Force connection
1173
self.assertIsNot(None, t._get_connection())
1175
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1176
self.assertIsNot(t, subdir)
1177
self.assertIs(t._get_connection(), subdir._get_connection())
1179
home = subdir._reuse_for(t.base + 'home')
1180
self.assertIs(t._get_connection(), home._get_connection())
1181
self.assertIs(subdir._get_connection(), home._get_connection())
1183
def test_clone(self):
1184
# TODO: Test that clone moves up and down the filesystem
1185
t1 = self.get_transport()
1187
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1189
self.assertTrue(t1.has('a'))
1190
self.assertTrue(t1.has('b/c'))
1191
self.assertFalse(t1.has('c'))
1194
self.assertEqual(t1.base + 'b/', t2.base)
1196
self.assertTrue(t2.has('c'))
1197
self.assertFalse(t2.has('a'))
1200
self.assertTrue(t3.has('a'))
1201
self.assertFalse(t3.has('c'))
1203
self.assertFalse(t1.has('b/d'))
1204
self.assertFalse(t2.has('d'))
1205
self.assertFalse(t3.has('b/d'))
1207
if t1.is_readonly():
1208
self.build_tree_contents([('b/d', b'newfile\n')])
1210
t2.put_bytes('d', b'newfile\n')
1212
self.assertTrue(t1.has('b/d'))
1213
self.assertTrue(t2.has('d'))
1214
self.assertTrue(t3.has('b/d'))
1216
def test_clone_to_root(self):
1217
orig_transport = self.get_transport()
1218
# Repeatedly go up to a parent directory until we're at the root
1219
# directory of this transport
1220
root_transport = orig_transport
1221
new_transport = root_transport.clone("..")
1222
# as we are walking up directories, the path must be
1223
# growing less, except at the top
1224
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1225
or new_transport.base == root_transport.base)
1226
while new_transport.base != root_transport.base:
1227
root_transport = new_transport
1228
new_transport = root_transport.clone("..")
1229
# as we are walking up directories, the path must be
1230
# growing less, except at the top
1231
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1232
or new_transport.base == root_transport.base)
1234
# Cloning to "/" should take us to exactly the same location.
1235
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1236
# the abspath of "/" from the original transport should be the same
1237
# as the base at the root:
1238
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1240
# At the root, the URL must still end with / as its a directory
1241
self.assertEqual(root_transport.base[-1], '/')
1243
def test_clone_from_root(self):
1244
"""At the root, cloning to a simple dir should just do string append."""
1245
orig_transport = self.get_transport()
1246
root_transport = orig_transport.clone('/')
1247
self.assertEqual(root_transport.base + '.bzr/',
1248
root_transport.clone('.bzr').base)
1250
def test_base_url(self):
1251
t = self.get_transport()
1252
self.assertEqual('/', t.base[-1])
1254
def test_relpath(self):
1255
t = self.get_transport()
1256
self.assertEqual('', t.relpath(t.base))
1258
self.assertEqual('', t.relpath(t.base[:-1]))
1259
# subdirs which don't exist should still give relpaths.
1260
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1261
# trailing slash should be the same.
1262
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1264
def test_relpath_at_root(self):
1265
t = self.get_transport()
1266
# clone all the way to the top
1267
new_transport = t.clone('..')
1268
while new_transport.base != t.base:
1270
new_transport = t.clone('..')
1271
# we must be able to get a relpath below the root
1272
self.assertEqual('', t.relpath(t.base))
1273
# and a deeper one should work too
1274
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1276
def test_abspath(self):
1277
# smoke test for abspath. Corner cases for backends like unix fs's
1278
# that have aliasing problems like symlinks should go in backend
1279
# specific test cases.
1280
transport = self.get_transport()
1282
self.assertEqual(transport.base + 'relpath',
1283
transport.abspath('relpath'))
1285
# This should work without raising an error.
1286
transport.abspath("/")
1288
# the abspath of "/" and "/foo/.." should result in the same location
1289
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1291
self.assertEqual(transport.clone("/").abspath('foo'),
1292
transport.abspath("/foo"))
1294
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1295
def test_win32_abspath(self):
1296
# Note: we tried to set sys.platform='win32' so we could test on
1297
# other platforms too, but then osutils does platform specific
1298
# things at import time which defeated us...
1299
if sys.platform != 'win32':
1301
'Testing drive letters in abspath implemented only for win32')
1303
# smoke test for abspath on win32.
1304
# a transport based on 'file:///' never fully qualifies the drive.
1305
transport = _mod_transport.get_transport_from_url("file:///")
1306
self.assertEqual(transport.abspath("/"), "file:///")
1308
# but a transport that starts with a drive spec must keep it.
1309
transport = _mod_transport.get_transport_from_url("file:///C:/")
1310
self.assertEqual(transport.abspath("/"), "file:///C:/")
1312
def test_local_abspath(self):
1313
transport = self.get_transport()
1315
p = transport.local_abspath('.')
1316
except (errors.NotLocalUrl, TransportNotPossible) as e:
1317
# should be formattable
1320
self.assertEqual(getcwd(), p)
1322
def test_abspath_at_root(self):
1323
t = self.get_transport()
1324
# clone all the way to the top
1325
new_transport = t.clone('..')
1326
while new_transport.base != t.base:
1328
new_transport = t.clone('..')
1329
# we must be able to get a abspath of the root when we ask for
1330
# t.abspath('..') - this due to our choice that clone('..')
1331
# should return the root from the root, combined with the desire that
1332
# the url from clone('..') and from abspath('..') should be the same.
1333
self.assertEqual(t.base, t.abspath('..'))
1334
# '' should give us the root
1335
self.assertEqual(t.base, t.abspath(''))
1336
# and a path should append to the url
1337
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1339
def test_iter_files_recursive(self):
1340
transport = self.get_transport()
1341
if not transport.listable():
1342
self.assertRaises(TransportNotPossible,
1343
transport.iter_files_recursive)
1345
self.build_tree(['isolated/',
1349
'isolated/dir/b%25z', # make sure quoting is correct
1351
transport=transport)
1352
paths = set(transport.iter_files_recursive())
1353
# nb the directories are not converted
1354
self.assertEqual(paths,
1355
{'isolated/dir/foo',
1357
'isolated/dir/b%2525z',
1359
sub_transport = transport.clone('isolated')
1360
paths = set(sub_transport.iter_files_recursive())
1361
self.assertEqual(paths,
1362
{'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1364
def test_copy_tree(self):
1365
# TODO: test file contents and permissions are preserved. This test was
1366
# added just to ensure that quoting was handled correctly.
1367
# -- David Allouche 2006-08-11
1368
transport = self.get_transport()
1369
if not transport.listable():
1370
self.assertRaises(TransportNotPossible,
1371
transport.iter_files_recursive)
1373
if transport.is_readonly():
1375
self.build_tree(['from/',
1379
'from/dir/b%25z', # make sure quoting is correct
1381
transport=transport)
1382
transport.copy_tree('from', 'to')
1383
paths = set(transport.iter_files_recursive())
1384
self.assertEqual(paths,
1394
def test_copy_tree_to_transport(self):
1395
transport = self.get_transport()
1396
if not transport.listable():
1397
self.assertRaises(TransportNotPossible,
1398
transport.iter_files_recursive)
1400
if transport.is_readonly():
1402
self.build_tree(['from/',
1406
'from/dir/b%25z', # make sure quoting is correct
1408
transport=transport)
1409
from_transport = transport.clone('from')
1410
to_transport = transport.clone('to')
1411
to_transport.ensure_base()
1412
from_transport.copy_tree_to_transport(to_transport)
1413
paths = set(transport.iter_files_recursive())
1414
self.assertEqual(paths,
1424
def test_unicode_paths(self):
1425
"""Test that we can read/write files with Unicode names."""
1426
t = self.get_transport()
1428
# With FAT32 and certain encodings on win32
1429
# '\xe5' and '\xe4' actually map to the same file
1430
# adding a suffix kicks in the 'preserving but insensitive'
1431
# route, and maintains the right files
1432
files = [u'\xe5.1', # a w/ circle iso-8859-1
1433
u'\xe4.2', # a w/ dots iso-8859-1
1434
u'\u017d', # Z with umlat iso-8859-2
1435
u'\u062c', # Arabic j
1436
u'\u0410', # Russian A
1437
u'\u65e5', # Kanji person
1440
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1441
if no_unicode_support:
1442
self.knownFailure("test server cannot handle unicode paths")
1445
self.build_tree(files, transport=t, line_endings='binary')
1446
except UnicodeError:
1447
raise TestSkipped("cannot handle unicode paths in current encoding")
1449
# A plain unicode string is not a valid url
1451
self.assertRaises(urlutils.InvalidURL, t.get, fname)
1454
fname_utf8 = fname.encode('utf-8')
1455
contents = b'contents of %s\n' % (fname_utf8,)
1456
self.check_transport_contents(contents, t, urlutils.escape(fname))
1458
def test_connect_twice_is_same_content(self):
1459
# check that our server (whatever it is) is accessible reliably
1460
# via get_transport and multiple connections share content.
1461
transport = self.get_transport()
1462
if transport.is_readonly():
1464
transport.put_bytes('foo', b'bar')
1465
transport3 = self.get_transport()
1466
self.check_transport_contents(b'bar', transport3, 'foo')
1468
# now opening at a relative url should give use a sane result:
1469
transport.mkdir('newdir')
1470
transport5 = self.get_transport('newdir')
1471
transport6 = transport5.clone('..')
1472
self.check_transport_contents(b'bar', transport6, 'foo')
1474
def test_lock_write(self):
1475
"""Test transport-level write locks.
1477
These are deprecated and transports may decline to support them.
1479
transport = self.get_transport()
1480
if transport.is_readonly():
1481
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1483
transport.put_bytes('lock', b'')
1485
lock = transport.lock_write('lock')
1486
except TransportNotPossible:
1488
# TODO make this consistent on all platforms:
1489
# self.assertRaises(LockError, transport.lock_write, 'lock')
1492
def test_lock_read(self):
1493
"""Test transport-level read locks.
1495
These are deprecated and transports may decline to support them.
1497
transport = self.get_transport()
1498
if transport.is_readonly():
1499
open('lock', 'w').close()
1501
transport.put_bytes('lock', b'')
1503
lock = transport.lock_read('lock')
1504
except TransportNotPossible:
1506
# TODO make this consistent on all platforms:
1507
# self.assertRaises(LockError, transport.lock_read, 'lock')
1510
def test_readv(self):
1511
transport = self.get_transport()
1512
if transport.is_readonly():
1513
with open('a', 'w') as f: f.write('0123456789')
1515
transport.put_bytes('a', b'0123456789')
1517
d = list(transport.readv('a', ((0, 1),)))
1518
self.assertEqual(d[0], (0, b'0'))
1520
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1521
self.assertEqual(d[0], (0, b'0'))
1522
self.assertEqual(d[1], (1, b'1'))
1523
self.assertEqual(d[2], (3, b'34'))
1524
self.assertEqual(d[3], (9, b'9'))
1526
def test_readv_out_of_order(self):
1527
transport = self.get_transport()
1528
if transport.is_readonly():
1529
with open('a', 'w') as f: f.write('0123456789')
1531
transport.put_bytes('a', b'01234567890')
1533
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1534
self.assertEqual(d[0], (1, b'1'))
1535
self.assertEqual(d[1], (9, b'9'))
1536
self.assertEqual(d[2], (0, b'0'))
1537
self.assertEqual(d[3], (3, b'34'))
1539
def test_readv_with_adjust_for_latency(self):
1540
transport = self.get_transport()
1541
# the adjust for latency flag expands the data region returned
1542
# according to a per-transport heuristic, so testing is a little
1543
# tricky as we need more data than the largest combining that our
1544
# transports do. To accomodate this we generate random data and cross
1545
# reference the returned data with the random data. To avoid doing
1546
# multiple large random byte look ups we do several tests on the same
1548
content = osutils.rand_bytes(200*1024)
1549
content_size = len(content)
1550
if transport.is_readonly():
1551
self.build_tree_contents([('a', content)])
1553
transport.put_bytes('a', content)
1554
def check_result_data(result_vector):
1555
for item in result_vector:
1556
data_len = len(item[1])
1557
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1560
result = list(transport.readv('a', ((0, 30),),
1561
adjust_for_latency=True, upper_limit=content_size))
1562
# we expect 1 result, from 0, to something > 30
1563
self.assertEqual(1, len(result))
1564
self.assertEqual(0, result[0][0])
1565
self.assertTrue(len(result[0][1]) >= 30)
1566
check_result_data(result)
1567
# end of file corner case
1568
result = list(transport.readv('a', ((204700, 100),),
1569
adjust_for_latency=True, upper_limit=content_size))
1570
# we expect 1 result, from 204800- its length, to the end
1571
self.assertEqual(1, len(result))
1572
data_len = len(result[0][1])
1573
self.assertEqual(204800-data_len, result[0][0])
1574
self.assertTrue(data_len >= 100)
1575
check_result_data(result)
1576
# out of order ranges are made in order
1577
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1578
adjust_for_latency=True, upper_limit=content_size))
1579
# we expect 2 results, in order, start and end.
1580
self.assertEqual(2, len(result))
1582
data_len = len(result[0][1])
1583
self.assertEqual(0, result[0][0])
1584
self.assertTrue(data_len >= 30)
1586
data_len = len(result[1][1])
1587
self.assertEqual(204800-data_len, result[1][0])
1588
self.assertTrue(data_len >= 100)
1589
check_result_data(result)
1590
# close ranges get combined (even if out of order)
1591
for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1592
result = list(transport.readv('a', request_vector,
1593
adjust_for_latency=True, upper_limit=content_size))
1594
self.assertEqual(1, len(result))
1595
data_len = len(result[0][1])
1596
# minimum length is from 400 to 1034 - 634
1597
self.assertTrue(data_len >= 634)
1598
# must contain the region 400 to 1034
1599
self.assertTrue(result[0][0] <= 400)
1600
self.assertTrue(result[0][0] + data_len >= 1034)
1601
check_result_data(result)
1603
def test_readv_with_adjust_for_latency_with_big_file(self):
1604
transport = self.get_transport()
1605
# test from observed failure case.
1606
if transport.is_readonly():
1607
with open('a', 'w') as f: f.write('a'*1024*1024)
1609
transport.put_bytes('a', b'a'*1024*1024)
1610
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1611
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1612
(465373, 800), (947422, 800)]
1613
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1614
found_items = [False]*9
1615
for pos, (start, length) in enumerate(broken_vector):
1616
# check the range is covered by the result
1617
for offset, data in results:
1618
if offset <= start and start + length <= offset + len(data):
1619
found_items[pos] = True
1620
self.assertEqual([True]*9, found_items)
1622
def test_get_with_open_write_stream_sees_all_content(self):
1623
t = self.get_transport()
1626
with t.open_write_stream('foo') as handle:
1627
handle.write(b'bcd')
1628
self.assertEqual([(0, b'b'), (2, b'd')], list(t.readv('foo', ((0, 1), (2, 1)))))
1630
def test_get_smart_medium(self):
1631
"""All transports must either give a smart medium, or know they can't.
1633
transport = self.get_transport()
1635
client_medium = transport.get_smart_medium()
1636
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1637
except errors.NoSmartMedium:
1638
# as long as we got it we're fine
1641
def test_readv_short_read(self):
1642
transport = self.get_transport()
1643
if transport.is_readonly():
1644
with open('a', 'w') as f: f.write('0123456789')
1646
transport.put_bytes('a', b'01234567890')
1648
# This is intentionally reading off the end of the file
1649
# since we are sure that it cannot get there
1650
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1651
# Can be raised by paramiko
1653
transport.readv, 'a', [(1, 1), (8, 10)])
1655
# This is trying to seek past the end of the file, it should
1656
# also raise a special error
1657
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1658
transport.readv, 'a', [(12, 2)])
1660
def test_no_segment_parameters(self):
1661
"""Segment parameters should be stripped and stored in
1662
transport.segment_parameters."""
1663
transport = self.get_transport("foo")
1664
self.assertEqual({}, transport.get_segment_parameters())
1666
def test_segment_parameters(self):
1667
"""Segment parameters should be stripped and stored in
1668
transport.get_segment_parameters()."""
1669
base_url = self._server.get_url()
1670
parameters = {"key1": "val1", "key2": "val2"}
1671
url = urlutils.join_segment_parameters(base_url, parameters)
1672
transport = _mod_transport.get_transport_from_url(url)
1673
self.assertEqual(parameters, transport.get_segment_parameters())
1675
def test_set_segment_parameters(self):
1676
"""Segment parameters can be set and show up in base."""
1677
transport = self.get_transport("foo")
1678
orig_base = transport.base
1679
transport.set_segment_parameter("arm", "board")
1680
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1681
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1682
transport.set_segment_parameter("arm", None)
1683
transport.set_segment_parameter("nonexistant", None)
1684
self.assertEqual({}, transport.get_segment_parameters())
1685
self.assertEqual(orig_base, transport.base)
1687
def test_stat_symlink(self):
1688
# if a transport points directly to a symlink (and supports symlinks
1689
# at all) you can tell this. helps with bug 32669.
1690
t = self.get_transport()
1692
t.symlink('target', 'link')
1693
except TransportNotPossible:
1694
raise TestSkipped("symlinks not supported")
1695
t2 = t.clone('link')
1697
self.assertTrue(stat.S_ISLNK(st.st_mode))
1699
def test_abspath_url_unquote_unreserved(self):
1700
"""URLs from abspath should have unreserved characters unquoted
1702
Need consistent quoting notably for tildes, see lp:842223 for more.
1704
t = self.get_transport()
1705
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1706
self.assertEqual(t.base + "-.09AZ_az~",
1707
t.abspath(needlessly_escaped_dir))
1709
def test_clone_url_unquote_unreserved(self):
1710
"""Base URL of a cloned branch needs unreserved characters unquoted
1712
Cloned transports should be prefix comparable for things like the
1713
isolation checking of tests, see lp:842223 for more.
1715
t1 = self.get_transport()
1716
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1717
self.build_tree([needlessly_escaped_dir], transport=t1)
1718
t2 = t1.clone(needlessly_escaped_dir)
1719
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1721
def test_hook_post_connection_one(self):
1722
"""Fire post_connect hook after a ConnectedTransport is first used"""
1724
Transport.hooks.install_named_hook("post_connect", log.append, None)
1725
t = self.get_transport()
1726
self.assertEqual([], log)
1727
t.has("non-existant")
1728
if isinstance(t, RemoteTransport):
1729
self.assertEqual([t.get_smart_medium()], log)
1730
elif isinstance(t, ConnectedTransport):
1731
self.assertEqual([t], log)
1733
self.assertEqual([], log)
1735
def test_hook_post_connection_multi(self):
1736
"""Fire post_connect hook once per unshared underlying connection"""
1738
Transport.hooks.install_named_hook("post_connect", log.append, None)
1739
t1 = self.get_transport()
1741
t3 = self.get_transport()
1742
self.assertEqual([], log)
1746
if isinstance(t1, RemoteTransport):
1747
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1748
elif isinstance(t1, ConnectedTransport):
1749
self.assertEqual([t1, t3], log)
1751
self.assertEqual([], log)