/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2017-06-02 11:26:27 UTC
  • mfrom: (6621.27.5 1089352-sni-support)
  • Revision ID: breezy.the.bot@gmail.com-20170602112627-jbvjcm9czx7gt3gb
Add SNI support.

Merged from https://code.launchpad.net/~jelmer/brz/sni-support/+merge/324979

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
24
23
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
24
import stat
28
25
import sys
29
 
import unittest
30
26
 
31
 
from bzrlib import (
 
27
from .. import (
32
28
    errors,
33
29
    osutils,
 
30
    pyutils,
34
31
    tests,
 
32
    transport as _mod_transport,
35
33
    urlutils,
36
34
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      InvalidURL,
 
38
                      NoSuchFile,
 
39
                      PathError,
 
40
                      TransportNotPossible,
 
41
                      )
 
42
from ..osutils import getcwd
 
43
from ..sixish import (
 
44
    BytesIO,
 
45
    zip,
 
46
    )
 
47
from ..smart import medium
 
48
from . import (
51
49
    TestSkipped,
52
50
    TestNotApplicable,
53
51
    multiply_tests,
54
52
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
53
from . import test_server
 
54
from .test_transport import TestTransportImplementation
 
55
from ..transport import (
58
56
    ConnectedTransport,
59
 
    get_transport,
 
57
    Transport,
60
58
    _get_transport_modules,
61
59
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
60
from ..transport.memory import MemoryTransport
 
61
from ..transport.remote import RemoteTransport
63
62
 
64
63
 
65
64
def get_transport_test_permutations(module):
78
77
    for module in _get_transport_modules():
79
78
        try:
80
79
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
80
                pyutils.get_named_object(module))
82
81
            for (klass, server_factory) in permutations:
83
82
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
83
                    {"transport_class":klass,
85
84
                     "transport_server":server_factory})
86
85
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
86
        except errors.DependencyNotPresent as e:
88
87
            # Continue even if a dependency prevents us
89
88
            # from adding this test
90
89
            pass
91
90
    return result
92
91
 
93
92
 
94
 
def load_tests(standard_tests, module, loader):
 
93
def load_tests(loader, standard_tests, pattern):
95
94
    """Multiply tests for tranport implementations."""
96
95
    result = loader.suiteClass()
97
96
    scenarios = transport_test_permutations()
102
101
 
103
102
    def setUp(self):
104
103
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
104
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
105
 
107
106
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
107
        """Check that transport.get_bytes(relpath) == content."""
 
108
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
109
 
111
110
    def test_ensure_base_missing(self):
112
111
        """.ensure_base() should create the directory if it doesn't exist"""
192
191
        self.build_tree(files, transport=t, line_endings='binary')
193
192
        self.check_transport_contents('contents of a\n', t, 'a')
194
193
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
 
194
        # Must use iter zip() from future not old version which will fully
 
195
        # evaluate its inputs, the transport requests should be issued and
197
196
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
 
197
        for content, f in zip(contents, content_f):
199
198
            self.assertEqual(content, f.read())
200
199
 
201
200
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
 
201
        # Again this zip() must come from the future
 
202
        for content, f in zip(contents, content_f):
204
203
            self.assertEqual(content, f.read())
205
204
 
206
205
    def test_get_unknown_file(self):
211
210
                    ]
212
211
        self.build_tree(files, transport=t, line_endings='binary')
213
212
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
213
        def iterate_and_close(func, *args):
 
214
            for f in func(*args):
 
215
                # We call f.read() here because things like paramiko actually
 
216
                # spawn a thread to prefetch the content, which we want to
 
217
                # consume before we close the handle.
 
218
                content = f.read()
 
219
                f.close()
 
220
        self.assertRaises(NoSuchFile, iterate_and_close,
 
221
                          t.get_multi, ['a', 'b', 'c'])
 
222
        self.assertRaises(NoSuchFile, iterate_and_close,
 
223
                          t.get_multi, iter(['a', 'b', 'c']))
216
224
 
217
225
    def test_get_directory_read_gives_ReadError(self):
218
226
        """consistent errors for read() on a file returned by get()."""
251
259
 
252
260
    def test_get_bytes_unknown_file(self):
253
261
        t = self.get_transport()
254
 
 
255
262
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
263
 
257
264
    def test_get_with_open_write_stream_sees_all_content(self):
261
268
        handle = t.open_write_stream('foo')
262
269
        try:
263
270
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
 
271
            self.assertEqual('b', t.get_bytes('foo'))
265
272
        finally:
266
273
            handle.close()
267
274
 
273
280
        try:
274
281
            handle.write('b')
275
282
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
 
283
            f = t.get('foo')
 
284
            try:
 
285
                self.assertEqual('b', f.read())
 
286
            finally:
 
287
                f.close()
277
288
        finally:
278
289
            handle.close()
279
290
 
286
297
            return
287
298
 
288
299
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
 
300
        self.assertTrue(t.has('a'))
290
301
        self.check_transport_contents('some text for a\n', t, 'a')
291
302
 
292
303
        # The contents should be overwritten
304
315
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
305
316
            return
306
317
 
307
 
        self.failIf(t.has('a'))
 
318
        self.assertFalse(t.has('a'))
308
319
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
 
320
        self.assertTrue(t.has('a'))
310
321
        self.check_transport_contents('some text for a\n', t, 'a')
311
322
        # Put also replaces contents
312
323
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
324
335
        # Now test the create_parent flag
325
336
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
337
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
 
338
        self.assertFalse(t.has('dir/a'))
328
339
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
340
                               create_parent_dir=True)
330
341
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
353
        if not t._can_roundtrip_unix_modebits():
343
354
            # Can't roundtrip, so no need to run this test
344
355
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
356
        t.put_bytes('mode644', 'test text\n', mode=0o644)
 
357
        self.assertTransportMode(t, 'mode644', 0o644)
 
358
        t.put_bytes('mode666', 'test text\n', mode=0o666)
 
359
        self.assertTransportMode(t, 'mode666', 0o666)
 
360
        t.put_bytes('mode600', 'test text\n', mode=0o600)
 
361
        self.assertTransportMode(t, 'mode600', 0o600)
351
362
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
363
        t.put_bytes('mode400', 'test text\n', mode=0o400)
 
364
        self.assertTransportMode(t, 'mode400', 0o400)
354
365
 
355
366
        # The default permissions should be based on the current umask
356
367
        umask = osutils.get_umask()
357
368
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
369
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
370
 
360
371
    def test_put_bytes_non_atomic_permissions(self):
361
372
        t = self.get_transport()
365
376
        if not t._can_roundtrip_unix_modebits():
366
377
            # Can't roundtrip, so no need to run this test
367
378
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
379
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0o644)
 
380
        self.assertTransportMode(t, 'mode644', 0o644)
 
381
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0o666)
 
382
        self.assertTransportMode(t, 'mode666', 0o666)
 
383
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0o600)
 
384
        self.assertTransportMode(t, 'mode600', 0o600)
 
385
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0o400)
 
386
        self.assertTransportMode(t, 'mode400', 0o400)
376
387
 
377
388
        # The default permissions should be based on the current umask
378
389
        umask = osutils.get_umask()
379
390
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
391
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
392
 
382
393
        # We should also be able to set the mode for a parent directory
383
394
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
395
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0o664,
 
396
                               dir_mode=0o700, create_parent_dir=True)
 
397
        self.assertTransportMode(t, 'dir700', 0o700)
 
398
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0o664,
 
399
                               dir_mode=0o770, create_parent_dir=True)
 
400
        self.assertTransportMode(t, 'dir770', 0o770)
 
401
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0o664,
 
402
                               dir_mode=0o777, create_parent_dir=True)
 
403
        self.assertTransportMode(t, 'dir777', 0o777)
393
404
 
394
405
    def test_put_file(self):
395
406
        t = self.get_transport()
396
407
 
397
408
        if t.is_readonly():
398
409
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
410
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
400
411
            return
401
412
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
413
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
414
        # put_file returns the length of the data written
404
415
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
 
416
        self.assertTrue(t.has('a'))
406
417
        self.check_transport_contents('some text for a\n', t, 'a')
407
418
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
419
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
420
        self.assertEqual(19, result)
410
421
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
422
        self.assertRaises(NoSuchFile,
412
423
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
424
                              BytesIO(b'contents'))
414
425
 
415
426
    def test_put_file_non_atomic(self):
416
427
        t = self.get_transport()
417
428
 
418
429
        if t.is_readonly():
419
430
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
431
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
432
            return
422
433
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
 
434
        self.assertFalse(t.has('a'))
 
435
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
436
        self.assertTrue(t.has('a'))
426
437
        self.check_transport_contents('some text for a\n', t, 'a')
427
438
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
439
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
429
440
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
430
441
 
431
442
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
443
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
444
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
 
445
        t.put_file_non_atomic('a', BytesIO(b''))
435
446
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
447
        self.check_transport_contents('', t, 'a')
437
448
 
438
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
450
                                       BytesIO(b'contents\n'))
440
451
        # Now test the create_parent flag
441
452
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
453
                                       BytesIO(b'contents\n'))
 
454
        self.assertFalse(t.has('dir/a'))
 
455
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
456
                              create_parent_dir=True)
446
457
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
447
458
 
448
459
        # But we still get NoSuchFile if we can't make the parent dir
449
460
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
 
461
                                       BytesIO(b'contents\n'),
451
462
                                       create_parent_dir=True)
452
463
 
453
464
    def test_put_file_permissions(self):
459
470
        if not t._can_roundtrip_unix_modebits():
460
471
            # Can't roundtrip, so no need to run this test
461
472
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
473
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
474
        self.assertTransportMode(t, 'mode644', 0o644)
 
475
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
476
        self.assertTransportMode(t, 'mode666', 0o666)
 
477
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
478
        self.assertTransportMode(t, 'mode600', 0o600)
468
479
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
480
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
481
        self.assertTransportMode(t, 'mode400', 0o400)
471
482
        # The default permissions should be based on the current umask
472
483
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
484
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
485
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
486
 
476
487
    def test_put_file_non_atomic_permissions(self):
477
488
        t = self.get_transport()
481
492
        if not t._can_roundtrip_unix_modebits():
482
493
            # Can't roundtrip, so no need to run this test
483
494
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
495
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
496
        self.assertTransportMode(t, 'mode644', 0o644)
 
497
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
498
        self.assertTransportMode(t, 'mode666', 0o666)
 
499
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
500
        self.assertTransportMode(t, 'mode600', 0o600)
490
501
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
502
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
503
        self.assertTransportMode(t, 'mode400', 0o400)
493
504
 
494
505
        # The default permissions should be based on the current umask
495
506
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
507
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
508
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
509
 
499
510
        # We should also be able to set the mode for a parent directory
500
511
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
512
        sio = BytesIO()
 
513
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
514
                              dir_mode=0o700, create_parent_dir=True)
 
515
        self.assertTransportMode(t, 'dir700', 0o700)
 
516
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
517
                              dir_mode=0o770, create_parent_dir=True)
 
518
        self.assertTransportMode(t, 'dir770', 0o770)
 
519
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
520
                              dir_mode=0o777, create_parent_dir=True)
 
521
        self.assertTransportMode(t, 'dir777', 0o777)
511
522
 
512
523
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
524
        t = self.get_transport()
520
525
        if t.is_readonly():
521
526
            return
522
527
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
528
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
529
 
540
530
    def test_mkdir(self):
541
531
        t = self.get_transport()
576
566
 
577
567
        # Test get/put in sub-directories
578
568
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
569
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
580
570
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
571
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
582
572
 
591
581
            # no sense testing on this transport
592
582
            return
593
583
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
584
        t.mkdir('dmode755', mode=0o755)
 
585
        self.assertTransportMode(t, 'dmode755', 0o755)
 
586
        t.mkdir('dmode555', mode=0o555)
 
587
        self.assertTransportMode(t, 'dmode555', 0o555)
 
588
        t.mkdir('dmode777', mode=0o777)
 
589
        self.assertTransportMode(t, 'dmode777', 0o777)
 
590
        t.mkdir('dmode700', mode=0o700)
 
591
        self.assertTransportMode(t, 'dmode700', 0o700)
 
592
        t.mkdir_multi(['mdmode755'], mode=0o755)
 
593
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
594
 
605
595
        # Default mode should be based on umask
606
596
        umask = osutils.get_umask()
607
597
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
598
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
599
 
610
600
    def test_opening_a_file_stream_creates_file(self):
611
601
        t = self.get_transport()
620
610
    def test_opening_a_file_stream_can_set_mode(self):
621
611
        t = self.get_transport()
622
612
        if t.is_readonly():
 
613
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
614
                              t.open_write_stream, 'foo')
623
615
            return
624
616
        if not t._can_roundtrip_unix_modebits():
625
617
            # Can't roundtrip, so no need to run this test
626
618
            return
 
619
 
627
620
        def check_mode(name, mode, expected):
628
621
            handle = t.open_write_stream(name, mode=mode)
629
622
            handle.close()
630
623
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
624
        check_mode('mode644', 0o644, 0o644)
 
625
        check_mode('mode666', 0o666, 0o666)
 
626
        check_mode('mode600', 0o600, 0o600)
634
627
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
628
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
629
 
637
630
    def test_copy_to(self):
638
631
        # FIXME: test:   same server to same server (partly done)
645
638
            self.build_tree(files, transport=transport_from)
646
639
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
640
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
641
                self.check_transport_contents(transport_to.get_bytes(f),
649
642
                                              transport_from, f)
650
643
 
651
644
        t = self.get_transport()
674
667
        files = ['a', 'b', 'c', 'd']
675
668
        t.copy_to(iter(files), temp_transport)
676
669
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
670
            self.check_transport_contents(temp_transport.get_bytes(f),
678
671
                                          t, f)
679
672
        del temp_transport
680
673
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
674
        for mode in (0o666, 0o644, 0o600, 0o400):
682
675
            temp_transport = MemoryTransport("memory:///")
683
676
            t.copy_to(files, temp_transport, mode=mode)
684
677
            for f in files:
705
698
        t.put_bytes('b', 'contents\nfor b\n')
706
699
 
707
700
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
701
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
702
 
710
703
        self.check_transport_contents(
711
704
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
713
706
 
714
707
        # a file with no parent should fail..
715
708
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
709
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
710
 
718
711
        # And we can create new files, too
719
712
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
713
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
721
714
        self.check_transport_contents('some text\nfor a missing file\n',
722
715
                                      t, 'c')
723
716
 
753
746
        t.put_bytes('b', 'contents\nfor b\n')
754
747
 
755
748
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
749
            t.append_multi([('a', BytesIO(b'and\nthen\nsome\nmore\n')),
 
750
                            ('b', BytesIO(b'some\nmore\nfor\nb\n'))]))
758
751
 
759
752
        self.check_transport_contents(
760
753
            'diff\ncontents for\na\n'
767
760
                t, 'b')
768
761
 
769
762
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
 
763
            t.append_multi(iter([('a', BytesIO(b'a little bit more\n')),
 
764
                                 ('b', BytesIO(b'from an iterator\n'))])))
772
765
        self.check_transport_contents(
773
766
            'diff\ncontents for\na\n'
774
767
            'add\nsome\nmore\ncontents\n'
782
775
                t, 'b')
783
776
 
784
777
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
 
778
            t.append_multi([('a', BytesIO(b'some text in a\n')),
 
779
                            ('d', BytesIO(b'missing file r\n'))]))
787
780
 
788
781
        self.check_transport_contents(
789
782
            'diff\ncontents for\na\n'
800
793
        t = self.get_transport()
801
794
        if t.is_readonly():
802
795
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
796
                t.append_file, 'f', BytesIO(b'f'), mode=None)
804
797
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
798
        t.append_file('f', BytesIO(b'f'), mode=None)
806
799
 
807
800
    def test_append_bytes_mode(self):
808
801
        # check append_bytes accepts a mode
823
816
            return
824
817
 
825
818
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
819
        self.assertTrue(t.has('a'))
827
820
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
821
        self.assertFalse(t.has('a'))
829
822
 
830
823
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
824
 
837
830
        t.delete_multi(['a', 'c'])
838
831
        self.assertEqual([False, True, False],
839
832
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
 
833
        self.assertFalse(t.has('a'))
 
834
        self.assertTrue(t.has('b'))
 
835
        self.assertFalse(t.has('c'))
843
836
 
844
837
        self.assertRaises(NoSuchFile,
845
838
                t.delete_multi, ['a', 'b', 'c'])
906
899
        t.mkdir('foo-baz')
907
900
        t.rmdir('foo')
908
901
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
902
        self.assertTrue(t.has('foo-bar'))
910
903
 
911
904
    def test_rename_dir_succeeds(self):
912
905
        t = self.get_transport()
913
906
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
907
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
908
                              t.rename, 'foo', 'bar')
 
909
            return
915
910
        t.mkdir('adir')
916
911
        t.mkdir('adir/asubdir')
917
912
        t.rename('adir', 'bdir')
922
917
        """Attempting to replace a nonemtpy directory should fail"""
923
918
        t = self.get_transport()
924
919
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
920
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
921
                              t.rename, 'foo', 'bar')
 
922
            return
926
923
        t.mkdir('adir')
927
924
        t.mkdir('adir/asubdir')
928
925
        t.mkdir('bdir')
992
989
        # perhaps all of this could be done in a subdirectory
993
990
 
994
991
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
992
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
996
993
 
997
994
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
995
        self.assertTrue(t.has('b'))
 
996
        self.assertFalse(t.has('a'))
1000
997
 
1001
998
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
999
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
1003
1000
 
1004
1001
        # Overwrite a file
1005
1002
        t.put_bytes('c', 'c this file\n')
1006
1003
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
 
1004
        self.assertFalse(t.has('c'))
1008
1005
        self.check_transport_contents('c this file\n', t, 'b')
1009
1006
 
1010
1007
        # TODO: Try to write a test for atomicity
1042
1039
        except NotImplementedError:
1043
1040
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
1041
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
1042
        t = _mod_transport.get_transport_from_url(url)
1046
1043
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
1044
 
1048
1045
    def test_stat(self):
1053
1050
 
1054
1051
        try:
1055
1052
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
1053
        except TransportNotPossible as e:
1057
1054
            # This transport cannot stat
1058
1055
            return
1059
1056
 
1064
1061
        for path, size in zip(paths, sizes):
1065
1062
            st = t.stat(path)
1066
1063
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1064
                self.assertTrue(S_ISDIR(st.st_mode))
1068
1065
                # directory sizes are meaningless
1069
1066
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
1067
                self.assertTrue(S_ISREG(st.st_mode))
1071
1068
                self.assertEqual(size, st.st_size)
1072
1069
 
1073
1070
        remote_stats = list(t.stat_multi(paths))
1080
1077
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
1078
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
1079
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
1080
        st = subdir.stat('./file')
 
1081
        st = subdir.stat('.')
1085
1082
 
1086
1083
    def test_hardlink(self):
1087
1084
        from stat import ST_NLINK
1096
1093
        try:
1097
1094
            t.hardlink(source_name, link_name)
1098
1095
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
1096
            self.assertTrue(t.has(source_name))
 
1097
            self.assertTrue(t.has(link_name))
1101
1098
 
1102
1099
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1100
            self.assertEqual(st[ST_NLINK], 2)
1104
1101
        except TransportNotPossible:
1105
1102
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
1103
                              self._server.__class__)
1118
1115
        try:
1119
1116
            t.symlink(source_name, link_name)
1120
1117
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1118
            self.assertTrue(t.has(source_name))
 
1119
            self.assertTrue(t.has(link_name))
1123
1120
 
1124
1121
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
 
1122
            self.assertTrue(S_ISLNK(st.st_mode),
 
1123
                "expected symlink, got mode %o" % st.st_mode)
1126
1124
        except TransportNotPossible:
1127
1125
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1126
                              self._server.__class__)
1129
1127
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1128
            self.knownFailure("Paramiko fails to create symlinks during tests")
1131
1129
 
1132
1130
    def test_list_dir(self):
1133
1131
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1136
            return
1139
1137
 
1140
1138
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1139
            l = sorted(transport.list_dir(d))
1143
1140
            return l
1144
1141
 
1145
1142
        self.assertEqual([], sorted_list('.', t))
1197
1194
            raise TestSkipped("not a connected transport")
1198
1195
 
1199
1196
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1197
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1198
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1199
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1200
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1201
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1202
 
1206
1203
    def test__reuse_for(self):
1207
1204
        t = self.get_transport()
1214
1211
 
1215
1212
            Only the parameters different from None will be changed.
1216
1213
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1214
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1215
            if user     is None: user     = t._parsed_url.user
 
1216
            if password is None: password = t._parsed_url.password
 
1217
            if user     is None: user     = t._parsed_url.user
 
1218
            if host     is None: host     = t._parsed_url.host
 
1219
            if port     is None: port     = t._parsed_url.port
 
1220
            if path     is None: path     = t._parsed_url.path
 
1221
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1222
 
1226
 
        if t._scheme == 'ftp':
 
1223
        if t._parsed_url.scheme == 'ftp':
1227
1224
            scheme = 'sftp'
1228
1225
        else:
1229
1226
            scheme = 'ftp'
1230
1227
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1228
        if t._parsed_url.user == 'me':
1232
1229
            user = 'you'
1233
1230
        else:
1234
1231
            user = 'me'
1245
1242
        #   (they may be typed by the user when prompted for example)
1246
1243
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1244
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1245
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1246
        if t._parsed_url.port == 1234:
1250
1247
            port = 4321
1251
1248
        else:
1252
1249
            port = 1234
1265
1262
        self.assertIs(t._get_connection(), c._get_connection())
1266
1263
 
1267
1264
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1265
        new_connection = None
1269
1266
        t._set_connection(new_connection)
1270
1267
        # Check that both transports use the same connection
1271
1268
        self.assertIs(new_connection, t._get_connection())
1293
1290
 
1294
1291
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1292
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1293
        self.assertTrue(t1.has('a'))
 
1294
        self.assertTrue(t1.has('b/c'))
 
1295
        self.assertFalse(t1.has('c'))
1299
1296
 
1300
1297
        t2 = t1.clone('b')
1301
1298
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1299
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1300
        self.assertTrue(t2.has('c'))
 
1301
        self.assertFalse(t2.has('a'))
1305
1302
 
1306
1303
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1304
        self.assertTrue(t3.has('a'))
 
1305
        self.assertFalse(t3.has('c'))
1309
1306
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1307
        self.assertFalse(t1.has('b/d'))
 
1308
        self.assertFalse(t2.has('d'))
 
1309
        self.assertFalse(t3.has('b/d'))
1313
1310
 
1314
1311
        if t1.is_readonly():
1315
1312
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1313
        else:
1317
1314
            t2.put_bytes('d', 'newfile\n')
1318
1315
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1316
        self.assertTrue(t1.has('b/d'))
 
1317
        self.assertTrue(t2.has('d'))
 
1318
        self.assertTrue(t3.has('b/d'))
1322
1319
 
1323
1320
    def test_clone_to_root(self):
1324
1321
        orig_transport = self.get_transport()
1398
1395
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1396
                         transport.abspath("/foo"))
1400
1397
 
 
1398
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1399
    def test_win32_abspath(self):
1402
1400
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1401
        # other platforms too, but then osutils does platform specific
1408
1406
 
1409
1407
        # smoke test for abspath on win32.
1410
1408
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1409
        transport = _mod_transport.get_transport_from_url("file:///")
 
1410
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1411
 
1414
1412
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1413
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1414
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1415
 
1418
1416
    def test_local_abspath(self):
1419
1417
        transport = self.get_transport()
1420
1418
        try:
1421
1419
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1420
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1421
            # should be formattable
1424
1422
            s = str(e)
1425
1423
        else:
1458
1456
        paths = set(transport.iter_files_recursive())
1459
1457
        # nb the directories are not converted
1460
1458
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
 
1459
                    {'isolated/dir/foo',
1462
1460
                         'isolated/dir/bar',
1463
1461
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1462
                         'isolated/bar'})
1465
1463
        sub_transport = transport.clone('isolated')
1466
1464
        paths = set(sub_transport.iter_files_recursive())
1467
1465
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1466
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1467
 
1470
1468
    def test_copy_tree(self):
1471
1469
        # TODO: test file contents and permissions are preserved. This test was
1488
1486
        transport.copy_tree('from', 'to')
1489
1487
        paths = set(transport.iter_files_recursive())
1490
1488
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
 
1489
                    {'from/dir/foo',
1492
1490
                         'from/dir/bar',
1493
1491
                         'from/dir/b%2525z',
1494
1492
                         'from/bar',
1495
1493
                         'to/dir/foo',
1496
1494
                         'to/dir/bar',
1497
1495
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1496
                         'to/bar',})
1499
1497
 
1500
1498
    def test_copy_tree_to_transport(self):
1501
1499
        transport = self.get_transport()
1518
1516
        from_transport.copy_tree_to_transport(to_transport)
1519
1517
        paths = set(transport.iter_files_recursive())
1520
1518
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
 
1519
                    {'from/dir/foo',
1522
1520
                         'from/dir/bar',
1523
1521
                         'from/dir/b%2525z',
1524
1522
                         'from/bar',
1525
1523
                         'to/dir/foo',
1526
1524
                         'to/dir/bar',
1527
1525
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1526
                         'to/bar',})
1529
1527
 
1530
1528
    def test_unicode_paths(self):
1531
1529
        """Test that we can read/write files with Unicode names."""
1545
1543
 
1546
1544
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1545
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1546
            self.knownFailure("test server cannot handle unicode paths")
1549
1547
 
1550
1548
        try:
1551
1549
            self.build_tree(files, transport=t, line_endings='binary')
1616
1614
    def test_readv(self):
1617
1615
        transport = self.get_transport()
1618
1616
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1617
            with file('a', 'w') as f: f.write('0123456789')
1620
1618
        else:
1621
1619
            transport.put_bytes('a', '0123456789')
1622
1620
 
1632
1630
    def test_readv_out_of_order(self):
1633
1631
        transport = self.get_transport()
1634
1632
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1633
            with file('a', 'w') as f: f.write('0123456789')
1636
1634
        else:
1637
1635
            transport.put_bytes('a', '01234567890')
1638
1636
 
1710
1708
        transport = self.get_transport()
1711
1709
        # test from observed failure case.
1712
1710
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1711
            with file('a', 'w') as f: f.write('a'*1024*1024)
1714
1712
        else:
1715
1713
            transport.put_bytes('a', 'a'*1024*1024)
1716
1714
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1748
    def test_readv_short_read(self):
1751
1749
        transport = self.get_transport()
1752
1750
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1751
            with file('a', 'w') as f: f.write('0123456789')
1754
1752
        else:
1755
1753
            transport.put_bytes('a', '01234567890')
1756
1754
 
1765
1763
        # also raise a special error
1766
1764
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1765
                              transport.readv, 'a', [(12,2)])
 
1766
 
 
1767
    def test_no_segment_parameters(self):
 
1768
        """Segment parameters should be stripped and stored in
 
1769
        transport.segment_parameters."""
 
1770
        transport = self.get_transport("foo")
 
1771
        self.assertEqual({}, transport.get_segment_parameters())
 
1772
 
 
1773
    def test_segment_parameters(self):
 
1774
        """Segment parameters should be stripped and stored in
 
1775
        transport.get_segment_parameters()."""
 
1776
        base_url = self._server.get_url()
 
1777
        parameters = {"key1": "val1", "key2": "val2"}
 
1778
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1779
        transport = _mod_transport.get_transport_from_url(url)
 
1780
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1781
 
 
1782
    def test_set_segment_parameters(self):
 
1783
        """Segment parameters can be set and show up in base."""
 
1784
        transport = self.get_transport("foo")
 
1785
        orig_base = transport.base
 
1786
        transport.set_segment_parameter("arm", "board")
 
1787
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1788
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1789
        transport.set_segment_parameter("arm", None)
 
1790
        transport.set_segment_parameter("nonexistant", None)
 
1791
        self.assertEqual({}, transport.get_segment_parameters())
 
1792
        self.assertEqual(orig_base, transport.base)
 
1793
 
 
1794
    def test_stat_symlink(self):
 
1795
        # if a transport points directly to a symlink (and supports symlinks
 
1796
        # at all) you can tell this.  helps with bug 32669.
 
1797
        t = self.get_transport()
 
1798
        try:
 
1799
            t.symlink('target', 'link')
 
1800
        except TransportNotPossible:
 
1801
            raise TestSkipped("symlinks not supported")
 
1802
        t2 = t.clone('link')
 
1803
        st = t2.stat('')
 
1804
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1805
 
 
1806
    def test_abspath_url_unquote_unreserved(self):
 
1807
        """URLs from abspath should have unreserved characters unquoted
 
1808
        
 
1809
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1810
        """
 
1811
        t = self.get_transport()
 
1812
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1813
        self.assertEqual(t.base + "-.09AZ_az~",
 
1814
            t.abspath(needlessly_escaped_dir))
 
1815
 
 
1816
    def test_clone_url_unquote_unreserved(self):
 
1817
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1818
        
 
1819
        Cloned transports should be prefix comparable for things like the
 
1820
        isolation checking of tests, see lp:842223 for more.
 
1821
        """
 
1822
        t1 = self.get_transport()
 
1823
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1824
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1825
        t2 = t1.clone(needlessly_escaped_dir)
 
1826
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1827
 
 
1828
    def test_hook_post_connection_one(self):
 
1829
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1830
        log = []
 
1831
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1832
        t = self.get_transport()
 
1833
        self.assertEqual([], log)
 
1834
        t.has("non-existant")
 
1835
        if isinstance(t, RemoteTransport):
 
1836
            self.assertEqual([t.get_smart_medium()], log)
 
1837
        elif isinstance(t, ConnectedTransport):
 
1838
            self.assertEqual([t], log)
 
1839
        else:
 
1840
            self.assertEqual([], log)
 
1841
 
 
1842
    def test_hook_post_connection_multi(self):
 
1843
        """Fire post_connect hook once per unshared underlying connection"""
 
1844
        log = []
 
1845
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1846
        t1 = self.get_transport()
 
1847
        t2 = t1.clone(".")
 
1848
        t3 = self.get_transport()
 
1849
        self.assertEqual([], log)
 
1850
        t1.has("x")
 
1851
        t2.has("x")
 
1852
        t3.has("x")
 
1853
        if isinstance(t1, RemoteTransport):
 
1854
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1855
        elif isinstance(t1, ConnectedTransport):
 
1856
            self.assertEqual([t1, t3], log)
 
1857
        else:
 
1858
            self.assertEqual([], log)