1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
23
from io import BytesIO
33
transport as _mod_transport,
36
from ..errors import (ConnectionError,
42
from ..osutils import getcwd
43
from ..bzr.smart import medium
49
from . import test_server
50
from .test_transport import TestTransportImplementation
51
from ..transport import (
54
_get_transport_modules,
56
from ..transport.memory import MemoryTransport
57
from ..transport.remote import RemoteTransport
60
def get_transport_test_permutations(module):
61
"""Get the permutations module wants to have tested."""
62
if getattr(module, 'get_test_permutations', None) is None:
64
"transport module %s doesn't provide get_test_permutations()"
67
return module.get_test_permutations()
70
def transport_test_permutations():
71
"""Return a list of the klass, server_factory pairs to test."""
73
for module in _get_transport_modules():
75
permutations = get_transport_test_permutations(
76
pyutils.get_named_object(module))
77
for (klass, server_factory) in permutations:
78
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
79
{"transport_class": klass,
80
"transport_server": server_factory})
81
result.append(scenario)
82
except errors.DependencyNotPresent as e:
83
# Continue even if a dependency prevents us
84
# from adding this test
89
def load_tests(loader, standard_tests, pattern):
90
"""Multiply tests for tranport implementations."""
91
result = loader.suiteClass()
92
scenarios = transport_test_permutations()
93
return multiply_tests(standard_tests, scenarios, result)
96
class TransportTests(TestTransportImplementation):
99
super(TransportTests, self).setUp()
100
self.overrideEnv('BRZ_NO_SMART_VFS', None)
102
def check_transport_contents(self, content, transport, relpath):
103
"""Check that transport.get_bytes(relpath) == content."""
104
self.assertEqualDiff(content, transport.get_bytes(relpath))
106
def test_ensure_base_missing(self):
107
""".ensure_base() should create the directory if it doesn't exist"""
108
t = self.get_transport()
110
if t_a.is_readonly():
111
self.assertRaises(TransportNotPossible,
114
self.assertTrue(t_a.ensure_base())
115
self.assertTrue(t.has('a'))
117
def test_ensure_base_exists(self):
118
""".ensure_base() should just be happy if it already exists"""
119
t = self.get_transport()
125
# ensure_base returns False if it didn't create the base
126
self.assertFalse(t_a.ensure_base())
128
def test_ensure_base_missing_parent(self):
129
""".ensure_base() will fail if the parent dir doesn't exist"""
130
t = self.get_transport()
136
self.assertRaises(NoSuchFile, t_b.ensure_base)
138
def test_external_url(self):
139
""".external_url either works or raises InProcessTransport."""
140
t = self.get_transport()
143
except errors.InProcessTransport:
147
t = self.get_transport()
149
files = ['a', 'b', 'e', 'g', '%']
150
self.build_tree(files, transport=t)
151
self.assertEqual(True, t.has('a'))
152
self.assertEqual(False, t.has('c'))
153
self.assertEqual(True, t.has(urlutils.escape('%')))
154
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
155
self.assertEqual(False, t.has_any(['c', 'd', 'f',
156
urlutils.escape('%%')]))
157
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
158
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
160
def test_has_root_works(self):
161
if self.transport_server is test_server.SmartTCPServer_for_testing:
162
raise TestNotApplicable(
163
"SmartTCPServer_for_testing intentionally does not allow "
165
current_transport = self.get_transport()
166
self.assertTrue(current_transport.has('/'))
167
root = current_transport.clone('/')
168
self.assertTrue(root.has(''))
171
t = self.get_transport()
174
content = b'contents of a\n'
175
self.build_tree(['a'], transport=t, line_endings='binary')
176
self.check_transport_contents(b'contents of a\n', t, 'a')
178
self.assertEqual(content, f.read())
180
def test_get_unknown_file(self):
181
t = self.get_transport()
183
contents = [b'contents of a\n',
186
self.build_tree(files, transport=t, line_endings='binary')
187
self.assertRaises(NoSuchFile, t.get, 'c')
189
def iterate_and_close(func, *args):
190
for f in func(*args):
191
# We call f.read() here because things like paramiko actually
192
# spawn a thread to prefetch the content, which we want to
193
# consume before we close the handle.
197
def test_get_directory_read_gives_ReadError(self):
198
"""consistent errors for read() on a file returned by get()."""
199
t = self.get_transport()
201
self.build_tree(['a directory/'])
203
t.mkdir('a%20directory')
204
# getting the file must either work or fail with a PathError
206
a_file = t.get('a%20directory')
207
except (errors.PathError, errors.RedirectRequested):
208
# early failure return immediately.
210
# having got a file, read() must either work (i.e. http reading a dir
211
# listing) or fail with ReadError
214
except errors.ReadError:
217
def test_get_bytes(self):
218
t = self.get_transport()
220
files = ['a', 'b', 'e', 'g']
221
contents = [b'contents of a\n',
226
self.build_tree(files, transport=t, line_endings='binary')
227
self.check_transport_contents(b'contents of a\n', t, 'a')
229
for content, fname in zip(contents, files):
230
self.assertEqual(content, t.get_bytes(fname))
232
def test_get_bytes_unknown_file(self):
233
t = self.get_transport()
234
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
236
def test_get_with_open_write_stream_sees_all_content(self):
237
t = self.get_transport()
240
with t.open_write_stream('foo') as handle:
242
self.assertEqual(b'b', t.get_bytes('foo'))
244
def test_get_bytes_with_open_write_stream_sees_all_content(self):
245
t = self.get_transport()
248
with t.open_write_stream('foo') as handle:
250
self.assertEqual(b'b', t.get_bytes('foo'))
251
with t.get('foo') as f:
252
self.assertEqual(b'b', f.read())
254
def test_put_bytes(self):
255
t = self.get_transport()
258
self.assertRaises(TransportNotPossible,
259
t.put_bytes, 'a', b'some text for a\n')
262
t.put_bytes('a', b'some text for a\n')
263
self.assertTrue(t.has('a'))
264
self.check_transport_contents(b'some text for a\n', t, 'a')
266
# The contents should be overwritten
267
t.put_bytes('a', b'new text for a\n')
268
self.check_transport_contents(b'new text for a\n', t, 'a')
270
self.assertRaises(NoSuchFile,
271
t.put_bytes, 'path/doesnt/exist/c', b'contents')
273
def test_put_bytes_non_atomic(self):
274
t = self.get_transport()
277
self.assertRaises(TransportNotPossible,
278
t.put_bytes_non_atomic, 'a', b'some text for a\n')
281
self.assertFalse(t.has('a'))
282
t.put_bytes_non_atomic('a', b'some text for a\n')
283
self.assertTrue(t.has('a'))
284
self.check_transport_contents(b'some text for a\n', t, 'a')
285
# Put also replaces contents
286
t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
287
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
289
# Make sure we can create another file
290
t.put_bytes_non_atomic('d', b'contents for\nd\n')
291
# And overwrite 'a' with empty contents
292
t.put_bytes_non_atomic('a', b'')
293
self.check_transport_contents(b'contents for\nd\n', t, 'd')
294
self.check_transport_contents(b'', t, 'a')
296
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
298
# Now test the create_parent flag
299
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
301
self.assertFalse(t.has('dir/a'))
302
t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
303
create_parent_dir=True)
304
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
306
# But we still get NoSuchFile if we can't make the parent dir
307
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
309
create_parent_dir=True)
311
def test_put_bytes_permissions(self):
312
t = self.get_transport()
316
if not t._can_roundtrip_unix_modebits():
317
# Can't roundtrip, so no need to run this test
319
t.put_bytes('mode644', b'test text\n', mode=0o644)
320
self.assertTransportMode(t, 'mode644', 0o644)
321
t.put_bytes('mode666', b'test text\n', mode=0o666)
322
self.assertTransportMode(t, 'mode666', 0o666)
323
t.put_bytes('mode600', b'test text\n', mode=0o600)
324
self.assertTransportMode(t, 'mode600', 0o600)
325
# Yes, you can put_bytes a file such that it becomes readonly
326
t.put_bytes('mode400', b'test text\n', mode=0o400)
327
self.assertTransportMode(t, 'mode400', 0o400)
329
# The default permissions should be based on the current umask
330
umask = osutils.get_umask()
331
t.put_bytes('nomode', b'test text\n', mode=None)
332
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
334
def test_put_bytes_non_atomic_permissions(self):
335
t = self.get_transport()
339
if not t._can_roundtrip_unix_modebits():
340
# Can't roundtrip, so no need to run this test
342
t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
343
self.assertTransportMode(t, 'mode644', 0o644)
344
t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
345
self.assertTransportMode(t, 'mode666', 0o666)
346
t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
347
self.assertTransportMode(t, 'mode600', 0o600)
348
t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
349
self.assertTransportMode(t, 'mode400', 0o400)
351
# The default permissions should be based on the current umask
352
umask = osutils.get_umask()
353
t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
354
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
356
# We should also be able to set the mode for a parent directory
358
t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
359
dir_mode=0o700, create_parent_dir=True)
360
self.assertTransportMode(t, 'dir700', 0o700)
361
t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
362
dir_mode=0o770, create_parent_dir=True)
363
self.assertTransportMode(t, 'dir770', 0o770)
364
t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
365
dir_mode=0o777, create_parent_dir=True)
366
self.assertTransportMode(t, 'dir777', 0o777)
368
def test_put_file(self):
369
t = self.get_transport()
372
self.assertRaises(TransportNotPossible,
373
t.put_file, 'a', BytesIO(b'some text for a\n'))
376
result = t.put_file('a', BytesIO(b'some text for a\n'))
377
# put_file returns the length of the data written
378
self.assertEqual(16, result)
379
self.assertTrue(t.has('a'))
380
self.check_transport_contents(b'some text for a\n', t, 'a')
381
# Put also replaces contents
382
result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
383
self.assertEqual(19, result)
384
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
385
self.assertRaises(NoSuchFile,
386
t.put_file, 'path/doesnt/exist/c',
387
BytesIO(b'contents'))
389
def test_put_file_non_atomic(self):
390
t = self.get_transport()
393
self.assertRaises(TransportNotPossible,
394
t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
397
self.assertFalse(t.has('a'))
398
t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
399
self.assertTrue(t.has('a'))
400
self.check_transport_contents(b'some text for a\n', t, 'a')
401
# Put also replaces contents
402
t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
403
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
405
# Make sure we can create another file
406
t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
407
# And overwrite 'a' with empty contents
408
t.put_file_non_atomic('a', BytesIO(b''))
409
self.check_transport_contents(b'contents for\nd\n', t, 'd')
410
self.check_transport_contents(b'', t, 'a')
412
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
413
BytesIO(b'contents\n'))
414
# Now test the create_parent flag
415
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
416
BytesIO(b'contents\n'))
417
self.assertFalse(t.has('dir/a'))
418
t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
419
create_parent_dir=True)
420
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
422
# But we still get NoSuchFile if we can't make the parent dir
423
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
424
BytesIO(b'contents\n'),
425
create_parent_dir=True)
427
def test_put_file_permissions(self):
429
t = self.get_transport()
433
if not t._can_roundtrip_unix_modebits():
434
# Can't roundtrip, so no need to run this test
436
t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
437
self.assertTransportMode(t, 'mode644', 0o644)
438
t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
439
self.assertTransportMode(t, 'mode666', 0o666)
440
t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
441
self.assertTransportMode(t, 'mode600', 0o600)
442
# Yes, you can put a file such that it becomes readonly
443
t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
444
self.assertTransportMode(t, 'mode400', 0o400)
445
# The default permissions should be based on the current umask
446
umask = osutils.get_umask()
447
t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
448
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
450
def test_put_file_non_atomic_permissions(self):
451
t = self.get_transport()
455
if not t._can_roundtrip_unix_modebits():
456
# Can't roundtrip, so no need to run this test
458
t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
459
self.assertTransportMode(t, 'mode644', 0o644)
460
t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
461
self.assertTransportMode(t, 'mode666', 0o666)
462
t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
463
self.assertTransportMode(t, 'mode600', 0o600)
464
# Yes, you can put_file_non_atomic a file such that it becomes readonly
465
t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
466
self.assertTransportMode(t, 'mode400', 0o400)
468
# The default permissions should be based on the current umask
469
umask = osutils.get_umask()
470
t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
471
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
473
# We should also be able to set the mode for a parent directory
476
t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
477
dir_mode=0o700, create_parent_dir=True)
478
self.assertTransportMode(t, 'dir700', 0o700)
479
t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
480
dir_mode=0o770, create_parent_dir=True)
481
self.assertTransportMode(t, 'dir770', 0o770)
482
t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
483
dir_mode=0o777, create_parent_dir=True)
484
self.assertTransportMode(t, 'dir777', 0o777)
486
def test_put_bytes_unicode(self):
487
t = self.get_transport()
490
unicode_string = u'\u1234'
491
self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
493
def test_mkdir(self):
494
t = self.get_transport()
497
# cannot mkdir on readonly transports. We're not testing for
498
# cache coherency because cache behaviour is not currently
499
# defined for the transport interface.
500
self.assertRaises(TransportNotPossible, t.mkdir, '.')
501
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
502
self.assertRaises(TransportNotPossible,
503
t.mkdir, 'path/doesnt/exist')
507
self.assertEqual(t.has('dir_a'), True)
508
self.assertEqual(t.has('dir_b'), False)
511
self.assertEqual(t.has('dir_b'), True)
513
self.assertEqual([t.has(n) for n in
514
['dir_a', 'dir_b', 'dir_q', 'dir_b']],
515
[True, True, False, True])
517
# we were testing that a local mkdir followed by a transport
518
# mkdir failed thusly, but given that we * in one process * do not
519
# concurrently fiddle with disk dirs and then use transport to do
520
# things, the win here seems marginal compared to the constraint on
521
# the interface. RBC 20051227
523
self.assertRaises(FileExists, t.mkdir, 'dir_g')
525
# Test get/put in sub-directories
526
t.put_bytes('dir_a/a', b'contents of dir_a/a')
527
t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
528
self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
529
self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
531
# mkdir of a dir with an absent parent
532
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
534
def test_mkdir_permissions(self):
535
t = self.get_transport()
538
if not t._can_roundtrip_unix_modebits():
539
# no sense testing on this transport
541
# Test mkdir with a mode
542
t.mkdir('dmode755', mode=0o755)
543
self.assertTransportMode(t, 'dmode755', 0o755)
544
t.mkdir('dmode555', mode=0o555)
545
self.assertTransportMode(t, 'dmode555', 0o555)
546
t.mkdir('dmode777', mode=0o777)
547
self.assertTransportMode(t, 'dmode777', 0o777)
548
t.mkdir('dmode700', mode=0o700)
549
self.assertTransportMode(t, 'dmode700', 0o700)
550
t.mkdir('mdmode755', mode=0o755)
551
self.assertTransportMode(t, 'mdmode755', 0o755)
553
# Default mode should be based on umask
554
umask = osutils.get_umask()
555
t.mkdir('dnomode', mode=None)
556
self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
558
def test_opening_a_file_stream_creates_file(self):
559
t = self.get_transport()
562
handle = t.open_write_stream('foo')
564
self.assertEqual(b'', t.get_bytes('foo'))
568
def test_opening_a_file_stream_can_set_mode(self):
569
t = self.get_transport()
571
self.assertRaises((TransportNotPossible, NotImplementedError),
572
t.open_write_stream, 'foo')
574
if not t._can_roundtrip_unix_modebits():
575
# Can't roundtrip, so no need to run this test
578
def check_mode(name, mode, expected):
579
handle = t.open_write_stream(name, mode=mode)
581
self.assertTransportMode(t, name, expected)
582
check_mode('mode644', 0o644, 0o644)
583
check_mode('mode666', 0o666, 0o666)
584
check_mode('mode600', 0o600, 0o600)
585
# The default permissions should be based on the current umask
586
check_mode('nomode', None, 0o666 & ~osutils.get_umask())
588
def test_copy_to(self):
589
# FIXME: test: same server to same server (partly done)
590
# same protocol two servers
591
# and different protocols (done for now except for MemoryTransport.
594
def simple_copy_files(transport_from, transport_to):
595
files = ['a', 'b', 'c', 'd']
596
self.build_tree(files, transport=transport_from)
597
self.assertEqual(4, transport_from.copy_to(files, transport_to))
599
self.check_transport_contents(transport_to.get_bytes(f),
602
t = self.get_transport()
603
if t.__class__.__name__ == "SFTPTransport":
604
self.skipTest("SFTP copy_to currently too flakey to use")
605
temp_transport = MemoryTransport('memory:///')
606
simple_copy_files(t, temp_transport)
607
if not t.is_readonly():
608
t.mkdir('copy_to_simple')
609
t2 = t.clone('copy_to_simple')
610
simple_copy_files(t, t2)
612
# Test that copying into a missing directory raises
615
self.build_tree(['e/', 'e/f'])
618
t.put_bytes('e/f', b'contents of e')
619
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
620
temp_transport.mkdir('e')
621
t.copy_to(['e/f'], temp_transport)
624
temp_transport = MemoryTransport('memory:///')
626
files = ['a', 'b', 'c', 'd']
627
t.copy_to(iter(files), temp_transport)
629
self.check_transport_contents(temp_transport.get_bytes(f),
633
for mode in (0o666, 0o644, 0o600, 0o400):
634
temp_transport = MemoryTransport("memory:///")
635
t.copy_to(files, temp_transport, mode=mode)
637
self.assertTransportMode(temp_transport, f, mode)
639
def test_create_prefix(self):
640
t = self.get_transport()
641
sub = t.clone('foo').clone('bar')
644
except TransportNotPossible:
645
self.assertTrue(t.is_readonly())
647
self.assertTrue(t.has('foo/bar'))
649
def test_append_file(self):
650
t = self.get_transport()
653
self.assertRaises(TransportNotPossible,
654
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
656
t.put_bytes('a', b'diff\ncontents for\na\n')
657
t.put_bytes('b', b'contents\nfor b\n')
660
t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
662
self.check_transport_contents(
663
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
666
# a file with no parent should fail..
667
self.assertRaises(NoSuchFile,
668
t.append_file, 'missing/path', BytesIO(b'content'))
670
# And we can create new files, too
672
t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
673
self.check_transport_contents(b'some text\nfor a missing file\n',
676
def test_append_bytes(self):
677
t = self.get_transport()
680
self.assertRaises(TransportNotPossible,
681
t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
684
self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
685
self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
688
t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
690
self.check_transport_contents(
691
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
694
# a file with no parent should fail..
695
self.assertRaises(NoSuchFile,
696
t.append_bytes, 'missing/path', b'content')
698
def test_append_file_mode(self):
699
"""Check that append accepts a mode parameter"""
700
# check append accepts a mode
701
t = self.get_transport()
703
self.assertRaises(TransportNotPossible,
704
t.append_file, 'f', BytesIO(b'f'), mode=None)
706
t.append_file('f', BytesIO(b'f'), mode=None)
708
def test_append_bytes_mode(self):
709
# check append_bytes accepts a mode
710
t = self.get_transport()
712
self.assertRaises(TransportNotPossible,
713
t.append_bytes, 'f', b'f', mode=None)
715
t.append_bytes('f', b'f', mode=None)
717
def test_delete(self):
718
# TODO: Test Transport.delete
719
t = self.get_transport()
721
# Not much to do with a readonly transport
723
self.assertRaises(TransportNotPossible, t.delete, 'missing')
726
t.put_bytes('a', b'a little bit of text\n')
727
self.assertTrue(t.has('a'))
729
self.assertFalse(t.has('a'))
731
self.assertRaises(NoSuchFile, t.delete, 'a')
733
t.put_bytes('a', b'a text\n')
734
t.put_bytes('b', b'b text\n')
735
t.put_bytes('c', b'c text\n')
736
self.assertEqual([True, True, True],
737
[t.has(n) for n in ['a', 'b', 'c']])
740
self.assertEqual([False, True, False],
741
[t.has(n) for n in ['a', 'b', 'c']])
742
self.assertFalse(t.has('a'))
743
self.assertTrue(t.has('b'))
744
self.assertFalse(t.has('c'))
746
for name in ['a', 'c', 'd']:
747
self.assertRaises(NoSuchFile, t.delete, name)
749
# We should have deleted everything
750
# SftpServer creates control files in the
751
# working directory, so we can just do a
753
# self.assertEqual([], os.listdir('.'))
755
def test_recommended_page_size(self):
756
"""Transports recommend a page size for partial access to files."""
757
t = self.get_transport()
758
self.assertIsInstance(t.recommended_page_size(), int)
760
def test_rmdir(self):
761
t = self.get_transport()
762
# Not much to do with a readonly transport
764
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
769
# ftp may not be able to raise NoSuchFile for lack of
770
# details when failing
771
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
773
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
775
def test_rmdir_not_empty(self):
776
"""Deleting a non-empty directory raises an exception
778
sftp (and possibly others) don't give us a specific "directory not
779
empty" exception -- we can just see that the operation failed.
781
t = self.get_transport()
786
self.assertRaises(PathError, t.rmdir, 'adir')
788
def test_rmdir_empty_but_similar_prefix(self):
789
"""rmdir does not get confused by sibling paths.
791
A naive implementation of MemoryTransport would refuse to rmdir
792
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
793
uses "path.startswith(dir)" on all file paths to determine if directory
796
t = self.get_transport()
800
t.put_bytes('foo-bar', b'')
803
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
804
self.assertTrue(t.has('foo-bar'))
806
def test_rename_dir_succeeds(self):
807
t = self.get_transport()
809
self.assertRaises((TransportNotPossible, NotImplementedError),
810
t.rename, 'foo', 'bar')
813
t.mkdir('adir/asubdir')
814
t.rename('adir', 'bdir')
815
self.assertTrue(t.has('bdir/asubdir'))
816
self.assertFalse(t.has('adir'))
818
def test_rename_dir_nonempty(self):
819
"""Attempting to replace a nonemtpy directory should fail"""
820
t = self.get_transport()
822
self.assertRaises((TransportNotPossible, NotImplementedError),
823
t.rename, 'foo', 'bar')
826
t.mkdir('adir/asubdir')
828
t.mkdir('bdir/bsubdir')
829
# any kind of PathError would be OK, though we normally expect
831
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
832
# nothing was changed so it should still be as before
833
self.assertTrue(t.has('bdir/bsubdir'))
834
self.assertFalse(t.has('adir/bdir'))
835
self.assertFalse(t.has('adir/bsubdir'))
837
def test_rename_across_subdirs(self):
838
t = self.get_transport()
840
raise TestNotApplicable("transport is readonly")
845
ta.put_bytes('f', b'aoeu')
846
ta.rename('f', '../b/f')
847
self.assertTrue(tb.has('f'))
848
self.assertFalse(ta.has('f'))
849
self.assertTrue(t.has('b/f'))
851
def test_delete_tree(self):
852
t = self.get_transport()
854
# Not much to do with a readonly transport
856
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
859
# and does it like listing ?
862
t.delete_tree('adir')
863
except TransportNotPossible:
864
# ok, this transport does not support delete_tree
867
# did it delete that trivial case?
868
self.assertRaises(NoSuchFile, t.stat, 'adir')
870
self.build_tree(['adir/',
878
t.delete_tree('adir')
879
# adir should be gone now.
880
self.assertRaises(NoSuchFile, t.stat, 'adir')
883
t = self.get_transport()
888
# TODO: I would like to use os.listdir() to
889
# make sure there are no extra files, but SftpServer
890
# creates control files in the working directory
891
# perhaps all of this could be done in a subdirectory
893
t.put_bytes('a', b'a first file\n')
894
self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
897
self.assertTrue(t.has('b'))
898
self.assertFalse(t.has('a'))
900
self.check_transport_contents(b'a first file\n', t, 'b')
901
self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
904
t.put_bytes('c', b'c this file\n')
906
self.assertFalse(t.has('c'))
907
self.check_transport_contents(b'c this file\n', t, 'b')
909
# TODO: Try to write a test for atomicity
910
# TODO: Test moving into a non-existent subdirectory
913
t = self.get_transport()
918
t.put_bytes('a', b'a file\n')
920
self.check_transport_contents(b'a file\n', t, 'b')
922
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
924
# What should the assert be if you try to copy a
925
# file over a directory?
926
#self.assertRaises(Something, t.copy, 'a', 'c')
927
t.put_bytes('d', b'text in d\n')
929
self.check_transport_contents(b'text in d\n', t, 'b')
931
def test_connection_error(self):
932
"""ConnectionError is raised when connection is impossible.
934
The error should be raised from the first operation on the transport.
937
url = self._server.get_bogus_url()
938
except NotImplementedError:
939
raise TestSkipped("Transport %s has no bogus URL support." %
940
self._server.__class__)
941
t = _mod_transport.get_transport_from_url(url)
942
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
945
# TODO: Test stat, just try once, and if it throws, stop testing
946
from stat import S_ISDIR, S_ISREG
948
t = self.get_transport()
952
except TransportNotPossible as e:
953
# This transport cannot stat
956
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
957
sizes = [14, 0, 16, 0, 18]
958
self.build_tree(paths, transport=t, line_endings='binary')
960
for path, size in zip(paths, sizes):
962
if path.endswith('/'):
963
self.assertTrue(S_ISDIR(st.st_mode))
964
# directory sizes are meaningless
966
self.assertTrue(S_ISREG(st.st_mode))
967
self.assertEqual(size, st.st_size)
969
self.assertRaises(NoSuchFile, t.stat, 'q')
970
self.assertRaises(NoSuchFile, t.stat, 'b/a')
972
self.build_tree(['subdir/', 'subdir/file'], transport=t)
973
subdir = t.clone('subdir')
974
st = subdir.stat('./file')
975
st = subdir.stat('.')
977
def test_hardlink(self):
978
from stat import ST_NLINK
980
t = self.get_transport()
982
source_name = "original_target"
983
link_name = "target_link"
985
self.build_tree([source_name], transport=t)
988
t.hardlink(source_name, link_name)
990
self.assertTrue(t.has(source_name))
991
self.assertTrue(t.has(link_name))
993
st = t.stat(link_name)
994
self.assertEqual(st[ST_NLINK], 2)
995
except TransportNotPossible:
996
raise TestSkipped("Transport %s does not support hardlinks." %
997
self._server.__class__)
999
def test_symlink(self):
1000
from stat import S_ISLNK
1002
t = self.get_transport()
1004
source_name = "original_target"
1005
link_name = "target_link"
1007
self.build_tree([source_name], transport=t)
1010
t.symlink(source_name, link_name)
1012
self.assertTrue(t.has(source_name))
1013
self.assertTrue(t.has(link_name))
1015
st = t.stat(link_name)
1016
self.assertTrue(S_ISLNK(st.st_mode),
1017
"expected symlink, got mode %o" % st.st_mode)
1018
except TransportNotPossible:
1019
raise TestSkipped("Transport %s does not support symlinks." %
1020
self._server.__class__)
1022
self.assertEqual(source_name, t.readlink(link_name))
1024
def test_readlink_nonexistent(self):
1025
t = self.get_transport()
1027
self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
1028
except TransportNotPossible:
1029
raise TestSkipped("Transport %s does not support symlinks." %
1030
self._server.__class__)
1032
def test_list_dir(self):
1033
# TODO: Test list_dir, just try once, and if it throws, stop testing
1034
t = self.get_transport()
1036
if not t.listable():
1037
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1040
def sorted_list(d, transport):
1041
l = sorted(transport.list_dir(d))
1044
self.assertEqual([], sorted_list('.', t))
1045
# c2 is precisely one letter longer than c here to test that
1046
# suffixing is not confused.
1047
# a%25b checks that quoting is done consistently across transports
1048
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1050
if not t.is_readonly():
1051
self.build_tree(tree_names, transport=t)
1053
self.build_tree(tree_names)
1056
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1058
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1059
self.assertEqual(['d', 'e'], sorted_list('c', t))
1061
# Cloning the transport produces an equivalent listing
1062
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1064
if not t.is_readonly():
1071
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1072
self.assertEqual(['e'], sorted_list('c', t))
1074
self.assertListRaises(PathError, t.list_dir, 'q')
1075
self.assertListRaises(PathError, t.list_dir, 'c/f')
1076
# 'a' is a file, list_dir should raise an error
1077
self.assertListRaises(PathError, t.list_dir, 'a')
1079
def test_list_dir_result_is_url_escaped(self):
1080
t = self.get_transport()
1081
if not t.listable():
1082
raise TestSkipped("transport not listable")
1084
if not t.is_readonly():
1085
self.build_tree(['a/', 'a/%'], transport=t)
1087
self.build_tree(['a/', 'a/%'])
1089
names = list(t.list_dir('a'))
1090
self.assertEqual(['%25'], names)
1091
self.assertIsInstance(names[0], str)
1093
def test_clone_preserve_info(self):
1094
t1 = self.get_transport()
1095
if not isinstance(t1, ConnectedTransport):
1096
raise TestSkipped("not a connected transport")
1098
t2 = t1.clone('subdir')
1099
self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1100
self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1101
self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1102
self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1103
self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1105
def test__reuse_for(self):
1106
t = self.get_transport()
1107
if not isinstance(t, ConnectedTransport):
1108
raise TestSkipped("not a connected transport")
1110
def new_url(scheme=None, user=None, password=None,
1111
host=None, port=None, path=None):
1112
"""Build a new url from t.base changing only parts of it.
1114
Only the parameters different from None will be changed.
1117
scheme = t._parsed_url.scheme
1119
user = t._parsed_url.user
1120
if password is None:
1121
password = t._parsed_url.password
1123
user = t._parsed_url.user
1125
host = t._parsed_url.host
1127
port = t._parsed_url.port
1129
path = t._parsed_url.path
1130
return str(urlutils.URL(scheme, user, password, host, port, path))
1132
if t._parsed_url.scheme == 'ftp':
1136
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1137
if t._parsed_url.user == 'me':
1141
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1142
# passwords are not taken into account because:
1143
# - it makes no sense to have two different valid passwords for the
1145
# - _password in ConnectedTransport is intended to collect what the
1146
# user specified from the command-line and there are cases where the
1147
# new url can contain no password (if the url was built from an
1148
# existing transport.base for example)
1149
# - password are considered part of the credentials provided at
1150
# connection creation time and as such may not be present in the url
1151
# (they may be typed by the user when prompted for example)
1152
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1153
# We will not connect, we can use a invalid host
1154
self.assertIsNot(t, t._reuse_for(
1155
new_url(host=t._parsed_url.host + 'bar')))
1156
if t._parsed_url.port == 1234:
1160
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1161
# No point in trying to reuse a transport for a local URL
1162
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1164
def test_connection_sharing(self):
1165
t = self.get_transport()
1166
if not isinstance(t, ConnectedTransport):
1167
raise TestSkipped("not a connected transport")
1169
c = t.clone('subdir')
1170
# Some transports will create the connection only when needed
1171
t.has('surely_not') # Force connection
1172
self.assertIs(t._get_connection(), c._get_connection())
1174
# Temporary failure, we need to create a new dummy connection
1175
new_connection = None
1176
t._set_connection(new_connection)
1177
# Check that both transports use the same connection
1178
self.assertIs(new_connection, t._get_connection())
1179
self.assertIs(new_connection, c._get_connection())
1181
def test_reuse_connection_for_various_paths(self):
1182
t = self.get_transport()
1183
if not isinstance(t, ConnectedTransport):
1184
raise TestSkipped("not a connected transport")
1186
t.has('surely_not') # Force connection
1187
self.assertIsNot(None, t._get_connection())
1189
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1190
self.assertIsNot(t, subdir)
1191
self.assertIs(t._get_connection(), subdir._get_connection())
1193
home = subdir._reuse_for(t.base + 'home')
1194
self.assertIs(t._get_connection(), home._get_connection())
1195
self.assertIs(subdir._get_connection(), home._get_connection())
1197
def test_clone(self):
1198
# TODO: Test that clone moves up and down the filesystem
1199
t1 = self.get_transport()
1201
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1203
self.assertTrue(t1.has('a'))
1204
self.assertTrue(t1.has('b/c'))
1205
self.assertFalse(t1.has('c'))
1208
self.assertEqual(t1.base + 'b/', t2.base)
1210
self.assertTrue(t2.has('c'))
1211
self.assertFalse(t2.has('a'))
1214
self.assertTrue(t3.has('a'))
1215
self.assertFalse(t3.has('c'))
1217
self.assertFalse(t1.has('b/d'))
1218
self.assertFalse(t2.has('d'))
1219
self.assertFalse(t3.has('b/d'))
1221
if t1.is_readonly():
1222
self.build_tree_contents([('b/d', b'newfile\n')])
1224
t2.put_bytes('d', b'newfile\n')
1226
self.assertTrue(t1.has('b/d'))
1227
self.assertTrue(t2.has('d'))
1228
self.assertTrue(t3.has('b/d'))
1230
def test_clone_to_root(self):
1231
orig_transport = self.get_transport()
1232
# Repeatedly go up to a parent directory until we're at the root
1233
# directory of this transport
1234
root_transport = orig_transport
1235
new_transport = root_transport.clone("..")
1236
# as we are walking up directories, the path must be
1237
# growing less, except at the top
1238
self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1239
new_transport.base == root_transport.base)
1240
while new_transport.base != root_transport.base:
1241
root_transport = new_transport
1242
new_transport = root_transport.clone("..")
1243
# as we are walking up directories, the path must be
1244
# growing less, except at the top
1245
self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1246
new_transport.base == root_transport.base)
1248
# Cloning to "/" should take us to exactly the same location.
1249
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1250
# the abspath of "/" from the original transport should be the same
1251
# as the base at the root:
1252
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1254
# At the root, the URL must still end with / as its a directory
1255
self.assertEqual(root_transport.base[-1], '/')
1257
def test_clone_from_root(self):
1258
"""At the root, cloning to a simple dir should just do string append."""
1259
orig_transport = self.get_transport()
1260
root_transport = orig_transport.clone('/')
1261
self.assertEqual(root_transport.base + '.bzr/',
1262
root_transport.clone('.bzr').base)
1264
def test_base_url(self):
1265
t = self.get_transport()
1266
self.assertEqual('/', t.base[-1])
1268
def test_relpath(self):
1269
t = self.get_transport()
1270
self.assertEqual('', t.relpath(t.base))
1272
self.assertEqual('', t.relpath(t.base[:-1]))
1273
# subdirs which don't exist should still give relpaths.
1274
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1275
# trailing slash should be the same.
1276
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1278
def test_relpath_at_root(self):
1279
t = self.get_transport()
1280
# clone all the way to the top
1281
new_transport = t.clone('..')
1282
while new_transport.base != t.base:
1284
new_transport = t.clone('..')
1285
# we must be able to get a relpath below the root
1286
self.assertEqual('', t.relpath(t.base))
1287
# and a deeper one should work too
1288
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1290
def test_abspath(self):
1291
# smoke test for abspath. Corner cases for backends like unix fs's
1292
# that have aliasing problems like symlinks should go in backend
1293
# specific test cases.
1294
transport = self.get_transport()
1296
self.assertEqual(transport.base + 'relpath',
1297
transport.abspath('relpath'))
1299
# This should work without raising an error.
1300
transport.abspath("/")
1302
# the abspath of "/" and "/foo/.." should result in the same location
1303
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1305
self.assertEqual(transport.clone("/").abspath('foo'),
1306
transport.abspath("/foo"))
1308
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1309
def test_win32_abspath(self):
1310
# Note: we tried to set sys.platform='win32' so we could test on
1311
# other platforms too, but then osutils does platform specific
1312
# things at import time which defeated us...
1313
if sys.platform != 'win32':
1315
'Testing drive letters in abspath implemented only for win32')
1317
# smoke test for abspath on win32.
1318
# a transport based on 'file:///' never fully qualifies the drive.
1319
transport = _mod_transport.get_transport_from_url("file:///")
1320
self.assertEqual(transport.abspath("/"), "file:///")
1322
# but a transport that starts with a drive spec must keep it.
1323
transport = _mod_transport.get_transport_from_url("file:///C:/")
1324
self.assertEqual(transport.abspath("/"), "file:///C:/")
1326
def test_local_abspath(self):
1327
transport = self.get_transport()
1329
p = transport.local_abspath('.')
1330
except (errors.NotLocalUrl, TransportNotPossible) as e:
1331
# should be formattable
1334
self.assertEqual(getcwd(), p)
1336
def test_abspath_at_root(self):
1337
t = self.get_transport()
1338
# clone all the way to the top
1339
new_transport = t.clone('..')
1340
while new_transport.base != t.base:
1342
new_transport = t.clone('..')
1343
# we must be able to get a abspath of the root when we ask for
1344
# t.abspath('..') - this due to our choice that clone('..')
1345
# should return the root from the root, combined with the desire that
1346
# the url from clone('..') and from abspath('..') should be the same.
1347
self.assertEqual(t.base, t.abspath('..'))
1348
# '' should give us the root
1349
self.assertEqual(t.base, t.abspath(''))
1350
# and a path should append to the url
1351
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1353
def test_iter_files_recursive(self):
1354
transport = self.get_transport()
1355
if not transport.listable():
1356
self.assertRaises(TransportNotPossible,
1357
transport.iter_files_recursive)
1359
self.build_tree(['isolated/',
1363
'isolated/dir/b%25z', # make sure quoting is correct
1365
transport=transport)
1366
paths = set(transport.iter_files_recursive())
1367
# nb the directories are not converted
1368
self.assertEqual(paths,
1369
{'isolated/dir/foo',
1371
'isolated/dir/b%2525z',
1373
sub_transport = transport.clone('isolated')
1374
paths = set(sub_transport.iter_files_recursive())
1375
self.assertEqual(paths,
1376
{'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1378
def test_copy_tree(self):
1379
# TODO: test file contents and permissions are preserved. This test was
1380
# added just to ensure that quoting was handled correctly.
1381
# -- David Allouche 2006-08-11
1382
transport = self.get_transport()
1383
if not transport.listable():
1384
self.assertRaises(TransportNotPossible,
1385
transport.iter_files_recursive)
1387
if transport.is_readonly():
1389
self.build_tree(['from/',
1393
'from/dir/b%25z', # make sure quoting is correct
1395
transport=transport)
1396
transport.copy_tree('from', 'to')
1397
paths = set(transport.iter_files_recursive())
1398
self.assertEqual(paths,
1408
def test_copy_tree_to_transport(self):
1409
transport = self.get_transport()
1410
if not transport.listable():
1411
self.assertRaises(TransportNotPossible,
1412
transport.iter_files_recursive)
1414
if transport.is_readonly():
1416
self.build_tree(['from/',
1420
'from/dir/b%25z', # make sure quoting is correct
1422
transport=transport)
1423
from_transport = transport.clone('from')
1424
to_transport = transport.clone('to')
1425
to_transport.ensure_base()
1426
from_transport.copy_tree_to_transport(to_transport)
1427
paths = set(transport.iter_files_recursive())
1428
self.assertEqual(paths,
1438
def test_unicode_paths(self):
1439
"""Test that we can read/write files with Unicode names."""
1440
t = self.get_transport()
1442
# With FAT32 and certain encodings on win32
1443
# '\xe5' and '\xe4' actually map to the same file
1444
# adding a suffix kicks in the 'preserving but insensitive'
1445
# route, and maintains the right files
1446
files = [u'\xe5.1', # a w/ circle iso-8859-1
1447
u'\xe4.2', # a w/ dots iso-8859-1
1448
u'\u017d', # Z with umlat iso-8859-2
1449
u'\u062c', # Arabic j
1450
u'\u0410', # Russian A
1451
u'\u65e5', # Kanji person
1454
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1455
if no_unicode_support:
1456
self.knownFailure("test server cannot handle unicode paths")
1459
self.build_tree(files, transport=t, line_endings='binary')
1460
except UnicodeError:
1462
"cannot handle unicode paths in current encoding")
1464
# A plain unicode string is not a valid url
1466
self.assertRaises(urlutils.InvalidURL, t.get, fname)
1469
fname_utf8 = fname.encode('utf-8')
1470
contents = b'contents of %s\n' % (fname_utf8,)
1471
self.check_transport_contents(contents, t, urlutils.escape(fname))
1473
def test_connect_twice_is_same_content(self):
1474
# check that our server (whatever it is) is accessible reliably
1475
# via get_transport and multiple connections share content.
1476
transport = self.get_transport()
1477
if transport.is_readonly():
1479
transport.put_bytes('foo', b'bar')
1480
transport3 = self.get_transport()
1481
self.check_transport_contents(b'bar', transport3, 'foo')
1483
# now opening at a relative url should give use a sane result:
1484
transport.mkdir('newdir')
1485
transport5 = self.get_transport('newdir')
1486
transport6 = transport5.clone('..')
1487
self.check_transport_contents(b'bar', transport6, 'foo')
1489
def test_lock_write(self):
1490
"""Test transport-level write locks.
1492
These are deprecated and transports may decline to support them.
1494
transport = self.get_transport()
1495
if transport.is_readonly():
1496
self.assertRaises(TransportNotPossible,
1497
transport.lock_write, 'foo')
1499
transport.put_bytes('lock', b'')
1501
lock = transport.lock_write('lock')
1502
except TransportNotPossible:
1504
# TODO make this consistent on all platforms:
1505
# self.assertRaises(LockError, transport.lock_write, 'lock')
1508
def test_lock_read(self):
1509
"""Test transport-level read locks.
1511
These are deprecated and transports may decline to support them.
1513
transport = self.get_transport()
1514
if transport.is_readonly():
1515
open('lock', 'w').close()
1517
transport.put_bytes('lock', b'')
1519
lock = transport.lock_read('lock')
1520
except TransportNotPossible:
1522
# TODO make this consistent on all platforms:
1523
# self.assertRaises(LockError, transport.lock_read, 'lock')
1526
def test_readv(self):
1527
transport = self.get_transport()
1528
if transport.is_readonly():
1529
with open('a', 'w') as f:
1530
f.write('0123456789')
1532
transport.put_bytes('a', b'0123456789')
1534
d = list(transport.readv('a', ((0, 1),)))
1535
self.assertEqual(d[0], (0, b'0'))
1537
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1538
self.assertEqual(d[0], (0, b'0'))
1539
self.assertEqual(d[1], (1, b'1'))
1540
self.assertEqual(d[2], (3, b'34'))
1541
self.assertEqual(d[3], (9, b'9'))
1543
def test_readv_out_of_order(self):
1544
transport = self.get_transport()
1545
if transport.is_readonly():
1546
with open('a', 'w') as f:
1547
f.write('0123456789')
1549
transport.put_bytes('a', b'01234567890')
1551
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1552
self.assertEqual(d[0], (1, b'1'))
1553
self.assertEqual(d[1], (9, b'9'))
1554
self.assertEqual(d[2], (0, b'0'))
1555
self.assertEqual(d[3], (3, b'34'))
1557
def test_readv_with_adjust_for_latency(self):
1558
transport = self.get_transport()
1559
# the adjust for latency flag expands the data region returned
1560
# according to a per-transport heuristic, so testing is a little
1561
# tricky as we need more data than the largest combining that our
1562
# transports do. To accomodate this we generate random data and cross
1563
# reference the returned data with the random data. To avoid doing
1564
# multiple large random byte look ups we do several tests on the same
1566
content = osutils.rand_bytes(200 * 1024)
1567
content_size = len(content)
1568
if transport.is_readonly():
1569
self.build_tree_contents([('a', content)])
1571
transport.put_bytes('a', content)
1573
def check_result_data(result_vector):
1574
for item in result_vector:
1575
data_len = len(item[1])
1576
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1579
result = list(transport.readv('a', ((0, 30),),
1580
adjust_for_latency=True, upper_limit=content_size))
1581
# we expect 1 result, from 0, to something > 30
1582
self.assertEqual(1, len(result))
1583
self.assertEqual(0, result[0][0])
1584
self.assertTrue(len(result[0][1]) >= 30)
1585
check_result_data(result)
1586
# end of file corner case
1587
result = list(transport.readv('a', ((204700, 100),),
1588
adjust_for_latency=True, upper_limit=content_size))
1589
# we expect 1 result, from 204800- its length, to the end
1590
self.assertEqual(1, len(result))
1591
data_len = len(result[0][1])
1592
self.assertEqual(204800 - data_len, result[0][0])
1593
self.assertTrue(data_len >= 100)
1594
check_result_data(result)
1595
# out of order ranges are made in order
1596
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1597
adjust_for_latency=True, upper_limit=content_size))
1598
# we expect 2 results, in order, start and end.
1599
self.assertEqual(2, len(result))
1601
data_len = len(result[0][1])
1602
self.assertEqual(0, result[0][0])
1603
self.assertTrue(data_len >= 30)
1605
data_len = len(result[1][1])
1606
self.assertEqual(204800 - data_len, result[1][0])
1607
self.assertTrue(data_len >= 100)
1608
check_result_data(result)
1609
# close ranges get combined (even if out of order)
1610
for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1611
result = list(transport.readv('a', request_vector,
1612
adjust_for_latency=True, upper_limit=content_size))
1613
self.assertEqual(1, len(result))
1614
data_len = len(result[0][1])
1615
# minimum length is from 400 to 1034 - 634
1616
self.assertTrue(data_len >= 634)
1617
# must contain the region 400 to 1034
1618
self.assertTrue(result[0][0] <= 400)
1619
self.assertTrue(result[0][0] + data_len >= 1034)
1620
check_result_data(result)
1622
def test_readv_with_adjust_for_latency_with_big_file(self):
1623
transport = self.get_transport()
1624
# test from observed failure case.
1625
if transport.is_readonly():
1626
with open('a', 'w') as f:
1627
f.write('a' * 1024 * 1024)
1629
transport.put_bytes('a', b'a' * 1024 * 1024)
1630
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1631
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1632
(465373, 800), (947422, 800)]
1633
results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
1634
found_items = [False] * 9
1635
for pos, (start, length) in enumerate(broken_vector):
1636
# check the range is covered by the result
1637
for offset, data in results:
1638
if offset <= start and start + length <= offset + len(data):
1639
found_items[pos] = True
1640
self.assertEqual([True] * 9, found_items)
1642
def test_get_with_open_write_stream_sees_all_content(self):
1643
t = self.get_transport()
1646
with t.open_write_stream('foo') as handle:
1647
handle.write(b'bcd')
1648
self.assertEqual([(0, b'b'), (2, b'd')], list(
1649
t.readv('foo', ((0, 1), (2, 1)))))
1651
def test_get_smart_medium(self):
1652
"""All transports must either give a smart medium, or know they can't.
1654
transport = self.get_transport()
1656
client_medium = transport.get_smart_medium()
1657
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1658
except errors.NoSmartMedium:
1659
# as long as we got it we're fine
1662
def test_readv_short_read(self):
1663
transport = self.get_transport()
1664
if transport.is_readonly():
1665
with open('a', 'w') as f:
1666
f.write('0123456789')
1668
transport.put_bytes('a', b'01234567890')
1670
# This is intentionally reading off the end of the file
1671
# since we are sure that it cannot get there
1672
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1673
# Can be raised by paramiko
1675
transport.readv, 'a', [(1, 1), (8, 10)])
1677
# This is trying to seek past the end of the file, it should
1678
# also raise a special error
1679
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1680
transport.readv, 'a', [(12, 2)])
1682
def test_no_segment_parameters(self):
1683
"""Segment parameters should be stripped and stored in
1684
transport.segment_parameters."""
1685
transport = self.get_transport("foo")
1686
self.assertEqual({}, transport.get_segment_parameters())
1688
def test_segment_parameters(self):
1689
"""Segment parameters should be stripped and stored in
1690
transport.get_segment_parameters()."""
1691
base_url = self._server.get_url()
1692
parameters = {"key1": "val1", "key2": "val2"}
1693
url = urlutils.join_segment_parameters(base_url, parameters)
1694
transport = _mod_transport.get_transport_from_url(url)
1695
self.assertEqual(parameters, transport.get_segment_parameters())
1697
def test_set_segment_parameters(self):
1698
"""Segment parameters can be set and show up in base."""
1699
transport = self.get_transport("foo")
1700
orig_base = transport.base
1701
transport.set_segment_parameter("arm", "board")
1702
self.assertEqual("%s,arm=board" % orig_base, transport.base)
1703
self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1704
transport.set_segment_parameter("arm", None)
1705
transport.set_segment_parameter("nonexistant", None)
1706
self.assertEqual({}, transport.get_segment_parameters())
1707
self.assertEqual(orig_base, transport.base)
1709
def test_stat_symlink(self):
1710
# if a transport points directly to a symlink (and supports symlinks
1711
# at all) you can tell this. helps with bug 32669.
1712
t = self.get_transport()
1714
t.symlink('target', 'link')
1715
except TransportNotPossible:
1716
raise TestSkipped("symlinks not supported")
1717
t2 = t.clone('link')
1719
self.assertTrue(stat.S_ISLNK(st.st_mode))
1721
def test_abspath_url_unquote_unreserved(self):
1722
"""URLs from abspath should have unreserved characters unquoted
1724
Need consistent quoting notably for tildes, see lp:842223 for more.
1726
t = self.get_transport()
1727
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1728
self.assertEqual(t.base + "-.09AZ_az~",
1729
t.abspath(needlessly_escaped_dir))
1731
def test_clone_url_unquote_unreserved(self):
1732
"""Base URL of a cloned branch needs unreserved characters unquoted
1734
Cloned transports should be prefix comparable for things like the
1735
isolation checking of tests, see lp:842223 for more.
1737
t1 = self.get_transport()
1738
needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1739
self.build_tree([needlessly_escaped_dir], transport=t1)
1740
t2 = t1.clone(needlessly_escaped_dir)
1741
self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1743
def test_hook_post_connection_one(self):
1744
"""Fire post_connect hook after a ConnectedTransport is first used"""
1746
Transport.hooks.install_named_hook("post_connect", log.append, None)
1747
t = self.get_transport()
1748
self.assertEqual([], log)
1749
t.has("non-existant")
1750
if isinstance(t, RemoteTransport):
1751
self.assertEqual([t.get_smart_medium()], log)
1752
elif isinstance(t, ConnectedTransport):
1753
self.assertEqual([t], log)
1755
self.assertEqual([], log)
1757
def test_hook_post_connection_multi(self):
1758
"""Fire post_connect hook once per unshared underlying connection"""
1760
Transport.hooks.install_named_hook("post_connect", log.append, None)
1761
t1 = self.get_transport()
1763
t3 = self.get_transport()
1764
self.assertEqual([], log)
1768
if isinstance(t1, RemoteTransport):
1769
self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1770
elif isinstance(t1, ConnectedTransport):
1771
self.assertEqual([t1, t3], log)
1773
self.assertEqual([], log)