/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2020-05-06 02:13:25 UTC
  • mfrom: (7490.7.21 work)
  • mto: This revision was merged to the branch mainline in revision 7501.
  • Revision ID: jelmer@jelmer.uk-20200506021325-awbmmqu1zyorz7sj
Merge 3.1 branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
from io import BytesIO
23
24
import os
24
25
import stat
25
26
import sys
39
40
                      TransportNotPossible,
40
41
                      )
41
42
from ..osutils import getcwd
42
 
from ..sixish import (
43
 
    BytesIO,
44
 
    zip,
45
 
    )
46
43
from ..bzr.smart import medium
47
44
from . import (
48
45
    TestSkipped,
79
76
                pyutils.get_named_object(module))
80
77
            for (klass, server_factory) in permutations:
81
78
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
 
                    {"transport_class":klass,
83
 
                     "transport_server":server_factory})
 
79
                            {"transport_class": klass,
 
80
                             "transport_server": server_factory})
84
81
                result.append(scenario)
85
82
        except errors.DependencyNotPresent as e:
86
83
            # Continue even if a dependency prevents us
154
151
        self.assertEqual(True, t.has('a'))
155
152
        self.assertEqual(False, t.has('c'))
156
153
        self.assertEqual(True, t.has(urlutils.escape('%')))
157
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
158
 
                                           'e', 'f', 'g', 'h'])),
159
 
                         [True, True, False, False,
160
 
                          True, False, True, False])
161
154
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
162
155
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
163
156
                                           urlutils.escape('%%')]))
164
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
165
 
                                                'e', 'f', 'g', 'h']))),
166
 
                         [True, True, False, False,
167
 
                          True, False, True, False])
168
157
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
169
158
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
170
159
 
181
170
    def test_get(self):
182
171
        t = self.get_transport()
183
172
 
184
 
        files = ['a', 'b', 'e', 'g']
185
 
        contents = ['contents of a\n',
186
 
                    'contents of b\n',
187
 
                    'contents of e\n',
188
 
                    'contents of g\n',
189
 
                    ]
190
 
        self.build_tree(files, transport=t, line_endings='binary')
191
 
        self.check_transport_contents('contents of a\n', t, 'a')
192
 
        content_f = t.get_multi(files)
193
 
        # Must use iter zip() from future not old version which will fully
194
 
        # evaluate its inputs, the transport requests should be issued and
195
 
        # handled sequentially (we don't want to force transport to buffer).
196
 
        for content, f in zip(contents, content_f):
197
 
            self.assertEqual(content, f.read())
198
 
 
199
 
        content_f = t.get_multi(iter(files))
200
 
        # Again this zip() must come from the future
201
 
        for content, f in zip(contents, content_f):
202
 
            self.assertEqual(content, f.read())
 
173
        files = ['a']
 
174
        content = b'contents of a\n'
 
175
        self.build_tree(['a'], transport=t, line_endings='binary')
 
176
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
177
        f = t.get('a')
 
178
        self.assertEqual(content, f.read())
203
179
 
204
180
    def test_get_unknown_file(self):
205
181
        t = self.get_transport()
206
182
        files = ['a', 'b']
207
 
        contents = ['contents of a\n',
208
 
                    'contents of b\n',
 
183
        contents = [b'contents of a\n',
 
184
                    b'contents of b\n',
209
185
                    ]
210
186
        self.build_tree(files, transport=t, line_endings='binary')
211
187
        self.assertRaises(NoSuchFile, t.get, 'c')
 
188
 
212
189
        def iterate_and_close(func, *args):
213
190
            for f in func(*args):
214
191
                # We call f.read() here because things like paramiko actually
216
193
                # consume before we close the handle.
217
194
                content = f.read()
218
195
                f.close()
219
 
        self.assertRaises(NoSuchFile, iterate_and_close,
220
 
                          t.get_multi, ['a', 'b', 'c'])
221
 
        self.assertRaises(NoSuchFile, iterate_and_close,
222
 
                          t.get_multi, iter(['a', 'b', 'c']))
223
196
 
224
197
    def test_get_directory_read_gives_ReadError(self):
225
198
        """consistent errors for read() on a file returned by get()."""
245
218
        t = self.get_transport()
246
219
 
247
220
        files = ['a', 'b', 'e', 'g']
248
 
        contents = ['contents of a\n',
249
 
                    'contents of b\n',
250
 
                    'contents of e\n',
251
 
                    'contents of g\n',
 
221
        contents = [b'contents of a\n',
 
222
                    b'contents of b\n',
 
223
                    b'contents of e\n',
 
224
                    b'contents of g\n',
252
225
                    ]
253
226
        self.build_tree(files, transport=t, line_endings='binary')
254
 
        self.check_transport_contents('contents of a\n', t, 'a')
 
227
        self.check_transport_contents(b'contents of a\n', t, 'a')
255
228
 
256
229
        for content, fname in zip(contents, files):
257
230
            self.assertEqual(content, t.get_bytes(fname))
264
237
        t = self.get_transport()
265
238
        if t.is_readonly():
266
239
            return
267
 
        handle = t.open_write_stream('foo')
268
 
        try:
269
 
            handle.write('b')
270
 
            self.assertEqual('b', t.get_bytes('foo'))
271
 
        finally:
272
 
            handle.close()
 
240
        with t.open_write_stream('foo') as handle:
 
241
            handle.write(b'b')
 
242
            self.assertEqual(b'b', t.get_bytes('foo'))
273
243
 
274
244
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
275
245
        t = self.get_transport()
276
246
        if t.is_readonly():
277
247
            return
278
 
        handle = t.open_write_stream('foo')
279
 
        try:
280
 
            handle.write('b')
281
 
            self.assertEqual('b', t.get_bytes('foo'))
282
 
            f = t.get('foo')
283
 
            try:
284
 
                self.assertEqual('b', f.read())
285
 
            finally:
286
 
                f.close()
287
 
        finally:
288
 
            handle.close()
 
248
        with t.open_write_stream('foo') as handle:
 
249
            handle.write(b'b')
 
250
            self.assertEqual(b'b', t.get_bytes('foo'))
 
251
            with t.get('foo') as f:
 
252
                self.assertEqual(b'b', f.read())
289
253
 
290
254
    def test_put_bytes(self):
291
255
        t = self.get_transport()
292
256
 
293
257
        if t.is_readonly():
294
258
            self.assertRaises(TransportNotPossible,
295
 
                    t.put_bytes, 'a', 'some text for a\n')
 
259
                              t.put_bytes, 'a', b'some text for a\n')
296
260
            return
297
261
 
298
 
        t.put_bytes('a', 'some text for a\n')
 
262
        t.put_bytes('a', b'some text for a\n')
299
263
        self.assertTrue(t.has('a'))
300
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
264
        self.check_transport_contents(b'some text for a\n', t, 'a')
301
265
 
302
266
        # The contents should be overwritten
303
 
        t.put_bytes('a', 'new text for a\n')
304
 
        self.check_transport_contents('new text for a\n', t, 'a')
 
267
        t.put_bytes('a', b'new text for a\n')
 
268
        self.check_transport_contents(b'new text for a\n', t, 'a')
305
269
 
306
270
        self.assertRaises(NoSuchFile,
307
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
271
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
308
272
 
309
273
    def test_put_bytes_non_atomic(self):
310
274
        t = self.get_transport()
311
275
 
312
276
        if t.is_readonly():
313
277
            self.assertRaises(TransportNotPossible,
314
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
278
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
315
279
            return
316
280
 
317
281
        self.assertFalse(t.has('a'))
318
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
282
        t.put_bytes_non_atomic('a', b'some text for a\n')
319
283
        self.assertTrue(t.has('a'))
320
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
284
        self.check_transport_contents(b'some text for a\n', t, 'a')
321
285
        # Put also replaces contents
322
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
286
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
287
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
324
288
 
325
289
        # Make sure we can create another file
326
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
290
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
327
291
        # And overwrite 'a' with empty contents
328
 
        t.put_bytes_non_atomic('a', '')
329
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
330
 
        self.check_transport_contents('', t, 'a')
 
292
        t.put_bytes_non_atomic('a', b'')
 
293
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
294
        self.check_transport_contents(b'', t, 'a')
331
295
 
332
296
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
333
 
                                       'contents\n')
 
297
                          b'contents\n')
334
298
        # Now test the create_parent flag
335
299
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
336
 
                                       'contents\n')
 
300
                          b'contents\n')
337
301
        self.assertFalse(t.has('dir/a'))
338
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
302
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
339
303
                               create_parent_dir=True)
340
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
304
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
341
305
 
342
306
        # But we still get NoSuchFile if we can't make the parent dir
343
307
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
344
 
                                       'contents\n',
345
 
                                       create_parent_dir=True)
 
308
                          b'contents\n',
 
309
                          create_parent_dir=True)
346
310
 
347
311
    def test_put_bytes_permissions(self):
348
312
        t = self.get_transport()
352
316
        if not t._can_roundtrip_unix_modebits():
353
317
            # Can't roundtrip, so no need to run this test
354
318
            return
355
 
        t.put_bytes('mode644', 'test text\n', mode=0o644)
 
319
        t.put_bytes('mode644', b'test text\n', mode=0o644)
356
320
        self.assertTransportMode(t, 'mode644', 0o644)
357
 
        t.put_bytes('mode666', 'test text\n', mode=0o666)
 
321
        t.put_bytes('mode666', b'test text\n', mode=0o666)
358
322
        self.assertTransportMode(t, 'mode666', 0o666)
359
 
        t.put_bytes('mode600', 'test text\n', mode=0o600)
 
323
        t.put_bytes('mode600', b'test text\n', mode=0o600)
360
324
        self.assertTransportMode(t, 'mode600', 0o600)
361
325
        # Yes, you can put_bytes a file such that it becomes readonly
362
 
        t.put_bytes('mode400', 'test text\n', mode=0o400)
 
326
        t.put_bytes('mode400', b'test text\n', mode=0o400)
363
327
        self.assertTransportMode(t, 'mode400', 0o400)
364
328
 
365
329
        # The default permissions should be based on the current umask
366
330
        umask = osutils.get_umask()
367
 
        t.put_bytes('nomode', 'test text\n', mode=None)
 
331
        t.put_bytes('nomode', b'test text\n', mode=None)
368
332
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
369
333
 
370
334
    def test_put_bytes_non_atomic_permissions(self):
375
339
        if not t._can_roundtrip_unix_modebits():
376
340
            # Can't roundtrip, so no need to run this test
377
341
            return
378
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0o644)
 
342
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
379
343
        self.assertTransportMode(t, 'mode644', 0o644)
380
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0o666)
 
344
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
381
345
        self.assertTransportMode(t, 'mode666', 0o666)
382
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0o600)
 
346
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
383
347
        self.assertTransportMode(t, 'mode600', 0o600)
384
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0o400)
 
348
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
385
349
        self.assertTransportMode(t, 'mode400', 0o400)
386
350
 
387
351
        # The default permissions should be based on the current umask
388
352
        umask = osutils.get_umask()
389
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
353
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
390
354
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
391
355
 
392
356
        # We should also be able to set the mode for a parent directory
393
357
        # when it is created
394
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0o664,
 
358
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
395
359
                               dir_mode=0o700, create_parent_dir=True)
396
360
        self.assertTransportMode(t, 'dir700', 0o700)
397
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0o664,
 
361
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
398
362
                               dir_mode=0o770, create_parent_dir=True)
399
363
        self.assertTransportMode(t, 'dir770', 0o770)
400
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0o664,
 
364
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
401
365
                               dir_mode=0o777, create_parent_dir=True)
402
366
        self.assertTransportMode(t, 'dir777', 0o777)
403
367
 
406
370
 
407
371
        if t.is_readonly():
408
372
            self.assertRaises(TransportNotPossible,
409
 
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
 
373
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
410
374
            return
411
375
 
412
376
        result = t.put_file('a', BytesIO(b'some text for a\n'))
413
377
        # put_file returns the length of the data written
414
378
        self.assertEqual(16, result)
415
379
        self.assertTrue(t.has('a'))
416
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
380
        self.check_transport_contents(b'some text for a\n', t, 'a')
417
381
        # Put also replaces contents
418
382
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
419
383
        self.assertEqual(19, result)
420
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
384
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
421
385
        self.assertRaises(NoSuchFile,
422
386
                          t.put_file, 'path/doesnt/exist/c',
423
 
                              BytesIO(b'contents'))
 
387
                          BytesIO(b'contents'))
424
388
 
425
389
    def test_put_file_non_atomic(self):
426
390
        t = self.get_transport()
427
391
 
428
392
        if t.is_readonly():
429
393
            self.assertRaises(TransportNotPossible,
430
 
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
394
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
431
395
            return
432
396
 
433
397
        self.assertFalse(t.has('a'))
434
398
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
435
399
        self.assertTrue(t.has('a'))
436
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
400
        self.check_transport_contents(b'some text for a\n', t, 'a')
437
401
        # Put also replaces contents
438
402
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
439
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
403
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
440
404
 
441
405
        # Make sure we can create another file
442
406
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
443
407
        # And overwrite 'a' with empty contents
444
408
        t.put_file_non_atomic('a', BytesIO(b''))
445
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
446
 
        self.check_transport_contents('', t, 'a')
 
409
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
410
        self.check_transport_contents(b'', t, 'a')
447
411
 
448
412
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
449
 
                                       BytesIO(b'contents\n'))
 
413
                          BytesIO(b'contents\n'))
450
414
        # Now test the create_parent flag
451
415
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
452
 
                                       BytesIO(b'contents\n'))
 
416
                          BytesIO(b'contents\n'))
453
417
        self.assertFalse(t.has('dir/a'))
454
418
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
455
419
                              create_parent_dir=True)
456
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
420
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
457
421
 
458
422
        # But we still get NoSuchFile if we can't make the parent dir
459
423
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
460
 
                                       BytesIO(b'contents\n'),
461
 
                                       create_parent_dir=True)
 
424
                          BytesIO(b'contents\n'),
 
425
                          create_parent_dir=True)
462
426
 
463
427
    def test_put_file_permissions(self):
464
428
 
535
499
            # defined for the transport interface.
536
500
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
537
501
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
538
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
539
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
502
            self.assertRaises(TransportNotPossible,
 
503
                              t.mkdir, 'path/doesnt/exist')
540
504
            return
541
505
        # Test mkdir
542
506
        t.mkdir('dir_a')
546
510
        t.mkdir('dir_b')
547
511
        self.assertEqual(t.has('dir_b'), True)
548
512
 
549
 
        t.mkdir_multi(['dir_c', 'dir_d'])
550
 
 
551
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
552
 
        self.assertEqual(list(t.has_multi(
553
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
554
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
555
 
            [True, True, True, False,
556
 
             True, True, True, True])
 
513
        self.assertEqual([t.has(n) for n in
 
514
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
515
                         [True, True, False, True])
557
516
 
558
517
        # we were testing that a local mkdir followed by a transport
559
518
        # mkdir failed thusly, but given that we * in one process * do not
564
523
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
565
524
 
566
525
        # Test get/put in sub-directories
567
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
526
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
568
527
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
569
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
570
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
528
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
529
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
571
530
 
572
531
        # mkdir of a dir with an absent parent
573
532
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
588
547
        self.assertTransportMode(t, 'dmode777', 0o777)
589
548
        t.mkdir('dmode700', mode=0o700)
590
549
        self.assertTransportMode(t, 'dmode700', 0o700)
591
 
        t.mkdir_multi(['mdmode755'], mode=0o755)
 
550
        t.mkdir('mdmode755', mode=0o755)
592
551
        self.assertTransportMode(t, 'mdmode755', 0o755)
593
552
 
594
553
        # Default mode should be based on umask
602
561
            return
603
562
        handle = t.open_write_stream('foo')
604
563
        try:
605
 
            self.assertEqual('', t.get_bytes('foo'))
 
564
            self.assertEqual(b'', t.get_bytes('foo'))
606
565
        finally:
607
566
            handle.close()
608
567
 
650
609
            t2 = t.clone('copy_to_simple')
651
610
            simple_copy_files(t, t2)
652
611
 
653
 
 
654
612
        # Test that copying into a missing directory raises
655
613
        # NoSuchFile
656
614
        if t.is_readonly():
657
615
            self.build_tree(['e/', 'e/f'])
658
616
        else:
659
617
            t.mkdir('e')
660
 
            t.put_bytes('e/f', 'contents of e')
 
618
            t.put_bytes('e/f', b'contents of e')
661
619
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
662
620
        temp_transport.mkdir('e')
663
621
        t.copy_to(['e/f'], temp_transport)
693
651
 
694
652
        if t.is_readonly():
695
653
            self.assertRaises(TransportNotPossible,
696
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
654
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
697
655
            return
698
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
699
 
        t.put_bytes('b', 'contents\nfor b\n')
 
656
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
657
        t.put_bytes('b', b'contents\nfor b\n')
700
658
 
701
659
        self.assertEqual(20,
702
 
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
660
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
703
661
 
704
662
        self.check_transport_contents(
705
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
663
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
706
664
            t, 'a')
707
665
 
708
666
        # a file with no parent should fail..
711
669
 
712
670
        # And we can create new files, too
713
671
        self.assertEqual(0,
714
 
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
715
 
        self.check_transport_contents('some text\nfor a missing file\n',
 
672
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
673
        self.check_transport_contents(b'some text\nfor a missing file\n',
716
674
                                      t, 'c')
717
675
 
718
676
    def test_append_bytes(self):
720
678
 
721
679
        if t.is_readonly():
722
680
            self.assertRaises(TransportNotPossible,
723
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
681
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
724
682
            return
725
683
 
726
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
727
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
684
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
685
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
728
686
 
729
687
        self.assertEqual(20,
730
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
688
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
731
689
 
732
690
        self.check_transport_contents(
733
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
691
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
734
692
            t, 'a')
735
693
 
736
694
        # a file with no parent should fail..
737
695
        self.assertRaises(NoSuchFile,
738
 
                          t.append_bytes, 'missing/path', 'content')
739
 
 
740
 
    def test_append_multi(self):
741
 
        t = self.get_transport()
742
 
 
743
 
        if t.is_readonly():
744
 
            return
745
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
746
 
                         'add\nsome\nmore\ncontents\n')
747
 
        t.put_bytes('b', 'contents\nfor b\n')
748
 
 
749
 
        self.assertEqual((43, 15),
750
 
            t.append_multi([('a', BytesIO(b'and\nthen\nsome\nmore\n')),
751
 
                            ('b', BytesIO(b'some\nmore\nfor\nb\n'))]))
752
 
 
753
 
        self.check_transport_contents(
754
 
            'diff\ncontents for\na\n'
755
 
            'add\nsome\nmore\ncontents\n'
756
 
            'and\nthen\nsome\nmore\n',
757
 
            t, 'a')
758
 
        self.check_transport_contents(
759
 
                'contents\nfor b\n'
760
 
                'some\nmore\nfor\nb\n',
761
 
                t, 'b')
762
 
 
763
 
        self.assertEqual((62, 31),
764
 
            t.append_multi(iter([('a', BytesIO(b'a little bit more\n')),
765
 
                                 ('b', BytesIO(b'from an iterator\n'))])))
766
 
        self.check_transport_contents(
767
 
            'diff\ncontents for\na\n'
768
 
            'add\nsome\nmore\ncontents\n'
769
 
            'and\nthen\nsome\nmore\n'
770
 
            'a little bit more\n',
771
 
            t, 'a')
772
 
        self.check_transport_contents(
773
 
                'contents\nfor b\n'
774
 
                'some\nmore\nfor\nb\n'
775
 
                'from an iterator\n',
776
 
                t, 'b')
777
 
 
778
 
        self.assertEqual((80, 0),
779
 
            t.append_multi([('a', BytesIO(b'some text in a\n')),
780
 
                            ('d', BytesIO(b'missing file r\n'))]))
781
 
 
782
 
        self.check_transport_contents(
783
 
            'diff\ncontents for\na\n'
784
 
            'add\nsome\nmore\ncontents\n'
785
 
            'and\nthen\nsome\nmore\n'
786
 
            'a little bit more\n'
787
 
            'some text in a\n',
788
 
            t, 'a')
789
 
        self.check_transport_contents('missing file r\n', t, 'd')
 
696
                          t.append_bytes, 'missing/path', b'content')
790
697
 
791
698
    def test_append_file_mode(self):
792
699
        """Check that append accepts a mode parameter"""
794
701
        t = self.get_transport()
795
702
        if t.is_readonly():
796
703
            self.assertRaises(TransportNotPossible,
797
 
                t.append_file, 'f', BytesIO(b'f'), mode=None)
 
704
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
798
705
            return
799
706
        t.append_file('f', BytesIO(b'f'), mode=None)
800
707
 
803
710
        t = self.get_transport()
804
711
        if t.is_readonly():
805
712
            self.assertRaises(TransportNotPossible,
806
 
                t.append_bytes, 'f', 'f', mode=None)
 
713
                              t.append_bytes, 'f', b'f', mode=None)
807
714
            return
808
 
        t.append_bytes('f', 'f', mode=None)
 
715
        t.append_bytes('f', b'f', mode=None)
809
716
 
810
717
    def test_delete(self):
811
718
        # TODO: Test Transport.delete
816
723
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
817
724
            return
818
725
 
819
 
        t.put_bytes('a', 'a little bit of text\n')
 
726
        t.put_bytes('a', b'a little bit of text\n')
820
727
        self.assertTrue(t.has('a'))
821
728
        t.delete('a')
822
729
        self.assertFalse(t.has('a'))
823
730
 
824
731
        self.assertRaises(NoSuchFile, t.delete, 'a')
825
732
 
826
 
        t.put_bytes('a', 'a text\n')
827
 
        t.put_bytes('b', 'b text\n')
828
 
        t.put_bytes('c', 'c text\n')
 
733
        t.put_bytes('a', b'a text\n')
 
734
        t.put_bytes('b', b'b text\n')
 
735
        t.put_bytes('c', b'c text\n')
829
736
        self.assertEqual([True, True, True],
830
 
                list(t.has_multi(['a', 'b', 'c'])))
831
 
        t.delete_multi(['a', 'c'])
 
737
                         [t.has(n) for n in ['a', 'b', 'c']])
 
738
        t.delete('a')
 
739
        t.delete('c')
832
740
        self.assertEqual([False, True, False],
833
 
                list(t.has_multi(['a', 'b', 'c'])))
 
741
                         [t.has(n) for n in ['a', 'b', 'c']])
834
742
        self.assertFalse(t.has('a'))
835
743
        self.assertTrue(t.has('b'))
836
744
        self.assertFalse(t.has('c'))
837
745
 
838
 
        self.assertRaises(NoSuchFile,
839
 
                t.delete_multi, ['a', 'b', 'c'])
840
 
 
841
 
        self.assertRaises(NoSuchFile,
842
 
                t.delete_multi, iter(['a', 'b', 'c']))
843
 
 
844
 
        t.put_bytes('a', 'another a text\n')
845
 
        t.put_bytes('c', 'another c text\n')
846
 
        t.delete_multi(iter(['a', 'b', 'c']))
 
746
        for name in ['a', 'c', 'd']:
 
747
            self.assertRaises(NoSuchFile, t.delete, name)
847
748
 
848
749
        # We should have deleted everything
849
750
        # SftpServer creates control files in the
896
797
        if t.is_readonly():
897
798
            return
898
799
        t.mkdir('foo')
899
 
        t.put_bytes('foo-bar', '')
 
800
        t.put_bytes('foo-bar', b'')
900
801
        t.mkdir('foo-baz')
901
802
        t.rmdir('foo')
902
803
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
941
842
        t.mkdir('b')
942
843
        ta = t.clone('a')
943
844
        tb = t.clone('b')
944
 
        ta.put_bytes('f', 'aoeu')
 
845
        ta.put_bytes('f', b'aoeu')
945
846
        ta.rename('f', '../b/f')
946
847
        self.assertTrue(tb.has('f'))
947
848
        self.assertFalse(ta.has('f'))
989
890
        # creates control files in the working directory
990
891
        # perhaps all of this could be done in a subdirectory
991
892
 
992
 
        t.put_bytes('a', 'a first file\n')
993
 
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
 
893
        t.put_bytes('a', b'a first file\n')
 
894
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
994
895
 
995
896
        t.move('a', 'b')
996
897
        self.assertTrue(t.has('b'))
997
898
        self.assertFalse(t.has('a'))
998
899
 
999
 
        self.check_transport_contents('a first file\n', t, 'b')
1000
 
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
 
900
        self.check_transport_contents(b'a first file\n', t, 'b')
 
901
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1001
902
 
1002
903
        # Overwrite a file
1003
 
        t.put_bytes('c', 'c this file\n')
 
904
        t.put_bytes('c', b'c this file\n')
1004
905
        t.move('c', 'b')
1005
906
        self.assertFalse(t.has('c'))
1006
 
        self.check_transport_contents('c this file\n', t, 'b')
 
907
        self.check_transport_contents(b'c this file\n', t, 'b')
1007
908
 
1008
909
        # TODO: Try to write a test for atomicity
1009
910
        # TODO: Test moving into a non-existent subdirectory
1010
 
        # TODO: Test Transport.move_multi
1011
911
 
1012
912
    def test_copy(self):
1013
913
        t = self.get_transport()
1015
915
        if t.is_readonly():
1016
916
            return
1017
917
 
1018
 
        t.put_bytes('a', 'a file\n')
 
918
        t.put_bytes('a', b'a file\n')
1019
919
        t.copy('a', 'b')
1020
 
        self.check_transport_contents('a file\n', t, 'b')
 
920
        self.check_transport_contents(b'a file\n', t, 'b')
1021
921
 
1022
922
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1023
923
        os.mkdir('c')
1024
924
        # What should the assert be if you try to copy a
1025
925
        # file over a directory?
1026
926
        #self.assertRaises(Something, t.copy, 'a', 'c')
1027
 
        t.put_bytes('d', 'text in d\n')
 
927
        t.put_bytes('d', b'text in d\n')
1028
928
        t.copy('d', 'b')
1029
 
        self.check_transport_contents('text in d\n', t, 'b')
1030
 
 
1031
 
        # TODO: test copy_multi
 
929
        self.check_transport_contents(b'text in d\n', t, 'b')
1032
930
 
1033
931
    def test_connection_error(self):
1034
932
        """ConnectionError is raised when connection is impossible.
1068
966
                self.assertTrue(S_ISREG(st.st_mode))
1069
967
                self.assertEqual(size, st.st_size)
1070
968
 
1071
 
        remote_stats = list(t.stat_multi(paths))
1072
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1073
 
 
1074
969
        self.assertRaises(NoSuchFile, t.stat, 'q')
1075
970
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1076
971
 
1077
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1078
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1079
972
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1080
973
        subdir = t.clone('subdir')
1081
974
        st = subdir.stat('./file')
1121
1014
 
1122
1015
            st = t.stat(link_name)
1123
1016
            self.assertTrue(S_ISLNK(st.st_mode),
1124
 
                "expected symlink, got mode %o" % st.st_mode)
1125
 
        except TransportNotPossible:
1126
 
            raise TestSkipped("Transport %s does not support symlinks." %
1127
 
                              self._server.__class__)
1128
 
        except IOError:
1129
 
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1017
                            "expected symlink, got mode %o" % st.st_mode)
 
1018
        except TransportNotPossible:
 
1019
            raise TestSkipped("Transport %s does not support symlinks." %
 
1020
                              self._server.__class__)
 
1021
 
 
1022
        self.assertEqual(source_name, t.readlink(link_name))
 
1023
 
 
1024
    def test_readlink_nonexistent(self):
 
1025
        t = self.get_transport()
 
1026
        try:
 
1027
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
 
1028
        except TransportNotPossible:
 
1029
            raise TestSkipped("Transport %s does not support symlinks." %
 
1030
                              self._server.__class__)
1130
1031
 
1131
1032
    def test_list_dir(self):
1132
1033
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1212
1113
 
1213
1114
            Only the parameters different from None will be changed.
1214
1115
            """
1215
 
            if scheme   is None: scheme   = t._parsed_url.scheme
1216
 
            if user     is None: user     = t._parsed_url.user
1217
 
            if password is None: password = t._parsed_url.password
1218
 
            if user     is None: user     = t._parsed_url.user
1219
 
            if host     is None: host     = t._parsed_url.host
1220
 
            if port     is None: port     = t._parsed_url.port
1221
 
            if path     is None: path     = t._parsed_url.path
 
1116
            if scheme is None:
 
1117
                scheme = t._parsed_url.scheme
 
1118
            if user is None:
 
1119
                user = t._parsed_url.user
 
1120
            if password is None:
 
1121
                password = t._parsed_url.password
 
1122
            if user is None:
 
1123
                user = t._parsed_url.user
 
1124
            if host is None:
 
1125
                host = t._parsed_url.host
 
1126
            if port is None:
 
1127
                port = t._parsed_url.port
 
1128
            if path is None:
 
1129
                path = t._parsed_url.path
1222
1130
            return str(urlutils.URL(scheme, user, password, host, port, path))
1223
1131
 
1224
1132
        if t._parsed_url.scheme == 'ftp':
1243
1151
        #   (they may be typed by the user when prompted for example)
1244
1152
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1245
1153
        # We will not connect, we can use a invalid host
1246
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1154
        self.assertIsNot(t, t._reuse_for(
 
1155
            new_url(host=t._parsed_url.host + 'bar')))
1247
1156
        if t._parsed_url.port == 1234:
1248
1157
            port = 4321
1249
1158
        else:
1259
1168
 
1260
1169
        c = t.clone('subdir')
1261
1170
        # Some transports will create the connection  only when needed
1262
 
        t.has('surely_not') # Force connection
 
1171
        t.has('surely_not')  # Force connection
1263
1172
        self.assertIs(t._get_connection(), c._get_connection())
1264
1173
 
1265
1174
        # Temporary failure, we need to create a new dummy connection
1274
1183
        if not isinstance(t, ConnectedTransport):
1275
1184
            raise TestSkipped("not a connected transport")
1276
1185
 
1277
 
        t.has('surely_not') # Force connection
 
1186
        t.has('surely_not')  # Force connection
1278
1187
        self.assertIsNot(None, t._get_connection())
1279
1188
 
1280
1189
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1310
1219
        self.assertFalse(t3.has('b/d'))
1311
1220
 
1312
1221
        if t1.is_readonly():
1313
 
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1222
            self.build_tree_contents([('b/d', b'newfile\n')])
1314
1223
        else:
1315
 
            t2.put_bytes('d', 'newfile\n')
 
1224
            t2.put_bytes('d', b'newfile\n')
1316
1225
 
1317
1226
        self.assertTrue(t1.has('b/d'))
1318
1227
        self.assertTrue(t2.has('d'))
1326
1235
        new_transport = root_transport.clone("..")
1327
1236
        # as we are walking up directories, the path must be
1328
1237
        # growing less, except at the top
1329
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1330
 
            or new_transport.base == root_transport.base)
 
1238
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1239
                        new_transport.base == root_transport.base)
1331
1240
        while new_transport.base != root_transport.base:
1332
1241
            root_transport = new_transport
1333
1242
            new_transport = root_transport.clone("..")
1334
1243
            # as we are walking up directories, the path must be
1335
1244
            # growing less, except at the top
1336
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1337
 
                or new_transport.base == root_transport.base)
 
1245
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1246
                            new_transport.base == root_transport.base)
1338
1247
 
1339
1248
        # Cloning to "/" should take us to exactly the same location.
1340
1249
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1350
1259
        orig_transport = self.get_transport()
1351
1260
        root_transport = orig_transport.clone('/')
1352
1261
        self.assertEqual(root_transport.base + '.bzr/',
1353
 
            root_transport.clone('.bzr').base)
 
1262
                         root_transport.clone('.bzr').base)
1354
1263
 
1355
1264
    def test_base_url(self):
1356
1265
        t = self.get_transport()
1451
1360
                         'isolated/dir/',
1452
1361
                         'isolated/dir/foo',
1453
1362
                         'isolated/dir/bar',
1454
 
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1363
                         'isolated/dir/b%25z',  # make sure quoting is correct
1455
1364
                         'isolated/bar'],
1456
1365
                        transport=transport)
1457
1366
        paths = set(transport.iter_files_recursive())
1458
1367
        # nb the directories are not converted
1459
1368
        self.assertEqual(paths,
1460
 
                    {'isolated/dir/foo',
1461
 
                         'isolated/dir/bar',
1462
 
                         'isolated/dir/b%2525z',
1463
 
                         'isolated/bar'})
 
1369
                         {'isolated/dir/foo',
 
1370
                          'isolated/dir/bar',
 
1371
                          'isolated/dir/b%2525z',
 
1372
                          'isolated/bar'})
1464
1373
        sub_transport = transport.clone('isolated')
1465
1374
        paths = set(sub_transport.iter_files_recursive())
1466
1375
        self.assertEqual(paths,
1467
 
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1376
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1468
1377
 
1469
1378
    def test_copy_tree(self):
1470
1379
        # TODO: test file contents and permissions are preserved. This test was
1481
1390
                         'from/dir/',
1482
1391
                         'from/dir/foo',
1483
1392
                         'from/dir/bar',
1484
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1393
                         'from/dir/b%25z',  # make sure quoting is correct
1485
1394
                         'from/bar'],
1486
1395
                        transport=transport)
1487
1396
        transport.copy_tree('from', 'to')
1488
1397
        paths = set(transport.iter_files_recursive())
1489
1398
        self.assertEqual(paths,
1490
 
                    {'from/dir/foo',
1491
 
                         'from/dir/bar',
1492
 
                         'from/dir/b%2525z',
1493
 
                         'from/bar',
1494
 
                         'to/dir/foo',
1495
 
                         'to/dir/bar',
1496
 
                         'to/dir/b%2525z',
1497
 
                         'to/bar',})
 
1399
                         {'from/dir/foo',
 
1400
                          'from/dir/bar',
 
1401
                          'from/dir/b%2525z',
 
1402
                          'from/bar',
 
1403
                          'to/dir/foo',
 
1404
                          'to/dir/bar',
 
1405
                          'to/dir/b%2525z',
 
1406
                          'to/bar', })
1498
1407
 
1499
1408
    def test_copy_tree_to_transport(self):
1500
1409
        transport = self.get_transport()
1508
1417
                         'from/dir/',
1509
1418
                         'from/dir/foo',
1510
1419
                         'from/dir/bar',
1511
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1420
                         'from/dir/b%25z',  # make sure quoting is correct
1512
1421
                         'from/bar'],
1513
1422
                        transport=transport)
1514
1423
        from_transport = transport.clone('from')
1517
1426
        from_transport.copy_tree_to_transport(to_transport)
1518
1427
        paths = set(transport.iter_files_recursive())
1519
1428
        self.assertEqual(paths,
1520
 
                    {'from/dir/foo',
1521
 
                         'from/dir/bar',
1522
 
                         'from/dir/b%2525z',
1523
 
                         'from/bar',
1524
 
                         'to/dir/foo',
1525
 
                         'to/dir/bar',
1526
 
                         'to/dir/b%2525z',
1527
 
                         'to/bar',})
 
1429
                         {'from/dir/foo',
 
1430
                          'from/dir/bar',
 
1431
                          'from/dir/b%2525z',
 
1432
                          'from/bar',
 
1433
                          'to/dir/foo',
 
1434
                          'to/dir/bar',
 
1435
                          'to/dir/b%2525z',
 
1436
                          'to/bar', })
1528
1437
 
1529
1438
    def test_unicode_paths(self):
1530
1439
        """Test that we can read/write files with Unicode names."""
1534
1443
        # '\xe5' and '\xe4' actually map to the same file
1535
1444
        # adding a suffix kicks in the 'preserving but insensitive'
1536
1445
        # route, and maintains the right files
1537
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1538
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1539
 
                 u'\u017d', # Z with umlat iso-8859-2
1540
 
                 u'\u062c', # Arabic j
1541
 
                 u'\u0410', # Russian A
1542
 
                 u'\u65e5', # Kanji person
1543
 
                ]
 
1446
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1447
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1448
                 u'\u017d',  # Z with umlat iso-8859-2
 
1449
                 u'\u062c',  # Arabic j
 
1450
                 u'\u0410',  # Russian A
 
1451
                 u'\u65e5',  # Kanji person
 
1452
                 ]
1544
1453
 
1545
1454
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1546
1455
        if no_unicode_support:
1549
1458
        try:
1550
1459
            self.build_tree(files, transport=t, line_endings='binary')
1551
1460
        except UnicodeError:
1552
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1461
            raise TestSkipped(
 
1462
                "cannot handle unicode paths in current encoding")
1553
1463
 
1554
1464
        # A plain unicode string is not a valid url
1555
1465
        for fname in files:
1557
1467
 
1558
1468
        for fname in files:
1559
1469
            fname_utf8 = fname.encode('utf-8')
1560
 
            contents = 'contents of %s\n' % (fname_utf8,)
 
1470
            contents = b'contents of %s\n' % (fname_utf8,)
1561
1471
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1562
1472
 
1563
1473
    def test_connect_twice_is_same_content(self):
1566
1476
        transport = self.get_transport()
1567
1477
        if transport.is_readonly():
1568
1478
            return
1569
 
        transport.put_bytes('foo', 'bar')
 
1479
        transport.put_bytes('foo', b'bar')
1570
1480
        transport3 = self.get_transport()
1571
 
        self.check_transport_contents('bar', transport3, 'foo')
 
1481
        self.check_transport_contents(b'bar', transport3, 'foo')
1572
1482
 
1573
1483
        # now opening at a relative url should give use a sane result:
1574
1484
        transport.mkdir('newdir')
1575
1485
        transport5 = self.get_transport('newdir')
1576
1486
        transport6 = transport5.clone('..')
1577
 
        self.check_transport_contents('bar', transport6, 'foo')
 
1487
        self.check_transport_contents(b'bar', transport6, 'foo')
1578
1488
 
1579
1489
    def test_lock_write(self):
1580
1490
        """Test transport-level write locks.
1583
1493
        """
1584
1494
        transport = self.get_transport()
1585
1495
        if transport.is_readonly():
1586
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1496
            self.assertRaises(TransportNotPossible,
 
1497
                              transport.lock_write, 'foo')
1587
1498
            return
1588
 
        transport.put_bytes('lock', '')
 
1499
        transport.put_bytes('lock', b'')
1589
1500
        try:
1590
1501
            lock = transport.lock_write('lock')
1591
1502
        except TransportNotPossible:
1601
1512
        """
1602
1513
        transport = self.get_transport()
1603
1514
        if transport.is_readonly():
1604
 
            file('lock', 'w').close()
 
1515
            open('lock', 'w').close()
1605
1516
        else:
1606
 
            transport.put_bytes('lock', '')
 
1517
            transport.put_bytes('lock', b'')
1607
1518
        try:
1608
1519
            lock = transport.lock_read('lock')
1609
1520
        except TransportNotPossible:
1615
1526
    def test_readv(self):
1616
1527
        transport = self.get_transport()
1617
1528
        if transport.is_readonly():
1618
 
            with file('a', 'w') as f: f.write('0123456789')
 
1529
            with open('a', 'w') as f:
 
1530
                f.write('0123456789')
1619
1531
        else:
1620
 
            transport.put_bytes('a', '0123456789')
 
1532
            transport.put_bytes('a', b'0123456789')
1621
1533
 
1622
1534
        d = list(transport.readv('a', ((0, 1),)))
1623
 
        self.assertEqual(d[0], (0, '0'))
 
1535
        self.assertEqual(d[0], (0, b'0'))
1624
1536
 
1625
1537
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1626
 
        self.assertEqual(d[0], (0, '0'))
1627
 
        self.assertEqual(d[1], (1, '1'))
1628
 
        self.assertEqual(d[2], (3, '34'))
1629
 
        self.assertEqual(d[3], (9, '9'))
 
1538
        self.assertEqual(d[0], (0, b'0'))
 
1539
        self.assertEqual(d[1], (1, b'1'))
 
1540
        self.assertEqual(d[2], (3, b'34'))
 
1541
        self.assertEqual(d[3], (9, b'9'))
1630
1542
 
1631
1543
    def test_readv_out_of_order(self):
1632
1544
        transport = self.get_transport()
1633
1545
        if transport.is_readonly():
1634
 
            with file('a', 'w') as f: f.write('0123456789')
 
1546
            with open('a', 'w') as f:
 
1547
                f.write('0123456789')
1635
1548
        else:
1636
 
            transport.put_bytes('a', '01234567890')
 
1549
            transport.put_bytes('a', b'01234567890')
1637
1550
 
1638
1551
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1639
 
        self.assertEqual(d[0], (1, '1'))
1640
 
        self.assertEqual(d[1], (9, '9'))
1641
 
        self.assertEqual(d[2], (0, '0'))
1642
 
        self.assertEqual(d[3], (3, '34'))
 
1552
        self.assertEqual(d[0], (1, b'1'))
 
1553
        self.assertEqual(d[1], (9, b'9'))
 
1554
        self.assertEqual(d[2], (0, b'0'))
 
1555
        self.assertEqual(d[3], (3, b'34'))
1643
1556
 
1644
1557
    def test_readv_with_adjust_for_latency(self):
1645
1558
        transport = self.get_transport()
1650
1563
        # reference the returned data with the random data. To avoid doing
1651
1564
        # multiple large random byte look ups we do several tests on the same
1652
1565
        # backing data.
1653
 
        content = osutils.rand_bytes(200*1024)
 
1566
        content = osutils.rand_bytes(200 * 1024)
1654
1567
        content_size = len(content)
1655
1568
        if transport.is_readonly():
1656
1569
            self.build_tree_contents([('a', content)])
1657
1570
        else:
1658
1571
            transport.put_bytes('a', content)
 
1572
 
1659
1573
        def check_result_data(result_vector):
1660
1574
            for item in result_vector:
1661
1575
                data_len = len(item[1])
1663
1577
 
1664
1578
        # start corner case
1665
1579
        result = list(transport.readv('a', ((0, 30),),
1666
 
            adjust_for_latency=True, upper_limit=content_size))
 
1580
                                      adjust_for_latency=True, upper_limit=content_size))
1667
1581
        # we expect 1 result, from 0, to something > 30
1668
1582
        self.assertEqual(1, len(result))
1669
1583
        self.assertEqual(0, result[0][0])
1671
1585
        check_result_data(result)
1672
1586
        # end of file corner case
1673
1587
        result = list(transport.readv('a', ((204700, 100),),
1674
 
            adjust_for_latency=True, upper_limit=content_size))
 
1588
                                      adjust_for_latency=True, upper_limit=content_size))
1675
1589
        # we expect 1 result, from 204800- its length, to the end
1676
1590
        self.assertEqual(1, len(result))
1677
1591
        data_len = len(result[0][1])
1678
 
        self.assertEqual(204800-data_len, result[0][0])
 
1592
        self.assertEqual(204800 - data_len, result[0][0])
1679
1593
        self.assertTrue(data_len >= 100)
1680
1594
        check_result_data(result)
1681
1595
        # out of order ranges are made in order
1682
1596
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1683
 
            adjust_for_latency=True, upper_limit=content_size))
 
1597
                                      adjust_for_latency=True, upper_limit=content_size))
1684
1598
        # we expect 2 results, in order, start and end.
1685
1599
        self.assertEqual(2, len(result))
1686
1600
        # start
1689
1603
        self.assertTrue(data_len >= 30)
1690
1604
        # end
1691
1605
        data_len = len(result[1][1])
1692
 
        self.assertEqual(204800-data_len, result[1][0])
 
1606
        self.assertEqual(204800 - data_len, result[1][0])
1693
1607
        self.assertTrue(data_len >= 100)
1694
1608
        check_result_data(result)
1695
1609
        # close ranges get combined (even if out of order)
1696
1610
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1697
1611
            result = list(transport.readv('a', request_vector,
1698
 
                adjust_for_latency=True, upper_limit=content_size))
 
1612
                                          adjust_for_latency=True, upper_limit=content_size))
1699
1613
            self.assertEqual(1, len(result))
1700
1614
            data_len = len(result[0][1])
1701
1615
            # minimum length is from 400 to 1034 - 634
1709
1623
        transport = self.get_transport()
1710
1624
        # test from observed failure case.
1711
1625
        if transport.is_readonly():
1712
 
            with file('a', 'w') as f: f.write('a'*1024*1024)
 
1626
            with open('a', 'w') as f:
 
1627
                f.write('a' * 1024 * 1024)
1713
1628
        else:
1714
 
            transport.put_bytes('a', 'a'*1024*1024)
 
1629
            transport.put_bytes('a', b'a' * 1024 * 1024)
1715
1630
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1716
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1717
 
            (465373, 800), (947422, 800)]
1718
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1719
 
        found_items = [False]*9
 
1631
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1632
                         (465373, 800), (947422, 800)]
 
1633
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1634
        found_items = [False] * 9
1720
1635
        for pos, (start, length) in enumerate(broken_vector):
1721
1636
            # check the range is covered by the result
1722
1637
            for offset, data in results:
1723
1638
                if offset <= start and start + length <= offset + len(data):
1724
1639
                    found_items[pos] = True
1725
 
        self.assertEqual([True]*9, found_items)
 
1640
        self.assertEqual([True] * 9, found_items)
1726
1641
 
1727
1642
    def test_get_with_open_write_stream_sees_all_content(self):
1728
1643
        t = self.get_transport()
1729
1644
        if t.is_readonly():
1730
1645
            return
1731
 
        handle = t.open_write_stream('foo')
1732
 
        try:
1733
 
            handle.write('bcd')
1734
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0, 1), (2, 1)))))
1735
 
        finally:
1736
 
            handle.close()
 
1646
        with t.open_write_stream('foo') as handle:
 
1647
            handle.write(b'bcd')
 
1648
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1649
                t.readv('foo', ((0, 1), (2, 1)))))
1737
1650
 
1738
1651
    def test_get_smart_medium(self):
1739
1652
        """All transports must either give a smart medium, or know they can't.
1749
1662
    def test_readv_short_read(self):
1750
1663
        transport = self.get_transport()
1751
1664
        if transport.is_readonly():
1752
 
            with file('a', 'w') as f: f.write('0123456789')
 
1665
            with open('a', 'w') as f:
 
1666
                f.write('0123456789')
1753
1667
        else:
1754
 
            transport.put_bytes('a', '01234567890')
 
1668
            transport.put_bytes('a', b'01234567890')
1755
1669
 
1756
1670
        # This is intentionally reading off the end of the file
1757
1671
        # since we are sure that it cannot get there
1806
1720
 
1807
1721
    def test_abspath_url_unquote_unreserved(self):
1808
1722
        """URLs from abspath should have unreserved characters unquoted
1809
 
        
 
1723
 
1810
1724
        Need consistent quoting notably for tildes, see lp:842223 for more.
1811
1725
        """
1812
1726
        t = self.get_transport()
1813
1727
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1814
1728
        self.assertEqual(t.base + "-.09AZ_az~",
1815
 
            t.abspath(needlessly_escaped_dir))
 
1729
                         t.abspath(needlessly_escaped_dir))
1816
1730
 
1817
1731
    def test_clone_url_unquote_unreserved(self):
1818
1732
        """Base URL of a cloned branch needs unreserved characters unquoted
1819
 
        
 
1733
 
1820
1734
        Cloned transports should be prefix comparable for things like the
1821
1735
        isolation checking of tests, see lp:842223 for more.
1822
1736
        """