/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2017-06-10 00:21:41 UTC
  • mto: This revision was merged to the branch mainline in revision 6675.
  • Revision ID: jelmer@jelmer.uk-20170610002141-m1z5k7fs8laesa65
Fix import.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
24
23
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
24
import stat
28
25
import sys
29
 
import unittest
30
26
 
31
 
from bzrlib import (
 
27
from .. import (
32
28
    errors,
33
29
    osutils,
 
30
    pyutils,
34
31
    tests,
 
32
    transport as _mod_transport,
35
33
    urlutils,
36
34
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      InvalidURL,
 
38
                      NoSuchFile,
 
39
                      PathError,
 
40
                      TransportNotPossible,
 
41
                      )
 
42
from ..osutils import getcwd
 
43
from ..sixish import (
 
44
    BytesIO,
 
45
    zip,
 
46
    )
 
47
from ..smart import medium
 
48
from . import (
51
49
    TestSkipped,
52
50
    TestNotApplicable,
53
51
    multiply_tests,
54
52
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
53
from . import test_server
 
54
from .test_transport import TestTransportImplementation
 
55
from ..transport import (
58
56
    ConnectedTransport,
59
 
    get_transport,
 
57
    Transport,
60
58
    _get_transport_modules,
61
59
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
60
from ..transport.memory import MemoryTransport
 
61
from ..transport.remote import RemoteTransport
63
62
 
64
63
 
65
64
def get_transport_test_permutations(module):
78
77
    for module in _get_transport_modules():
79
78
        try:
80
79
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
80
                pyutils.get_named_object(module))
82
81
            for (klass, server_factory) in permutations:
83
82
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
83
                    {"transport_class":klass,
85
84
                     "transport_server":server_factory})
86
85
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
86
        except errors.DependencyNotPresent as e:
88
87
            # Continue even if a dependency prevents us
89
88
            # from adding this test
90
89
            pass
91
90
    return result
92
91
 
93
92
 
94
 
def load_tests(standard_tests, module, loader):
 
93
def load_tests(loader, standard_tests, pattern):
95
94
    """Multiply tests for tranport implementations."""
96
95
    result = loader.suiteClass()
97
96
    scenarios = transport_test_permutations()
102
101
 
103
102
    def setUp(self):
104
103
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
104
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
105
 
107
106
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
107
        """Check that transport.get_bytes(relpath) == content."""
 
108
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
109
 
111
110
    def test_ensure_base_missing(self):
112
111
        """.ensure_base() should create the directory if it doesn't exist"""
192
191
        self.build_tree(files, transport=t, line_endings='binary')
193
192
        self.check_transport_contents('contents of a\n', t, 'a')
194
193
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
 
194
        # Must use iter zip() from future not old version which will fully
 
195
        # evaluate its inputs, the transport requests should be issued and
197
196
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
 
197
        for content, f in zip(contents, content_f):
199
198
            self.assertEqual(content, f.read())
200
199
 
201
200
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
 
201
        # Again this zip() must come from the future
 
202
        for content, f in zip(contents, content_f):
204
203
            self.assertEqual(content, f.read())
205
204
 
206
205
    def test_get_unknown_file(self):
211
210
                    ]
212
211
        self.build_tree(files, transport=t, line_endings='binary')
213
212
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
213
        def iterate_and_close(func, *args):
 
214
            for f in func(*args):
 
215
                # We call f.read() here because things like paramiko actually
 
216
                # spawn a thread to prefetch the content, which we want to
 
217
                # consume before we close the handle.
 
218
                content = f.read()
 
219
                f.close()
 
220
        self.assertRaises(NoSuchFile, iterate_and_close,
 
221
                          t.get_multi, ['a', 'b', 'c'])
 
222
        self.assertRaises(NoSuchFile, iterate_and_close,
 
223
                          t.get_multi, iter(['a', 'b', 'c']))
216
224
 
217
225
    def test_get_directory_read_gives_ReadError(self):
218
226
        """consistent errors for read() on a file returned by get()."""
251
259
 
252
260
    def test_get_bytes_unknown_file(self):
253
261
        t = self.get_transport()
254
 
 
255
262
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
263
 
257
264
    def test_get_with_open_write_stream_sees_all_content(self):
261
268
        handle = t.open_write_stream('foo')
262
269
        try:
263
270
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
 
271
            self.assertEqual('b', t.get_bytes('foo'))
265
272
        finally:
266
273
            handle.close()
267
274
 
273
280
        try:
274
281
            handle.write('b')
275
282
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
 
283
            f = t.get('foo')
 
284
            try:
 
285
                self.assertEqual('b', f.read())
 
286
            finally:
 
287
                f.close()
277
288
        finally:
278
289
            handle.close()
279
290
 
286
297
            return
287
298
 
288
299
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
 
300
        self.assertTrue(t.has('a'))
290
301
        self.check_transport_contents('some text for a\n', t, 'a')
291
302
 
292
303
        # The contents should be overwritten
304
315
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
305
316
            return
306
317
 
307
 
        self.failIf(t.has('a'))
 
318
        self.assertFalse(t.has('a'))
308
319
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
 
320
        self.assertTrue(t.has('a'))
310
321
        self.check_transport_contents('some text for a\n', t, 'a')
311
322
        # Put also replaces contents
312
323
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
324
335
        # Now test the create_parent flag
325
336
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
337
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
 
338
        self.assertFalse(t.has('dir/a'))
328
339
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
340
                               create_parent_dir=True)
330
341
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
353
        if not t._can_roundtrip_unix_modebits():
343
354
            # Can't roundtrip, so no need to run this test
344
355
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
356
        t.put_bytes('mode644', 'test text\n', mode=0o644)
 
357
        self.assertTransportMode(t, 'mode644', 0o644)
 
358
        t.put_bytes('mode666', 'test text\n', mode=0o666)
 
359
        self.assertTransportMode(t, 'mode666', 0o666)
 
360
        t.put_bytes('mode600', 'test text\n', mode=0o600)
 
361
        self.assertTransportMode(t, 'mode600', 0o600)
351
362
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
363
        t.put_bytes('mode400', 'test text\n', mode=0o400)
 
364
        self.assertTransportMode(t, 'mode400', 0o400)
354
365
 
355
366
        # The default permissions should be based on the current umask
356
367
        umask = osutils.get_umask()
357
368
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
369
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
370
 
360
371
    def test_put_bytes_non_atomic_permissions(self):
361
372
        t = self.get_transport()
365
376
        if not t._can_roundtrip_unix_modebits():
366
377
            # Can't roundtrip, so no need to run this test
367
378
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
379
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0o644)
 
380
        self.assertTransportMode(t, 'mode644', 0o644)
 
381
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0o666)
 
382
        self.assertTransportMode(t, 'mode666', 0o666)
 
383
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0o600)
 
384
        self.assertTransportMode(t, 'mode600', 0o600)
 
385
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0o400)
 
386
        self.assertTransportMode(t, 'mode400', 0o400)
376
387
 
377
388
        # The default permissions should be based on the current umask
378
389
        umask = osutils.get_umask()
379
390
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
391
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
392
 
382
393
        # We should also be able to set the mode for a parent directory
383
394
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
395
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0o664,
 
396
                               dir_mode=0o700, create_parent_dir=True)
 
397
        self.assertTransportMode(t, 'dir700', 0o700)
 
398
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0o664,
 
399
                               dir_mode=0o770, create_parent_dir=True)
 
400
        self.assertTransportMode(t, 'dir770', 0o770)
 
401
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0o664,
 
402
                               dir_mode=0o777, create_parent_dir=True)
 
403
        self.assertTransportMode(t, 'dir777', 0o777)
393
404
 
394
405
    def test_put_file(self):
395
406
        t = self.get_transport()
396
407
 
397
408
        if t.is_readonly():
398
409
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
410
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
400
411
            return
401
412
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
414
        # put_file returns the length of the data written
404
415
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
 
416
        self.assertTrue(t.has('a'))
406
417
        self.check_transport_contents('some text for a\n', t, 'a')
407
418
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
420
        self.assertEqual(19, result)
410
421
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
422
        self.assertRaises(NoSuchFile,
412
423
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
424
                              BytesIO(b'contents'))
414
425
 
415
426
    def test_put_file_non_atomic(self):
416
427
        t = self.get_transport()
417
428
 
418
429
        if t.is_readonly():
419
430
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
431
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
432
            return
422
433
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
 
434
        self.assertFalse(t.has('a'))
 
435
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
436
        self.assertTrue(t.has('a'))
426
437
        self.check_transport_contents('some text for a\n', t, 'a')
427
438
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
439
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
429
440
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
430
441
 
431
442
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
443
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
444
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
 
445
        t.put_file_non_atomic('a', BytesIO(b''))
435
446
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
447
        self.check_transport_contents('', t, 'a')
437
448
 
438
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
450
                                       BytesIO(b'contents\n'))
440
451
        # Now test the create_parent flag
441
452
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
453
                                       BytesIO(b'contents\n'))
 
454
        self.assertFalse(t.has('dir/a'))
 
455
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
456
                              create_parent_dir=True)
446
457
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
447
458
 
448
459
        # But we still get NoSuchFile if we can't make the parent dir
449
460
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
 
461
                                       BytesIO(b'contents\n'),
451
462
                                       create_parent_dir=True)
452
463
 
453
464
    def test_put_file_permissions(self):
459
470
        if not t._can_roundtrip_unix_modebits():
460
471
            # Can't roundtrip, so no need to run this test
461
472
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
473
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
474
        self.assertTransportMode(t, 'mode644', 0o644)
 
475
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
476
        self.assertTransportMode(t, 'mode666', 0o666)
 
477
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
478
        self.assertTransportMode(t, 'mode600', 0o600)
468
479
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
480
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
481
        self.assertTransportMode(t, 'mode400', 0o400)
471
482
        # The default permissions should be based on the current umask
472
483
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
484
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
485
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
486
 
476
487
    def test_put_file_non_atomic_permissions(self):
477
488
        t = self.get_transport()
481
492
        if not t._can_roundtrip_unix_modebits():
482
493
            # Can't roundtrip, so no need to run this test
483
494
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
495
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
496
        self.assertTransportMode(t, 'mode644', 0o644)
 
497
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
498
        self.assertTransportMode(t, 'mode666', 0o666)
 
499
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
500
        self.assertTransportMode(t, 'mode600', 0o600)
490
501
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
502
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
503
        self.assertTransportMode(t, 'mode400', 0o400)
493
504
 
494
505
        # The default permissions should be based on the current umask
495
506
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
507
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
508
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
509
 
499
510
        # We should also be able to set the mode for a parent directory
500
511
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
512
        sio = BytesIO()
 
513
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
514
                              dir_mode=0o700, create_parent_dir=True)
 
515
        self.assertTransportMode(t, 'dir700', 0o700)
 
516
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
517
                              dir_mode=0o770, create_parent_dir=True)
 
518
        self.assertTransportMode(t, 'dir770', 0o770)
 
519
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
520
                              dir_mode=0o777, create_parent_dir=True)
 
521
        self.assertTransportMode(t, 'dir777', 0o777)
511
522
 
512
523
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
524
        t = self.get_transport()
520
525
        if t.is_readonly():
521
526
            return
522
527
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
528
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
529
 
540
530
    def test_mkdir(self):
541
531
        t = self.get_transport()
576
566
 
577
567
        # Test get/put in sub-directories
578
568
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
569
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
580
570
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
571
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
582
572
 
591
581
            # no sense testing on this transport
592
582
            return
593
583
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
584
        t.mkdir('dmode755', mode=0o755)
 
585
        self.assertTransportMode(t, 'dmode755', 0o755)
 
586
        t.mkdir('dmode555', mode=0o555)
 
587
        self.assertTransportMode(t, 'dmode555', 0o555)
 
588
        t.mkdir('dmode777', mode=0o777)
 
589
        self.assertTransportMode(t, 'dmode777', 0o777)
 
590
        t.mkdir('dmode700', mode=0o700)
 
591
        self.assertTransportMode(t, 'dmode700', 0o700)
 
592
        t.mkdir_multi(['mdmode755'], mode=0o755)
 
593
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
594
 
605
595
        # Default mode should be based on umask
606
596
        umask = osutils.get_umask()
607
597
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
598
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
599
 
610
600
    def test_opening_a_file_stream_creates_file(self):
611
601
        t = self.get_transport()
620
610
    def test_opening_a_file_stream_can_set_mode(self):
621
611
        t = self.get_transport()
622
612
        if t.is_readonly():
 
613
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
614
                              t.open_write_stream, 'foo')
623
615
            return
624
616
        if not t._can_roundtrip_unix_modebits():
625
617
            # Can't roundtrip, so no need to run this test
626
618
            return
 
619
 
627
620
        def check_mode(name, mode, expected):
628
621
            handle = t.open_write_stream(name, mode=mode)
629
622
            handle.close()
630
623
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
624
        check_mode('mode644', 0o644, 0o644)
 
625
        check_mode('mode666', 0o666, 0o666)
 
626
        check_mode('mode600', 0o600, 0o600)
634
627
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
628
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
629
 
637
630
    def test_copy_to(self):
638
631
        # FIXME: test:   same server to same server (partly done)
645
638
            self.build_tree(files, transport=transport_from)
646
639
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
640
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
641
                self.check_transport_contents(transport_to.get_bytes(f),
649
642
                                              transport_from, f)
650
643
 
651
644
        t = self.get_transport()
 
645
        if t.__class__.__name__ == "SFTPTransport":
 
646
            self.skipTest("SFTP copy_to currently too flakey to use")
652
647
        temp_transport = MemoryTransport('memory:///')
653
648
        simple_copy_files(t, temp_transport)
654
649
        if not t.is_readonly():
674
669
        files = ['a', 'b', 'c', 'd']
675
670
        t.copy_to(iter(files), temp_transport)
676
671
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
672
            self.check_transport_contents(temp_transport.get_bytes(f),
678
673
                                          t, f)
679
674
        del temp_transport
680
675
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
676
        for mode in (0o666, 0o644, 0o600, 0o400):
682
677
            temp_transport = MemoryTransport("memory:///")
683
678
            t.copy_to(files, temp_transport, mode=mode)
684
679
            for f in files:
705
700
        t.put_bytes('b', 'contents\nfor b\n')
706
701
 
707
702
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
703
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
704
 
710
705
        self.check_transport_contents(
711
706
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
713
708
 
714
709
        # a file with no parent should fail..
715
710
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
711
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
712
 
718
713
        # And we can create new files, too
719
714
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
715
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
721
716
        self.check_transport_contents('some text\nfor a missing file\n',
722
717
                                      t, 'c')
723
718
 
753
748
        t.put_bytes('b', 'contents\nfor b\n')
754
749
 
755
750
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
751
            t.append_multi([('a', BytesIO(b'and\nthen\nsome\nmore\n')),
 
752
                            ('b', BytesIO(b'some\nmore\nfor\nb\n'))]))
758
753
 
759
754
        self.check_transport_contents(
760
755
            'diff\ncontents for\na\n'
767
762
                t, 'b')
768
763
 
769
764
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
 
765
            t.append_multi(iter([('a', BytesIO(b'a little bit more\n')),
 
766
                                 ('b', BytesIO(b'from an iterator\n'))])))
772
767
        self.check_transport_contents(
773
768
            'diff\ncontents for\na\n'
774
769
            'add\nsome\nmore\ncontents\n'
782
777
                t, 'b')
783
778
 
784
779
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
 
780
            t.append_multi([('a', BytesIO(b'some text in a\n')),
 
781
                            ('d', BytesIO(b'missing file r\n'))]))
787
782
 
788
783
        self.check_transport_contents(
789
784
            'diff\ncontents for\na\n'
800
795
        t = self.get_transport()
801
796
        if t.is_readonly():
802
797
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
798
                t.append_file, 'f', BytesIO(b'f'), mode=None)
804
799
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
800
        t.append_file('f', BytesIO(b'f'), mode=None)
806
801
 
807
802
    def test_append_bytes_mode(self):
808
803
        # check append_bytes accepts a mode
823
818
            return
824
819
 
825
820
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
821
        self.assertTrue(t.has('a'))
827
822
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
823
        self.assertFalse(t.has('a'))
829
824
 
830
825
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
826
 
837
832
        t.delete_multi(['a', 'c'])
838
833
        self.assertEqual([False, True, False],
839
834
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
 
835
        self.assertFalse(t.has('a'))
 
836
        self.assertTrue(t.has('b'))
 
837
        self.assertFalse(t.has('c'))
843
838
 
844
839
        self.assertRaises(NoSuchFile,
845
840
                t.delete_multi, ['a', 'b', 'c'])
906
901
        t.mkdir('foo-baz')
907
902
        t.rmdir('foo')
908
903
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
904
        self.assertTrue(t.has('foo-bar'))
910
905
 
911
906
    def test_rename_dir_succeeds(self):
912
907
        t = self.get_transport()
913
908
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
909
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
910
                              t.rename, 'foo', 'bar')
 
911
            return
915
912
        t.mkdir('adir')
916
913
        t.mkdir('adir/asubdir')
917
914
        t.rename('adir', 'bdir')
922
919
        """Attempting to replace a nonemtpy directory should fail"""
923
920
        t = self.get_transport()
924
921
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
922
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
923
                              t.rename, 'foo', 'bar')
 
924
            return
926
925
        t.mkdir('adir')
927
926
        t.mkdir('adir/asubdir')
928
927
        t.mkdir('bdir')
992
991
        # perhaps all of this could be done in a subdirectory
993
992
 
994
993
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
994
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
996
995
 
997
996
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
997
        self.assertTrue(t.has('b'))
 
998
        self.assertFalse(t.has('a'))
1000
999
 
1001
1000
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
1001
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
1003
1002
 
1004
1003
        # Overwrite a file
1005
1004
        t.put_bytes('c', 'c this file\n')
1006
1005
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
 
1006
        self.assertFalse(t.has('c'))
1008
1007
        self.check_transport_contents('c this file\n', t, 'b')
1009
1008
 
1010
1009
        # TODO: Try to write a test for atomicity
1042
1041
        except NotImplementedError:
1043
1042
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
1043
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
1044
        t = _mod_transport.get_transport_from_url(url)
1046
1045
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
1046
 
1048
1047
    def test_stat(self):
1053
1052
 
1054
1053
        try:
1055
1054
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
1055
        except TransportNotPossible as e:
1057
1056
            # This transport cannot stat
1058
1057
            return
1059
1058
 
1064
1063
        for path, size in zip(paths, sizes):
1065
1064
            st = t.stat(path)
1066
1065
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1066
                self.assertTrue(S_ISDIR(st.st_mode))
1068
1067
                # directory sizes are meaningless
1069
1068
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
1069
                self.assertTrue(S_ISREG(st.st_mode))
1071
1070
                self.assertEqual(size, st.st_size)
1072
1071
 
1073
1072
        remote_stats = list(t.stat_multi(paths))
1080
1079
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
1080
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
1081
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
1082
        st = subdir.stat('./file')
 
1083
        st = subdir.stat('.')
1085
1084
 
1086
1085
    def test_hardlink(self):
1087
1086
        from stat import ST_NLINK
1096
1095
        try:
1097
1096
            t.hardlink(source_name, link_name)
1098
1097
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
1098
            self.assertTrue(t.has(source_name))
 
1099
            self.assertTrue(t.has(link_name))
1101
1100
 
1102
1101
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1102
            self.assertEqual(st[ST_NLINK], 2)
1104
1103
        except TransportNotPossible:
1105
1104
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
1105
                              self._server.__class__)
1118
1117
        try:
1119
1118
            t.symlink(source_name, link_name)
1120
1119
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1120
            self.assertTrue(t.has(source_name))
 
1121
            self.assertTrue(t.has(link_name))
1123
1122
 
1124
1123
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
 
1124
            self.assertTrue(S_ISLNK(st.st_mode),
 
1125
                "expected symlink, got mode %o" % st.st_mode)
1126
1126
        except TransportNotPossible:
1127
1127
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1128
                              self._server.__class__)
1129
1129
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1130
            self.knownFailure("Paramiko fails to create symlinks during tests")
1131
1131
 
1132
1132
    def test_list_dir(self):
1133
1133
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1138
            return
1139
1139
 
1140
1140
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1141
            l = sorted(transport.list_dir(d))
1143
1142
            return l
1144
1143
 
1145
1144
        self.assertEqual([], sorted_list('.', t))
1197
1196
            raise TestSkipped("not a connected transport")
1198
1197
 
1199
1198
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1199
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1200
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1201
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1202
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1203
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1204
 
1206
1205
    def test__reuse_for(self):
1207
1206
        t = self.get_transport()
1214
1213
 
1215
1214
            Only the parameters different from None will be changed.
1216
1215
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1216
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if password is None: password = t._parsed_url.password
 
1219
            if user     is None: user     = t._parsed_url.user
 
1220
            if host     is None: host     = t._parsed_url.host
 
1221
            if port     is None: port     = t._parsed_url.port
 
1222
            if path     is None: path     = t._parsed_url.path
 
1223
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1224
 
1226
 
        if t._scheme == 'ftp':
 
1225
        if t._parsed_url.scheme == 'ftp':
1227
1226
            scheme = 'sftp'
1228
1227
        else:
1229
1228
            scheme = 'ftp'
1230
1229
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1230
        if t._parsed_url.user == 'me':
1232
1231
            user = 'you'
1233
1232
        else:
1234
1233
            user = 'me'
1245
1244
        #   (they may be typed by the user when prompted for example)
1246
1245
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1246
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1247
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1248
        if t._parsed_url.port == 1234:
1250
1249
            port = 4321
1251
1250
        else:
1252
1251
            port = 1234
1265
1264
        self.assertIs(t._get_connection(), c._get_connection())
1266
1265
 
1267
1266
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1267
        new_connection = None
1269
1268
        t._set_connection(new_connection)
1270
1269
        # Check that both transports use the same connection
1271
1270
        self.assertIs(new_connection, t._get_connection())
1293
1292
 
1294
1293
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1294
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1295
        self.assertTrue(t1.has('a'))
 
1296
        self.assertTrue(t1.has('b/c'))
 
1297
        self.assertFalse(t1.has('c'))
1299
1298
 
1300
1299
        t2 = t1.clone('b')
1301
1300
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1301
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1302
        self.assertTrue(t2.has('c'))
 
1303
        self.assertFalse(t2.has('a'))
1305
1304
 
1306
1305
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1306
        self.assertTrue(t3.has('a'))
 
1307
        self.assertFalse(t3.has('c'))
1309
1308
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1309
        self.assertFalse(t1.has('b/d'))
 
1310
        self.assertFalse(t2.has('d'))
 
1311
        self.assertFalse(t3.has('b/d'))
1313
1312
 
1314
1313
        if t1.is_readonly():
1315
1314
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1315
        else:
1317
1316
            t2.put_bytes('d', 'newfile\n')
1318
1317
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1318
        self.assertTrue(t1.has('b/d'))
 
1319
        self.assertTrue(t2.has('d'))
 
1320
        self.assertTrue(t3.has('b/d'))
1322
1321
 
1323
1322
    def test_clone_to_root(self):
1324
1323
        orig_transport = self.get_transport()
1398
1397
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1398
                         transport.abspath("/foo"))
1400
1399
 
 
1400
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1401
    def test_win32_abspath(self):
1402
1402
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1403
        # other platforms too, but then osutils does platform specific
1408
1408
 
1409
1409
        # smoke test for abspath on win32.
1410
1410
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1411
        transport = _mod_transport.get_transport_from_url("file:///")
 
1412
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1413
 
1414
1414
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1415
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1416
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1417
 
1418
1418
    def test_local_abspath(self):
1419
1419
        transport = self.get_transport()
1420
1420
        try:
1421
1421
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1422
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1423
            # should be formattable
1424
1424
            s = str(e)
1425
1425
        else:
1458
1458
        paths = set(transport.iter_files_recursive())
1459
1459
        # nb the directories are not converted
1460
1460
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
 
1461
                    {'isolated/dir/foo',
1462
1462
                         'isolated/dir/bar',
1463
1463
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1464
                         'isolated/bar'})
1465
1465
        sub_transport = transport.clone('isolated')
1466
1466
        paths = set(sub_transport.iter_files_recursive())
1467
1467
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1468
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1469
 
1470
1470
    def test_copy_tree(self):
1471
1471
        # TODO: test file contents and permissions are preserved. This test was
1488
1488
        transport.copy_tree('from', 'to')
1489
1489
        paths = set(transport.iter_files_recursive())
1490
1490
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
 
1491
                    {'from/dir/foo',
1492
1492
                         'from/dir/bar',
1493
1493
                         'from/dir/b%2525z',
1494
1494
                         'from/bar',
1495
1495
                         'to/dir/foo',
1496
1496
                         'to/dir/bar',
1497
1497
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1498
                         'to/bar',})
1499
1499
 
1500
1500
    def test_copy_tree_to_transport(self):
1501
1501
        transport = self.get_transport()
1518
1518
        from_transport.copy_tree_to_transport(to_transport)
1519
1519
        paths = set(transport.iter_files_recursive())
1520
1520
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
 
1521
                    {'from/dir/foo',
1522
1522
                         'from/dir/bar',
1523
1523
                         'from/dir/b%2525z',
1524
1524
                         'from/bar',
1525
1525
                         'to/dir/foo',
1526
1526
                         'to/dir/bar',
1527
1527
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1528
                         'to/bar',})
1529
1529
 
1530
1530
    def test_unicode_paths(self):
1531
1531
        """Test that we can read/write files with Unicode names."""
1545
1545
 
1546
1546
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1547
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1548
            self.knownFailure("test server cannot handle unicode paths")
1549
1549
 
1550
1550
        try:
1551
1551
            self.build_tree(files, transport=t, line_endings='binary')
1616
1616
    def test_readv(self):
1617
1617
        transport = self.get_transport()
1618
1618
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1619
            with file('a', 'w') as f: f.write('0123456789')
1620
1620
        else:
1621
1621
            transport.put_bytes('a', '0123456789')
1622
1622
 
1632
1632
    def test_readv_out_of_order(self):
1633
1633
        transport = self.get_transport()
1634
1634
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1635
            with file('a', 'w') as f: f.write('0123456789')
1636
1636
        else:
1637
1637
            transport.put_bytes('a', '01234567890')
1638
1638
 
1710
1710
        transport = self.get_transport()
1711
1711
        # test from observed failure case.
1712
1712
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1713
            with file('a', 'w') as f: f.write('a'*1024*1024)
1714
1714
        else:
1715
1715
            transport.put_bytes('a', 'a'*1024*1024)
1716
1716
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1750
    def test_readv_short_read(self):
1751
1751
        transport = self.get_transport()
1752
1752
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1753
            with file('a', 'w') as f: f.write('0123456789')
1754
1754
        else:
1755
1755
            transport.put_bytes('a', '01234567890')
1756
1756
 
1765
1765
        # also raise a special error
1766
1766
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1767
                              transport.readv, 'a', [(12,2)])
 
1768
 
 
1769
    def test_no_segment_parameters(self):
 
1770
        """Segment parameters should be stripped and stored in
 
1771
        transport.segment_parameters."""
 
1772
        transport = self.get_transport("foo")
 
1773
        self.assertEqual({}, transport.get_segment_parameters())
 
1774
 
 
1775
    def test_segment_parameters(self):
 
1776
        """Segment parameters should be stripped and stored in
 
1777
        transport.get_segment_parameters()."""
 
1778
        base_url = self._server.get_url()
 
1779
        parameters = {"key1": "val1", "key2": "val2"}
 
1780
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1781
        transport = _mod_transport.get_transport_from_url(url)
 
1782
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1783
 
 
1784
    def test_set_segment_parameters(self):
 
1785
        """Segment parameters can be set and show up in base."""
 
1786
        transport = self.get_transport("foo")
 
1787
        orig_base = transport.base
 
1788
        transport.set_segment_parameter("arm", "board")
 
1789
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1790
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1791
        transport.set_segment_parameter("arm", None)
 
1792
        transport.set_segment_parameter("nonexistant", None)
 
1793
        self.assertEqual({}, transport.get_segment_parameters())
 
1794
        self.assertEqual(orig_base, transport.base)
 
1795
 
 
1796
    def test_stat_symlink(self):
 
1797
        # if a transport points directly to a symlink (and supports symlinks
 
1798
        # at all) you can tell this.  helps with bug 32669.
 
1799
        t = self.get_transport()
 
1800
        try:
 
1801
            t.symlink('target', 'link')
 
1802
        except TransportNotPossible:
 
1803
            raise TestSkipped("symlinks not supported")
 
1804
        t2 = t.clone('link')
 
1805
        st = t2.stat('')
 
1806
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1807
 
 
1808
    def test_abspath_url_unquote_unreserved(self):
 
1809
        """URLs from abspath should have unreserved characters unquoted
 
1810
        
 
1811
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1812
        """
 
1813
        t = self.get_transport()
 
1814
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1815
        self.assertEqual(t.base + "-.09AZ_az~",
 
1816
            t.abspath(needlessly_escaped_dir))
 
1817
 
 
1818
    def test_clone_url_unquote_unreserved(self):
 
1819
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1820
        
 
1821
        Cloned transports should be prefix comparable for things like the
 
1822
        isolation checking of tests, see lp:842223 for more.
 
1823
        """
 
1824
        t1 = self.get_transport()
 
1825
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1826
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1827
        t2 = t1.clone(needlessly_escaped_dir)
 
1828
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1829
 
 
1830
    def test_hook_post_connection_one(self):
 
1831
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1832
        log = []
 
1833
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1834
        t = self.get_transport()
 
1835
        self.assertEqual([], log)
 
1836
        t.has("non-existant")
 
1837
        if isinstance(t, RemoteTransport):
 
1838
            self.assertEqual([t.get_smart_medium()], log)
 
1839
        elif isinstance(t, ConnectedTransport):
 
1840
            self.assertEqual([t], log)
 
1841
        else:
 
1842
            self.assertEqual([], log)
 
1843
 
 
1844
    def test_hook_post_connection_multi(self):
 
1845
        """Fire post_connect hook once per unshared underlying connection"""
 
1846
        log = []
 
1847
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1848
        t1 = self.get_transport()
 
1849
        t2 = t1.clone(".")
 
1850
        t3 = self.get_transport()
 
1851
        self.assertEqual([], log)
 
1852
        t1.has("x")
 
1853
        t2.has("x")
 
1854
        t3.has("x")
 
1855
        if isinstance(t1, RemoteTransport):
 
1856
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1857
        elif isinstance(t1, ConnectedTransport):
 
1858
            self.assertEqual([t1, t3], log)
 
1859
        else:
 
1860
            self.assertEqual([], log)