/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2018-08-14 01:15:02 UTC
  • mto: This revision was merged to the branch mainline in revision 7078.
  • Revision ID: jelmer@jelmer.uk-20180814011502-5zaydaq02vc2qxo1
Fix tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
24
23
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
24
import stat
28
25
import sys
29
 
import unittest
30
26
 
31
 
from bzrlib import (
 
27
from .. import (
32
28
    errors,
33
29
    osutils,
 
30
    pyutils,
34
31
    tests,
 
32
    transport as _mod_transport,
35
33
    urlutils,
36
34
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      NoSuchFile,
 
38
                      PathError,
 
39
                      TransportNotPossible,
 
40
                      )
 
41
from ..osutils import getcwd
 
42
from ..sixish import (
 
43
    BytesIO,
 
44
    zip,
 
45
    )
 
46
from ..bzr.smart import medium
 
47
from . import (
51
48
    TestSkipped,
52
49
    TestNotApplicable,
53
50
    multiply_tests,
54
51
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
52
from . import test_server
 
53
from .test_transport import TestTransportImplementation
 
54
from ..transport import (
58
55
    ConnectedTransport,
59
 
    get_transport,
 
56
    Transport,
60
57
    _get_transport_modules,
61
58
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
59
from ..transport.memory import MemoryTransport
 
60
from ..transport.remote import RemoteTransport
63
61
 
64
62
 
65
63
def get_transport_test_permutations(module):
78
76
    for module in _get_transport_modules():
79
77
        try:
80
78
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
79
                pyutils.get_named_object(module))
82
80
            for (klass, server_factory) in permutations:
83
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
82
                    {"transport_class":klass,
85
83
                     "transport_server":server_factory})
86
84
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
85
        except errors.DependencyNotPresent as e:
88
86
            # Continue even if a dependency prevents us
89
87
            # from adding this test
90
88
            pass
91
89
    return result
92
90
 
93
91
 
94
 
def load_tests(standard_tests, module, loader):
 
92
def load_tests(loader, standard_tests, pattern):
95
93
    """Multiply tests for tranport implementations."""
96
94
    result = loader.suiteClass()
97
95
    scenarios = transport_test_permutations()
102
100
 
103
101
    def setUp(self):
104
102
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
104
 
107
105
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
108
 
111
109
    def test_ensure_base_missing(self):
112
110
        """.ensure_base() should create the directory if it doesn't exist"""
156
154
        self.assertEqual(True, t.has('a'))
157
155
        self.assertEqual(False, t.has('c'))
158
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
159
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
 
                                           'e', 'f', 'g', 'h'])),
161
 
                         [True, True, False, False,
162
 
                          True, False, True, False])
163
157
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
158
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
159
                                           urlutils.escape('%%')]))
166
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
 
                                                'e', 'f', 'g', 'h']))),
168
 
                         [True, True, False, False,
169
 
                          True, False, True, False])
170
160
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
161
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
162
 
183
173
    def test_get(self):
184
174
        t = self.get_transport()
185
175
 
186
 
        files = ['a', 'b', 'e', 'g']
187
 
        contents = ['contents of a\n',
188
 
                    'contents of b\n',
189
 
                    'contents of e\n',
190
 
                    'contents of g\n',
191
 
                    ]
192
 
        self.build_tree(files, transport=t, line_endings='binary')
193
 
        self.check_transport_contents('contents of a\n', t, 'a')
194
 
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
197
 
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
199
 
            self.assertEqual(content, f.read())
200
 
 
201
 
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
204
 
            self.assertEqual(content, f.read())
 
176
        files = ['a']
 
177
        content = b'contents of a\n'
 
178
        self.build_tree(['a'], transport=t, line_endings='binary')
 
179
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
180
        f = t.get('a')
 
181
        self.assertEqual(content, f.read())
205
182
 
206
183
    def test_get_unknown_file(self):
207
184
        t = self.get_transport()
208
185
        files = ['a', 'b']
209
 
        contents = ['contents of a\n',
210
 
                    'contents of b\n',
 
186
        contents = [b'contents of a\n',
 
187
                    b'contents of b\n',
211
188
                    ]
212
189
        self.build_tree(files, transport=t, line_endings='binary')
213
190
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
191
        def iterate_and_close(func, *args):
 
192
            for f in func(*args):
 
193
                # We call f.read() here because things like paramiko actually
 
194
                # spawn a thread to prefetch the content, which we want to
 
195
                # consume before we close the handle.
 
196
                content = f.read()
 
197
                f.close()
216
198
 
217
199
    def test_get_directory_read_gives_ReadError(self):
218
200
        """consistent errors for read() on a file returned by get()."""
238
220
        t = self.get_transport()
239
221
 
240
222
        files = ['a', 'b', 'e', 'g']
241
 
        contents = ['contents of a\n',
242
 
                    'contents of b\n',
243
 
                    'contents of e\n',
244
 
                    'contents of g\n',
 
223
        contents = [b'contents of a\n',
 
224
                    b'contents of b\n',
 
225
                    b'contents of e\n',
 
226
                    b'contents of g\n',
245
227
                    ]
246
228
        self.build_tree(files, transport=t, line_endings='binary')
247
 
        self.check_transport_contents('contents of a\n', t, 'a')
 
229
        self.check_transport_contents(b'contents of a\n', t, 'a')
248
230
 
249
231
        for content, fname in zip(contents, files):
250
232
            self.assertEqual(content, t.get_bytes(fname))
251
233
 
252
234
    def test_get_bytes_unknown_file(self):
253
235
        t = self.get_transport()
254
 
 
255
236
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
237
 
257
238
    def test_get_with_open_write_stream_sees_all_content(self):
258
239
        t = self.get_transport()
259
240
        if t.is_readonly():
260
241
            return
261
 
        handle = t.open_write_stream('foo')
262
 
        try:
263
 
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
265
 
        finally:
266
 
            handle.close()
 
242
        with t.open_write_stream('foo') as handle:
 
243
            handle.write(b'b')
 
244
            self.assertEqual(b'b', t.get_bytes('foo'))
267
245
 
268
246
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
247
        t = self.get_transport()
270
248
        if t.is_readonly():
271
249
            return
272
 
        handle = t.open_write_stream('foo')
273
 
        try:
274
 
            handle.write('b')
275
 
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
277
 
        finally:
278
 
            handle.close()
 
250
        with t.open_write_stream('foo') as handle:
 
251
            handle.write(b'b')
 
252
            self.assertEqual(b'b', t.get_bytes('foo'))
 
253
            with t.get('foo') as f:
 
254
                self.assertEqual(b'b', f.read())
279
255
 
280
256
    def test_put_bytes(self):
281
257
        t = self.get_transport()
282
258
 
283
259
        if t.is_readonly():
284
260
            self.assertRaises(TransportNotPossible,
285
 
                    t.put_bytes, 'a', 'some text for a\n')
 
261
                    t.put_bytes, 'a', b'some text for a\n')
286
262
            return
287
263
 
288
 
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
290
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
264
        t.put_bytes('a', b'some text for a\n')
 
265
        self.assertTrue(t.has('a'))
 
266
        self.check_transport_contents(b'some text for a\n', t, 'a')
291
267
 
292
268
        # The contents should be overwritten
293
 
        t.put_bytes('a', 'new text for a\n')
294
 
        self.check_transport_contents('new text for a\n', t, 'a')
 
269
        t.put_bytes('a', b'new text for a\n')
 
270
        self.check_transport_contents(b'new text for a\n', t, 'a')
295
271
 
296
272
        self.assertRaises(NoSuchFile,
297
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
273
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
298
274
 
299
275
    def test_put_bytes_non_atomic(self):
300
276
        t = self.get_transport()
301
277
 
302
278
        if t.is_readonly():
303
279
            self.assertRaises(TransportNotPossible,
304
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
280
                    t.put_bytes_non_atomic, 'a', b'some text for a\n')
305
281
            return
306
282
 
307
 
        self.failIf(t.has('a'))
308
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
310
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
283
        self.assertFalse(t.has('a'))
 
284
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
285
        self.assertTrue(t.has('a'))
 
286
        self.check_transport_contents(b'some text for a\n', t, 'a')
311
287
        # Put also replaces contents
312
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
288
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
289
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
314
290
 
315
291
        # Make sure we can create another file
316
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
292
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
317
293
        # And overwrite 'a' with empty contents
318
 
        t.put_bytes_non_atomic('a', '')
319
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
320
 
        self.check_transport_contents('', t, 'a')
 
294
        t.put_bytes_non_atomic('a', b'')
 
295
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
296
        self.check_transport_contents(b'', t, 'a')
321
297
 
322
298
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
323
 
                                       'contents\n')
 
299
                                       b'contents\n')
324
300
        # Now test the create_parent flag
325
301
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
 
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
328
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
302
                                       b'contents\n')
 
303
        self.assertFalse(t.has('dir/a'))
 
304
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
329
305
                               create_parent_dir=True)
330
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
306
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
331
307
 
332
308
        # But we still get NoSuchFile if we can't make the parent dir
333
309
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
334
 
                                       'contents\n',
 
310
                                       b'contents\n',
335
311
                                       create_parent_dir=True)
336
312
 
337
313
    def test_put_bytes_permissions(self):
342
318
        if not t._can_roundtrip_unix_modebits():
343
319
            # Can't roundtrip, so no need to run this test
344
320
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
321
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
322
        self.assertTransportMode(t, 'mode644', 0o644)
 
323
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
324
        self.assertTransportMode(t, 'mode666', 0o666)
 
325
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
326
        self.assertTransportMode(t, 'mode600', 0o600)
351
327
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
328
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
329
        self.assertTransportMode(t, 'mode400', 0o400)
354
330
 
355
331
        # The default permissions should be based on the current umask
356
332
        umask = osutils.get_umask()
357
 
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
333
        t.put_bytes('nomode', b'test text\n', mode=None)
 
334
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
335
 
360
336
    def test_put_bytes_non_atomic_permissions(self):
361
337
        t = self.get_transport()
365
341
        if not t._can_roundtrip_unix_modebits():
366
342
            # Can't roundtrip, so no need to run this test
367
343
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
344
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
345
        self.assertTransportMode(t, 'mode644', 0o644)
 
346
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
347
        self.assertTransportMode(t, 'mode666', 0o666)
 
348
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
349
        self.assertTransportMode(t, 'mode600', 0o600)
 
350
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
351
        self.assertTransportMode(t, 'mode400', 0o400)
376
352
 
377
353
        # The default permissions should be based on the current umask
378
354
        umask = osutils.get_umask()
379
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
355
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
356
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
357
 
382
358
        # We should also be able to set the mode for a parent directory
383
359
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
360
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
361
                               dir_mode=0o700, create_parent_dir=True)
 
362
        self.assertTransportMode(t, 'dir700', 0o700)
 
363
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
364
                               dir_mode=0o770, create_parent_dir=True)
 
365
        self.assertTransportMode(t, 'dir770', 0o770)
 
366
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
367
                               dir_mode=0o777, create_parent_dir=True)
 
368
        self.assertTransportMode(t, 'dir777', 0o777)
393
369
 
394
370
    def test_put_file(self):
395
371
        t = self.get_transport()
396
372
 
397
373
        if t.is_readonly():
398
374
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
375
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
400
376
            return
401
377
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
378
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
379
        # put_file returns the length of the data written
404
380
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
406
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
381
        self.assertTrue(t.has('a'))
 
382
        self.check_transport_contents(b'some text for a\n', t, 'a')
407
383
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
384
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
385
        self.assertEqual(19, result)
410
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
386
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
411
387
        self.assertRaises(NoSuchFile,
412
388
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
389
                              BytesIO(b'contents'))
414
390
 
415
391
    def test_put_file_non_atomic(self):
416
392
        t = self.get_transport()
417
393
 
418
394
        if t.is_readonly():
419
395
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
396
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
397
            return
422
398
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
426
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
399
        self.assertFalse(t.has('a'))
 
400
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
401
        self.assertTrue(t.has('a'))
 
402
        self.check_transport_contents(b'some text for a\n', t, 'a')
427
403
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
404
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
405
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
430
406
 
431
407
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
408
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
409
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
435
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
 
        self.check_transport_contents('', t, 'a')
 
410
        t.put_file_non_atomic('a', BytesIO(b''))
 
411
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
412
        self.check_transport_contents(b'', t, 'a')
437
413
 
438
414
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
415
                                       BytesIO(b'contents\n'))
440
416
        # Now test the create_parent flag
441
417
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
418
                                       BytesIO(b'contents\n'))
 
419
        self.assertFalse(t.has('dir/a'))
 
420
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
421
                              create_parent_dir=True)
446
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
422
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
447
423
 
448
424
        # But we still get NoSuchFile if we can't make the parent dir
449
425
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
 
426
                                       BytesIO(b'contents\n'),
451
427
                                       create_parent_dir=True)
452
428
 
453
429
    def test_put_file_permissions(self):
459
435
        if not t._can_roundtrip_unix_modebits():
460
436
            # Can't roundtrip, so no need to run this test
461
437
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
438
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
439
        self.assertTransportMode(t, 'mode644', 0o644)
 
440
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
441
        self.assertTransportMode(t, 'mode666', 0o666)
 
442
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
443
        self.assertTransportMode(t, 'mode600', 0o600)
468
444
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
445
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
446
        self.assertTransportMode(t, 'mode400', 0o400)
471
447
        # The default permissions should be based on the current umask
472
448
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
449
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
450
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
451
 
476
452
    def test_put_file_non_atomic_permissions(self):
477
453
        t = self.get_transport()
481
457
        if not t._can_roundtrip_unix_modebits():
482
458
            # Can't roundtrip, so no need to run this test
483
459
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
460
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
461
        self.assertTransportMode(t, 'mode644', 0o644)
 
462
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
463
        self.assertTransportMode(t, 'mode666', 0o666)
 
464
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
465
        self.assertTransportMode(t, 'mode600', 0o600)
490
466
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
467
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
468
        self.assertTransportMode(t, 'mode400', 0o400)
493
469
 
494
470
        # The default permissions should be based on the current umask
495
471
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
472
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
473
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
474
 
499
475
        # We should also be able to set the mode for a parent directory
500
476
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
477
        sio = BytesIO()
 
478
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
479
                              dir_mode=0o700, create_parent_dir=True)
 
480
        self.assertTransportMode(t, 'dir700', 0o700)
 
481
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
482
                              dir_mode=0o770, create_parent_dir=True)
 
483
        self.assertTransportMode(t, 'dir770', 0o770)
 
484
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
485
                              dir_mode=0o777, create_parent_dir=True)
 
486
        self.assertTransportMode(t, 'dir777', 0o777)
511
487
 
512
488
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
489
        t = self.get_transport()
520
490
        if t.is_readonly():
521
491
            return
522
492
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
493
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
494
 
540
495
    def test_mkdir(self):
541
496
        t = self.get_transport()
546
501
            # defined for the transport interface.
547
502
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
503
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
504
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
551
505
            return
552
506
        # Test mkdir
557
511
        t.mkdir('dir_b')
558
512
        self.assertEqual(t.has('dir_b'), True)
559
513
 
560
 
        t.mkdir_multi(['dir_c', 'dir_d'])
561
 
 
562
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
 
        self.assertEqual(list(t.has_multi(
564
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
 
            [True, True, True, False,
567
 
             True, True, True, True])
 
514
        self.assertEqual([t.has(n) for n in
 
515
            ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
516
            [True, True, False, True])
568
517
 
569
518
        # we were testing that a local mkdir followed by a transport
570
519
        # mkdir failed thusly, but given that we * in one process * do not
575
524
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
576
525
 
577
526
        # Test get/put in sub-directories
578
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
527
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
528
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
529
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
530
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
582
531
 
583
532
        # mkdir of a dir with an absent parent
584
533
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
591
540
            # no sense testing on this transport
592
541
            return
593
542
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
543
        t.mkdir('dmode755', mode=0o755)
 
544
        self.assertTransportMode(t, 'dmode755', 0o755)
 
545
        t.mkdir('dmode555', mode=0o555)
 
546
        self.assertTransportMode(t, 'dmode555', 0o555)
 
547
        t.mkdir('dmode777', mode=0o777)
 
548
        self.assertTransportMode(t, 'dmode777', 0o777)
 
549
        t.mkdir('dmode700', mode=0o700)
 
550
        self.assertTransportMode(t, 'dmode700', 0o700)
 
551
        t.mkdir('mdmode755', mode=0o755)
 
552
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
553
 
605
554
        # Default mode should be based on umask
606
555
        umask = osutils.get_umask()
607
556
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
557
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
558
 
610
559
    def test_opening_a_file_stream_creates_file(self):
611
560
        t = self.get_transport()
613
562
            return
614
563
        handle = t.open_write_stream('foo')
615
564
        try:
616
 
            self.assertEqual('', t.get_bytes('foo'))
 
565
            self.assertEqual(b'', t.get_bytes('foo'))
617
566
        finally:
618
567
            handle.close()
619
568
 
620
569
    def test_opening_a_file_stream_can_set_mode(self):
621
570
        t = self.get_transport()
622
571
        if t.is_readonly():
 
572
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
573
                              t.open_write_stream, 'foo')
623
574
            return
624
575
        if not t._can_roundtrip_unix_modebits():
625
576
            # Can't roundtrip, so no need to run this test
626
577
            return
 
578
 
627
579
        def check_mode(name, mode, expected):
628
580
            handle = t.open_write_stream(name, mode=mode)
629
581
            handle.close()
630
582
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
583
        check_mode('mode644', 0o644, 0o644)
 
584
        check_mode('mode666', 0o666, 0o666)
 
585
        check_mode('mode600', 0o600, 0o600)
634
586
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
587
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
588
 
637
589
    def test_copy_to(self):
638
590
        # FIXME: test:   same server to same server (partly done)
645
597
            self.build_tree(files, transport=transport_from)
646
598
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
599
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
600
                self.check_transport_contents(transport_to.get_bytes(f),
649
601
                                              transport_from, f)
650
602
 
651
603
        t = self.get_transport()
 
604
        if t.__class__.__name__ == "SFTPTransport":
 
605
            self.skipTest("SFTP copy_to currently too flakey to use")
652
606
        temp_transport = MemoryTransport('memory:///')
653
607
        simple_copy_files(t, temp_transport)
654
608
        if not t.is_readonly():
663
617
            self.build_tree(['e/', 'e/f'])
664
618
        else:
665
619
            t.mkdir('e')
666
 
            t.put_bytes('e/f', 'contents of e')
 
620
            t.put_bytes('e/f', b'contents of e')
667
621
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
622
        temp_transport.mkdir('e')
669
623
        t.copy_to(['e/f'], temp_transport)
674
628
        files = ['a', 'b', 'c', 'd']
675
629
        t.copy_to(iter(files), temp_transport)
676
630
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
631
            self.check_transport_contents(temp_transport.get_bytes(f),
678
632
                                          t, f)
679
633
        del temp_transport
680
634
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
635
        for mode in (0o666, 0o644, 0o600, 0o400):
682
636
            temp_transport = MemoryTransport("memory:///")
683
637
            t.copy_to(files, temp_transport, mode=mode)
684
638
            for f in files:
701
655
            self.assertRaises(TransportNotPossible,
702
656
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
703
657
            return
704
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
705
 
        t.put_bytes('b', 'contents\nfor b\n')
 
658
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
659
        t.put_bytes('b', b'contents\nfor b\n')
706
660
 
707
661
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
662
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
663
 
710
664
        self.check_transport_contents(
711
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
665
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
712
666
            t, 'a')
713
667
 
714
668
        # a file with no parent should fail..
715
669
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
670
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
671
 
718
672
        # And we can create new files, too
719
673
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
 
        self.check_transport_contents('some text\nfor a missing file\n',
 
674
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
675
        self.check_transport_contents(b'some text\nfor a missing file\n',
722
676
                                      t, 'c')
723
677
 
724
678
    def test_append_bytes(self):
726
680
 
727
681
        if t.is_readonly():
728
682
            self.assertRaises(TransportNotPossible,
729
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
683
                    t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
730
684
            return
731
685
 
732
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
686
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
687
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
734
688
 
735
689
        self.assertEqual(20,
736
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
690
            t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
737
691
 
738
692
        self.check_transport_contents(
739
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
693
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
740
694
            t, 'a')
741
695
 
742
696
        # a file with no parent should fail..
743
697
        self.assertRaises(NoSuchFile,
744
 
                          t.append_bytes, 'missing/path', 'content')
745
 
 
746
 
    def test_append_multi(self):
747
 
        t = self.get_transport()
748
 
 
749
 
        if t.is_readonly():
750
 
            return
751
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
752
 
                         'add\nsome\nmore\ncontents\n')
753
 
        t.put_bytes('b', 'contents\nfor b\n')
754
 
 
755
 
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
758
 
 
759
 
        self.check_transport_contents(
760
 
            'diff\ncontents for\na\n'
761
 
            'add\nsome\nmore\ncontents\n'
762
 
            'and\nthen\nsome\nmore\n',
763
 
            t, 'a')
764
 
        self.check_transport_contents(
765
 
                'contents\nfor b\n'
766
 
                'some\nmore\nfor\nb\n',
767
 
                t, 'b')
768
 
 
769
 
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
772
 
        self.check_transport_contents(
773
 
            'diff\ncontents for\na\n'
774
 
            'add\nsome\nmore\ncontents\n'
775
 
            'and\nthen\nsome\nmore\n'
776
 
            'a little bit more\n',
777
 
            t, 'a')
778
 
        self.check_transport_contents(
779
 
                'contents\nfor b\n'
780
 
                'some\nmore\nfor\nb\n'
781
 
                'from an iterator\n',
782
 
                t, 'b')
783
 
 
784
 
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
787
 
 
788
 
        self.check_transport_contents(
789
 
            'diff\ncontents for\na\n'
790
 
            'add\nsome\nmore\ncontents\n'
791
 
            'and\nthen\nsome\nmore\n'
792
 
            'a little bit more\n'
793
 
            'some text in a\n',
794
 
            t, 'a')
795
 
        self.check_transport_contents('missing file r\n', t, 'd')
 
698
                          t.append_bytes, 'missing/path', b'content')
796
699
 
797
700
    def test_append_file_mode(self):
798
701
        """Check that append accepts a mode parameter"""
800
703
        t = self.get_transport()
801
704
        if t.is_readonly():
802
705
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
706
                t.append_file, 'f', BytesIO(b'f'), mode=None)
804
707
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
708
        t.append_file('f', BytesIO(b'f'), mode=None)
806
709
 
807
710
    def test_append_bytes_mode(self):
808
711
        # check append_bytes accepts a mode
809
712
        t = self.get_transport()
810
713
        if t.is_readonly():
811
714
            self.assertRaises(TransportNotPossible,
812
 
                t.append_bytes, 'f', 'f', mode=None)
 
715
                t.append_bytes, 'f', b'f', mode=None)
813
716
            return
814
 
        t.append_bytes('f', 'f', mode=None)
 
717
        t.append_bytes('f', b'f', mode=None)
815
718
 
816
719
    def test_delete(self):
817
720
        # TODO: Test Transport.delete
822
725
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
823
726
            return
824
727
 
825
 
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
728
        t.put_bytes('a', b'a little bit of text\n')
 
729
        self.assertTrue(t.has('a'))
827
730
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
731
        self.assertFalse(t.has('a'))
829
732
 
830
733
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
734
 
832
 
        t.put_bytes('a', 'a text\n')
833
 
        t.put_bytes('b', 'b text\n')
834
 
        t.put_bytes('c', 'c text\n')
 
735
        t.put_bytes('a', b'a text\n')
 
736
        t.put_bytes('b', b'b text\n')
 
737
        t.put_bytes('c', b'c text\n')
835
738
        self.assertEqual([True, True, True],
836
 
                list(t.has_multi(['a', 'b', 'c'])))
837
 
        t.delete_multi(['a', 'c'])
 
739
                [t.has(n) for n in ['a', 'b', 'c']])
 
740
        t.delete('a')
 
741
        t.delete('c')
838
742
        self.assertEqual([False, True, False],
839
 
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
843
 
 
844
 
        self.assertRaises(NoSuchFile,
845
 
                t.delete_multi, ['a', 'b', 'c'])
846
 
 
847
 
        self.assertRaises(NoSuchFile,
848
 
                t.delete_multi, iter(['a', 'b', 'c']))
849
 
 
850
 
        t.put_bytes('a', 'another a text\n')
851
 
        t.put_bytes('c', 'another c text\n')
852
 
        t.delete_multi(iter(['a', 'b', 'c']))
 
743
                [t.has(n) for n in ['a', 'b', 'c']])
 
744
        self.assertFalse(t.has('a'))
 
745
        self.assertTrue(t.has('b'))
 
746
        self.assertFalse(t.has('c'))
 
747
 
 
748
        for name in ['a', 'c', 'd']:
 
749
            self.assertRaises(NoSuchFile, t.delete, name)
853
750
 
854
751
        # We should have deleted everything
855
752
        # SftpServer creates control files in the
902
799
        if t.is_readonly():
903
800
            return
904
801
        t.mkdir('foo')
905
 
        t.put_bytes('foo-bar', '')
 
802
        t.put_bytes('foo-bar', b'')
906
803
        t.mkdir('foo-baz')
907
804
        t.rmdir('foo')
908
805
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
806
        self.assertTrue(t.has('foo-bar'))
910
807
 
911
808
    def test_rename_dir_succeeds(self):
912
809
        t = self.get_transport()
913
810
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
811
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
812
                              t.rename, 'foo', 'bar')
 
813
            return
915
814
        t.mkdir('adir')
916
815
        t.mkdir('adir/asubdir')
917
816
        t.rename('adir', 'bdir')
922
821
        """Attempting to replace a nonemtpy directory should fail"""
923
822
        t = self.get_transport()
924
823
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
824
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
825
                              t.rename, 'foo', 'bar')
 
826
            return
926
827
        t.mkdir('adir')
927
828
        t.mkdir('adir/asubdir')
928
829
        t.mkdir('bdir')
943
844
        t.mkdir('b')
944
845
        ta = t.clone('a')
945
846
        tb = t.clone('b')
946
 
        ta.put_bytes('f', 'aoeu')
 
847
        ta.put_bytes('f', b'aoeu')
947
848
        ta.rename('f', '../b/f')
948
849
        self.assertTrue(tb.has('f'))
949
850
        self.assertFalse(ta.has('f'))
991
892
        # creates control files in the working directory
992
893
        # perhaps all of this could be done in a subdirectory
993
894
 
994
 
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
895
        t.put_bytes('a', b'a first file\n')
 
896
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
996
897
 
997
898
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
899
        self.assertTrue(t.has('b'))
 
900
        self.assertFalse(t.has('a'))
1000
901
 
1001
 
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
902
        self.check_transport_contents(b'a first file\n', t, 'b')
 
903
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1003
904
 
1004
905
        # Overwrite a file
1005
 
        t.put_bytes('c', 'c this file\n')
 
906
        t.put_bytes('c', b'c this file\n')
1006
907
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
1008
 
        self.check_transport_contents('c this file\n', t, 'b')
 
908
        self.assertFalse(t.has('c'))
 
909
        self.check_transport_contents(b'c this file\n', t, 'b')
1009
910
 
1010
911
        # TODO: Try to write a test for atomicity
1011
912
        # TODO: Test moving into a non-existent subdirectory
1012
 
        # TODO: Test Transport.move_multi
1013
913
 
1014
914
    def test_copy(self):
1015
915
        t = self.get_transport()
1017
917
        if t.is_readonly():
1018
918
            return
1019
919
 
1020
 
        t.put_bytes('a', 'a file\n')
 
920
        t.put_bytes('a', b'a file\n')
1021
921
        t.copy('a', 'b')
1022
 
        self.check_transport_contents('a file\n', t, 'b')
 
922
        self.check_transport_contents(b'a file\n', t, 'b')
1023
923
 
1024
924
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1025
925
        os.mkdir('c')
1026
926
        # What should the assert be if you try to copy a
1027
927
        # file over a directory?
1028
928
        #self.assertRaises(Something, t.copy, 'a', 'c')
1029
 
        t.put_bytes('d', 'text in d\n')
 
929
        t.put_bytes('d', b'text in d\n')
1030
930
        t.copy('d', 'b')
1031
 
        self.check_transport_contents('text in d\n', t, 'b')
1032
 
 
1033
 
        # TODO: test copy_multi
 
931
        self.check_transport_contents(b'text in d\n', t, 'b')
1034
932
 
1035
933
    def test_connection_error(self):
1036
934
        """ConnectionError is raised when connection is impossible.
1042
940
        except NotImplementedError:
1043
941
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
942
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
943
        t = _mod_transport.get_transport_from_url(url)
1046
944
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
945
 
1048
946
    def test_stat(self):
1053
951
 
1054
952
        try:
1055
953
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
954
        except TransportNotPossible as e:
1057
955
            # This transport cannot stat
1058
956
            return
1059
957
 
1064
962
        for path, size in zip(paths, sizes):
1065
963
            st = t.stat(path)
1066
964
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
965
                self.assertTrue(S_ISDIR(st.st_mode))
1068
966
                # directory sizes are meaningless
1069
967
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
968
                self.assertTrue(S_ISREG(st.st_mode))
1071
969
                self.assertEqual(size, st.st_size)
1072
970
 
1073
 
        remote_stats = list(t.stat_multi(paths))
1074
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1075
 
 
1076
971
        self.assertRaises(NoSuchFile, t.stat, 'q')
1077
972
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1078
973
 
1079
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
974
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
975
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
976
        st = subdir.stat('./file')
 
977
        st = subdir.stat('.')
1085
978
 
1086
979
    def test_hardlink(self):
1087
980
        from stat import ST_NLINK
1096
989
        try:
1097
990
            t.hardlink(source_name, link_name)
1098
991
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
992
            self.assertTrue(t.has(source_name))
 
993
            self.assertTrue(t.has(link_name))
1101
994
 
1102
995
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
996
            self.assertEqual(st[ST_NLINK], 2)
1104
997
        except TransportNotPossible:
1105
998
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
999
                              self._server.__class__)
1118
1011
        try:
1119
1012
            t.symlink(source_name, link_name)
1120
1013
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1014
            self.assertTrue(t.has(source_name))
 
1015
            self.assertTrue(t.has(link_name))
1123
1016
 
1124
1017
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
 
1018
            self.assertTrue(S_ISLNK(st.st_mode),
 
1019
                "expected symlink, got mode %o" % st.st_mode)
1126
1020
        except TransportNotPossible:
1127
1021
            raise TestSkipped("Transport %s does not support symlinks." %
1128
1022
                              self._server.__class__)
1129
 
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1131
1023
 
1132
1024
    def test_list_dir(self):
1133
1025
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1030
            return
1139
1031
 
1140
1032
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1033
            l = sorted(transport.list_dir(d))
1143
1034
            return l
1144
1035
 
1145
1036
        self.assertEqual([], sorted_list('.', t))
1197
1088
            raise TestSkipped("not a connected transport")
1198
1089
 
1199
1090
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1091
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1092
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1093
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1094
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1095
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1096
 
1206
1097
    def test__reuse_for(self):
1207
1098
        t = self.get_transport()
1214
1105
 
1215
1106
            Only the parameters different from None will be changed.
1216
1107
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1108
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1109
            if user     is None: user     = t._parsed_url.user
 
1110
            if password is None: password = t._parsed_url.password
 
1111
            if user     is None: user     = t._parsed_url.user
 
1112
            if host     is None: host     = t._parsed_url.host
 
1113
            if port     is None: port     = t._parsed_url.port
 
1114
            if path     is None: path     = t._parsed_url.path
 
1115
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1116
 
1226
 
        if t._scheme == 'ftp':
 
1117
        if t._parsed_url.scheme == 'ftp':
1227
1118
            scheme = 'sftp'
1228
1119
        else:
1229
1120
            scheme = 'ftp'
1230
1121
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1122
        if t._parsed_url.user == 'me':
1232
1123
            user = 'you'
1233
1124
        else:
1234
1125
            user = 'me'
1245
1136
        #   (they may be typed by the user when prompted for example)
1246
1137
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1138
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1139
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1140
        if t._parsed_url.port == 1234:
1250
1141
            port = 4321
1251
1142
        else:
1252
1143
            port = 1234
1265
1156
        self.assertIs(t._get_connection(), c._get_connection())
1266
1157
 
1267
1158
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1159
        new_connection = None
1269
1160
        t._set_connection(new_connection)
1270
1161
        # Check that both transports use the same connection
1271
1162
        self.assertIs(new_connection, t._get_connection())
1293
1184
 
1294
1185
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1186
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1187
        self.assertTrue(t1.has('a'))
 
1188
        self.assertTrue(t1.has('b/c'))
 
1189
        self.assertFalse(t1.has('c'))
1299
1190
 
1300
1191
        t2 = t1.clone('b')
1301
1192
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1193
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1194
        self.assertTrue(t2.has('c'))
 
1195
        self.assertFalse(t2.has('a'))
1305
1196
 
1306
1197
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1198
        self.assertTrue(t3.has('a'))
 
1199
        self.assertFalse(t3.has('c'))
1309
1200
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1201
        self.assertFalse(t1.has('b/d'))
 
1202
        self.assertFalse(t2.has('d'))
 
1203
        self.assertFalse(t3.has('b/d'))
1313
1204
 
1314
1205
        if t1.is_readonly():
1315
 
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1206
            self.build_tree_contents([('b/d', b'newfile\n')])
1316
1207
        else:
1317
 
            t2.put_bytes('d', 'newfile\n')
 
1208
            t2.put_bytes('d', b'newfile\n')
1318
1209
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1210
        self.assertTrue(t1.has('b/d'))
 
1211
        self.assertTrue(t2.has('d'))
 
1212
        self.assertTrue(t3.has('b/d'))
1322
1213
 
1323
1214
    def test_clone_to_root(self):
1324
1215
        orig_transport = self.get_transport()
1398
1289
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1290
                         transport.abspath("/foo"))
1400
1291
 
 
1292
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1293
    def test_win32_abspath(self):
1402
1294
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1295
        # other platforms too, but then osutils does platform specific
1408
1300
 
1409
1301
        # smoke test for abspath on win32.
1410
1302
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1303
        transport = _mod_transport.get_transport_from_url("file:///")
 
1304
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1305
 
1414
1306
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1307
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1308
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1309
 
1418
1310
    def test_local_abspath(self):
1419
1311
        transport = self.get_transport()
1420
1312
        try:
1421
1313
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1314
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1315
            # should be formattable
1424
1316
            s = str(e)
1425
1317
        else:
1458
1350
        paths = set(transport.iter_files_recursive())
1459
1351
        # nb the directories are not converted
1460
1352
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
 
1353
                    {'isolated/dir/foo',
1462
1354
                         'isolated/dir/bar',
1463
1355
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1356
                         'isolated/bar'})
1465
1357
        sub_transport = transport.clone('isolated')
1466
1358
        paths = set(sub_transport.iter_files_recursive())
1467
1359
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1360
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1361
 
1470
1362
    def test_copy_tree(self):
1471
1363
        # TODO: test file contents and permissions are preserved. This test was
1488
1380
        transport.copy_tree('from', 'to')
1489
1381
        paths = set(transport.iter_files_recursive())
1490
1382
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
 
1383
                    {'from/dir/foo',
1492
1384
                         'from/dir/bar',
1493
1385
                         'from/dir/b%2525z',
1494
1386
                         'from/bar',
1495
1387
                         'to/dir/foo',
1496
1388
                         'to/dir/bar',
1497
1389
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1390
                         'to/bar',})
1499
1391
 
1500
1392
    def test_copy_tree_to_transport(self):
1501
1393
        transport = self.get_transport()
1518
1410
        from_transport.copy_tree_to_transport(to_transport)
1519
1411
        paths = set(transport.iter_files_recursive())
1520
1412
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
 
1413
                    {'from/dir/foo',
1522
1414
                         'from/dir/bar',
1523
1415
                         'from/dir/b%2525z',
1524
1416
                         'from/bar',
1525
1417
                         'to/dir/foo',
1526
1418
                         'to/dir/bar',
1527
1419
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1420
                         'to/bar',})
1529
1421
 
1530
1422
    def test_unicode_paths(self):
1531
1423
        """Test that we can read/write files with Unicode names."""
1545
1437
 
1546
1438
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1439
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1440
            self.knownFailure("test server cannot handle unicode paths")
1549
1441
 
1550
1442
        try:
1551
1443
            self.build_tree(files, transport=t, line_endings='binary')
1554
1446
 
1555
1447
        # A plain unicode string is not a valid url
1556
1448
        for fname in files:
1557
 
            self.assertRaises(InvalidURL, t.get, fname)
 
1449
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
1558
1450
 
1559
1451
        for fname in files:
1560
1452
            fname_utf8 = fname.encode('utf-8')
1561
 
            contents = 'contents of %s\n' % (fname_utf8,)
 
1453
            contents = b'contents of %s\n' % (fname_utf8,)
1562
1454
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1563
1455
 
1564
1456
    def test_connect_twice_is_same_content(self):
1567
1459
        transport = self.get_transport()
1568
1460
        if transport.is_readonly():
1569
1461
            return
1570
 
        transport.put_bytes('foo', 'bar')
 
1462
        transport.put_bytes('foo', b'bar')
1571
1463
        transport3 = self.get_transport()
1572
 
        self.check_transport_contents('bar', transport3, 'foo')
 
1464
        self.check_transport_contents(b'bar', transport3, 'foo')
1573
1465
 
1574
1466
        # now opening at a relative url should give use a sane result:
1575
1467
        transport.mkdir('newdir')
1576
1468
        transport5 = self.get_transport('newdir')
1577
1469
        transport6 = transport5.clone('..')
1578
 
        self.check_transport_contents('bar', transport6, 'foo')
 
1470
        self.check_transport_contents(b'bar', transport6, 'foo')
1579
1471
 
1580
1472
    def test_lock_write(self):
1581
1473
        """Test transport-level write locks.
1586
1478
        if transport.is_readonly():
1587
1479
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1588
1480
            return
1589
 
        transport.put_bytes('lock', '')
 
1481
        transport.put_bytes('lock', b'')
1590
1482
        try:
1591
1483
            lock = transport.lock_write('lock')
1592
1484
        except TransportNotPossible:
1602
1494
        """
1603
1495
        transport = self.get_transport()
1604
1496
        if transport.is_readonly():
1605
 
            file('lock', 'w').close()
 
1497
            open('lock', 'w').close()
1606
1498
        else:
1607
 
            transport.put_bytes('lock', '')
 
1499
            transport.put_bytes('lock', b'')
1608
1500
        try:
1609
1501
            lock = transport.lock_read('lock')
1610
1502
        except TransportNotPossible:
1616
1508
    def test_readv(self):
1617
1509
        transport = self.get_transport()
1618
1510
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1511
            with open('a', 'w') as f: f.write('0123456789')
1620
1512
        else:
1621
 
            transport.put_bytes('a', '0123456789')
 
1513
            transport.put_bytes('a', b'0123456789')
1622
1514
 
1623
1515
        d = list(transport.readv('a', ((0, 1),)))
1624
 
        self.assertEqual(d[0], (0, '0'))
 
1516
        self.assertEqual(d[0], (0, b'0'))
1625
1517
 
1626
1518
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
 
        self.assertEqual(d[0], (0, '0'))
1628
 
        self.assertEqual(d[1], (1, '1'))
1629
 
        self.assertEqual(d[2], (3, '34'))
1630
 
        self.assertEqual(d[3], (9, '9'))
 
1519
        self.assertEqual(d[0], (0, b'0'))
 
1520
        self.assertEqual(d[1], (1, b'1'))
 
1521
        self.assertEqual(d[2], (3, b'34'))
 
1522
        self.assertEqual(d[3], (9, b'9'))
1631
1523
 
1632
1524
    def test_readv_out_of_order(self):
1633
1525
        transport = self.get_transport()
1634
1526
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1527
            with open('a', 'w') as f: f.write('0123456789')
1636
1528
        else:
1637
 
            transport.put_bytes('a', '01234567890')
 
1529
            transport.put_bytes('a', b'01234567890')
1638
1530
 
1639
1531
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
 
        self.assertEqual(d[0], (1, '1'))
1641
 
        self.assertEqual(d[1], (9, '9'))
1642
 
        self.assertEqual(d[2], (0, '0'))
1643
 
        self.assertEqual(d[3], (3, '34'))
 
1532
        self.assertEqual(d[0], (1, b'1'))
 
1533
        self.assertEqual(d[1], (9, b'9'))
 
1534
        self.assertEqual(d[2], (0, b'0'))
 
1535
        self.assertEqual(d[3], (3, b'34'))
1644
1536
 
1645
1537
    def test_readv_with_adjust_for_latency(self):
1646
1538
        transport = self.get_transport()
1694
1586
        self.assertTrue(data_len >= 100)
1695
1587
        check_result_data(result)
1696
1588
        # close ranges get combined (even if out of order)
1697
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1589
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1698
1590
            result = list(transport.readv('a', request_vector,
1699
1591
                adjust_for_latency=True, upper_limit=content_size))
1700
1592
            self.assertEqual(1, len(result))
1710
1602
        transport = self.get_transport()
1711
1603
        # test from observed failure case.
1712
1604
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1605
            with open('a', 'w') as f: f.write('a'*1024*1024)
1714
1606
        else:
1715
 
            transport.put_bytes('a', 'a'*1024*1024)
 
1607
            transport.put_bytes('a', b'a'*1024*1024)
1716
1608
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
1609
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
1610
            (465373, 800), (947422, 800)]
1729
1621
        t = self.get_transport()
1730
1622
        if t.is_readonly():
1731
1623
            return
1732
 
        handle = t.open_write_stream('foo')
1733
 
        try:
1734
 
            handle.write('bcd')
1735
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1736
 
        finally:
1737
 
            handle.close()
 
1624
        with t.open_write_stream('foo') as handle:
 
1625
            handle.write(b'bcd')
 
1626
            self.assertEqual([(0, b'b'), (2, b'd')], list(t.readv('foo', ((0, 1), (2, 1)))))
1738
1627
 
1739
1628
    def test_get_smart_medium(self):
1740
1629
        """All transports must either give a smart medium, or know they can't.
1750
1639
    def test_readv_short_read(self):
1751
1640
        transport = self.get_transport()
1752
1641
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1642
            with open('a', 'w') as f: f.write('0123456789')
1754
1643
        else:
1755
 
            transport.put_bytes('a', '01234567890')
 
1644
            transport.put_bytes('a', b'01234567890')
1756
1645
 
1757
1646
        # This is intentionally reading off the end of the file
1758
1647
        # since we are sure that it cannot get there
1759
1648
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
1649
                               # Can be raised by paramiko
1761
1650
                               AssertionError),
1762
 
                              transport.readv, 'a', [(1,1), (8,10)])
 
1651
                              transport.readv, 'a', [(1, 1), (8, 10)])
1763
1652
 
1764
1653
        # This is trying to seek past the end of the file, it should
1765
1654
        # also raise a special error
1766
1655
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
 
                              transport.readv, 'a', [(12,2)])
 
1656
                              transport.readv, 'a', [(12, 2)])
 
1657
 
 
1658
    def test_no_segment_parameters(self):
 
1659
        """Segment parameters should be stripped and stored in
 
1660
        transport.segment_parameters."""
 
1661
        transport = self.get_transport("foo")
 
1662
        self.assertEqual({}, transport.get_segment_parameters())
 
1663
 
 
1664
    def test_segment_parameters(self):
 
1665
        """Segment parameters should be stripped and stored in
 
1666
        transport.get_segment_parameters()."""
 
1667
        base_url = self._server.get_url()
 
1668
        parameters = {"key1": "val1", "key2": "val2"}
 
1669
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1670
        transport = _mod_transport.get_transport_from_url(url)
 
1671
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1672
 
 
1673
    def test_set_segment_parameters(self):
 
1674
        """Segment parameters can be set and show up in base."""
 
1675
        transport = self.get_transport("foo")
 
1676
        orig_base = transport.base
 
1677
        transport.set_segment_parameter("arm", "board")
 
1678
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1679
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1680
        transport.set_segment_parameter("arm", None)
 
1681
        transport.set_segment_parameter("nonexistant", None)
 
1682
        self.assertEqual({}, transport.get_segment_parameters())
 
1683
        self.assertEqual(orig_base, transport.base)
 
1684
 
 
1685
    def test_stat_symlink(self):
 
1686
        # if a transport points directly to a symlink (and supports symlinks
 
1687
        # at all) you can tell this.  helps with bug 32669.
 
1688
        t = self.get_transport()
 
1689
        try:
 
1690
            t.symlink('target', 'link')
 
1691
        except TransportNotPossible:
 
1692
            raise TestSkipped("symlinks not supported")
 
1693
        t2 = t.clone('link')
 
1694
        st = t2.stat('')
 
1695
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1696
 
 
1697
    def test_abspath_url_unquote_unreserved(self):
 
1698
        """URLs from abspath should have unreserved characters unquoted
 
1699
        
 
1700
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1701
        """
 
1702
        t = self.get_transport()
 
1703
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1704
        self.assertEqual(t.base + "-.09AZ_az~",
 
1705
            t.abspath(needlessly_escaped_dir))
 
1706
 
 
1707
    def test_clone_url_unquote_unreserved(self):
 
1708
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1709
        
 
1710
        Cloned transports should be prefix comparable for things like the
 
1711
        isolation checking of tests, see lp:842223 for more.
 
1712
        """
 
1713
        t1 = self.get_transport()
 
1714
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1715
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1716
        t2 = t1.clone(needlessly_escaped_dir)
 
1717
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1718
 
 
1719
    def test_hook_post_connection_one(self):
 
1720
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1721
        log = []
 
1722
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1723
        t = self.get_transport()
 
1724
        self.assertEqual([], log)
 
1725
        t.has("non-existant")
 
1726
        if isinstance(t, RemoteTransport):
 
1727
            self.assertEqual([t.get_smart_medium()], log)
 
1728
        elif isinstance(t, ConnectedTransport):
 
1729
            self.assertEqual([t], log)
 
1730
        else:
 
1731
            self.assertEqual([], log)
 
1732
 
 
1733
    def test_hook_post_connection_multi(self):
 
1734
        """Fire post_connect hook once per unshared underlying connection"""
 
1735
        log = []
 
1736
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1737
        t1 = self.get_transport()
 
1738
        t2 = t1.clone(".")
 
1739
        t3 = self.get_transport()
 
1740
        self.assertEqual([], log)
 
1741
        t1.has("x")
 
1742
        t2.has("x")
 
1743
        t3.has("x")
 
1744
        if isinstance(t1, RemoteTransport):
 
1745
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1746
        elif isinstance(t1, ConnectedTransport):
 
1747
            self.assertEqual([t1, t3], log)
 
1748
        else:
 
1749
            self.assertEqual([], log)