1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
35
transport as _mod_transport,
38
from bzrlib.errors import (ConnectionError,
45
from bzrlib.osutils import getcwd
46
from bzrlib.smart import medium
47
from bzrlib.tests import (
52
from bzrlib.tests import test_server
53
from bzrlib.tests.test_transport import TestTransportImplementation
54
from bzrlib.transport import (
56
_get_transport_modules,
58
from bzrlib.transport.memory import MemoryTransport
61
def get_transport_test_permutations(module):
62
"""Get the permutations module wants to have tested."""
63
if getattr(module, 'get_test_permutations', None) is None:
65
"transport module %s doesn't provide get_test_permutations()"
68
return module.get_test_permutations()
71
def transport_test_permutations():
72
"""Return a list of the klass, server_factory pairs to test."""
74
for module in _get_transport_modules():
76
permutations = get_transport_test_permutations(
77
pyutils.get_named_object(module))
78
for (klass, server_factory) in permutations:
79
scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
80
{"transport_class":klass,
81
"transport_server":server_factory})
82
result.append(scenario)
83
except errors.DependencyNotPresent, e:
84
# Continue even if a dependency prevents us
85
# from adding this test
90
def load_tests(standard_tests, module, loader):
91
"""Multiply tests for tranport implementations."""
92
result = loader.suiteClass()
93
scenarios = transport_test_permutations()
94
return multiply_tests(standard_tests, scenarios, result)
97
class TransportTests(TestTransportImplementation):
100
super(TransportTests, self).setUp()
101
self.overrideEnv('BZR_NO_SMART_VFS', None)
103
def check_transport_contents(self, content, transport, relpath):
104
"""Check that transport.get_bytes(relpath) == content."""
105
self.assertEqualDiff(content, transport.get_bytes(relpath))
107
def test_ensure_base_missing(self):
108
""".ensure_base() should create the directory if it doesn't exist"""
109
t = self.get_transport()
111
if t_a.is_readonly():
112
self.assertRaises(TransportNotPossible,
115
self.assertTrue(t_a.ensure_base())
116
self.assertTrue(t.has('a'))
118
def test_ensure_base_exists(self):
119
""".ensure_base() should just be happy if it already exists"""
120
t = self.get_transport()
126
# ensure_base returns False if it didn't create the base
127
self.assertFalse(t_a.ensure_base())
129
def test_ensure_base_missing_parent(self):
130
""".ensure_base() will fail if the parent dir doesn't exist"""
131
t = self.get_transport()
137
self.assertRaises(NoSuchFile, t_b.ensure_base)
139
def test_external_url(self):
140
""".external_url either works or raises InProcessTransport."""
141
t = self.get_transport()
144
except errors.InProcessTransport:
148
t = self.get_transport()
150
files = ['a', 'b', 'e', 'g', '%']
151
self.build_tree(files, transport=t)
152
self.assertEqual(True, t.has('a'))
153
self.assertEqual(False, t.has('c'))
154
self.assertEqual(True, t.has(urlutils.escape('%')))
155
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
156
'e', 'f', 'g', 'h'])),
157
[True, True, False, False,
158
True, False, True, False])
159
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
160
self.assertEqual(False, t.has_any(['c', 'd', 'f',
161
urlutils.escape('%%')]))
162
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
163
'e', 'f', 'g', 'h']))),
164
[True, True, False, False,
165
True, False, True, False])
166
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
167
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
169
def test_has_root_works(self):
170
if self.transport_server is test_server.SmartTCPServer_for_testing:
171
raise TestNotApplicable(
172
"SmartTCPServer_for_testing intentionally does not allow "
174
current_transport = self.get_transport()
175
self.assertTrue(current_transport.has('/'))
176
root = current_transport.clone('/')
177
self.assertTrue(root.has(''))
180
t = self.get_transport()
182
files = ['a', 'b', 'e', 'g']
183
contents = ['contents of a\n',
188
self.build_tree(files, transport=t, line_endings='binary')
189
self.check_transport_contents('contents of a\n', t, 'a')
190
content_f = t.get_multi(files)
191
# Use itertools.izip() instead of use zip() or map(), since they fully
192
# evaluate their inputs, the transport requests should be issued and
193
# handled sequentially (we don't want to force transport to buffer).
194
for content, f in itertools.izip(contents, content_f):
195
self.assertEqual(content, f.read())
197
content_f = t.get_multi(iter(files))
198
# Use itertools.izip() for the same reason
199
for content, f in itertools.izip(contents, content_f):
200
self.assertEqual(content, f.read())
202
def test_get_unknown_file(self):
203
t = self.get_transport()
205
contents = ['contents of a\n',
208
self.build_tree(files, transport=t, line_endings='binary')
209
self.assertRaises(NoSuchFile, t.get, 'c')
210
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
211
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
213
def test_get_directory_read_gives_ReadError(self):
214
"""consistent errors for read() on a file returned by get()."""
215
t = self.get_transport()
217
self.build_tree(['a directory/'])
219
t.mkdir('a%20directory')
220
# getting the file must either work or fail with a PathError
222
a_file = t.get('a%20directory')
223
except (errors.PathError, errors.RedirectRequested):
224
# early failure return immediately.
226
# having got a file, read() must either work (i.e. http reading a dir
227
# listing) or fail with ReadError
230
except errors.ReadError:
233
def test_get_bytes(self):
234
t = self.get_transport()
236
files = ['a', 'b', 'e', 'g']
237
contents = ['contents of a\n',
242
self.build_tree(files, transport=t, line_endings='binary')
243
self.check_transport_contents('contents of a\n', t, 'a')
245
for content, fname in zip(contents, files):
246
self.assertEqual(content, t.get_bytes(fname))
248
def test_get_bytes_unknown_file(self):
249
t = self.get_transport()
250
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
252
def test_get_with_open_write_stream_sees_all_content(self):
253
t = self.get_transport()
256
handle = t.open_write_stream('foo')
259
self.assertEqual('b', t.get_bytes('foo'))
263
def test_get_bytes_with_open_write_stream_sees_all_content(self):
264
t = self.get_transport()
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
273
self.assertEqual('b', f.read())
279
def test_put_bytes(self):
280
t = self.get_transport()
283
self.assertRaises(TransportNotPossible,
284
t.put_bytes, 'a', 'some text for a\n')
287
t.put_bytes('a', 'some text for a\n')
288
self.assertTrue(t.has('a'))
289
self.check_transport_contents('some text for a\n', t, 'a')
291
# The contents should be overwritten
292
t.put_bytes('a', 'new text for a\n')
293
self.check_transport_contents('new text for a\n', t, 'a')
295
self.assertRaises(NoSuchFile,
296
t.put_bytes, 'path/doesnt/exist/c', 'contents')
298
def test_put_bytes_non_atomic(self):
299
t = self.get_transport()
302
self.assertRaises(TransportNotPossible,
303
t.put_bytes_non_atomic, 'a', 'some text for a\n')
306
self.assertFalse(t.has('a'))
307
t.put_bytes_non_atomic('a', 'some text for a\n')
308
self.assertTrue(t.has('a'))
309
self.check_transport_contents('some text for a\n', t, 'a')
310
# Put also replaces contents
311
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
312
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
314
# Make sure we can create another file
315
t.put_bytes_non_atomic('d', 'contents for\nd\n')
316
# And overwrite 'a' with empty contents
317
t.put_bytes_non_atomic('a', '')
318
self.check_transport_contents('contents for\nd\n', t, 'd')
319
self.check_transport_contents('', t, 'a')
321
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
323
# Now test the create_parent flag
324
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
self.assertFalse(t.has('dir/a'))
327
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
create_parent_dir=True)
329
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
331
# But we still get NoSuchFile if we can't make the parent dir
332
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
334
create_parent_dir=True)
336
def test_put_bytes_permissions(self):
337
t = self.get_transport()
341
if not t._can_roundtrip_unix_modebits():
342
# Can't roundtrip, so no need to run this test
344
t.put_bytes('mode644', 'test text\n', mode=0644)
345
self.assertTransportMode(t, 'mode644', 0644)
346
t.put_bytes('mode666', 'test text\n', mode=0666)
347
self.assertTransportMode(t, 'mode666', 0666)
348
t.put_bytes('mode600', 'test text\n', mode=0600)
349
self.assertTransportMode(t, 'mode600', 0600)
350
# Yes, you can put_bytes a file such that it becomes readonly
351
t.put_bytes('mode400', 'test text\n', mode=0400)
352
self.assertTransportMode(t, 'mode400', 0400)
354
# The default permissions should be based on the current umask
355
umask = osutils.get_umask()
356
t.put_bytes('nomode', 'test text\n', mode=None)
357
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
359
def test_put_bytes_non_atomic_permissions(self):
360
t = self.get_transport()
364
if not t._can_roundtrip_unix_modebits():
365
# Can't roundtrip, so no need to run this test
367
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
368
self.assertTransportMode(t, 'mode644', 0644)
369
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
370
self.assertTransportMode(t, 'mode666', 0666)
371
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
372
self.assertTransportMode(t, 'mode600', 0600)
373
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
374
self.assertTransportMode(t, 'mode400', 0400)
376
# The default permissions should be based on the current umask
377
umask = osutils.get_umask()
378
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
379
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
381
# We should also be able to set the mode for a parent directory
383
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
384
dir_mode=0700, create_parent_dir=True)
385
self.assertTransportMode(t, 'dir700', 0700)
386
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
387
dir_mode=0770, create_parent_dir=True)
388
self.assertTransportMode(t, 'dir770', 0770)
389
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
390
dir_mode=0777, create_parent_dir=True)
391
self.assertTransportMode(t, 'dir777', 0777)
393
def test_put_file(self):
394
t = self.get_transport()
397
self.assertRaises(TransportNotPossible,
398
t.put_file, 'a', StringIO('some text for a\n'))
401
result = t.put_file('a', StringIO('some text for a\n'))
402
# put_file returns the length of the data written
403
self.assertEqual(16, result)
404
self.assertTrue(t.has('a'))
405
self.check_transport_contents('some text for a\n', t, 'a')
406
# Put also replaces contents
407
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
408
self.assertEqual(19, result)
409
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
410
self.assertRaises(NoSuchFile,
411
t.put_file, 'path/doesnt/exist/c',
412
StringIO('contents'))
414
def test_put_file_non_atomic(self):
415
t = self.get_transport()
418
self.assertRaises(TransportNotPossible,
419
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
422
self.assertFalse(t.has('a'))
423
t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
self.assertTrue(t.has('a'))
425
self.check_transport_contents('some text for a\n', t, 'a')
426
# Put also replaces contents
427
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
428
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
430
# Make sure we can create another file
431
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
432
# And overwrite 'a' with empty contents
433
t.put_file_non_atomic('a', StringIO(''))
434
self.check_transport_contents('contents for\nd\n', t, 'd')
435
self.check_transport_contents('', t, 'a')
437
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
438
StringIO('contents\n'))
439
# Now test the create_parent flag
440
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
StringIO('contents\n'))
442
self.assertFalse(t.has('dir/a'))
443
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
create_parent_dir=True)
445
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
447
# But we still get NoSuchFile if we can't make the parent dir
448
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
449
StringIO('contents\n'),
450
create_parent_dir=True)
452
def test_put_file_permissions(self):
454
t = self.get_transport()
458
if not t._can_roundtrip_unix_modebits():
459
# Can't roundtrip, so no need to run this test
461
t.put_file('mode644', StringIO('test text\n'), mode=0644)
462
self.assertTransportMode(t, 'mode644', 0644)
463
t.put_file('mode666', StringIO('test text\n'), mode=0666)
464
self.assertTransportMode(t, 'mode666', 0666)
465
t.put_file('mode600', StringIO('test text\n'), mode=0600)
466
self.assertTransportMode(t, 'mode600', 0600)
467
# Yes, you can put a file such that it becomes readonly
468
t.put_file('mode400', StringIO('test text\n'), mode=0400)
469
self.assertTransportMode(t, 'mode400', 0400)
470
# The default permissions should be based on the current umask
471
umask = osutils.get_umask()
472
t.put_file('nomode', StringIO('test text\n'), mode=None)
473
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
475
def test_put_file_non_atomic_permissions(self):
476
t = self.get_transport()
480
if not t._can_roundtrip_unix_modebits():
481
# Can't roundtrip, so no need to run this test
483
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
484
self.assertTransportMode(t, 'mode644', 0644)
485
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
486
self.assertTransportMode(t, 'mode666', 0666)
487
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
488
self.assertTransportMode(t, 'mode600', 0600)
489
# Yes, you can put_file_non_atomic a file such that it becomes readonly
490
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
491
self.assertTransportMode(t, 'mode400', 0400)
493
# The default permissions should be based on the current umask
494
umask = osutils.get_umask()
495
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
496
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
498
# We should also be able to set the mode for a parent directory
501
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
502
dir_mode=0700, create_parent_dir=True)
503
self.assertTransportMode(t, 'dir700', 0700)
504
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
505
dir_mode=0770, create_parent_dir=True)
506
self.assertTransportMode(t, 'dir770', 0770)
507
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
508
dir_mode=0777, create_parent_dir=True)
509
self.assertTransportMode(t, 'dir777', 0777)
511
def test_put_bytes_unicode(self):
512
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
513
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
514
# (we don't want to encode unicode here at all, callers should be
515
# strictly passing bytes to put_bytes), but we allow it for backwards
516
# compatibility. At some point we should use a specific exception.
517
# See https://bugs.launchpad.net/bzr/+bug/106898.
518
t = self.get_transport()
521
unicode_string = u'\u1234'
523
(AssertionError, UnicodeEncodeError),
524
t.put_bytes, 'foo', unicode_string)
526
def test_put_file_unicode(self):
527
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
528
# This situation can happen (and has) if code is careless about the type
529
# of "string" they initialise/write to a StringIO with. We cannot use
530
# cStringIO, because it never returns unicode from read.
531
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
532
# raise, but we raise it for hysterical raisins.
533
t = self.get_transport()
536
unicode_file = pyStringIO(u'\u1234')
537
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
539
def test_mkdir(self):
540
t = self.get_transport()
543
# cannot mkdir on readonly transports. We're not testing for
544
# cache coherency because cache behaviour is not currently
545
# defined for the transport interface.
546
self.assertRaises(TransportNotPossible, t.mkdir, '.')
547
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
548
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
549
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
553
self.assertEqual(t.has('dir_a'), True)
554
self.assertEqual(t.has('dir_b'), False)
557
self.assertEqual(t.has('dir_b'), True)
559
t.mkdir_multi(['dir_c', 'dir_d'])
561
t.mkdir_multi(iter(['dir_e', 'dir_f']))
562
self.assertEqual(list(t.has_multi(
563
['dir_a', 'dir_b', 'dir_c', 'dir_q',
564
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
565
[True, True, True, False,
566
True, True, True, True])
568
# we were testing that a local mkdir followed by a transport
569
# mkdir failed thusly, but given that we * in one process * do not
570
# concurrently fiddle with disk dirs and then use transport to do
571
# things, the win here seems marginal compared to the constraint on
572
# the interface. RBC 20051227
574
self.assertRaises(FileExists, t.mkdir, 'dir_g')
576
# Test get/put in sub-directories
577
t.put_bytes('dir_a/a', 'contents of dir_a/a')
578
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
579
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
580
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
582
# mkdir of a dir with an absent parent
583
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
585
def test_mkdir_permissions(self):
586
t = self.get_transport()
589
if not t._can_roundtrip_unix_modebits():
590
# no sense testing on this transport
592
# Test mkdir with a mode
593
t.mkdir('dmode755', mode=0755)
594
self.assertTransportMode(t, 'dmode755', 0755)
595
t.mkdir('dmode555', mode=0555)
596
self.assertTransportMode(t, 'dmode555', 0555)
597
t.mkdir('dmode777', mode=0777)
598
self.assertTransportMode(t, 'dmode777', 0777)
599
t.mkdir('dmode700', mode=0700)
600
self.assertTransportMode(t, 'dmode700', 0700)
601
t.mkdir_multi(['mdmode755'], mode=0755)
602
self.assertTransportMode(t, 'mdmode755', 0755)
604
# Default mode should be based on umask
605
umask = osutils.get_umask()
606
t.mkdir('dnomode', mode=None)
607
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
609
def test_opening_a_file_stream_creates_file(self):
610
t = self.get_transport()
613
handle = t.open_write_stream('foo')
615
self.assertEqual('', t.get_bytes('foo'))
619
def test_opening_a_file_stream_can_set_mode(self):
620
t = self.get_transport()
623
if not t._can_roundtrip_unix_modebits():
624
# Can't roundtrip, so no need to run this test
626
def check_mode(name, mode, expected):
627
handle = t.open_write_stream(name, mode=mode)
629
self.assertTransportMode(t, name, expected)
630
check_mode('mode644', 0644, 0644)
631
check_mode('mode666', 0666, 0666)
632
check_mode('mode600', 0600, 0600)
633
# The default permissions should be based on the current umask
634
check_mode('nomode', None, 0666 & ~osutils.get_umask())
636
def test_copy_to(self):
637
# FIXME: test: same server to same server (partly done)
638
# same protocol two servers
639
# and different protocols (done for now except for MemoryTransport.
642
def simple_copy_files(transport_from, transport_to):
643
files = ['a', 'b', 'c', 'd']
644
self.build_tree(files, transport=transport_from)
645
self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
self.check_transport_contents(transport_to.get_bytes(f),
650
t = self.get_transport()
651
temp_transport = MemoryTransport('memory:///')
652
simple_copy_files(t, temp_transport)
653
if not t.is_readonly():
654
t.mkdir('copy_to_simple')
655
t2 = t.clone('copy_to_simple')
656
simple_copy_files(t, t2)
659
# Test that copying into a missing directory raises
662
self.build_tree(['e/', 'e/f'])
665
t.put_bytes('e/f', 'contents of e')
666
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
667
temp_transport.mkdir('e')
668
t.copy_to(['e/f'], temp_transport)
671
temp_transport = MemoryTransport('memory:///')
673
files = ['a', 'b', 'c', 'd']
674
t.copy_to(iter(files), temp_transport)
676
self.check_transport_contents(temp_transport.get_bytes(f),
680
for mode in (0666, 0644, 0600, 0400):
681
temp_transport = MemoryTransport("memory:///")
682
t.copy_to(files, temp_transport, mode=mode)
684
self.assertTransportMode(temp_transport, f, mode)
686
def test_create_prefix(self):
687
t = self.get_transport()
688
sub = t.clone('foo').clone('bar')
691
except TransportNotPossible:
692
self.assertTrue(t.is_readonly())
694
self.assertTrue(t.has('foo/bar'))
696
def test_append_file(self):
697
t = self.get_transport()
700
self.assertRaises(TransportNotPossible,
701
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
703
t.put_bytes('a', 'diff\ncontents for\na\n')
704
t.put_bytes('b', 'contents\nfor b\n')
707
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
709
self.check_transport_contents(
710
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
713
# a file with no parent should fail..
714
self.assertRaises(NoSuchFile,
715
t.append_file, 'missing/path', StringIO('content'))
717
# And we can create new files, too
719
t.append_file('c', StringIO('some text\nfor a missing file\n')))
720
self.check_transport_contents('some text\nfor a missing file\n',
723
def test_append_bytes(self):
724
t = self.get_transport()
727
self.assertRaises(TransportNotPossible,
728
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
731
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
732
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
735
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
737
self.check_transport_contents(
738
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
741
# a file with no parent should fail..
742
self.assertRaises(NoSuchFile,
743
t.append_bytes, 'missing/path', 'content')
745
def test_append_multi(self):
746
t = self.get_transport()
750
t.put_bytes('a', 'diff\ncontents for\na\n'
751
'add\nsome\nmore\ncontents\n')
752
t.put_bytes('b', 'contents\nfor b\n')
754
self.assertEqual((43, 15),
755
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
756
('b', StringIO('some\nmore\nfor\nb\n'))]))
758
self.check_transport_contents(
759
'diff\ncontents for\na\n'
760
'add\nsome\nmore\ncontents\n'
761
'and\nthen\nsome\nmore\n',
763
self.check_transport_contents(
765
'some\nmore\nfor\nb\n',
768
self.assertEqual((62, 31),
769
t.append_multi(iter([('a', StringIO('a little bit more\n')),
770
('b', StringIO('from an iterator\n'))])))
771
self.check_transport_contents(
772
'diff\ncontents for\na\n'
773
'add\nsome\nmore\ncontents\n'
774
'and\nthen\nsome\nmore\n'
775
'a little bit more\n',
777
self.check_transport_contents(
779
'some\nmore\nfor\nb\n'
780
'from an iterator\n',
783
self.assertEqual((80, 0),
784
t.append_multi([('a', StringIO('some text in a\n')),
785
('d', StringIO('missing file r\n'))]))
787
self.check_transport_contents(
788
'diff\ncontents for\na\n'
789
'add\nsome\nmore\ncontents\n'
790
'and\nthen\nsome\nmore\n'
791
'a little bit more\n'
794
self.check_transport_contents('missing file r\n', t, 'd')
796
def test_append_file_mode(self):
797
"""Check that append accepts a mode parameter"""
798
# check append accepts a mode
799
t = self.get_transport()
801
self.assertRaises(TransportNotPossible,
802
t.append_file, 'f', StringIO('f'), mode=None)
804
t.append_file('f', StringIO('f'), mode=None)
806
def test_append_bytes_mode(self):
807
# check append_bytes accepts a mode
808
t = self.get_transport()
810
self.assertRaises(TransportNotPossible,
811
t.append_bytes, 'f', 'f', mode=None)
813
t.append_bytes('f', 'f', mode=None)
815
def test_delete(self):
816
# TODO: Test Transport.delete
817
t = self.get_transport()
819
# Not much to do with a readonly transport
821
self.assertRaises(TransportNotPossible, t.delete, 'missing')
824
t.put_bytes('a', 'a little bit of text\n')
825
self.assertTrue(t.has('a'))
827
self.assertFalse(t.has('a'))
829
self.assertRaises(NoSuchFile, t.delete, 'a')
831
t.put_bytes('a', 'a text\n')
832
t.put_bytes('b', 'b text\n')
833
t.put_bytes('c', 'c text\n')
834
self.assertEqual([True, True, True],
835
list(t.has_multi(['a', 'b', 'c'])))
836
t.delete_multi(['a', 'c'])
837
self.assertEqual([False, True, False],
838
list(t.has_multi(['a', 'b', 'c'])))
839
self.assertFalse(t.has('a'))
840
self.assertTrue(t.has('b'))
841
self.assertFalse(t.has('c'))
843
self.assertRaises(NoSuchFile,
844
t.delete_multi, ['a', 'b', 'c'])
846
self.assertRaises(NoSuchFile,
847
t.delete_multi, iter(['a', 'b', 'c']))
849
t.put_bytes('a', 'another a text\n')
850
t.put_bytes('c', 'another c text\n')
851
t.delete_multi(iter(['a', 'b', 'c']))
853
# We should have deleted everything
854
# SftpServer creates control files in the
855
# working directory, so we can just do a
857
# self.assertEqual([], os.listdir('.'))
859
def test_recommended_page_size(self):
860
"""Transports recommend a page size for partial access to files."""
861
t = self.get_transport()
862
self.assertIsInstance(t.recommended_page_size(), int)
864
def test_rmdir(self):
865
t = self.get_transport()
866
# Not much to do with a readonly transport
868
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
873
# ftp may not be able to raise NoSuchFile for lack of
874
# details when failing
875
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
877
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
879
def test_rmdir_not_empty(self):
880
"""Deleting a non-empty directory raises an exception
882
sftp (and possibly others) don't give us a specific "directory not
883
empty" exception -- we can just see that the operation failed.
885
t = self.get_transport()
890
self.assertRaises(PathError, t.rmdir, 'adir')
892
def test_rmdir_empty_but_similar_prefix(self):
893
"""rmdir does not get confused by sibling paths.
895
A naive implementation of MemoryTransport would refuse to rmdir
896
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
897
uses "path.startswith(dir)" on all file paths to determine if directory
900
t = self.get_transport()
904
t.put_bytes('foo-bar', '')
907
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
self.assertTrue(t.has('foo-bar'))
910
def test_rename_dir_succeeds(self):
911
t = self.get_transport()
913
raise TestSkipped("transport is readonly")
915
t.mkdir('adir/asubdir')
916
t.rename('adir', 'bdir')
917
self.assertTrue(t.has('bdir/asubdir'))
918
self.assertFalse(t.has('adir'))
920
def test_rename_dir_nonempty(self):
921
"""Attempting to replace a nonemtpy directory should fail"""
922
t = self.get_transport()
924
raise TestSkipped("transport is readonly")
926
t.mkdir('adir/asubdir')
928
t.mkdir('bdir/bsubdir')
929
# any kind of PathError would be OK, though we normally expect
931
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
932
# nothing was changed so it should still be as before
933
self.assertTrue(t.has('bdir/bsubdir'))
934
self.assertFalse(t.has('adir/bdir'))
935
self.assertFalse(t.has('adir/bsubdir'))
937
def test_rename_across_subdirs(self):
938
t = self.get_transport()
940
raise TestNotApplicable("transport is readonly")
945
ta.put_bytes('f', 'aoeu')
946
ta.rename('f', '../b/f')
947
self.assertTrue(tb.has('f'))
948
self.assertFalse(ta.has('f'))
949
self.assertTrue(t.has('b/f'))
951
def test_delete_tree(self):
952
t = self.get_transport()
954
# Not much to do with a readonly transport
956
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
959
# and does it like listing ?
962
t.delete_tree('adir')
963
except TransportNotPossible:
964
# ok, this transport does not support delete_tree
967
# did it delete that trivial case?
968
self.assertRaises(NoSuchFile, t.stat, 'adir')
970
self.build_tree(['adir/',
978
t.delete_tree('adir')
979
# adir should be gone now.
980
self.assertRaises(NoSuchFile, t.stat, 'adir')
983
t = self.get_transport()
988
# TODO: I would like to use os.listdir() to
989
# make sure there are no extra files, but SftpServer
990
# creates control files in the working directory
991
# perhaps all of this could be done in a subdirectory
993
t.put_bytes('a', 'a first file\n')
994
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
997
self.assertTrue(t.has('b'))
998
self.assertFalse(t.has('a'))
1000
self.check_transport_contents('a first file\n', t, 'b')
1001
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1004
t.put_bytes('c', 'c this file\n')
1006
self.assertFalse(t.has('c'))
1007
self.check_transport_contents('c this file\n', t, 'b')
1009
# TODO: Try to write a test for atomicity
1010
# TODO: Test moving into a non-existent subdirectory
1011
# TODO: Test Transport.move_multi
1013
def test_copy(self):
1014
t = self.get_transport()
1019
t.put_bytes('a', 'a file\n')
1021
self.check_transport_contents('a file\n', t, 'b')
1023
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1025
# What should the assert be if you try to copy a
1026
# file over a directory?
1027
#self.assertRaises(Something, t.copy, 'a', 'c')
1028
t.put_bytes('d', 'text in d\n')
1030
self.check_transport_contents('text in d\n', t, 'b')
1032
# TODO: test copy_multi
1034
def test_connection_error(self):
1035
"""ConnectionError is raised when connection is impossible.
1037
The error should be raised from the first operation on the transport.
1040
url = self._server.get_bogus_url()
1041
except NotImplementedError:
1042
raise TestSkipped("Transport %s has no bogus URL support." %
1043
self._server.__class__)
1044
t = _mod_transport.get_transport(url)
1045
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
def test_stat(self):
1048
# TODO: Test stat, just try once, and if it throws, stop testing
1049
from stat import S_ISDIR, S_ISREG
1051
t = self.get_transport()
1055
except TransportNotPossible, e:
1056
# This transport cannot stat
1059
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1060
sizes = [14, 0, 16, 0, 18]
1061
self.build_tree(paths, transport=t, line_endings='binary')
1063
for path, size in zip(paths, sizes):
1065
if path.endswith('/'):
1066
self.assertTrue(S_ISDIR(st.st_mode))
1067
# directory sizes are meaningless
1069
self.assertTrue(S_ISREG(st.st_mode))
1070
self.assertEqual(size, st.st_size)
1072
remote_stats = list(t.stat_multi(paths))
1073
remote_iter_stats = list(t.stat_multi(iter(paths)))
1075
self.assertRaises(NoSuchFile, t.stat, 'q')
1076
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1078
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1079
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1080
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1081
subdir = t.clone('subdir')
1082
subdir.stat('./file')
1085
def test_hardlink(self):
1086
from stat import ST_NLINK
1088
t = self.get_transport()
1090
source_name = "original_target"
1091
link_name = "target_link"
1093
self.build_tree([source_name], transport=t)
1096
t.hardlink(source_name, link_name)
1098
self.assertTrue(t.has(source_name))
1099
self.assertTrue(t.has(link_name))
1101
st = t.stat(link_name)
1102
self.assertEqual(st[ST_NLINK], 2)
1103
except TransportNotPossible:
1104
raise TestSkipped("Transport %s does not support hardlinks." %
1105
self._server.__class__)
1107
def test_symlink(self):
1108
from stat import S_ISLNK
1110
t = self.get_transport()
1112
source_name = "original_target"
1113
link_name = "target_link"
1115
self.build_tree([source_name], transport=t)
1118
t.symlink(source_name, link_name)
1120
self.assertTrue(t.has(source_name))
1121
self.assertTrue(t.has(link_name))
1123
st = t.stat(link_name)
1124
self.assertTrue(S_ISLNK(st.st_mode),
1125
"expected symlink, got mode %o" % st.st_mode)
1126
except TransportNotPossible:
1127
raise TestSkipped("Transport %s does not support symlinks." %
1128
self._server.__class__)
1130
raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1132
def test_list_dir(self):
1133
# TODO: Test list_dir, just try once, and if it throws, stop testing
1134
t = self.get_transport()
1136
if not t.listable():
1137
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1140
def sorted_list(d, transport):
1141
l = list(transport.list_dir(d))
1145
self.assertEqual([], sorted_list('.', t))
1146
# c2 is precisely one letter longer than c here to test that
1147
# suffixing is not confused.
1148
# a%25b checks that quoting is done consistently across transports
1149
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1151
if not t.is_readonly():
1152
self.build_tree(tree_names, transport=t)
1154
self.build_tree(tree_names)
1157
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1159
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1160
self.assertEqual(['d', 'e'], sorted_list('c', t))
1162
# Cloning the transport produces an equivalent listing
1163
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1165
if not t.is_readonly():
1172
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1173
self.assertEqual(['e'], sorted_list('c', t))
1175
self.assertListRaises(PathError, t.list_dir, 'q')
1176
self.assertListRaises(PathError, t.list_dir, 'c/f')
1177
# 'a' is a file, list_dir should raise an error
1178
self.assertListRaises(PathError, t.list_dir, 'a')
1180
def test_list_dir_result_is_url_escaped(self):
1181
t = self.get_transport()
1182
if not t.listable():
1183
raise TestSkipped("transport not listable")
1185
if not t.is_readonly():
1186
self.build_tree(['a/', 'a/%'], transport=t)
1188
self.build_tree(['a/', 'a/%'])
1190
names = list(t.list_dir('a'))
1191
self.assertEqual(['%25'], names)
1192
self.assertIsInstance(names[0], str)
1194
def test_clone_preserve_info(self):
1195
t1 = self.get_transport()
1196
if not isinstance(t1, ConnectedTransport):
1197
raise TestSkipped("not a connected transport")
1199
t2 = t1.clone('subdir')
1200
self.assertEquals(t1._scheme, t2._scheme)
1201
self.assertEquals(t1._user, t2._user)
1202
self.assertEquals(t1._password, t2._password)
1203
self.assertEquals(t1._host, t2._host)
1204
self.assertEquals(t1._port, t2._port)
1206
def test__reuse_for(self):
1207
t = self.get_transport()
1208
if not isinstance(t, ConnectedTransport):
1209
raise TestSkipped("not a connected transport")
1211
def new_url(scheme=None, user=None, password=None,
1212
host=None, port=None, path=None):
1213
"""Build a new url from t.base changing only parts of it.
1215
Only the parameters different from None will be changed.
1217
if scheme is None: scheme = t._scheme
1218
if user is None: user = t._user
1219
if password is None: password = t._password
1220
if user is None: user = t._user
1221
if host is None: host = t._host
1222
if port is None: port = t._port
1223
if path is None: path = t._path
1224
return t._unsplit_url(scheme, user, password, host, port, path)
1226
if t._scheme == 'ftp':
1230
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1235
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1236
# passwords are not taken into account because:
1237
# - it makes no sense to have two different valid passwords for the
1239
# - _password in ConnectedTransport is intended to collect what the
1240
# user specified from the command-line and there are cases where the
1241
# new url can contain no password (if the url was built from an
1242
# existing transport.base for example)
1243
# - password are considered part of the credentials provided at
1244
# connection creation time and as such may not be present in the url
1245
# (they may be typed by the user when prompted for example)
1246
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
# We will not connect, we can use a invalid host
1248
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1253
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1254
# No point in trying to reuse a transport for a local URL
1255
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1257
def test_connection_sharing(self):
1258
t = self.get_transport()
1259
if not isinstance(t, ConnectedTransport):
1260
raise TestSkipped("not a connected transport")
1262
c = t.clone('subdir')
1263
# Some transports will create the connection only when needed
1264
t.has('surely_not') # Force connection
1265
self.assertIs(t._get_connection(), c._get_connection())
1267
# Temporary failure, we need to create a new dummy connection
1268
new_connection = None
1269
t._set_connection(new_connection)
1270
# Check that both transports use the same connection
1271
self.assertIs(new_connection, t._get_connection())
1272
self.assertIs(new_connection, c._get_connection())
1274
def test_reuse_connection_for_various_paths(self):
1275
t = self.get_transport()
1276
if not isinstance(t, ConnectedTransport):
1277
raise TestSkipped("not a connected transport")
1279
t.has('surely_not') # Force connection
1280
self.assertIsNot(None, t._get_connection())
1282
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1283
self.assertIsNot(t, subdir)
1284
self.assertIs(t._get_connection(), subdir._get_connection())
1286
home = subdir._reuse_for(t.base + 'home')
1287
self.assertIs(t._get_connection(), home._get_connection())
1288
self.assertIs(subdir._get_connection(), home._get_connection())
1290
def test_clone(self):
1291
# TODO: Test that clone moves up and down the filesystem
1292
t1 = self.get_transport()
1294
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1296
self.assertTrue(t1.has('a'))
1297
self.assertTrue(t1.has('b/c'))
1298
self.assertFalse(t1.has('c'))
1301
self.assertEqual(t1.base + 'b/', t2.base)
1303
self.assertTrue(t2.has('c'))
1304
self.assertFalse(t2.has('a'))
1307
self.assertTrue(t3.has('a'))
1308
self.assertFalse(t3.has('c'))
1310
self.assertFalse(t1.has('b/d'))
1311
self.assertFalse(t2.has('d'))
1312
self.assertFalse(t3.has('b/d'))
1314
if t1.is_readonly():
1315
self.build_tree_contents([('b/d', 'newfile\n')])
1317
t2.put_bytes('d', 'newfile\n')
1319
self.assertTrue(t1.has('b/d'))
1320
self.assertTrue(t2.has('d'))
1321
self.assertTrue(t3.has('b/d'))
1323
def test_clone_to_root(self):
1324
orig_transport = self.get_transport()
1325
# Repeatedly go up to a parent directory until we're at the root
1326
# directory of this transport
1327
root_transport = orig_transport
1328
new_transport = root_transport.clone("..")
1329
# as we are walking up directories, the path must be
1330
# growing less, except at the top
1331
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1332
or new_transport.base == root_transport.base)
1333
while new_transport.base != root_transport.base:
1334
root_transport = new_transport
1335
new_transport = root_transport.clone("..")
1336
# as we are walking up directories, the path must be
1337
# growing less, except at the top
1338
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1339
or new_transport.base == root_transport.base)
1341
# Cloning to "/" should take us to exactly the same location.
1342
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1343
# the abspath of "/" from the original transport should be the same
1344
# as the base at the root:
1345
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1347
# At the root, the URL must still end with / as its a directory
1348
self.assertEqual(root_transport.base[-1], '/')
1350
def test_clone_from_root(self):
1351
"""At the root, cloning to a simple dir should just do string append."""
1352
orig_transport = self.get_transport()
1353
root_transport = orig_transport.clone('/')
1354
self.assertEqual(root_transport.base + '.bzr/',
1355
root_transport.clone('.bzr').base)
1357
def test_base_url(self):
1358
t = self.get_transport()
1359
self.assertEqual('/', t.base[-1])
1361
def test_relpath(self):
1362
t = self.get_transport()
1363
self.assertEqual('', t.relpath(t.base))
1365
self.assertEqual('', t.relpath(t.base[:-1]))
1366
# subdirs which don't exist should still give relpaths.
1367
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1368
# trailing slash should be the same.
1369
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1371
def test_relpath_at_root(self):
1372
t = self.get_transport()
1373
# clone all the way to the top
1374
new_transport = t.clone('..')
1375
while new_transport.base != t.base:
1377
new_transport = t.clone('..')
1378
# we must be able to get a relpath below the root
1379
self.assertEqual('', t.relpath(t.base))
1380
# and a deeper one should work too
1381
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1383
def test_abspath(self):
1384
# smoke test for abspath. Corner cases for backends like unix fs's
1385
# that have aliasing problems like symlinks should go in backend
1386
# specific test cases.
1387
transport = self.get_transport()
1389
self.assertEqual(transport.base + 'relpath',
1390
transport.abspath('relpath'))
1392
# This should work without raising an error.
1393
transport.abspath("/")
1395
# the abspath of "/" and "/foo/.." should result in the same location
1396
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1398
self.assertEqual(transport.clone("/").abspath('foo'),
1399
transport.abspath("/foo"))
1401
# GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1402
def test_win32_abspath(self):
1403
# Note: we tried to set sys.platform='win32' so we could test on
1404
# other platforms too, but then osutils does platform specific
1405
# things at import time which defeated us...
1406
if sys.platform != 'win32':
1408
'Testing drive letters in abspath implemented only for win32')
1410
# smoke test for abspath on win32.
1411
# a transport based on 'file:///' never fully qualifies the drive.
1412
transport = _mod_transport.get_transport("file:///")
1413
self.assertEqual(transport.abspath("/"), "file:///")
1415
# but a transport that starts with a drive spec must keep it.
1416
transport = _mod_transport.get_transport("file:///C:/")
1417
self.assertEqual(transport.abspath("/"), "file:///C:/")
1419
def test_local_abspath(self):
1420
transport = self.get_transport()
1422
p = transport.local_abspath('.')
1423
except (errors.NotLocalUrl, TransportNotPossible), e:
1424
# should be formattable
1427
self.assertEqual(getcwd(), p)
1429
def test_abspath_at_root(self):
1430
t = self.get_transport()
1431
# clone all the way to the top
1432
new_transport = t.clone('..')
1433
while new_transport.base != t.base:
1435
new_transport = t.clone('..')
1436
# we must be able to get a abspath of the root when we ask for
1437
# t.abspath('..') - this due to our choice that clone('..')
1438
# should return the root from the root, combined with the desire that
1439
# the url from clone('..') and from abspath('..') should be the same.
1440
self.assertEqual(t.base, t.abspath('..'))
1441
# '' should give us the root
1442
self.assertEqual(t.base, t.abspath(''))
1443
# and a path should append to the url
1444
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1446
def test_iter_files_recursive(self):
1447
transport = self.get_transport()
1448
if not transport.listable():
1449
self.assertRaises(TransportNotPossible,
1450
transport.iter_files_recursive)
1452
self.build_tree(['isolated/',
1456
'isolated/dir/b%25z', # make sure quoting is correct
1458
transport=transport)
1459
paths = set(transport.iter_files_recursive())
1460
# nb the directories are not converted
1461
self.assertEqual(paths,
1462
set(['isolated/dir/foo',
1464
'isolated/dir/b%2525z',
1466
sub_transport = transport.clone('isolated')
1467
paths = set(sub_transport.iter_files_recursive())
1468
self.assertEqual(paths,
1469
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1471
def test_copy_tree(self):
1472
# TODO: test file contents and permissions are preserved. This test was
1473
# added just to ensure that quoting was handled correctly.
1474
# -- David Allouche 2006-08-11
1475
transport = self.get_transport()
1476
if not transport.listable():
1477
self.assertRaises(TransportNotPossible,
1478
transport.iter_files_recursive)
1480
if transport.is_readonly():
1482
self.build_tree(['from/',
1486
'from/dir/b%25z', # make sure quoting is correct
1488
transport=transport)
1489
transport.copy_tree('from', 'to')
1490
paths = set(transport.iter_files_recursive())
1491
self.assertEqual(paths,
1492
set(['from/dir/foo',
1501
def test_copy_tree_to_transport(self):
1502
transport = self.get_transport()
1503
if not transport.listable():
1504
self.assertRaises(TransportNotPossible,
1505
transport.iter_files_recursive)
1507
if transport.is_readonly():
1509
self.build_tree(['from/',
1513
'from/dir/b%25z', # make sure quoting is correct
1515
transport=transport)
1516
from_transport = transport.clone('from')
1517
to_transport = transport.clone('to')
1518
to_transport.ensure_base()
1519
from_transport.copy_tree_to_transport(to_transport)
1520
paths = set(transport.iter_files_recursive())
1521
self.assertEqual(paths,
1522
set(['from/dir/foo',
1531
def test_unicode_paths(self):
1532
"""Test that we can read/write files with Unicode names."""
1533
t = self.get_transport()
1535
# With FAT32 and certain encodings on win32
1536
# '\xe5' and '\xe4' actually map to the same file
1537
# adding a suffix kicks in the 'preserving but insensitive'
1538
# route, and maintains the right files
1539
files = [u'\xe5.1', # a w/ circle iso-8859-1
1540
u'\xe4.2', # a w/ dots iso-8859-1
1541
u'\u017d', # Z with umlat iso-8859-2
1542
u'\u062c', # Arabic j
1543
u'\u0410', # Russian A
1544
u'\u65e5', # Kanji person
1547
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1548
if no_unicode_support:
1549
raise tests.KnownFailure("test server cannot handle unicode paths")
1552
self.build_tree(files, transport=t, line_endings='binary')
1553
except UnicodeError:
1554
raise TestSkipped("cannot handle unicode paths in current encoding")
1556
# A plain unicode string is not a valid url
1558
self.assertRaises(InvalidURL, t.get, fname)
1561
fname_utf8 = fname.encode('utf-8')
1562
contents = 'contents of %s\n' % (fname_utf8,)
1563
self.check_transport_contents(contents, t, urlutils.escape(fname))
1565
def test_connect_twice_is_same_content(self):
1566
# check that our server (whatever it is) is accessible reliably
1567
# via get_transport and multiple connections share content.
1568
transport = self.get_transport()
1569
if transport.is_readonly():
1571
transport.put_bytes('foo', 'bar')
1572
transport3 = self.get_transport()
1573
self.check_transport_contents('bar', transport3, 'foo')
1575
# now opening at a relative url should give use a sane result:
1576
transport.mkdir('newdir')
1577
transport5 = self.get_transport('newdir')
1578
transport6 = transport5.clone('..')
1579
self.check_transport_contents('bar', transport6, 'foo')
1581
def test_lock_write(self):
1582
"""Test transport-level write locks.
1584
These are deprecated and transports may decline to support them.
1586
transport = self.get_transport()
1587
if transport.is_readonly():
1588
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1590
transport.put_bytes('lock', '')
1592
lock = transport.lock_write('lock')
1593
except TransportNotPossible:
1595
# TODO make this consistent on all platforms:
1596
# self.assertRaises(LockError, transport.lock_write, 'lock')
1599
def test_lock_read(self):
1600
"""Test transport-level read locks.
1602
These are deprecated and transports may decline to support them.
1604
transport = self.get_transport()
1605
if transport.is_readonly():
1606
file('lock', 'w').close()
1608
transport.put_bytes('lock', '')
1610
lock = transport.lock_read('lock')
1611
except TransportNotPossible:
1613
# TODO make this consistent on all platforms:
1614
# self.assertRaises(LockError, transport.lock_read, 'lock')
1617
def test_readv(self):
1618
transport = self.get_transport()
1619
if transport.is_readonly():
1620
file('a', 'w').write('0123456789')
1622
transport.put_bytes('a', '0123456789')
1624
d = list(transport.readv('a', ((0, 1),)))
1625
self.assertEqual(d[0], (0, '0'))
1627
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1628
self.assertEqual(d[0], (0, '0'))
1629
self.assertEqual(d[1], (1, '1'))
1630
self.assertEqual(d[2], (3, '34'))
1631
self.assertEqual(d[3], (9, '9'))
1633
def test_readv_out_of_order(self):
1634
transport = self.get_transport()
1635
if transport.is_readonly():
1636
file('a', 'w').write('0123456789')
1638
transport.put_bytes('a', '01234567890')
1640
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1641
self.assertEqual(d[0], (1, '1'))
1642
self.assertEqual(d[1], (9, '9'))
1643
self.assertEqual(d[2], (0, '0'))
1644
self.assertEqual(d[3], (3, '34'))
1646
def test_readv_with_adjust_for_latency(self):
1647
transport = self.get_transport()
1648
# the adjust for latency flag expands the data region returned
1649
# according to a per-transport heuristic, so testing is a little
1650
# tricky as we need more data than the largest combining that our
1651
# transports do. To accomodate this we generate random data and cross
1652
# reference the returned data with the random data. To avoid doing
1653
# multiple large random byte look ups we do several tests on the same
1655
content = osutils.rand_bytes(200*1024)
1656
content_size = len(content)
1657
if transport.is_readonly():
1658
self.build_tree_contents([('a', content)])
1660
transport.put_bytes('a', content)
1661
def check_result_data(result_vector):
1662
for item in result_vector:
1663
data_len = len(item[1])
1664
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1667
result = list(transport.readv('a', ((0, 30),),
1668
adjust_for_latency=True, upper_limit=content_size))
1669
# we expect 1 result, from 0, to something > 30
1670
self.assertEqual(1, len(result))
1671
self.assertEqual(0, result[0][0])
1672
self.assertTrue(len(result[0][1]) >= 30)
1673
check_result_data(result)
1674
# end of file corner case
1675
result = list(transport.readv('a', ((204700, 100),),
1676
adjust_for_latency=True, upper_limit=content_size))
1677
# we expect 1 result, from 204800- its length, to the end
1678
self.assertEqual(1, len(result))
1679
data_len = len(result[0][1])
1680
self.assertEqual(204800-data_len, result[0][0])
1681
self.assertTrue(data_len >= 100)
1682
check_result_data(result)
1683
# out of order ranges are made in order
1684
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1685
adjust_for_latency=True, upper_limit=content_size))
1686
# we expect 2 results, in order, start and end.
1687
self.assertEqual(2, len(result))
1689
data_len = len(result[0][1])
1690
self.assertEqual(0, result[0][0])
1691
self.assertTrue(data_len >= 30)
1693
data_len = len(result[1][1])
1694
self.assertEqual(204800-data_len, result[1][0])
1695
self.assertTrue(data_len >= 100)
1696
check_result_data(result)
1697
# close ranges get combined (even if out of order)
1698
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1699
result = list(transport.readv('a', request_vector,
1700
adjust_for_latency=True, upper_limit=content_size))
1701
self.assertEqual(1, len(result))
1702
data_len = len(result[0][1])
1703
# minimum length is from 400 to 1034 - 634
1704
self.assertTrue(data_len >= 634)
1705
# must contain the region 400 to 1034
1706
self.assertTrue(result[0][0] <= 400)
1707
self.assertTrue(result[0][0] + data_len >= 1034)
1708
check_result_data(result)
1710
def test_readv_with_adjust_for_latency_with_big_file(self):
1711
transport = self.get_transport()
1712
# test from observed failure case.
1713
if transport.is_readonly():
1714
file('a', 'w').write('a'*1024*1024)
1716
transport.put_bytes('a', 'a'*1024*1024)
1717
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1718
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1719
(465373, 800), (947422, 800)]
1720
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1721
found_items = [False]*9
1722
for pos, (start, length) in enumerate(broken_vector):
1723
# check the range is covered by the result
1724
for offset, data in results:
1725
if offset <= start and start + length <= offset + len(data):
1726
found_items[pos] = True
1727
self.assertEqual([True]*9, found_items)
1729
def test_get_with_open_write_stream_sees_all_content(self):
1730
t = self.get_transport()
1733
handle = t.open_write_stream('foo')
1736
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1740
def test_get_smart_medium(self):
1741
"""All transports must either give a smart medium, or know they can't.
1743
transport = self.get_transport()
1745
client_medium = transport.get_smart_medium()
1746
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1747
except errors.NoSmartMedium:
1748
# as long as we got it we're fine
1751
def test_readv_short_read(self):
1752
transport = self.get_transport()
1753
if transport.is_readonly():
1754
file('a', 'w').write('0123456789')
1756
transport.put_bytes('a', '01234567890')
1758
# This is intentionally reading off the end of the file
1759
# since we are sure that it cannot get there
1760
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1761
# Can be raised by paramiko
1763
transport.readv, 'a', [(1,1), (8,10)])
1765
# This is trying to seek past the end of the file, it should
1766
# also raise a special error
1767
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1768
transport.readv, 'a', [(12,2)])
1770
def test_stat_symlink(self):
1771
# if a transport points directly to a symlink (and supports symlinks
1772
# at all) you can tell this. helps with bug 32669.
1773
t = self.get_transport()
1775
t.symlink('target', 'link')
1776
except TransportNotPossible:
1777
raise TestSkipped("symlinks not supported")
1778
t2 = t.clone('link')
1780
self.assertTrue(stat.S_ISLNK(st.st_mode))