1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for Transport implementations.
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
25
from cStringIO import StringIO
26
from StringIO import StringIO as pyStringIO
37
from bzrlib.errors import (ConnectionError,
47
from bzrlib.osutils import getcwd
48
from bzrlib.smart import medium
49
from bzrlib.tests import (
55
from bzrlib.tests.test_transport import TestTransportImplementation
56
from bzrlib.transport import (
59
_get_transport_modules,
61
from bzrlib.transport.memory import MemoryTransport
64
class TransportTestProviderAdapter(TestScenarioApplier):
65
"""A tool to generate a suite testing all transports for a single test.
67
This is done by copying the test once for each transport and injecting
68
the transport_class and transport_server classes into each copy. Each copy
69
is also given a new id() to make it easy to identify.
73
self.scenarios = self._test_permutations()
75
def get_transport_test_permutations(self, module):
76
"""Get the permutations module wants to have tested."""
77
if getattr(module, 'get_test_permutations', None) is None:
79
"transport module %s doesn't provide get_test_permutations()"
82
return module.get_test_permutations()
84
def _test_permutations(self):
85
"""Return a list of the klass, server_factory pairs to test."""
87
for module in _get_transport_modules():
89
permutations = self.get_transport_test_permutations(
90
reduce(getattr, (module).split('.')[1:], __import__(module)))
91
for (klass, server_factory) in permutations:
92
scenario = (server_factory.__name__,
93
{"transport_class":klass,
94
"transport_server":server_factory})
95
result.append(scenario)
96
except errors.DependencyNotPresent, e:
97
# Continue even if a dependency prevents us
98
# from adding this test
103
def load_tests(standard_tests, module, loader):
104
"""Multiply tests for tranport implementations."""
105
result = loader.suiteClass()
106
adapter = TransportTestProviderAdapter()
107
for test in tests.iter_suite_tests(standard_tests):
108
result.addTests(adapter.adapt(test))
112
class TransportTests(TestTransportImplementation):
115
super(TransportTests, self).setUp()
116
self._captureVar('BZR_NO_SMART_VFS', None)
118
def check_transport_contents(self, content, transport, relpath):
119
"""Check that transport.get(relpath).read() == content."""
120
self.assertEqualDiff(content, transport.get(relpath).read())
122
def test_ensure_base_missing(self):
123
""".ensure_base() should create the directory if it doesn't exist"""
124
t = self.get_transport()
126
if t_a.is_readonly():
127
self.assertRaises(TransportNotPossible,
130
self.assertTrue(t_a.ensure_base())
131
self.assertTrue(t.has('a'))
133
def test_ensure_base_exists(self):
134
""".ensure_base() should just be happy if it already exists"""
135
t = self.get_transport()
141
# ensure_base returns False if it didn't create the base
142
self.assertFalse(t_a.ensure_base())
144
def test_ensure_base_missing_parent(self):
145
""".ensure_base() will fail if the parent dir doesn't exist"""
146
t = self.get_transport()
152
self.assertRaises(NoSuchFile, t_b.ensure_base)
154
def test_external_url(self):
155
""".external_url either works or raises InProcessTransport."""
156
t = self.get_transport()
159
except errors.InProcessTransport:
163
t = self.get_transport()
165
files = ['a', 'b', 'e', 'g', '%']
166
self.build_tree(files, transport=t)
167
self.assertEqual(True, t.has('a'))
168
self.assertEqual(False, t.has('c'))
169
self.assertEqual(True, t.has(urlutils.escape('%')))
170
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
171
[True, True, False, False, True, False, True, False])
172
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
173
self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
174
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
175
[True, True, False, False, True, False, True, False])
176
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
177
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
179
def test_has_root_works(self):
180
from bzrlib.smart import server
181
if self.transport_server is server.SmartTCPServer_for_testing:
182
raise TestNotApplicable(
183
"SmartTCPServer_for_testing intentionally does not allow "
185
current_transport = self.get_transport()
186
self.assertTrue(current_transport.has('/'))
187
root = current_transport.clone('/')
188
self.assertTrue(root.has(''))
191
t = self.get_transport()
193
files = ['a', 'b', 'e', 'g']
194
contents = ['contents of a\n',
199
self.build_tree(files, transport=t, line_endings='binary')
200
self.check_transport_contents('contents of a\n', t, 'a')
201
content_f = t.get_multi(files)
202
# Use itertools.izip() instead of use zip() or map(), since they fully
203
# evaluate their inputs, the transport requests should be issued and
204
# handled sequentially (we don't want to force transport to buffer).
205
for content, f in itertools.izip(contents, content_f):
206
self.assertEqual(content, f.read())
208
content_f = t.get_multi(iter(files))
209
# Use itertools.izip() for the same reason
210
for content, f in itertools.izip(contents, content_f):
211
self.assertEqual(content, f.read())
213
self.assertRaises(NoSuchFile, t.get, 'c')
214
self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
217
def test_get_directory_read_gives_ReadError(self):
218
"""consistent errors for read() on a file returned by get()."""
219
t = self.get_transport()
221
self.build_tree(['a directory/'])
223
t.mkdir('a%20directory')
224
# getting the file must either work or fail with a PathError
226
a_file = t.get('a%20directory')
227
except (errors.PathError, errors.RedirectRequested):
228
# early failure return immediately.
230
# having got a file, read() must either work (i.e. http reading a dir
231
# listing) or fail with ReadError
234
except errors.ReadError:
237
def test_get_bytes(self):
238
t = self.get_transport()
240
files = ['a', 'b', 'e', 'g']
241
contents = ['contents of a\n',
246
self.build_tree(files, transport=t, line_endings='binary')
247
self.check_transport_contents('contents of a\n', t, 'a')
249
for content, fname in zip(contents, files):
250
self.assertEqual(content, t.get_bytes(fname))
252
self.assertRaises(NoSuchFile, t.get_bytes, 'c')
254
def test_get_with_open_write_stream_sees_all_content(self):
255
t = self.get_transport()
258
handle = t.open_write_stream('foo')
261
self.assertEqual('b', t.get('foo').read())
265
def test_get_bytes_with_open_write_stream_sees_all_content(self):
266
t = self.get_transport()
269
handle = t.open_write_stream('foo')
272
self.assertEqual('b', t.get_bytes('foo'))
273
self.assertEqual('b', t.get('foo').read())
277
def test_put_bytes(self):
278
t = self.get_transport()
281
self.assertRaises(TransportNotPossible,
282
t.put_bytes, 'a', 'some text for a\n')
285
t.put_bytes('a', 'some text for a\n')
286
self.failUnless(t.has('a'))
287
self.check_transport_contents('some text for a\n', t, 'a')
289
# The contents should be overwritten
290
t.put_bytes('a', 'new text for a\n')
291
self.check_transport_contents('new text for a\n', t, 'a')
293
self.assertRaises(NoSuchFile,
294
t.put_bytes, 'path/doesnt/exist/c', 'contents')
296
def test_put_bytes_non_atomic(self):
297
t = self.get_transport()
300
self.assertRaises(TransportNotPossible,
301
t.put_bytes_non_atomic, 'a', 'some text for a\n')
304
self.failIf(t.has('a'))
305
t.put_bytes_non_atomic('a', 'some text for a\n')
306
self.failUnless(t.has('a'))
307
self.check_transport_contents('some text for a\n', t, 'a')
308
# Put also replaces contents
309
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
310
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
312
# Make sure we can create another file
313
t.put_bytes_non_atomic('d', 'contents for\nd\n')
314
# And overwrite 'a' with empty contents
315
t.put_bytes_non_atomic('a', '')
316
self.check_transport_contents('contents for\nd\n', t, 'd')
317
self.check_transport_contents('', t, 'a')
319
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
321
# Now test the create_parent flag
322
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
324
self.failIf(t.has('dir/a'))
325
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
326
create_parent_dir=True)
327
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
329
# But we still get NoSuchFile if we can't make the parent dir
330
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
332
create_parent_dir=True)
334
def test_put_bytes_permissions(self):
335
t = self.get_transport()
339
if not t._can_roundtrip_unix_modebits():
340
# Can't roundtrip, so no need to run this test
342
t.put_bytes('mode644', 'test text\n', mode=0644)
343
self.assertTransportMode(t, 'mode644', 0644)
344
t.put_bytes('mode666', 'test text\n', mode=0666)
345
self.assertTransportMode(t, 'mode666', 0666)
346
t.put_bytes('mode600', 'test text\n', mode=0600)
347
self.assertTransportMode(t, 'mode600', 0600)
348
# Yes, you can put_bytes a file such that it becomes readonly
349
t.put_bytes('mode400', 'test text\n', mode=0400)
350
self.assertTransportMode(t, 'mode400', 0400)
352
# The default permissions should be based on the current umask
353
umask = osutils.get_umask()
354
t.put_bytes('nomode', 'test text\n', mode=None)
355
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
357
def test_put_bytes_non_atomic_permissions(self):
358
t = self.get_transport()
362
if not t._can_roundtrip_unix_modebits():
363
# Can't roundtrip, so no need to run this test
365
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
366
self.assertTransportMode(t, 'mode644', 0644)
367
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
368
self.assertTransportMode(t, 'mode666', 0666)
369
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
370
self.assertTransportMode(t, 'mode600', 0600)
371
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
372
self.assertTransportMode(t, 'mode400', 0400)
374
# The default permissions should be based on the current umask
375
umask = osutils.get_umask()
376
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
377
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
379
# We should also be able to set the mode for a parent directory
381
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
382
dir_mode=0700, create_parent_dir=True)
383
self.assertTransportMode(t, 'dir700', 0700)
384
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
385
dir_mode=0770, create_parent_dir=True)
386
self.assertTransportMode(t, 'dir770', 0770)
387
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
388
dir_mode=0777, create_parent_dir=True)
389
self.assertTransportMode(t, 'dir777', 0777)
391
def test_put_file(self):
392
t = self.get_transport()
395
self.assertRaises(TransportNotPossible,
396
t.put_file, 'a', StringIO('some text for a\n'))
399
result = t.put_file('a', StringIO('some text for a\n'))
400
# put_file returns the length of the data written
401
self.assertEqual(16, result)
402
self.failUnless(t.has('a'))
403
self.check_transport_contents('some text for a\n', t, 'a')
404
# Put also replaces contents
405
result = t.put_file('a', StringIO('new\ncontents for\na\n'))
406
self.assertEqual(19, result)
407
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
408
self.assertRaises(NoSuchFile,
409
t.put_file, 'path/doesnt/exist/c',
410
StringIO('contents'))
412
def test_put_file_non_atomic(self):
413
t = self.get_transport()
416
self.assertRaises(TransportNotPossible,
417
t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
420
self.failIf(t.has('a'))
421
t.put_file_non_atomic('a', StringIO('some text for a\n'))
422
self.failUnless(t.has('a'))
423
self.check_transport_contents('some text for a\n', t, 'a')
424
# Put also replaces contents
425
t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
426
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
428
# Make sure we can create another file
429
t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
430
# And overwrite 'a' with empty contents
431
t.put_file_non_atomic('a', StringIO(''))
432
self.check_transport_contents('contents for\nd\n', t, 'd')
433
self.check_transport_contents('', t, 'a')
435
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
436
StringIO('contents\n'))
437
# Now test the create_parent flag
438
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
439
StringIO('contents\n'))
440
self.failIf(t.has('dir/a'))
441
t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
442
create_parent_dir=True)
443
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
445
# But we still get NoSuchFile if we can't make the parent dir
446
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
447
StringIO('contents\n'),
448
create_parent_dir=True)
450
def test_put_file_permissions(self):
452
t = self.get_transport()
456
if not t._can_roundtrip_unix_modebits():
457
# Can't roundtrip, so no need to run this test
459
t.put_file('mode644', StringIO('test text\n'), mode=0644)
460
self.assertTransportMode(t, 'mode644', 0644)
461
t.put_file('mode666', StringIO('test text\n'), mode=0666)
462
self.assertTransportMode(t, 'mode666', 0666)
463
t.put_file('mode600', StringIO('test text\n'), mode=0600)
464
self.assertTransportMode(t, 'mode600', 0600)
465
# Yes, you can put a file such that it becomes readonly
466
t.put_file('mode400', StringIO('test text\n'), mode=0400)
467
self.assertTransportMode(t, 'mode400', 0400)
468
# The default permissions should be based on the current umask
469
umask = osutils.get_umask()
470
t.put_file('nomode', StringIO('test text\n'), mode=None)
471
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
473
def test_put_file_non_atomic_permissions(self):
474
t = self.get_transport()
478
if not t._can_roundtrip_unix_modebits():
479
# Can't roundtrip, so no need to run this test
481
t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
482
self.assertTransportMode(t, 'mode644', 0644)
483
t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
484
self.assertTransportMode(t, 'mode666', 0666)
485
t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
486
self.assertTransportMode(t, 'mode600', 0600)
487
# Yes, you can put_file_non_atomic a file such that it becomes readonly
488
t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
489
self.assertTransportMode(t, 'mode400', 0400)
491
# The default permissions should be based on the current umask
492
umask = osutils.get_umask()
493
t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
494
self.assertTransportMode(t, 'nomode', 0666 & ~umask)
496
# We should also be able to set the mode for a parent directory
499
t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
500
dir_mode=0700, create_parent_dir=True)
501
self.assertTransportMode(t, 'dir700', 0700)
502
t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
503
dir_mode=0770, create_parent_dir=True)
504
self.assertTransportMode(t, 'dir770', 0770)
505
t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
506
dir_mode=0777, create_parent_dir=True)
507
self.assertTransportMode(t, 'dir777', 0777)
509
def test_put_bytes_unicode(self):
510
# Expect put_bytes to raise AssertionError or UnicodeEncodeError if
511
# given unicode "bytes". UnicodeEncodeError doesn't really make sense
512
# (we don't want to encode unicode here at all, callers should be
513
# strictly passing bytes to put_bytes), but we allow it for backwards
514
# compatibility. At some point we should use a specific exception.
515
# See https://bugs.launchpad.net/bzr/+bug/106898.
516
t = self.get_transport()
519
unicode_string = u'\u1234'
521
(AssertionError, UnicodeEncodeError),
522
t.put_bytes, 'foo', unicode_string)
524
def test_put_file_unicode(self):
525
# Like put_bytes, except with a StringIO.StringIO of a unicode string.
526
# This situation can happen (and has) if code is careless about the type
527
# of "string" they initialise/write to a StringIO with. We cannot use
528
# cStringIO, because it never returns unicode from read.
529
# Like put_bytes, UnicodeEncodeError isn't quite the right exception to
530
# raise, but we raise it for hysterical raisins.
531
t = self.get_transport()
534
unicode_file = pyStringIO(u'\u1234')
535
self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
537
def test_mkdir(self):
538
t = self.get_transport()
541
# cannot mkdir on readonly transports. We're not testing for
542
# cache coherency because cache behaviour is not currently
543
# defined for the transport interface.
544
self.assertRaises(TransportNotPossible, t.mkdir, '.')
545
self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
546
self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
547
self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
551
self.assertEqual(t.has('dir_a'), True)
552
self.assertEqual(t.has('dir_b'), False)
555
self.assertEqual(t.has('dir_b'), True)
557
t.mkdir_multi(['dir_c', 'dir_d'])
559
t.mkdir_multi(iter(['dir_e', 'dir_f']))
560
self.assertEqual(list(t.has_multi(
561
['dir_a', 'dir_b', 'dir_c', 'dir_q',
562
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
563
[True, True, True, False,
564
True, True, True, True])
566
# we were testing that a local mkdir followed by a transport
567
# mkdir failed thusly, but given that we * in one process * do not
568
# concurrently fiddle with disk dirs and then use transport to do
569
# things, the win here seems marginal compared to the constraint on
570
# the interface. RBC 20051227
572
self.assertRaises(FileExists, t.mkdir, 'dir_g')
574
# Test get/put in sub-directories
575
t.put_bytes('dir_a/a', 'contents of dir_a/a')
576
t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
577
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
578
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
580
# mkdir of a dir with an absent parent
581
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
583
def test_mkdir_permissions(self):
584
t = self.get_transport()
587
if not t._can_roundtrip_unix_modebits():
588
# no sense testing on this transport
590
# Test mkdir with a mode
591
t.mkdir('dmode755', mode=0755)
592
self.assertTransportMode(t, 'dmode755', 0755)
593
t.mkdir('dmode555', mode=0555)
594
self.assertTransportMode(t, 'dmode555', 0555)
595
t.mkdir('dmode777', mode=0777)
596
self.assertTransportMode(t, 'dmode777', 0777)
597
t.mkdir('dmode700', mode=0700)
598
self.assertTransportMode(t, 'dmode700', 0700)
599
t.mkdir_multi(['mdmode755'], mode=0755)
600
self.assertTransportMode(t, 'mdmode755', 0755)
602
# Default mode should be based on umask
603
umask = osutils.get_umask()
604
t.mkdir('dnomode', mode=None)
605
self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
607
def test_opening_a_file_stream_creates_file(self):
608
t = self.get_transport()
611
handle = t.open_write_stream('foo')
613
self.assertEqual('', t.get_bytes('foo'))
617
def test_opening_a_file_stream_can_set_mode(self):
618
t = self.get_transport()
621
if not t._can_roundtrip_unix_modebits():
622
# Can't roundtrip, so no need to run this test
624
def check_mode(name, mode, expected):
625
handle = t.open_write_stream(name, mode=mode)
627
self.assertTransportMode(t, name, expected)
628
check_mode('mode644', 0644, 0644)
629
check_mode('mode666', 0666, 0666)
630
check_mode('mode600', 0600, 0600)
631
# The default permissions should be based on the current umask
632
check_mode('nomode', None, 0666 & ~osutils.get_umask())
634
def test_copy_to(self):
635
# FIXME: test: same server to same server (partly done)
636
# same protocol two servers
637
# and different protocols (done for now except for MemoryTransport.
640
def simple_copy_files(transport_from, transport_to):
641
files = ['a', 'b', 'c', 'd']
642
self.build_tree(files, transport=transport_from)
643
self.assertEqual(4, transport_from.copy_to(files, transport_to))
645
self.check_transport_contents(transport_to.get(f).read(),
648
t = self.get_transport()
649
temp_transport = MemoryTransport('memory:///')
650
simple_copy_files(t, temp_transport)
651
if not t.is_readonly():
652
t.mkdir('copy_to_simple')
653
t2 = t.clone('copy_to_simple')
654
simple_copy_files(t, t2)
657
# Test that copying into a missing directory raises
660
self.build_tree(['e/', 'e/f'])
663
t.put_bytes('e/f', 'contents of e')
664
self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
665
temp_transport.mkdir('e')
666
t.copy_to(['e/f'], temp_transport)
669
temp_transport = MemoryTransport('memory:///')
671
files = ['a', 'b', 'c', 'd']
672
t.copy_to(iter(files), temp_transport)
674
self.check_transport_contents(temp_transport.get(f).read(),
678
for mode in (0666, 0644, 0600, 0400):
679
temp_transport = MemoryTransport("memory:///")
680
t.copy_to(files, temp_transport, mode=mode)
682
self.assertTransportMode(temp_transport, f, mode)
684
def test_append_file(self):
685
t = self.get_transport()
688
self.assertRaises(TransportNotPossible,
689
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
691
t.put_bytes('a', 'diff\ncontents for\na\n')
692
t.put_bytes('b', 'contents\nfor b\n')
695
t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
697
self.check_transport_contents(
698
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
701
# a file with no parent should fail..
702
self.assertRaises(NoSuchFile,
703
t.append_file, 'missing/path', StringIO('content'))
705
# And we can create new files, too
707
t.append_file('c', StringIO('some text\nfor a missing file\n')))
708
self.check_transport_contents('some text\nfor a missing file\n',
711
def test_append_bytes(self):
712
t = self.get_transport()
715
self.assertRaises(TransportNotPossible,
716
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
719
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
720
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
723
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
725
self.check_transport_contents(
726
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
729
# a file with no parent should fail..
730
self.assertRaises(NoSuchFile,
731
t.append_bytes, 'missing/path', 'content')
733
def test_append_multi(self):
734
t = self.get_transport()
738
t.put_bytes('a', 'diff\ncontents for\na\n'
739
'add\nsome\nmore\ncontents\n')
740
t.put_bytes('b', 'contents\nfor b\n')
742
self.assertEqual((43, 15),
743
t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
744
('b', StringIO('some\nmore\nfor\nb\n'))]))
746
self.check_transport_contents(
747
'diff\ncontents for\na\n'
748
'add\nsome\nmore\ncontents\n'
749
'and\nthen\nsome\nmore\n',
751
self.check_transport_contents(
753
'some\nmore\nfor\nb\n',
756
self.assertEqual((62, 31),
757
t.append_multi(iter([('a', StringIO('a little bit more\n')),
758
('b', StringIO('from an iterator\n'))])))
759
self.check_transport_contents(
760
'diff\ncontents for\na\n'
761
'add\nsome\nmore\ncontents\n'
762
'and\nthen\nsome\nmore\n'
763
'a little bit more\n',
765
self.check_transport_contents(
767
'some\nmore\nfor\nb\n'
768
'from an iterator\n',
771
self.assertEqual((80, 0),
772
t.append_multi([('a', StringIO('some text in a\n')),
773
('d', StringIO('missing file r\n'))]))
775
self.check_transport_contents(
776
'diff\ncontents for\na\n'
777
'add\nsome\nmore\ncontents\n'
778
'and\nthen\nsome\nmore\n'
779
'a little bit more\n'
782
self.check_transport_contents('missing file r\n', t, 'd')
784
def test_append_file_mode(self):
785
"""Check that append accepts a mode parameter"""
786
# check append accepts a mode
787
t = self.get_transport()
789
self.assertRaises(TransportNotPossible,
790
t.append_file, 'f', StringIO('f'), mode=None)
792
t.append_file('f', StringIO('f'), mode=None)
794
def test_append_bytes_mode(self):
795
# check append_bytes accepts a mode
796
t = self.get_transport()
798
self.assertRaises(TransportNotPossible,
799
t.append_bytes, 'f', 'f', mode=None)
801
t.append_bytes('f', 'f', mode=None)
803
def test_delete(self):
804
# TODO: Test Transport.delete
805
t = self.get_transport()
807
# Not much to do with a readonly transport
809
self.assertRaises(TransportNotPossible, t.delete, 'missing')
812
t.put_bytes('a', 'a little bit of text\n')
813
self.failUnless(t.has('a'))
815
self.failIf(t.has('a'))
817
self.assertRaises(NoSuchFile, t.delete, 'a')
819
t.put_bytes('a', 'a text\n')
820
t.put_bytes('b', 'b text\n')
821
t.put_bytes('c', 'c text\n')
822
self.assertEqual([True, True, True],
823
list(t.has_multi(['a', 'b', 'c'])))
824
t.delete_multi(['a', 'c'])
825
self.assertEqual([False, True, False],
826
list(t.has_multi(['a', 'b', 'c'])))
827
self.failIf(t.has('a'))
828
self.failUnless(t.has('b'))
829
self.failIf(t.has('c'))
831
self.assertRaises(NoSuchFile,
832
t.delete_multi, ['a', 'b', 'c'])
834
self.assertRaises(NoSuchFile,
835
t.delete_multi, iter(['a', 'b', 'c']))
837
t.put_bytes('a', 'another a text\n')
838
t.put_bytes('c', 'another c text\n')
839
t.delete_multi(iter(['a', 'b', 'c']))
841
# We should have deleted everything
842
# SftpServer creates control files in the
843
# working directory, so we can just do a
845
# self.assertEqual([], os.listdir('.'))
847
def test_recommended_page_size(self):
848
"""Transports recommend a page size for partial access to files."""
849
t = self.get_transport()
850
self.assertIsInstance(t.recommended_page_size(), int)
852
def test_rmdir(self):
853
t = self.get_transport()
854
# Not much to do with a readonly transport
856
self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
861
# ftp may not be able to raise NoSuchFile for lack of
862
# details when failing
863
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
865
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
867
def test_rmdir_not_empty(self):
868
"""Deleting a non-empty directory raises an exception
870
sftp (and possibly others) don't give us a specific "directory not
871
empty" exception -- we can just see that the operation failed.
873
t = self.get_transport()
878
self.assertRaises(PathError, t.rmdir, 'adir')
880
def test_rmdir_empty_but_similar_prefix(self):
881
"""rmdir does not get confused by sibling paths.
883
A naive implementation of MemoryTransport would refuse to rmdir
884
".bzr/branch" if there is a ".bzr/branch-format" directory, because it
885
uses "path.startswith(dir)" on all file paths to determine if directory
888
t = self.get_transport()
892
t.put_bytes('foo-bar', '')
895
self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
896
self.failUnless(t.has('foo-bar'))
898
def test_rename_dir_succeeds(self):
899
t = self.get_transport()
901
raise TestSkipped("transport is readonly")
903
t.mkdir('adir/asubdir')
904
t.rename('adir', 'bdir')
905
self.assertTrue(t.has('bdir/asubdir'))
906
self.assertFalse(t.has('adir'))
908
def test_rename_dir_nonempty(self):
909
"""Attempting to replace a nonemtpy directory should fail"""
910
t = self.get_transport()
912
raise TestSkipped("transport is readonly")
914
t.mkdir('adir/asubdir')
916
t.mkdir('bdir/bsubdir')
917
# any kind of PathError would be OK, though we normally expect
919
self.assertRaises(PathError, t.rename, 'bdir', 'adir')
920
# nothing was changed so it should still be as before
921
self.assertTrue(t.has('bdir/bsubdir'))
922
self.assertFalse(t.has('adir/bdir'))
923
self.assertFalse(t.has('adir/bsubdir'))
925
def test_rename_across_subdirs(self):
926
t = self.get_transport()
928
raise TestNotApplicable("transport is readonly")
933
ta.put_bytes('f', 'aoeu')
934
ta.rename('f', '../b/f')
935
self.assertTrue(tb.has('f'))
936
self.assertFalse(ta.has('f'))
937
self.assertTrue(t.has('b/f'))
939
def test_delete_tree(self):
940
t = self.get_transport()
942
# Not much to do with a readonly transport
944
self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
947
# and does it like listing ?
950
t.delete_tree('adir')
951
except TransportNotPossible:
952
# ok, this transport does not support delete_tree
955
# did it delete that trivial case?
956
self.assertRaises(NoSuchFile, t.stat, 'adir')
958
self.build_tree(['adir/',
966
t.delete_tree('adir')
967
# adir should be gone now.
968
self.assertRaises(NoSuchFile, t.stat, 'adir')
971
t = self.get_transport()
976
# TODO: I would like to use os.listdir() to
977
# make sure there are no extra files, but SftpServer
978
# creates control files in the working directory
979
# perhaps all of this could be done in a subdirectory
981
t.put_bytes('a', 'a first file\n')
982
self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
985
self.failUnless(t.has('b'))
986
self.failIf(t.has('a'))
988
self.check_transport_contents('a first file\n', t, 'b')
989
self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
992
t.put_bytes('c', 'c this file\n')
994
self.failIf(t.has('c'))
995
self.check_transport_contents('c this file\n', t, 'b')
997
# TODO: Try to write a test for atomicity
998
# TODO: Test moving into a non-existent subdirectory
999
# TODO: Test Transport.move_multi
1001
def test_copy(self):
1002
t = self.get_transport()
1007
t.put_bytes('a', 'a file\n')
1009
self.check_transport_contents('a file\n', t, 'b')
1011
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1013
# What should the assert be if you try to copy a
1014
# file over a directory?
1015
#self.assertRaises(Something, t.copy, 'a', 'c')
1016
t.put_bytes('d', 'text in d\n')
1018
self.check_transport_contents('text in d\n', t, 'b')
1020
# TODO: test copy_multi
1022
def test_connection_error(self):
1023
"""ConnectionError is raised when connection is impossible.
1025
The error should be raised from the first operation on the transport.
1028
url = self._server.get_bogus_url()
1029
except NotImplementedError:
1030
raise TestSkipped("Transport %s has no bogus URL support." %
1031
self._server.__class__)
1032
t = get_transport(url)
1033
self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1035
def test_stat(self):
1036
# TODO: Test stat, just try once, and if it throws, stop testing
1037
from stat import S_ISDIR, S_ISREG
1039
t = self.get_transport()
1043
except TransportNotPossible, e:
1044
# This transport cannot stat
1047
paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1048
sizes = [14, 0, 16, 0, 18]
1049
self.build_tree(paths, transport=t, line_endings='binary')
1051
for path, size in zip(paths, sizes):
1053
if path.endswith('/'):
1054
self.failUnless(S_ISDIR(st.st_mode))
1055
# directory sizes are meaningless
1057
self.failUnless(S_ISREG(st.st_mode))
1058
self.assertEqual(size, st.st_size)
1060
remote_stats = list(t.stat_multi(paths))
1061
remote_iter_stats = list(t.stat_multi(iter(paths)))
1063
self.assertRaises(NoSuchFile, t.stat, 'q')
1064
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1066
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1067
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1068
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1069
subdir = t.clone('subdir')
1070
subdir.stat('./file')
1073
def test_list_dir(self):
1074
# TODO: Test list_dir, just try once, and if it throws, stop testing
1075
t = self.get_transport()
1077
if not t.listable():
1078
self.assertRaises(TransportNotPossible, t.list_dir, '.')
1081
def sorted_list(d, transport):
1082
l = list(transport.list_dir(d))
1086
self.assertEqual([], sorted_list('.', t))
1087
# c2 is precisely one letter longer than c here to test that
1088
# suffixing is not confused.
1089
# a%25b checks that quoting is done consistently across transports
1090
tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
1092
if not t.is_readonly():
1093
self.build_tree(tree_names, transport=t)
1095
self.build_tree(tree_names)
1098
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
1100
['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
1101
self.assertEqual(['d', 'e'], sorted_list('c', t))
1103
# Cloning the transport produces an equivalent listing
1104
self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1106
if not t.is_readonly():
1113
self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
1114
self.assertEqual(['e'], sorted_list('c', t))
1116
self.assertListRaises(PathError, t.list_dir, 'q')
1117
self.assertListRaises(PathError, t.list_dir, 'c/f')
1118
self.assertListRaises(PathError, t.list_dir, 'a')
1120
def test_list_dir_result_is_url_escaped(self):
1121
t = self.get_transport()
1122
if not t.listable():
1123
raise TestSkipped("transport not listable")
1125
if not t.is_readonly():
1126
self.build_tree(['a/', 'a/%'], transport=t)
1128
self.build_tree(['a/', 'a/%'])
1130
names = list(t.list_dir('a'))
1131
self.assertEqual(['%25'], names)
1132
self.assertIsInstance(names[0], str)
1134
def test_clone_preserve_info(self):
1135
t1 = self.get_transport()
1136
if not isinstance(t1, ConnectedTransport):
1137
raise TestSkipped("not a connected transport")
1139
t2 = t1.clone('subdir')
1140
self.assertEquals(t1._scheme, t2._scheme)
1141
self.assertEquals(t1._user, t2._user)
1142
self.assertEquals(t1._password, t2._password)
1143
self.assertEquals(t1._host, t2._host)
1144
self.assertEquals(t1._port, t2._port)
1146
def test__reuse_for(self):
1147
t = self.get_transport()
1148
if not isinstance(t, ConnectedTransport):
1149
raise TestSkipped("not a connected transport")
1151
def new_url(scheme=None, user=None, password=None,
1152
host=None, port=None, path=None):
1153
"""Build a new url from t.base changing only parts of it.
1155
Only the parameters different from None will be changed.
1157
if scheme is None: scheme = t._scheme
1158
if user is None: user = t._user
1159
if password is None: password = t._password
1160
if user is None: user = t._user
1161
if host is None: host = t._host
1162
if port is None: port = t._port
1163
if path is None: path = t._path
1164
return t._unsplit_url(scheme, user, password, host, port, path)
1166
if t._scheme == 'ftp':
1170
self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1175
self.assertIsNot(t, t._reuse_for(new_url(user=user)))
1176
# passwords are not taken into account because:
1177
# - it makes no sense to have two different valid passwords for the
1179
# - _password in ConnectedTransport is intended to collect what the
1180
# user specified from the command-line and there are cases where the
1181
# new url can contain no password (if the url was built from an
1182
# existing transport.base for example)
1183
# - password are considered part of the credentials provided at
1184
# connection creation time and as such may not be present in the url
1185
# (they may be typed by the user when prompted for example)
1186
self.assertIs(t, t._reuse_for(new_url(password='from space')))
1187
# We will not connect, we can use a invalid host
1188
self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1193
self.assertIsNot(t, t._reuse_for(new_url(port=port)))
1194
# No point in trying to reuse a transport for a local URL
1195
self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1197
def test_connection_sharing(self):
1198
t = self.get_transport()
1199
if not isinstance(t, ConnectedTransport):
1200
raise TestSkipped("not a connected transport")
1202
c = t.clone('subdir')
1203
# Some transports will create the connection only when needed
1204
t.has('surely_not') # Force connection
1205
self.assertIs(t._get_connection(), c._get_connection())
1207
# Temporary failure, we need to create a new dummy connection
1208
new_connection = object()
1209
t._set_connection(new_connection)
1210
# Check that both transports use the same connection
1211
self.assertIs(new_connection, t._get_connection())
1212
self.assertIs(new_connection, c._get_connection())
1214
def test_reuse_connection_for_various_paths(self):
1215
t = self.get_transport()
1216
if not isinstance(t, ConnectedTransport):
1217
raise TestSkipped("not a connected transport")
1219
t.has('surely_not') # Force connection
1220
self.assertIsNot(None, t._get_connection())
1222
subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1223
self.assertIsNot(t, subdir)
1224
self.assertIs(t._get_connection(), subdir._get_connection())
1226
home = subdir._reuse_for(t.base + 'home')
1227
self.assertIs(t._get_connection(), home._get_connection())
1228
self.assertIs(subdir._get_connection(), home._get_connection())
1230
def test_clone(self):
1231
# TODO: Test that clone moves up and down the filesystem
1232
t1 = self.get_transport()
1234
self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1236
self.failUnless(t1.has('a'))
1237
self.failUnless(t1.has('b/c'))
1238
self.failIf(t1.has('c'))
1241
self.assertEqual(t1.base + 'b/', t2.base)
1243
self.failUnless(t2.has('c'))
1244
self.failIf(t2.has('a'))
1247
self.failUnless(t3.has('a'))
1248
self.failIf(t3.has('c'))
1250
self.failIf(t1.has('b/d'))
1251
self.failIf(t2.has('d'))
1252
self.failIf(t3.has('b/d'))
1254
if t1.is_readonly():
1255
self.build_tree_contents([('b/d', 'newfile\n')])
1257
t2.put_bytes('d', 'newfile\n')
1259
self.failUnless(t1.has('b/d'))
1260
self.failUnless(t2.has('d'))
1261
self.failUnless(t3.has('b/d'))
1263
def test_clone_to_root(self):
1264
orig_transport = self.get_transport()
1265
# Repeatedly go up to a parent directory until we're at the root
1266
# directory of this transport
1267
root_transport = orig_transport
1268
new_transport = root_transport.clone("..")
1269
# as we are walking up directories, the path must be
1270
# growing less, except at the top
1271
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1272
or new_transport.base == root_transport.base)
1273
while new_transport.base != root_transport.base:
1274
root_transport = new_transport
1275
new_transport = root_transport.clone("..")
1276
# as we are walking up directories, the path must be
1277
# growing less, except at the top
1278
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1279
or new_transport.base == root_transport.base)
1281
# Cloning to "/" should take us to exactly the same location.
1282
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1283
# the abspath of "/" from the original transport should be the same
1284
# as the base at the root:
1285
self.assertEqual(orig_transport.abspath("/"), root_transport.base)
1287
# At the root, the URL must still end with / as its a directory
1288
self.assertEqual(root_transport.base[-1], '/')
1290
def test_clone_from_root(self):
1291
"""At the root, cloning to a simple dir should just do string append."""
1292
orig_transport = self.get_transport()
1293
root_transport = orig_transport.clone('/')
1294
self.assertEqual(root_transport.base + '.bzr/',
1295
root_transport.clone('.bzr').base)
1297
def test_base_url(self):
1298
t = self.get_transport()
1299
self.assertEqual('/', t.base[-1])
1301
def test_relpath(self):
1302
t = self.get_transport()
1303
self.assertEqual('', t.relpath(t.base))
1305
self.assertEqual('', t.relpath(t.base[:-1]))
1306
# subdirs which don't exist should still give relpaths.
1307
self.assertEqual('foo', t.relpath(t.base + 'foo'))
1308
# trailing slash should be the same.
1309
self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1311
def test_relpath_at_root(self):
1312
t = self.get_transport()
1313
# clone all the way to the top
1314
new_transport = t.clone('..')
1315
while new_transport.base != t.base:
1317
new_transport = t.clone('..')
1318
# we must be able to get a relpath below the root
1319
self.assertEqual('', t.relpath(t.base))
1320
# and a deeper one should work too
1321
self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
1323
def test_abspath(self):
1324
# smoke test for abspath. Corner cases for backends like unix fs's
1325
# that have aliasing problems like symlinks should go in backend
1326
# specific test cases.
1327
transport = self.get_transport()
1329
self.assertEqual(transport.base + 'relpath',
1330
transport.abspath('relpath'))
1332
# This should work without raising an error.
1333
transport.abspath("/")
1335
# the abspath of "/" and "/foo/.." should result in the same location
1336
self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
1338
self.assertEqual(transport.clone("/").abspath('foo'),
1339
transport.abspath("/foo"))
1341
def test_win32_abspath(self):
1342
# Note: we tried to set sys.platform='win32' so we could test on
1343
# other platforms too, but then osutils does platform specific
1344
# things at import time which defeated us...
1345
if sys.platform != 'win32':
1347
'Testing drive letters in abspath implemented only for win32')
1349
# smoke test for abspath on win32.
1350
# a transport based on 'file:///' never fully qualifies the drive.
1351
transport = get_transport("file:///")
1352
self.failUnlessEqual(transport.abspath("/"), "file:///")
1354
# but a transport that starts with a drive spec must keep it.
1355
transport = get_transport("file:///C:/")
1356
self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1358
def test_local_abspath(self):
1359
transport = self.get_transport()
1361
p = transport.local_abspath('.')
1362
except (errors.NotLocalUrl, TransportNotPossible), e:
1363
# should be formattable
1366
self.assertEqual(getcwd(), p)
1368
def test_abspath_at_root(self):
1369
t = self.get_transport()
1370
# clone all the way to the top
1371
new_transport = t.clone('..')
1372
while new_transport.base != t.base:
1374
new_transport = t.clone('..')
1375
# we must be able to get a abspath of the root when we ask for
1376
# t.abspath('..') - this due to our choice that clone('..')
1377
# should return the root from the root, combined with the desire that
1378
# the url from clone('..') and from abspath('..') should be the same.
1379
self.assertEqual(t.base, t.abspath('..'))
1380
# '' should give us the root
1381
self.assertEqual(t.base, t.abspath(''))
1382
# and a path should append to the url
1383
self.assertEqual(t.base + 'foo', t.abspath('foo'))
1385
def test_iter_files_recursive(self):
1386
transport = self.get_transport()
1387
if not transport.listable():
1388
self.assertRaises(TransportNotPossible,
1389
transport.iter_files_recursive)
1391
self.build_tree(['isolated/',
1395
'isolated/dir/b%25z', # make sure quoting is correct
1397
transport=transport)
1398
paths = set(transport.iter_files_recursive())
1399
# nb the directories are not converted
1400
self.assertEqual(paths,
1401
set(['isolated/dir/foo',
1403
'isolated/dir/b%2525z',
1405
sub_transport = transport.clone('isolated')
1406
paths = set(sub_transport.iter_files_recursive())
1407
self.assertEqual(paths,
1408
set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1410
def test_copy_tree(self):
1411
# TODO: test file contents and permissions are preserved. This test was
1412
# added just to ensure that quoting was handled correctly.
1413
# -- David Allouche 2006-08-11
1414
transport = self.get_transport()
1415
if not transport.listable():
1416
self.assertRaises(TransportNotPossible,
1417
transport.iter_files_recursive)
1419
if transport.is_readonly():
1421
self.build_tree(['from/',
1425
'from/dir/b%25z', # make sure quoting is correct
1427
transport=transport)
1428
transport.copy_tree('from', 'to')
1429
paths = set(transport.iter_files_recursive())
1430
self.assertEqual(paths,
1431
set(['from/dir/foo',
1440
def test_unicode_paths(self):
1441
"""Test that we can read/write files with Unicode names."""
1442
t = self.get_transport()
1444
# With FAT32 and certain encodings on win32
1445
# '\xe5' and '\xe4' actually map to the same file
1446
# adding a suffix kicks in the 'preserving but insensitive'
1447
# route, and maintains the right files
1448
files = [u'\xe5.1', # a w/ circle iso-8859-1
1449
u'\xe4.2', # a w/ dots iso-8859-1
1450
u'\u017d', # Z with umlat iso-8859-2
1451
u'\u062c', # Arabic j
1452
u'\u0410', # Russian A
1453
u'\u65e5', # Kanji person
1457
self.build_tree(files, transport=t, line_endings='binary')
1458
except UnicodeError:
1459
raise TestSkipped("cannot handle unicode paths in current encoding")
1461
# A plain unicode string is not a valid url
1463
self.assertRaises(InvalidURL, t.get, fname)
1466
fname_utf8 = fname.encode('utf-8')
1467
contents = 'contents of %s\n' % (fname_utf8,)
1468
self.check_transport_contents(contents, t, urlutils.escape(fname))
1470
def test_connect_twice_is_same_content(self):
1471
# check that our server (whatever it is) is accessible reliably
1472
# via get_transport and multiple connections share content.
1473
transport = self.get_transport()
1474
if transport.is_readonly():
1476
transport.put_bytes('foo', 'bar')
1477
transport3 = self.get_transport()
1478
self.check_transport_contents('bar', transport3, 'foo')
1479
# its base should be usable.
1480
transport4 = get_transport(transport.base)
1481
self.check_transport_contents('bar', transport4, 'foo')
1483
# now opening at a relative url should give use a sane result:
1484
transport.mkdir('newdir')
1485
transport5 = get_transport(transport.base + "newdir")
1486
transport6 = transport5.clone('..')
1487
self.check_transport_contents('bar', transport6, 'foo')
1489
def test_lock_write(self):
1490
"""Test transport-level write locks.
1492
These are deprecated and transports may decline to support them.
1494
transport = self.get_transport()
1495
if transport.is_readonly():
1496
self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1498
transport.put_bytes('lock', '')
1500
lock = transport.lock_write('lock')
1501
except TransportNotPossible:
1503
# TODO make this consistent on all platforms:
1504
# self.assertRaises(LockError, transport.lock_write, 'lock')
1507
def test_lock_read(self):
1508
"""Test transport-level read locks.
1510
These are deprecated and transports may decline to support them.
1512
transport = self.get_transport()
1513
if transport.is_readonly():
1514
file('lock', 'w').close()
1516
transport.put_bytes('lock', '')
1518
lock = transport.lock_read('lock')
1519
except TransportNotPossible:
1521
# TODO make this consistent on all platforms:
1522
# self.assertRaises(LockError, transport.lock_read, 'lock')
1525
def test_readv(self):
1526
transport = self.get_transport()
1527
if transport.is_readonly():
1528
file('a', 'w').write('0123456789')
1530
transport.put_bytes('a', '0123456789')
1532
d = list(transport.readv('a', ((0, 1),)))
1533
self.assertEqual(d[0], (0, '0'))
1535
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1536
self.assertEqual(d[0], (0, '0'))
1537
self.assertEqual(d[1], (1, '1'))
1538
self.assertEqual(d[2], (3, '34'))
1539
self.assertEqual(d[3], (9, '9'))
1541
def test_readv_out_of_order(self):
1542
transport = self.get_transport()
1543
if transport.is_readonly():
1544
file('a', 'w').write('0123456789')
1546
transport.put_bytes('a', '01234567890')
1548
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1549
self.assertEqual(d[0], (1, '1'))
1550
self.assertEqual(d[1], (9, '9'))
1551
self.assertEqual(d[2], (0, '0'))
1552
self.assertEqual(d[3], (3, '34'))
1554
def test_readv_with_adjust_for_latency(self):
1555
transport = self.get_transport()
1556
# the adjust for latency flag expands the data region returned
1557
# according to a per-transport heuristic, so testing is a little
1558
# tricky as we need more data than the largest combining that our
1559
# transports do. To accomodate this we generate random data and cross
1560
# reference the returned data with the random data. To avoid doing
1561
# multiple large random byte look ups we do several tests on the same
1563
content = osutils.rand_bytes(200*1024)
1564
content_size = len(content)
1565
if transport.is_readonly():
1566
self.build_tree_contents([('a', content)])
1568
transport.put_bytes('a', content)
1569
def check_result_data(result_vector):
1570
for item in result_vector:
1571
data_len = len(item[1])
1572
self.assertEqual(content[item[0]:item[0] + data_len], item[1])
1575
result = list(transport.readv('a', ((0, 30),),
1576
adjust_for_latency=True, upper_limit=content_size))
1577
# we expect 1 result, from 0, to something > 30
1578
self.assertEqual(1, len(result))
1579
self.assertEqual(0, result[0][0])
1580
self.assertTrue(len(result[0][1]) >= 30)
1581
check_result_data(result)
1582
# end of file corner case
1583
result = list(transport.readv('a', ((204700, 100),),
1584
adjust_for_latency=True, upper_limit=content_size))
1585
# we expect 1 result, from 204800- its length, to the end
1586
self.assertEqual(1, len(result))
1587
data_len = len(result[0][1])
1588
self.assertEqual(204800-data_len, result[0][0])
1589
self.assertTrue(data_len >= 100)
1590
check_result_data(result)
1591
# out of order ranges are made in order
1592
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1593
adjust_for_latency=True, upper_limit=content_size))
1594
# we expect 2 results, in order, start and end.
1595
self.assertEqual(2, len(result))
1597
data_len = len(result[0][1])
1598
self.assertEqual(0, result[0][0])
1599
self.assertTrue(data_len >= 30)
1601
data_len = len(result[1][1])
1602
self.assertEqual(204800-data_len, result[1][0])
1603
self.assertTrue(data_len >= 100)
1604
check_result_data(result)
1605
# close ranges get combined (even if out of order)
1606
for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1607
result = list(transport.readv('a', request_vector,
1608
adjust_for_latency=True, upper_limit=content_size))
1609
self.assertEqual(1, len(result))
1610
data_len = len(result[0][1])
1611
# minimum length is from 400 to 1034 - 634
1612
self.assertTrue(data_len >= 634)
1613
# must contain the region 400 to 1034
1614
self.assertTrue(result[0][0] <= 400)
1615
self.assertTrue(result[0][0] + data_len >= 1034)
1616
check_result_data(result)
1618
def test_readv_with_adjust_for_latency_with_big_file(self):
1619
transport = self.get_transport()
1620
# test from observed failure case.
1621
if transport.is_readonly():
1622
file('a', 'w').write('a'*1024*1024)
1624
transport.put_bytes('a', 'a'*1024*1024)
1625
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1626
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1627
(465373, 800), (947422, 800)]
1628
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1629
found_items = [False]*9
1630
for pos, (start, length) in enumerate(broken_vector):
1631
# check the range is covered by the result
1632
for offset, data in results:
1633
if offset <= start and start + length <= offset + len(data):
1634
found_items[pos] = True
1635
self.assertEqual([True]*9, found_items)
1637
def test_get_with_open_write_stream_sees_all_content(self):
1638
t = self.get_transport()
1641
handle = t.open_write_stream('foo')
1644
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1648
def test_get_smart_medium(self):
1649
"""All transports must either give a smart medium, or know they can't.
1651
transport = self.get_transport()
1653
client_medium = transport.get_smart_medium()
1654
self.assertIsInstance(client_medium, medium.SmartClientMedium)
1655
except errors.NoSmartMedium:
1656
# as long as we got it we're fine
1659
def test_readv_short_read(self):
1660
transport = self.get_transport()
1661
if transport.is_readonly():
1662
file('a', 'w').write('0123456789')
1664
transport.put_bytes('a', '01234567890')
1666
# This is intentionally reading off the end of the file
1667
# since we are sure that it cannot get there
1668
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1669
# Can be raised by paramiko
1671
transport.readv, 'a', [(1,1), (8,10)])
1673
# This is trying to seek past the end of the file, it should
1674
# also raise a special error
1675
self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1676
transport.readv, 'a', [(12,2)])