/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Martin
  • Date: 2017-08-26 13:09:08 UTC
  • mto: (6754.4.4 check)
  • mto: This revision was merged to the branch mainline in revision 6755.
  • Revision ID: gzlist@googlemail.com-20170826130908-acpf9jit3h51kus1
Use future divison in lru_cache to pass tests on Python 3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
24
23
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
24
import stat
28
25
import sys
29
 
import unittest
30
26
 
31
 
from bzrlib import (
 
27
from .. import (
32
28
    errors,
33
29
    osutils,
 
30
    pyutils,
34
31
    tests,
 
32
    transport as _mod_transport,
35
33
    urlutils,
36
34
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      NoSuchFile,
 
38
                      PathError,
 
39
                      TransportNotPossible,
 
40
                      )
 
41
from ..osutils import getcwd
 
42
from ..sixish import (
 
43
    BytesIO,
 
44
    zip,
 
45
    )
 
46
from ..bzr.smart import medium
 
47
from . import (
51
48
    TestSkipped,
52
49
    TestNotApplicable,
53
50
    multiply_tests,
54
51
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
52
from . import test_server
 
53
from .test_transport import TestTransportImplementation
 
54
from ..transport import (
58
55
    ConnectedTransport,
59
 
    get_transport,
 
56
    Transport,
60
57
    _get_transport_modules,
61
58
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
59
from ..transport.memory import MemoryTransport
 
60
from ..transport.remote import RemoteTransport
63
61
 
64
62
 
65
63
def get_transport_test_permutations(module):
78
76
    for module in _get_transport_modules():
79
77
        try:
80
78
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
79
                pyutils.get_named_object(module))
82
80
            for (klass, server_factory) in permutations:
83
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
                    {"transport_class":klass,
85
83
                     "transport_server":server_factory})
86
84
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
85
        except errors.DependencyNotPresent as e:
88
86
            # Continue even if a dependency prevents us
89
87
            # from adding this test
90
88
            pass
91
89
    return result
92
90
 
93
91
 
94
 
def load_tests(standard_tests, module, loader):
 
92
def load_tests(loader, standard_tests, pattern):
95
93
    """Multiply tests for tranport implementations."""
96
94
    result = loader.suiteClass()
97
95
    scenarios = transport_test_permutations()
102
100
 
103
101
    def setUp(self):
104
102
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
104
 
107
105
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
108
 
111
109
    def test_ensure_base_missing(self):
112
110
        """.ensure_base() should create the directory if it doesn't exist"""
192
190
        self.build_tree(files, transport=t, line_endings='binary')
193
191
        self.check_transport_contents('contents of a\n', t, 'a')
194
192
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
 
193
        # Must use iter zip() from future not old version which will fully
 
194
        # evaluate its inputs, the transport requests should be issued and
197
195
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
 
196
        for content, f in zip(contents, content_f):
199
197
            self.assertEqual(content, f.read())
200
198
 
201
199
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
 
200
        # Again this zip() must come from the future
 
201
        for content, f in zip(contents, content_f):
204
202
            self.assertEqual(content, f.read())
205
203
 
206
204
    def test_get_unknown_file(self):
211
209
                    ]
212
210
        self.build_tree(files, transport=t, line_endings='binary')
213
211
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
212
        def iterate_and_close(func, *args):
 
213
            for f in func(*args):
 
214
                # We call f.read() here because things like paramiko actually
 
215
                # spawn a thread to prefetch the content, which we want to
 
216
                # consume before we close the handle.
 
217
                content = f.read()
 
218
                f.close()
 
219
        self.assertRaises(NoSuchFile, iterate_and_close,
 
220
                          t.get_multi, ['a', 'b', 'c'])
 
221
        self.assertRaises(NoSuchFile, iterate_and_close,
 
222
                          t.get_multi, iter(['a', 'b', 'c']))
216
223
 
217
224
    def test_get_directory_read_gives_ReadError(self):
218
225
        """consistent errors for read() on a file returned by get()."""
251
258
 
252
259
    def test_get_bytes_unknown_file(self):
253
260
        t = self.get_transport()
254
 
 
255
261
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
262
 
257
263
    def test_get_with_open_write_stream_sees_all_content(self):
261
267
        handle = t.open_write_stream('foo')
262
268
        try:
263
269
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
 
270
            self.assertEqual('b', t.get_bytes('foo'))
265
271
        finally:
266
272
            handle.close()
267
273
 
273
279
        try:
274
280
            handle.write('b')
275
281
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
 
282
            f = t.get('foo')
 
283
            try:
 
284
                self.assertEqual('b', f.read())
 
285
            finally:
 
286
                f.close()
277
287
        finally:
278
288
            handle.close()
279
289
 
286
296
            return
287
297
 
288
298
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
 
299
        self.assertTrue(t.has('a'))
290
300
        self.check_transport_contents('some text for a\n', t, 'a')
291
301
 
292
302
        # The contents should be overwritten
304
314
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
305
315
            return
306
316
 
307
 
        self.failIf(t.has('a'))
 
317
        self.assertFalse(t.has('a'))
308
318
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
 
319
        self.assertTrue(t.has('a'))
310
320
        self.check_transport_contents('some text for a\n', t, 'a')
311
321
        # Put also replaces contents
312
322
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
324
334
        # Now test the create_parent flag
325
335
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
336
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
 
337
        self.assertFalse(t.has('dir/a'))
328
338
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
329
339
                               create_parent_dir=True)
330
340
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
342
352
        if not t._can_roundtrip_unix_modebits():
343
353
            # Can't roundtrip, so no need to run this test
344
354
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
355
        t.put_bytes('mode644', 'test text\n', mode=0o644)
 
356
        self.assertTransportMode(t, 'mode644', 0o644)
 
357
        t.put_bytes('mode666', 'test text\n', mode=0o666)
 
358
        self.assertTransportMode(t, 'mode666', 0o666)
 
359
        t.put_bytes('mode600', 'test text\n', mode=0o600)
 
360
        self.assertTransportMode(t, 'mode600', 0o600)
351
361
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
362
        t.put_bytes('mode400', 'test text\n', mode=0o400)
 
363
        self.assertTransportMode(t, 'mode400', 0o400)
354
364
 
355
365
        # The default permissions should be based on the current umask
356
366
        umask = osutils.get_umask()
357
367
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
368
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
369
 
360
370
    def test_put_bytes_non_atomic_permissions(self):
361
371
        t = self.get_transport()
365
375
        if not t._can_roundtrip_unix_modebits():
366
376
            # Can't roundtrip, so no need to run this test
367
377
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
378
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0o644)
 
379
        self.assertTransportMode(t, 'mode644', 0o644)
 
380
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0o666)
 
381
        self.assertTransportMode(t, 'mode666', 0o666)
 
382
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0o600)
 
383
        self.assertTransportMode(t, 'mode600', 0o600)
 
384
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0o400)
 
385
        self.assertTransportMode(t, 'mode400', 0o400)
376
386
 
377
387
        # The default permissions should be based on the current umask
378
388
        umask = osutils.get_umask()
379
389
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
390
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
391
 
382
392
        # We should also be able to set the mode for a parent directory
383
393
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
394
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0o664,
 
395
                               dir_mode=0o700, create_parent_dir=True)
 
396
        self.assertTransportMode(t, 'dir700', 0o700)
 
397
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0o664,
 
398
                               dir_mode=0o770, create_parent_dir=True)
 
399
        self.assertTransportMode(t, 'dir770', 0o770)
 
400
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0o664,
 
401
                               dir_mode=0o777, create_parent_dir=True)
 
402
        self.assertTransportMode(t, 'dir777', 0o777)
393
403
 
394
404
    def test_put_file(self):
395
405
        t = self.get_transport()
396
406
 
397
407
        if t.is_readonly():
398
408
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
409
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
400
410
            return
401
411
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
412
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
413
        # put_file returns the length of the data written
404
414
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
 
415
        self.assertTrue(t.has('a'))
406
416
        self.check_transport_contents('some text for a\n', t, 'a')
407
417
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
418
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
419
        self.assertEqual(19, result)
410
420
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
411
421
        self.assertRaises(NoSuchFile,
412
422
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
423
                              BytesIO(b'contents'))
414
424
 
415
425
    def test_put_file_non_atomic(self):
416
426
        t = self.get_transport()
417
427
 
418
428
        if t.is_readonly():
419
429
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
430
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
431
            return
422
432
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
 
433
        self.assertFalse(t.has('a'))
 
434
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
435
        self.assertTrue(t.has('a'))
426
436
        self.check_transport_contents('some text for a\n', t, 'a')
427
437
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
438
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
429
439
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
430
440
 
431
441
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
442
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
443
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
 
444
        t.put_file_non_atomic('a', BytesIO(b''))
435
445
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
446
        self.check_transport_contents('', t, 'a')
437
447
 
438
448
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
449
                                       BytesIO(b'contents\n'))
440
450
        # Now test the create_parent flag
441
451
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
452
                                       BytesIO(b'contents\n'))
 
453
        self.assertFalse(t.has('dir/a'))
 
454
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
455
                              create_parent_dir=True)
446
456
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
447
457
 
448
458
        # But we still get NoSuchFile if we can't make the parent dir
449
459
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
 
460
                                       BytesIO(b'contents\n'),
451
461
                                       create_parent_dir=True)
452
462
 
453
463
    def test_put_file_permissions(self):
459
469
        if not t._can_roundtrip_unix_modebits():
460
470
            # Can't roundtrip, so no need to run this test
461
471
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
472
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
473
        self.assertTransportMode(t, 'mode644', 0o644)
 
474
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
475
        self.assertTransportMode(t, 'mode666', 0o666)
 
476
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
477
        self.assertTransportMode(t, 'mode600', 0o600)
468
478
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
479
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
480
        self.assertTransportMode(t, 'mode400', 0o400)
471
481
        # The default permissions should be based on the current umask
472
482
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
483
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
484
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
485
 
476
486
    def test_put_file_non_atomic_permissions(self):
477
487
        t = self.get_transport()
481
491
        if not t._can_roundtrip_unix_modebits():
482
492
            # Can't roundtrip, so no need to run this test
483
493
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
494
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
495
        self.assertTransportMode(t, 'mode644', 0o644)
 
496
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
497
        self.assertTransportMode(t, 'mode666', 0o666)
 
498
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
499
        self.assertTransportMode(t, 'mode600', 0o600)
490
500
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
501
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
502
        self.assertTransportMode(t, 'mode400', 0o400)
493
503
 
494
504
        # The default permissions should be based on the current umask
495
505
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
506
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
507
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
508
 
499
509
        # We should also be able to set the mode for a parent directory
500
510
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
511
        sio = BytesIO()
 
512
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
513
                              dir_mode=0o700, create_parent_dir=True)
 
514
        self.assertTransportMode(t, 'dir700', 0o700)
 
515
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
516
                              dir_mode=0o770, create_parent_dir=True)
 
517
        self.assertTransportMode(t, 'dir770', 0o770)
 
518
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
519
                              dir_mode=0o777, create_parent_dir=True)
 
520
        self.assertTransportMode(t, 'dir777', 0o777)
511
521
 
512
522
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
523
        t = self.get_transport()
520
524
        if t.is_readonly():
521
525
            return
522
526
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
527
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
528
 
540
529
    def test_mkdir(self):
541
530
        t = self.get_transport()
576
565
 
577
566
        # Test get/put in sub-directories
578
567
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
568
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
580
569
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
570
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
582
571
 
591
580
            # no sense testing on this transport
592
581
            return
593
582
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
583
        t.mkdir('dmode755', mode=0o755)
 
584
        self.assertTransportMode(t, 'dmode755', 0o755)
 
585
        t.mkdir('dmode555', mode=0o555)
 
586
        self.assertTransportMode(t, 'dmode555', 0o555)
 
587
        t.mkdir('dmode777', mode=0o777)
 
588
        self.assertTransportMode(t, 'dmode777', 0o777)
 
589
        t.mkdir('dmode700', mode=0o700)
 
590
        self.assertTransportMode(t, 'dmode700', 0o700)
 
591
        t.mkdir_multi(['mdmode755'], mode=0o755)
 
592
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
593
 
605
594
        # Default mode should be based on umask
606
595
        umask = osutils.get_umask()
607
596
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
597
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
598
 
610
599
    def test_opening_a_file_stream_creates_file(self):
611
600
        t = self.get_transport()
620
609
    def test_opening_a_file_stream_can_set_mode(self):
621
610
        t = self.get_transport()
622
611
        if t.is_readonly():
 
612
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
613
                              t.open_write_stream, 'foo')
623
614
            return
624
615
        if not t._can_roundtrip_unix_modebits():
625
616
            # Can't roundtrip, so no need to run this test
626
617
            return
 
618
 
627
619
        def check_mode(name, mode, expected):
628
620
            handle = t.open_write_stream(name, mode=mode)
629
621
            handle.close()
630
622
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
623
        check_mode('mode644', 0o644, 0o644)
 
624
        check_mode('mode666', 0o666, 0o666)
 
625
        check_mode('mode600', 0o600, 0o600)
634
626
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
627
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
628
 
637
629
    def test_copy_to(self):
638
630
        # FIXME: test:   same server to same server (partly done)
645
637
            self.build_tree(files, transport=transport_from)
646
638
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
639
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
640
                self.check_transport_contents(transport_to.get_bytes(f),
649
641
                                              transport_from, f)
650
642
 
651
643
        t = self.get_transport()
 
644
        if t.__class__.__name__ == "SFTPTransport":
 
645
            self.skipTest("SFTP copy_to currently too flakey to use")
652
646
        temp_transport = MemoryTransport('memory:///')
653
647
        simple_copy_files(t, temp_transport)
654
648
        if not t.is_readonly():
674
668
        files = ['a', 'b', 'c', 'd']
675
669
        t.copy_to(iter(files), temp_transport)
676
670
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
671
            self.check_transport_contents(temp_transport.get_bytes(f),
678
672
                                          t, f)
679
673
        del temp_transport
680
674
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
675
        for mode in (0o666, 0o644, 0o600, 0o400):
682
676
            temp_transport = MemoryTransport("memory:///")
683
677
            t.copy_to(files, temp_transport, mode=mode)
684
678
            for f in files:
705
699
        t.put_bytes('b', 'contents\nfor b\n')
706
700
 
707
701
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
702
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
703
 
710
704
        self.check_transport_contents(
711
705
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
713
707
 
714
708
        # a file with no parent should fail..
715
709
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
710
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
711
 
718
712
        # And we can create new files, too
719
713
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
714
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
721
715
        self.check_transport_contents('some text\nfor a missing file\n',
722
716
                                      t, 'c')
723
717
 
753
747
        t.put_bytes('b', 'contents\nfor b\n')
754
748
 
755
749
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
750
            t.append_multi([('a', BytesIO(b'and\nthen\nsome\nmore\n')),
 
751
                            ('b', BytesIO(b'some\nmore\nfor\nb\n'))]))
758
752
 
759
753
        self.check_transport_contents(
760
754
            'diff\ncontents for\na\n'
767
761
                t, 'b')
768
762
 
769
763
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
 
764
            t.append_multi(iter([('a', BytesIO(b'a little bit more\n')),
 
765
                                 ('b', BytesIO(b'from an iterator\n'))])))
772
766
        self.check_transport_contents(
773
767
            'diff\ncontents for\na\n'
774
768
            'add\nsome\nmore\ncontents\n'
782
776
                t, 'b')
783
777
 
784
778
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
 
779
            t.append_multi([('a', BytesIO(b'some text in a\n')),
 
780
                            ('d', BytesIO(b'missing file r\n'))]))
787
781
 
788
782
        self.check_transport_contents(
789
783
            'diff\ncontents for\na\n'
800
794
        t = self.get_transport()
801
795
        if t.is_readonly():
802
796
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
797
                t.append_file, 'f', BytesIO(b'f'), mode=None)
804
798
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
799
        t.append_file('f', BytesIO(b'f'), mode=None)
806
800
 
807
801
    def test_append_bytes_mode(self):
808
802
        # check append_bytes accepts a mode
823
817
            return
824
818
 
825
819
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
820
        self.assertTrue(t.has('a'))
827
821
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
822
        self.assertFalse(t.has('a'))
829
823
 
830
824
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
825
 
837
831
        t.delete_multi(['a', 'c'])
838
832
        self.assertEqual([False, True, False],
839
833
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
 
834
        self.assertFalse(t.has('a'))
 
835
        self.assertTrue(t.has('b'))
 
836
        self.assertFalse(t.has('c'))
843
837
 
844
838
        self.assertRaises(NoSuchFile,
845
839
                t.delete_multi, ['a', 'b', 'c'])
906
900
        t.mkdir('foo-baz')
907
901
        t.rmdir('foo')
908
902
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
903
        self.assertTrue(t.has('foo-bar'))
910
904
 
911
905
    def test_rename_dir_succeeds(self):
912
906
        t = self.get_transport()
913
907
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
908
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
909
                              t.rename, 'foo', 'bar')
 
910
            return
915
911
        t.mkdir('adir')
916
912
        t.mkdir('adir/asubdir')
917
913
        t.rename('adir', 'bdir')
922
918
        """Attempting to replace a nonemtpy directory should fail"""
923
919
        t = self.get_transport()
924
920
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
921
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
922
                              t.rename, 'foo', 'bar')
 
923
            return
926
924
        t.mkdir('adir')
927
925
        t.mkdir('adir/asubdir')
928
926
        t.mkdir('bdir')
992
990
        # perhaps all of this could be done in a subdirectory
993
991
 
994
992
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
993
        self.assertEqual([True, False], list(t.has_multi(['a', 'b'])))
996
994
 
997
995
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
996
        self.assertTrue(t.has('b'))
 
997
        self.assertFalse(t.has('a'))
1000
998
 
1001
999
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
1000
        self.assertEqual([False, True], list(t.has_multi(['a', 'b'])))
1003
1001
 
1004
1002
        # Overwrite a file
1005
1003
        t.put_bytes('c', 'c this file\n')
1006
1004
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
 
1005
        self.assertFalse(t.has('c'))
1008
1006
        self.check_transport_contents('c this file\n', t, 'b')
1009
1007
 
1010
1008
        # TODO: Try to write a test for atomicity
1042
1040
        except NotImplementedError:
1043
1041
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
1042
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
1043
        t = _mod_transport.get_transport_from_url(url)
1046
1044
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
1045
 
1048
1046
    def test_stat(self):
1053
1051
 
1054
1052
        try:
1055
1053
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
1054
        except TransportNotPossible as e:
1057
1055
            # This transport cannot stat
1058
1056
            return
1059
1057
 
1064
1062
        for path, size in zip(paths, sizes):
1065
1063
            st = t.stat(path)
1066
1064
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
1065
                self.assertTrue(S_ISDIR(st.st_mode))
1068
1066
                # directory sizes are meaningless
1069
1067
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
1068
                self.assertTrue(S_ISREG(st.st_mode))
1071
1069
                self.assertEqual(size, st.st_size)
1072
1070
 
1073
1071
        remote_stats = list(t.stat_multi(paths))
1080
1078
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
1079
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
1080
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
1081
        st = subdir.stat('./file')
 
1082
        st = subdir.stat('.')
1085
1083
 
1086
1084
    def test_hardlink(self):
1087
1085
        from stat import ST_NLINK
1096
1094
        try:
1097
1095
            t.hardlink(source_name, link_name)
1098
1096
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
1097
            self.assertTrue(t.has(source_name))
 
1098
            self.assertTrue(t.has(link_name))
1101
1099
 
1102
1100
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1101
            self.assertEqual(st[ST_NLINK], 2)
1104
1102
        except TransportNotPossible:
1105
1103
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
1104
                              self._server.__class__)
1118
1116
        try:
1119
1117
            t.symlink(source_name, link_name)
1120
1118
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1119
            self.assertTrue(t.has(source_name))
 
1120
            self.assertTrue(t.has(link_name))
1123
1121
 
1124
1122
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
 
1123
            self.assertTrue(S_ISLNK(st.st_mode),
 
1124
                "expected symlink, got mode %o" % st.st_mode)
1126
1125
        except TransportNotPossible:
1127
1126
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1127
                              self._server.__class__)
1129
1128
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1129
            self.knownFailure("Paramiko fails to create symlinks during tests")
1131
1130
 
1132
1131
    def test_list_dir(self):
1133
1132
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1137
            return
1139
1138
 
1140
1139
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1140
            l = sorted(transport.list_dir(d))
1143
1141
            return l
1144
1142
 
1145
1143
        self.assertEqual([], sorted_list('.', t))
1197
1195
            raise TestSkipped("not a connected transport")
1198
1196
 
1199
1197
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1198
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1199
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1200
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1201
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1202
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1203
 
1206
1204
    def test__reuse_for(self):
1207
1205
        t = self.get_transport()
1214
1212
 
1215
1213
            Only the parameters different from None will be changed.
1216
1214
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1215
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1216
            if user     is None: user     = t._parsed_url.user
 
1217
            if password is None: password = t._parsed_url.password
 
1218
            if user     is None: user     = t._parsed_url.user
 
1219
            if host     is None: host     = t._parsed_url.host
 
1220
            if port     is None: port     = t._parsed_url.port
 
1221
            if path     is None: path     = t._parsed_url.path
 
1222
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1223
 
1226
 
        if t._scheme == 'ftp':
 
1224
        if t._parsed_url.scheme == 'ftp':
1227
1225
            scheme = 'sftp'
1228
1226
        else:
1229
1227
            scheme = 'ftp'
1230
1228
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1229
        if t._parsed_url.user == 'me':
1232
1230
            user = 'you'
1233
1231
        else:
1234
1232
            user = 'me'
1245
1243
        #   (they may be typed by the user when prompted for example)
1246
1244
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1245
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1246
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1247
        if t._parsed_url.port == 1234:
1250
1248
            port = 4321
1251
1249
        else:
1252
1250
            port = 1234
1265
1263
        self.assertIs(t._get_connection(), c._get_connection())
1266
1264
 
1267
1265
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1266
        new_connection = None
1269
1267
        t._set_connection(new_connection)
1270
1268
        # Check that both transports use the same connection
1271
1269
        self.assertIs(new_connection, t._get_connection())
1293
1291
 
1294
1292
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1293
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1294
        self.assertTrue(t1.has('a'))
 
1295
        self.assertTrue(t1.has('b/c'))
 
1296
        self.assertFalse(t1.has('c'))
1299
1297
 
1300
1298
        t2 = t1.clone('b')
1301
1299
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1300
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1301
        self.assertTrue(t2.has('c'))
 
1302
        self.assertFalse(t2.has('a'))
1305
1303
 
1306
1304
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1305
        self.assertTrue(t3.has('a'))
 
1306
        self.assertFalse(t3.has('c'))
1309
1307
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1308
        self.assertFalse(t1.has('b/d'))
 
1309
        self.assertFalse(t2.has('d'))
 
1310
        self.assertFalse(t3.has('b/d'))
1313
1311
 
1314
1312
        if t1.is_readonly():
1315
1313
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1314
        else:
1317
1315
            t2.put_bytes('d', 'newfile\n')
1318
1316
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1317
        self.assertTrue(t1.has('b/d'))
 
1318
        self.assertTrue(t2.has('d'))
 
1319
        self.assertTrue(t3.has('b/d'))
1322
1320
 
1323
1321
    def test_clone_to_root(self):
1324
1322
        orig_transport = self.get_transport()
1398
1396
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1397
                         transport.abspath("/foo"))
1400
1398
 
 
1399
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1400
    def test_win32_abspath(self):
1402
1401
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1402
        # other platforms too, but then osutils does platform specific
1408
1407
 
1409
1408
        # smoke test for abspath on win32.
1410
1409
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1410
        transport = _mod_transport.get_transport_from_url("file:///")
 
1411
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1412
 
1414
1413
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1414
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1415
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1416
 
1418
1417
    def test_local_abspath(self):
1419
1418
        transport = self.get_transport()
1420
1419
        try:
1421
1420
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1421
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1422
            # should be formattable
1424
1423
            s = str(e)
1425
1424
        else:
1458
1457
        paths = set(transport.iter_files_recursive())
1459
1458
        # nb the directories are not converted
1460
1459
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
 
1460
                    {'isolated/dir/foo',
1462
1461
                         'isolated/dir/bar',
1463
1462
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1463
                         'isolated/bar'})
1465
1464
        sub_transport = transport.clone('isolated')
1466
1465
        paths = set(sub_transport.iter_files_recursive())
1467
1466
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1467
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1468
 
1470
1469
    def test_copy_tree(self):
1471
1470
        # TODO: test file contents and permissions are preserved. This test was
1488
1487
        transport.copy_tree('from', 'to')
1489
1488
        paths = set(transport.iter_files_recursive())
1490
1489
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
 
1490
                    {'from/dir/foo',
1492
1491
                         'from/dir/bar',
1493
1492
                         'from/dir/b%2525z',
1494
1493
                         'from/bar',
1495
1494
                         'to/dir/foo',
1496
1495
                         'to/dir/bar',
1497
1496
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1497
                         'to/bar',})
1499
1498
 
1500
1499
    def test_copy_tree_to_transport(self):
1501
1500
        transport = self.get_transport()
1518
1517
        from_transport.copy_tree_to_transport(to_transport)
1519
1518
        paths = set(transport.iter_files_recursive())
1520
1519
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
 
1520
                    {'from/dir/foo',
1522
1521
                         'from/dir/bar',
1523
1522
                         'from/dir/b%2525z',
1524
1523
                         'from/bar',
1525
1524
                         'to/dir/foo',
1526
1525
                         'to/dir/bar',
1527
1526
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1527
                         'to/bar',})
1529
1528
 
1530
1529
    def test_unicode_paths(self):
1531
1530
        """Test that we can read/write files with Unicode names."""
1545
1544
 
1546
1545
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1546
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1547
            self.knownFailure("test server cannot handle unicode paths")
1549
1548
 
1550
1549
        try:
1551
1550
            self.build_tree(files, transport=t, line_endings='binary')
1554
1553
 
1555
1554
        # A plain unicode string is not a valid url
1556
1555
        for fname in files:
1557
 
            self.assertRaises(InvalidURL, t.get, fname)
 
1556
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
1558
1557
 
1559
1558
        for fname in files:
1560
1559
            fname_utf8 = fname.encode('utf-8')
1616
1615
    def test_readv(self):
1617
1616
        transport = self.get_transport()
1618
1617
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1618
            with file('a', 'w') as f: f.write('0123456789')
1620
1619
        else:
1621
1620
            transport.put_bytes('a', '0123456789')
1622
1621
 
1632
1631
    def test_readv_out_of_order(self):
1633
1632
        transport = self.get_transport()
1634
1633
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1634
            with file('a', 'w') as f: f.write('0123456789')
1636
1635
        else:
1637
1636
            transport.put_bytes('a', '01234567890')
1638
1637
 
1710
1709
        transport = self.get_transport()
1711
1710
        # test from observed failure case.
1712
1711
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1712
            with file('a', 'w') as f: f.write('a'*1024*1024)
1714
1713
        else:
1715
1714
            transport.put_bytes('a', 'a'*1024*1024)
1716
1715
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1750
1749
    def test_readv_short_read(self):
1751
1750
        transport = self.get_transport()
1752
1751
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1752
            with file('a', 'w') as f: f.write('0123456789')
1754
1753
        else:
1755
1754
            transport.put_bytes('a', '01234567890')
1756
1755
 
1765
1764
        # also raise a special error
1766
1765
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
1766
                              transport.readv, 'a', [(12,2)])
 
1767
 
 
1768
    def test_no_segment_parameters(self):
 
1769
        """Segment parameters should be stripped and stored in
 
1770
        transport.segment_parameters."""
 
1771
        transport = self.get_transport("foo")
 
1772
        self.assertEqual({}, transport.get_segment_parameters())
 
1773
 
 
1774
    def test_segment_parameters(self):
 
1775
        """Segment parameters should be stripped and stored in
 
1776
        transport.get_segment_parameters()."""
 
1777
        base_url = self._server.get_url()
 
1778
        parameters = {"key1": "val1", "key2": "val2"}
 
1779
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1780
        transport = _mod_transport.get_transport_from_url(url)
 
1781
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1782
 
 
1783
    def test_set_segment_parameters(self):
 
1784
        """Segment parameters can be set and show up in base."""
 
1785
        transport = self.get_transport("foo")
 
1786
        orig_base = transport.base
 
1787
        transport.set_segment_parameter("arm", "board")
 
1788
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1789
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1790
        transport.set_segment_parameter("arm", None)
 
1791
        transport.set_segment_parameter("nonexistant", None)
 
1792
        self.assertEqual({}, transport.get_segment_parameters())
 
1793
        self.assertEqual(orig_base, transport.base)
 
1794
 
 
1795
    def test_stat_symlink(self):
 
1796
        # if a transport points directly to a symlink (and supports symlinks
 
1797
        # at all) you can tell this.  helps with bug 32669.
 
1798
        t = self.get_transport()
 
1799
        try:
 
1800
            t.symlink('target', 'link')
 
1801
        except TransportNotPossible:
 
1802
            raise TestSkipped("symlinks not supported")
 
1803
        t2 = t.clone('link')
 
1804
        st = t2.stat('')
 
1805
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1806
 
 
1807
    def test_abspath_url_unquote_unreserved(self):
 
1808
        """URLs from abspath should have unreserved characters unquoted
 
1809
        
 
1810
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1811
        """
 
1812
        t = self.get_transport()
 
1813
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1814
        self.assertEqual(t.base + "-.09AZ_az~",
 
1815
            t.abspath(needlessly_escaped_dir))
 
1816
 
 
1817
    def test_clone_url_unquote_unreserved(self):
 
1818
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1819
        
 
1820
        Cloned transports should be prefix comparable for things like the
 
1821
        isolation checking of tests, see lp:842223 for more.
 
1822
        """
 
1823
        t1 = self.get_transport()
 
1824
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1825
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1826
        t2 = t1.clone(needlessly_escaped_dir)
 
1827
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1828
 
 
1829
    def test_hook_post_connection_one(self):
 
1830
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1831
        log = []
 
1832
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1833
        t = self.get_transport()
 
1834
        self.assertEqual([], log)
 
1835
        t.has("non-existant")
 
1836
        if isinstance(t, RemoteTransport):
 
1837
            self.assertEqual([t.get_smart_medium()], log)
 
1838
        elif isinstance(t, ConnectedTransport):
 
1839
            self.assertEqual([t], log)
 
1840
        else:
 
1841
            self.assertEqual([], log)
 
1842
 
 
1843
    def test_hook_post_connection_multi(self):
 
1844
        """Fire post_connect hook once per unshared underlying connection"""
 
1845
        log = []
 
1846
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1847
        t1 = self.get_transport()
 
1848
        t2 = t1.clone(".")
 
1849
        t3 = self.get_transport()
 
1850
        self.assertEqual([], log)
 
1851
        t1.has("x")
 
1852
        t2.has("x")
 
1853
        t3.has("x")
 
1854
        if isinstance(t1, RemoteTransport):
 
1855
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1856
        elif isinstance(t1, ConnectedTransport):
 
1857
            self.assertEqual([t1, t3], log)
 
1858
        else:
 
1859
            self.assertEqual([], log)