154
151
self.assertEqual(True, t.has('a'))
155
152
self.assertEqual(False, t.has('c'))
156
153
self.assertEqual(True, t.has(urlutils.escape('%')))
157
self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
'e', 'f', 'g', 'h'])),
159
[True, True, False, False,
160
True, False, True, False])
161
154
self.assertEqual(True, t.has_any(['a', 'b', 'c']))
162
155
self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
156
urlutils.escape('%%')]))
164
self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
'e', 'f', 'g', 'h']))),
166
[True, True, False, False,
167
True, False, True, False])
168
157
self.assertEqual(False, t.has_any(['c', 'c', 'c']))
169
158
self.assertEqual(True, t.has_any(['b', 'b', 'b']))
181
170
def test_get(self):
182
171
t = self.get_transport()
184
files = ['a', 'b', 'e', 'g']
185
contents = ['contents of a\n',
190
self.build_tree(files, transport=t, line_endings='binary')
191
self.check_transport_contents('contents of a\n', t, 'a')
192
content_f = t.get_multi(files)
193
# Must use iter zip() from future not old version which will fully
194
# evaluate its inputs, the transport requests should be issued and
195
# handled sequentially (we don't want to force transport to buffer).
196
for content, f in zip(contents, content_f):
197
self.assertEqual(content, f.read())
199
content_f = t.get_multi(iter(files))
200
# Again this zip() must come from the future
201
for content, f in zip(contents, content_f):
202
self.assertEqual(content, f.read())
174
content = b'contents of a\n'
175
self.build_tree(['a'], transport=t, line_endings='binary')
176
self.check_transport_contents(b'contents of a\n', t, 'a')
178
self.assertEqual(content, f.read())
204
180
def test_get_unknown_file(self):
205
181
t = self.get_transport()
206
182
files = ['a', 'b']
207
contents = ['contents of a\n',
183
contents = [b'contents of a\n',
210
186
self.build_tree(files, transport=t, line_endings='binary')
211
187
self.assertRaises(NoSuchFile, t.get, 'c')
212
189
def iterate_and_close(func, *args):
213
190
for f in func(*args):
214
191
# We call f.read() here because things like paramiko actually
264
237
t = self.get_transport()
265
238
if t.is_readonly():
267
handle = t.open_write_stream('foo')
270
self.assertEqual('b', t.get_bytes('foo'))
240
with t.open_write_stream('foo') as handle:
242
self.assertEqual(b'b', t.get_bytes('foo'))
274
244
def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
245
t = self.get_transport()
276
246
if t.is_readonly():
278
handle = t.open_write_stream('foo')
281
self.assertEqual('b', t.get_bytes('foo'))
284
self.assertEqual('b', f.read())
248
with t.open_write_stream('foo') as handle:
250
self.assertEqual(b'b', t.get_bytes('foo'))
251
with t.get('foo') as f:
252
self.assertEqual(b'b', f.read())
290
254
def test_put_bytes(self):
291
255
t = self.get_transport()
293
257
if t.is_readonly():
294
258
self.assertRaises(TransportNotPossible,
295
t.put_bytes, 'a', 'some text for a\n')
259
t.put_bytes, 'a', b'some text for a\n')
298
t.put_bytes('a', 'some text for a\n')
262
t.put_bytes('a', b'some text for a\n')
299
263
self.assertTrue(t.has('a'))
300
self.check_transport_contents('some text for a\n', t, 'a')
264
self.check_transport_contents(b'some text for a\n', t, 'a')
302
266
# The contents should be overwritten
303
t.put_bytes('a', 'new text for a\n')
304
self.check_transport_contents('new text for a\n', t, 'a')
267
t.put_bytes('a', b'new text for a\n')
268
self.check_transport_contents(b'new text for a\n', t, 'a')
306
270
self.assertRaises(NoSuchFile,
307
t.put_bytes, 'path/doesnt/exist/c', 'contents')
271
t.put_bytes, 'path/doesnt/exist/c', b'contents')
309
273
def test_put_bytes_non_atomic(self):
310
274
t = self.get_transport()
312
276
if t.is_readonly():
313
277
self.assertRaises(TransportNotPossible,
314
t.put_bytes_non_atomic, 'a', 'some text for a\n')
278
t.put_bytes_non_atomic, 'a', b'some text for a\n')
317
281
self.assertFalse(t.has('a'))
318
t.put_bytes_non_atomic('a', 'some text for a\n')
282
t.put_bytes_non_atomic('a', b'some text for a\n')
319
283
self.assertTrue(t.has('a'))
320
self.check_transport_contents('some text for a\n', t, 'a')
284
self.check_transport_contents(b'some text for a\n', t, 'a')
321
285
# Put also replaces contents
322
t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
286
t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
287
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
325
289
# Make sure we can create another file
326
t.put_bytes_non_atomic('d', 'contents for\nd\n')
290
t.put_bytes_non_atomic('d', b'contents for\nd\n')
327
291
# And overwrite 'a' with empty contents
328
t.put_bytes_non_atomic('a', '')
329
self.check_transport_contents('contents for\nd\n', t, 'd')
330
self.check_transport_contents('', t, 'a')
292
t.put_bytes_non_atomic('a', b'')
293
self.check_transport_contents(b'contents for\nd\n', t, 'd')
294
self.check_transport_contents(b'', t, 'a')
332
296
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
334
298
# Now test the create_parent flag
335
299
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
337
301
self.assertFalse(t.has('dir/a'))
338
t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
302
t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
339
303
create_parent_dir=True)
340
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
304
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
342
306
# But we still get NoSuchFile if we can't make the parent dir
343
307
self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
345
create_parent_dir=True)
309
create_parent_dir=True)
347
311
def test_put_bytes_permissions(self):
348
312
t = self.get_transport()
352
316
if not t._can_roundtrip_unix_modebits():
353
317
# Can't roundtrip, so no need to run this test
355
t.put_bytes('mode644', 'test text\n', mode=0o644)
319
t.put_bytes('mode644', b'test text\n', mode=0o644)
356
320
self.assertTransportMode(t, 'mode644', 0o644)
357
t.put_bytes('mode666', 'test text\n', mode=0o666)
321
t.put_bytes('mode666', b'test text\n', mode=0o666)
358
322
self.assertTransportMode(t, 'mode666', 0o666)
359
t.put_bytes('mode600', 'test text\n', mode=0o600)
323
t.put_bytes('mode600', b'test text\n', mode=0o600)
360
324
self.assertTransportMode(t, 'mode600', 0o600)
361
325
# Yes, you can put_bytes a file such that it becomes readonly
362
t.put_bytes('mode400', 'test text\n', mode=0o400)
326
t.put_bytes('mode400', b'test text\n', mode=0o400)
363
327
self.assertTransportMode(t, 'mode400', 0o400)
365
329
# The default permissions should be based on the current umask
366
330
umask = osutils.get_umask()
367
t.put_bytes('nomode', 'test text\n', mode=None)
331
t.put_bytes('nomode', b'test text\n', mode=None)
368
332
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
370
334
def test_put_bytes_non_atomic_permissions(self):
375
339
if not t._can_roundtrip_unix_modebits():
376
340
# Can't roundtrip, so no need to run this test
378
t.put_bytes_non_atomic('mode644', 'test text\n', mode=0o644)
342
t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
379
343
self.assertTransportMode(t, 'mode644', 0o644)
380
t.put_bytes_non_atomic('mode666', 'test text\n', mode=0o666)
344
t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
381
345
self.assertTransportMode(t, 'mode666', 0o666)
382
t.put_bytes_non_atomic('mode600', 'test text\n', mode=0o600)
346
t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
383
347
self.assertTransportMode(t, 'mode600', 0o600)
384
t.put_bytes_non_atomic('mode400', 'test text\n', mode=0o400)
348
t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
385
349
self.assertTransportMode(t, 'mode400', 0o400)
387
351
# The default permissions should be based on the current umask
388
352
umask = osutils.get_umask()
389
t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
353
t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
390
354
self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
392
356
# We should also be able to set the mode for a parent directory
393
357
# when it is created
394
t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0o664,
358
t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
395
359
dir_mode=0o700, create_parent_dir=True)
396
360
self.assertTransportMode(t, 'dir700', 0o700)
397
t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0o664,
361
t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
398
362
dir_mode=0o770, create_parent_dir=True)
399
363
self.assertTransportMode(t, 'dir770', 0o770)
400
t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0o664,
364
t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
401
365
dir_mode=0o777, create_parent_dir=True)
402
366
self.assertTransportMode(t, 'dir777', 0o777)
407
371
if t.is_readonly():
408
372
self.assertRaises(TransportNotPossible,
409
t.put_file, 'a', BytesIO(b'some text for a\n'))
373
t.put_file, 'a', BytesIO(b'some text for a\n'))
412
376
result = t.put_file('a', BytesIO(b'some text for a\n'))
413
377
# put_file returns the length of the data written
414
378
self.assertEqual(16, result)
415
379
self.assertTrue(t.has('a'))
416
self.check_transport_contents('some text for a\n', t, 'a')
380
self.check_transport_contents(b'some text for a\n', t, 'a')
417
381
# Put also replaces contents
418
382
result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
419
383
self.assertEqual(19, result)
420
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
384
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
421
385
self.assertRaises(NoSuchFile,
422
386
t.put_file, 'path/doesnt/exist/c',
423
BytesIO(b'contents'))
387
BytesIO(b'contents'))
425
389
def test_put_file_non_atomic(self):
426
390
t = self.get_transport()
428
392
if t.is_readonly():
429
393
self.assertRaises(TransportNotPossible,
430
t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
394
t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
433
397
self.assertFalse(t.has('a'))
434
398
t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
435
399
self.assertTrue(t.has('a'))
436
self.check_transport_contents('some text for a\n', t, 'a')
400
self.check_transport_contents(b'some text for a\n', t, 'a')
437
401
# Put also replaces contents
438
402
t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
439
self.check_transport_contents('new\ncontents for\na\n', t, 'a')
403
self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
441
405
# Make sure we can create another file
442
406
t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
443
407
# And overwrite 'a' with empty contents
444
408
t.put_file_non_atomic('a', BytesIO(b''))
445
self.check_transport_contents('contents for\nd\n', t, 'd')
446
self.check_transport_contents('', t, 'a')
409
self.check_transport_contents(b'contents for\nd\n', t, 'd')
410
self.check_transport_contents(b'', t, 'a')
448
412
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
BytesIO(b'contents\n'))
413
BytesIO(b'contents\n'))
450
414
# Now test the create_parent flag
451
415
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
BytesIO(b'contents\n'))
416
BytesIO(b'contents\n'))
453
417
self.assertFalse(t.has('dir/a'))
454
418
t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
455
419
create_parent_dir=True)
456
self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
420
self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
458
422
# But we still get NoSuchFile if we can't make the parent dir
459
423
self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
BytesIO(b'contents\n'),
461
create_parent_dir=True)
424
BytesIO(b'contents\n'),
425
create_parent_dir=True)
463
427
def test_put_file_permissions(self):
547
511
self.assertEqual(t.has('dir_b'), True)
549
t.mkdir_multi(['dir_c', 'dir_d'])
551
t.mkdir_multi(iter(['dir_e', 'dir_f']))
552
self.assertEqual(list(t.has_multi(
553
['dir_a', 'dir_b', 'dir_c', 'dir_q',
554
'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
555
[True, True, True, False,
556
True, True, True, True])
513
self.assertEqual([t.has(n) for n in
514
['dir_a', 'dir_b', 'dir_q', 'dir_b']],
515
[True, True, False, True])
558
517
# we were testing that a local mkdir followed by a transport
559
518
# mkdir failed thusly, but given that we * in one process * do not
564
523
self.assertRaises(FileExists, t.mkdir, 'dir_g')
566
525
# Test get/put in sub-directories
567
t.put_bytes('dir_a/a', 'contents of dir_a/a')
526
t.put_bytes('dir_a/a', b'contents of dir_a/a')
568
527
t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
569
self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
570
self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
528
self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
529
self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
572
531
# mkdir of a dir with an absent parent
573
532
self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
694
652
if t.is_readonly():
695
653
self.assertRaises(TransportNotPossible,
696
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
654
t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
698
t.put_bytes('a', 'diff\ncontents for\na\n')
699
t.put_bytes('b', 'contents\nfor b\n')
656
t.put_bytes('a', b'diff\ncontents for\na\n')
657
t.put_bytes('b', b'contents\nfor b\n')
701
659
self.assertEqual(20,
702
t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
660
t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
704
662
self.check_transport_contents(
705
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
663
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
708
666
# a file with no parent should fail..
721
679
if t.is_readonly():
722
680
self.assertRaises(TransportNotPossible,
723
t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
681
t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
726
self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
727
self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
684
self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
685
self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
729
687
self.assertEqual(20,
730
t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
688
t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
732
690
self.check_transport_contents(
733
'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
691
b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
736
694
# a file with no parent should fail..
737
695
self.assertRaises(NoSuchFile,
738
t.append_bytes, 'missing/path', 'content')
740
def test_append_multi(self):
741
t = self.get_transport()
745
t.put_bytes('a', 'diff\ncontents for\na\n'
746
'add\nsome\nmore\ncontents\n')
747
t.put_bytes('b', 'contents\nfor b\n')
749
self.assertEqual((43, 15),
750
t.append_multi([('a', BytesIO(b'and\nthen\nsome\nmore\n')),
751
('b', BytesIO(b'some\nmore\nfor\nb\n'))]))
753
self.check_transport_contents(
754
'diff\ncontents for\na\n'
755
'add\nsome\nmore\ncontents\n'
756
'and\nthen\nsome\nmore\n',
758
self.check_transport_contents(
760
'some\nmore\nfor\nb\n',
763
self.assertEqual((62, 31),
764
t.append_multi(iter([('a', BytesIO(b'a little bit more\n')),
765
('b', BytesIO(b'from an iterator\n'))])))
766
self.check_transport_contents(
767
'diff\ncontents for\na\n'
768
'add\nsome\nmore\ncontents\n'
769
'and\nthen\nsome\nmore\n'
770
'a little bit more\n',
772
self.check_transport_contents(
774
'some\nmore\nfor\nb\n'
775
'from an iterator\n',
778
self.assertEqual((80, 0),
779
t.append_multi([('a', BytesIO(b'some text in a\n')),
780
('d', BytesIO(b'missing file r\n'))]))
782
self.check_transport_contents(
783
'diff\ncontents for\na\n'
784
'add\nsome\nmore\ncontents\n'
785
'and\nthen\nsome\nmore\n'
786
'a little bit more\n'
789
self.check_transport_contents('missing file r\n', t, 'd')
696
t.append_bytes, 'missing/path', b'content')
791
698
def test_append_file_mode(self):
792
699
"""Check that append accepts a mode parameter"""
816
723
self.assertRaises(TransportNotPossible, t.delete, 'missing')
819
t.put_bytes('a', 'a little bit of text\n')
726
t.put_bytes('a', b'a little bit of text\n')
820
727
self.assertTrue(t.has('a'))
822
729
self.assertFalse(t.has('a'))
824
731
self.assertRaises(NoSuchFile, t.delete, 'a')
826
t.put_bytes('a', 'a text\n')
827
t.put_bytes('b', 'b text\n')
828
t.put_bytes('c', 'c text\n')
733
t.put_bytes('a', b'a text\n')
734
t.put_bytes('b', b'b text\n')
735
t.put_bytes('c', b'c text\n')
829
736
self.assertEqual([True, True, True],
830
list(t.has_multi(['a', 'b', 'c'])))
831
t.delete_multi(['a', 'c'])
737
[t.has(n) for n in ['a', 'b', 'c']])
832
740
self.assertEqual([False, True, False],
833
list(t.has_multi(['a', 'b', 'c'])))
741
[t.has(n) for n in ['a', 'b', 'c']])
834
742
self.assertFalse(t.has('a'))
835
743
self.assertTrue(t.has('b'))
836
744
self.assertFalse(t.has('c'))
838
self.assertRaises(NoSuchFile,
839
t.delete_multi, ['a', 'b', 'c'])
841
self.assertRaises(NoSuchFile,
842
t.delete_multi, iter(['a', 'b', 'c']))
844
t.put_bytes('a', 'another a text\n')
845
t.put_bytes('c', 'another c text\n')
846
t.delete_multi(iter(['a', 'b', 'c']))
746
for name in ['a', 'c', 'd']:
747
self.assertRaises(NoSuchFile, t.delete, name)
848
749
# We should have deleted everything
849
750
# SftpServer creates control files in the
989
890
# creates control files in the working directory
990
891
# perhaps all of this could be done in a subdirectory
992
t.put_bytes('a', 'a first file\n')
993
self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
893
t.put_bytes('a', b'a first file\n')
894
self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
996
897
self.assertTrue(t.has('b'))
997
898
self.assertFalse(t.has('a'))
999
self.check_transport_contents('a first file\n', t, 'b')
1000
self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
900
self.check_transport_contents(b'a first file\n', t, 'b')
901
self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1002
903
# Overwrite a file
1003
t.put_bytes('c', 'c this file\n')
904
t.put_bytes('c', b'c this file\n')
1004
905
t.move('c', 'b')
1005
906
self.assertFalse(t.has('c'))
1006
self.check_transport_contents('c this file\n', t, 'b')
907
self.check_transport_contents(b'c this file\n', t, 'b')
1008
909
# TODO: Try to write a test for atomicity
1009
910
# TODO: Test moving into a non-existent subdirectory
1010
# TODO: Test Transport.move_multi
1012
912
def test_copy(self):
1013
913
t = self.get_transport()
1015
915
if t.is_readonly():
1018
t.put_bytes('a', 'a file\n')
918
t.put_bytes('a', b'a file\n')
1019
919
t.copy('a', 'b')
1020
self.check_transport_contents('a file\n', t, 'b')
920
self.check_transport_contents(b'a file\n', t, 'b')
1022
922
self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1024
924
# What should the assert be if you try to copy a
1025
925
# file over a directory?
1026
926
#self.assertRaises(Something, t.copy, 'a', 'c')
1027
t.put_bytes('d', 'text in d\n')
927
t.put_bytes('d', b'text in d\n')
1028
928
t.copy('d', 'b')
1029
self.check_transport_contents('text in d\n', t, 'b')
1031
# TODO: test copy_multi
929
self.check_transport_contents(b'text in d\n', t, 'b')
1033
931
def test_connection_error(self):
1034
932
"""ConnectionError is raised when connection is impossible.
1068
966
self.assertTrue(S_ISREG(st.st_mode))
1069
967
self.assertEqual(size, st.st_size)
1071
remote_stats = list(t.stat_multi(paths))
1072
remote_iter_stats = list(t.stat_multi(iter(paths)))
1074
969
self.assertRaises(NoSuchFile, t.stat, 'q')
1075
970
self.assertRaises(NoSuchFile, t.stat, 'b/a')
1077
self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1078
self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1079
972
self.build_tree(['subdir/', 'subdir/file'], transport=t)
1080
973
subdir = t.clone('subdir')
1081
974
st = subdir.stat('./file')
1122
1015
st = t.stat(link_name)
1123
1016
self.assertTrue(S_ISLNK(st.st_mode),
1124
"expected symlink, got mode %o" % st.st_mode)
1125
except TransportNotPossible:
1126
raise TestSkipped("Transport %s does not support symlinks." %
1127
self._server.__class__)
1129
self.knownFailure("Paramiko fails to create symlinks during tests")
1017
"expected symlink, got mode %o" % st.st_mode)
1018
except TransportNotPossible:
1019
raise TestSkipped("Transport %s does not support symlinks." %
1020
self._server.__class__)
1022
self.assertEqual(source_name, t.readlink(link_name))
1024
def test_readlink_nonexistent(self):
1025
t = self.get_transport()
1027
self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
1028
except TransportNotPossible:
1029
raise TestSkipped("Transport %s does not support symlinks." %
1030
self._server.__class__)
1131
1032
def test_list_dir(self):
1132
1033
# TODO: Test list_dir, just try once, and if it throws, stop testing
1213
1114
Only the parameters different from None will be changed.
1215
if scheme is None: scheme = t._parsed_url.scheme
1216
if user is None: user = t._parsed_url.user
1217
if password is None: password = t._parsed_url.password
1218
if user is None: user = t._parsed_url.user
1219
if host is None: host = t._parsed_url.host
1220
if port is None: port = t._parsed_url.port
1221
if path is None: path = t._parsed_url.path
1117
scheme = t._parsed_url.scheme
1119
user = t._parsed_url.user
1120
if password is None:
1121
password = t._parsed_url.password
1123
user = t._parsed_url.user
1125
host = t._parsed_url.host
1127
port = t._parsed_url.port
1129
path = t._parsed_url.path
1222
1130
return str(urlutils.URL(scheme, user, password, host, port, path))
1224
1132
if t._parsed_url.scheme == 'ftp':
1326
1235
new_transport = root_transport.clone("..")
1327
1236
# as we are walking up directories, the path must be
1328
1237
# growing less, except at the top
1329
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1330
or new_transport.base == root_transport.base)
1238
self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1239
new_transport.base == root_transport.base)
1331
1240
while new_transport.base != root_transport.base:
1332
1241
root_transport = new_transport
1333
1242
new_transport = root_transport.clone("..")
1334
1243
# as we are walking up directories, the path must be
1335
1244
# growing less, except at the top
1336
self.assertTrue(len(new_transport.base) < len(root_transport.base)
1337
or new_transport.base == root_transport.base)
1245
self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1246
new_transport.base == root_transport.base)
1339
1248
# Cloning to "/" should take us to exactly the same location.
1340
1249
self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1534
1443
# '\xe5' and '\xe4' actually map to the same file
1535
1444
# adding a suffix kicks in the 'preserving but insensitive'
1536
1445
# route, and maintains the right files
1537
files = [u'\xe5.1', # a w/ circle iso-8859-1
1538
u'\xe4.2', # a w/ dots iso-8859-1
1539
u'\u017d', # Z with umlat iso-8859-2
1540
u'\u062c', # Arabic j
1541
u'\u0410', # Russian A
1542
u'\u65e5', # Kanji person
1446
files = [u'\xe5.1', # a w/ circle iso-8859-1
1447
u'\xe4.2', # a w/ dots iso-8859-1
1448
u'\u017d', # Z with umlat iso-8859-2
1449
u'\u062c', # Arabic j
1450
u'\u0410', # Russian A
1451
u'\u65e5', # Kanji person
1545
1454
no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1546
1455
if no_unicode_support:
1566
1476
transport = self.get_transport()
1567
1477
if transport.is_readonly():
1569
transport.put_bytes('foo', 'bar')
1479
transport.put_bytes('foo', b'bar')
1570
1480
transport3 = self.get_transport()
1571
self.check_transport_contents('bar', transport3, 'foo')
1481
self.check_transport_contents(b'bar', transport3, 'foo')
1573
1483
# now opening at a relative url should give use a sane result:
1574
1484
transport.mkdir('newdir')
1575
1485
transport5 = self.get_transport('newdir')
1576
1486
transport6 = transport5.clone('..')
1577
self.check_transport_contents('bar', transport6, 'foo')
1487
self.check_transport_contents(b'bar', transport6, 'foo')
1579
1489
def test_lock_write(self):
1580
1490
"""Test transport-level write locks.
1615
1526
def test_readv(self):
1616
1527
transport = self.get_transport()
1617
1528
if transport.is_readonly():
1618
with file('a', 'w') as f: f.write('0123456789')
1529
with open('a', 'w') as f:
1530
f.write('0123456789')
1620
transport.put_bytes('a', '0123456789')
1532
transport.put_bytes('a', b'0123456789')
1622
1534
d = list(transport.readv('a', ((0, 1),)))
1623
self.assertEqual(d[0], (0, '0'))
1535
self.assertEqual(d[0], (0, b'0'))
1625
1537
d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1626
self.assertEqual(d[0], (0, '0'))
1627
self.assertEqual(d[1], (1, '1'))
1628
self.assertEqual(d[2], (3, '34'))
1629
self.assertEqual(d[3], (9, '9'))
1538
self.assertEqual(d[0], (0, b'0'))
1539
self.assertEqual(d[1], (1, b'1'))
1540
self.assertEqual(d[2], (3, b'34'))
1541
self.assertEqual(d[3], (9, b'9'))
1631
1543
def test_readv_out_of_order(self):
1632
1544
transport = self.get_transport()
1633
1545
if transport.is_readonly():
1634
with file('a', 'w') as f: f.write('0123456789')
1546
with open('a', 'w') as f:
1547
f.write('0123456789')
1636
transport.put_bytes('a', '01234567890')
1549
transport.put_bytes('a', b'01234567890')
1638
1551
d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1639
self.assertEqual(d[0], (1, '1'))
1640
self.assertEqual(d[1], (9, '9'))
1641
self.assertEqual(d[2], (0, '0'))
1642
self.assertEqual(d[3], (3, '34'))
1552
self.assertEqual(d[0], (1, b'1'))
1553
self.assertEqual(d[1], (9, b'9'))
1554
self.assertEqual(d[2], (0, b'0'))
1555
self.assertEqual(d[3], (3, b'34'))
1644
1557
def test_readv_with_adjust_for_latency(self):
1645
1558
transport = self.get_transport()
1671
1585
check_result_data(result)
1672
1586
# end of file corner case
1673
1587
result = list(transport.readv('a', ((204700, 100),),
1674
adjust_for_latency=True, upper_limit=content_size))
1588
adjust_for_latency=True, upper_limit=content_size))
1675
1589
# we expect 1 result, from 204800- its length, to the end
1676
1590
self.assertEqual(1, len(result))
1677
1591
data_len = len(result[0][1])
1678
self.assertEqual(204800-data_len, result[0][0])
1592
self.assertEqual(204800 - data_len, result[0][0])
1679
1593
self.assertTrue(data_len >= 100)
1680
1594
check_result_data(result)
1681
1595
# out of order ranges are made in order
1682
1596
result = list(transport.readv('a', ((204700, 100), (0, 50)),
1683
adjust_for_latency=True, upper_limit=content_size))
1597
adjust_for_latency=True, upper_limit=content_size))
1684
1598
# we expect 2 results, in order, start and end.
1685
1599
self.assertEqual(2, len(result))
1689
1603
self.assertTrue(data_len >= 30)
1691
1605
data_len = len(result[1][1])
1692
self.assertEqual(204800-data_len, result[1][0])
1606
self.assertEqual(204800 - data_len, result[1][0])
1693
1607
self.assertTrue(data_len >= 100)
1694
1608
check_result_data(result)
1695
1609
# close ranges get combined (even if out of order)
1696
1610
for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1697
1611
result = list(transport.readv('a', request_vector,
1698
adjust_for_latency=True, upper_limit=content_size))
1612
adjust_for_latency=True, upper_limit=content_size))
1699
1613
self.assertEqual(1, len(result))
1700
1614
data_len = len(result[0][1])
1701
1615
# minimum length is from 400 to 1034 - 634
1709
1623
transport = self.get_transport()
1710
1624
# test from observed failure case.
1711
1625
if transport.is_readonly():
1712
with file('a', 'w') as f: f.write('a'*1024*1024)
1626
with open('a', 'w') as f:
1627
f.write('a' * 1024 * 1024)
1714
transport.put_bytes('a', 'a'*1024*1024)
1629
transport.put_bytes('a', b'a' * 1024 * 1024)
1715
1630
broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1716
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1717
(465373, 800), (947422, 800)]
1718
results = list(transport.readv('a', broken_vector, True, 1024*1024))
1719
found_items = [False]*9
1631
(225037, 800), (221357, 800), (437077, 800), (947670, 800),
1632
(465373, 800), (947422, 800)]
1633
results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
1634
found_items = [False] * 9
1720
1635
for pos, (start, length) in enumerate(broken_vector):
1721
1636
# check the range is covered by the result
1722
1637
for offset, data in results:
1723
1638
if offset <= start and start + length <= offset + len(data):
1724
1639
found_items[pos] = True
1725
self.assertEqual([True]*9, found_items)
1640
self.assertEqual([True] * 9, found_items)
1727
1642
def test_get_with_open_write_stream_sees_all_content(self):
1728
1643
t = self.get_transport()
1729
1644
if t.is_readonly():
1731
handle = t.open_write_stream('foo')
1734
self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0, 1), (2, 1)))))
1646
with t.open_write_stream('foo') as handle:
1647
handle.write(b'bcd')
1648
self.assertEqual([(0, b'b'), (2, b'd')], list(
1649
t.readv('foo', ((0, 1), (2, 1)))))
1738
1651
def test_get_smart_medium(self):
1739
1652
"""All transports must either give a smart medium, or know they can't.