/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2019-10-13 22:53:02 UTC
  • mfrom: (7290.1.35 work)
  • mto: This revision was merged to the branch mainline in revision 7405.
  • Revision ID: jelmer@jelmer.uk-20191013225302-vg88ztajzq05hkas
Merge lp:brz/3.0.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
24
23
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
24
import stat
28
25
import sys
29
 
import unittest
30
26
 
31
 
from bzrlib import (
 
27
from .. import (
32
28
    errors,
33
29
    osutils,
 
30
    pyutils,
34
31
    tests,
 
32
    transport as _mod_transport,
35
33
    urlutils,
36
34
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      NoSuchFile,
 
38
                      PathError,
 
39
                      TransportNotPossible,
 
40
                      )
 
41
from ..osutils import getcwd
 
42
from ..sixish import (
 
43
    BytesIO,
 
44
    zip,
 
45
    )
 
46
from ..bzr.smart import medium
 
47
from . import (
51
48
    TestSkipped,
52
49
    TestNotApplicable,
53
50
    multiply_tests,
54
51
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
52
from . import test_server
 
53
from .test_transport import TestTransportImplementation
 
54
from ..transport import (
58
55
    ConnectedTransport,
59
 
    get_transport,
 
56
    Transport,
60
57
    _get_transport_modules,
61
58
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
59
from ..transport.memory import MemoryTransport
 
60
from ..transport.remote import RemoteTransport
63
61
 
64
62
 
65
63
def get_transport_test_permutations(module):
78
76
    for module in _get_transport_modules():
79
77
        try:
80
78
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
79
                pyutils.get_named_object(module))
82
80
            for (klass, server_factory) in permutations:
83
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
 
                    {"transport_class":klass,
85
 
                     "transport_server":server_factory})
 
82
                            {"transport_class": klass,
 
83
                             "transport_server": server_factory})
86
84
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
85
        except errors.DependencyNotPresent as e:
88
86
            # Continue even if a dependency prevents us
89
87
            # from adding this test
90
88
            pass
91
89
    return result
92
90
 
93
91
 
94
 
def load_tests(standard_tests, module, loader):
 
92
def load_tests(loader, standard_tests, pattern):
95
93
    """Multiply tests for tranport implementations."""
96
94
    result = loader.suiteClass()
97
95
    scenarios = transport_test_permutations()
102
100
 
103
101
    def setUp(self):
104
102
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
103
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
104
 
107
105
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
108
 
111
109
    def test_ensure_base_missing(self):
112
110
        """.ensure_base() should create the directory if it doesn't exist"""
156
154
        self.assertEqual(True, t.has('a'))
157
155
        self.assertEqual(False, t.has('c'))
158
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
159
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
 
                                           'e', 'f', 'g', 'h'])),
161
 
                         [True, True, False, False,
162
 
                          True, False, True, False])
163
157
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
158
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
159
                                           urlutils.escape('%%')]))
166
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
 
                                                'e', 'f', 'g', 'h']))),
168
 
                         [True, True, False, False,
169
 
                          True, False, True, False])
170
160
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
161
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
162
 
183
173
    def test_get(self):
184
174
        t = self.get_transport()
185
175
 
186
 
        files = ['a', 'b', 'e', 'g']
187
 
        contents = ['contents of a\n',
188
 
                    'contents of b\n',
189
 
                    'contents of e\n',
190
 
                    'contents of g\n',
191
 
                    ]
192
 
        self.build_tree(files, transport=t, line_endings='binary')
193
 
        self.check_transport_contents('contents of a\n', t, 'a')
194
 
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
197
 
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
199
 
            self.assertEqual(content, f.read())
200
 
 
201
 
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
204
 
            self.assertEqual(content, f.read())
 
176
        files = ['a']
 
177
        content = b'contents of a\n'
 
178
        self.build_tree(['a'], transport=t, line_endings='binary')
 
179
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
180
        f = t.get('a')
 
181
        self.assertEqual(content, f.read())
205
182
 
206
183
    def test_get_unknown_file(self):
207
184
        t = self.get_transport()
208
185
        files = ['a', 'b']
209
 
        contents = ['contents of a\n',
210
 
                    'contents of b\n',
 
186
        contents = [b'contents of a\n',
 
187
                    b'contents of b\n',
211
188
                    ]
212
189
        self.build_tree(files, transport=t, line_endings='binary')
213
190
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
191
 
 
192
        def iterate_and_close(func, *args):
 
193
            for f in func(*args):
 
194
                # We call f.read() here because things like paramiko actually
 
195
                # spawn a thread to prefetch the content, which we want to
 
196
                # consume before we close the handle.
 
197
                content = f.read()
 
198
                f.close()
216
199
 
217
200
    def test_get_directory_read_gives_ReadError(self):
218
201
        """consistent errors for read() on a file returned by get()."""
238
221
        t = self.get_transport()
239
222
 
240
223
        files = ['a', 'b', 'e', 'g']
241
 
        contents = ['contents of a\n',
242
 
                    'contents of b\n',
243
 
                    'contents of e\n',
244
 
                    'contents of g\n',
 
224
        contents = [b'contents of a\n',
 
225
                    b'contents of b\n',
 
226
                    b'contents of e\n',
 
227
                    b'contents of g\n',
245
228
                    ]
246
229
        self.build_tree(files, transport=t, line_endings='binary')
247
 
        self.check_transport_contents('contents of a\n', t, 'a')
 
230
        self.check_transport_contents(b'contents of a\n', t, 'a')
248
231
 
249
232
        for content, fname in zip(contents, files):
250
233
            self.assertEqual(content, t.get_bytes(fname))
251
234
 
252
235
    def test_get_bytes_unknown_file(self):
253
236
        t = self.get_transport()
254
 
 
255
237
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
238
 
257
239
    def test_get_with_open_write_stream_sees_all_content(self):
258
240
        t = self.get_transport()
259
241
        if t.is_readonly():
260
242
            return
261
 
        handle = t.open_write_stream('foo')
262
 
        try:
263
 
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
265
 
        finally:
266
 
            handle.close()
 
243
        with t.open_write_stream('foo') as handle:
 
244
            handle.write(b'b')
 
245
            self.assertEqual(b'b', t.get_bytes('foo'))
267
246
 
268
247
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
248
        t = self.get_transport()
270
249
        if t.is_readonly():
271
250
            return
272
 
        handle = t.open_write_stream('foo')
273
 
        try:
274
 
            handle.write('b')
275
 
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
277
 
        finally:
278
 
            handle.close()
 
251
        with t.open_write_stream('foo') as handle:
 
252
            handle.write(b'b')
 
253
            self.assertEqual(b'b', t.get_bytes('foo'))
 
254
            with t.get('foo') as f:
 
255
                self.assertEqual(b'b', f.read())
279
256
 
280
257
    def test_put_bytes(self):
281
258
        t = self.get_transport()
282
259
 
283
260
        if t.is_readonly():
284
261
            self.assertRaises(TransportNotPossible,
285
 
                    t.put_bytes, 'a', 'some text for a\n')
 
262
                              t.put_bytes, 'a', b'some text for a\n')
286
263
            return
287
264
 
288
 
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
290
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
265
        t.put_bytes('a', b'some text for a\n')
 
266
        self.assertTrue(t.has('a'))
 
267
        self.check_transport_contents(b'some text for a\n', t, 'a')
291
268
 
292
269
        # The contents should be overwritten
293
 
        t.put_bytes('a', 'new text for a\n')
294
 
        self.check_transport_contents('new text for a\n', t, 'a')
 
270
        t.put_bytes('a', b'new text for a\n')
 
271
        self.check_transport_contents(b'new text for a\n', t, 'a')
295
272
 
296
273
        self.assertRaises(NoSuchFile,
297
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
274
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
298
275
 
299
276
    def test_put_bytes_non_atomic(self):
300
277
        t = self.get_transport()
301
278
 
302
279
        if t.is_readonly():
303
280
            self.assertRaises(TransportNotPossible,
304
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
281
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
305
282
            return
306
283
 
307
 
        self.failIf(t.has('a'))
308
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
310
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
284
        self.assertFalse(t.has('a'))
 
285
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
286
        self.assertTrue(t.has('a'))
 
287
        self.check_transport_contents(b'some text for a\n', t, 'a')
311
288
        # Put also replaces contents
312
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
289
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
290
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
314
291
 
315
292
        # Make sure we can create another file
316
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
293
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
317
294
        # And overwrite 'a' with empty contents
318
 
        t.put_bytes_non_atomic('a', '')
319
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
320
 
        self.check_transport_contents('', t, 'a')
 
295
        t.put_bytes_non_atomic('a', b'')
 
296
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
297
        self.check_transport_contents(b'', t, 'a')
321
298
 
322
299
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
323
 
                                       'contents\n')
 
300
                          b'contents\n')
324
301
        # Now test the create_parent flag
325
302
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
 
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
328
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
303
                          b'contents\n')
 
304
        self.assertFalse(t.has('dir/a'))
 
305
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
329
306
                               create_parent_dir=True)
330
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
307
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
331
308
 
332
309
        # But we still get NoSuchFile if we can't make the parent dir
333
310
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
334
 
                                       'contents\n',
335
 
                                       create_parent_dir=True)
 
311
                          b'contents\n',
 
312
                          create_parent_dir=True)
336
313
 
337
314
    def test_put_bytes_permissions(self):
338
315
        t = self.get_transport()
342
319
        if not t._can_roundtrip_unix_modebits():
343
320
            # Can't roundtrip, so no need to run this test
344
321
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
322
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
323
        self.assertTransportMode(t, 'mode644', 0o644)
 
324
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
325
        self.assertTransportMode(t, 'mode666', 0o666)
 
326
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
327
        self.assertTransportMode(t, 'mode600', 0o600)
351
328
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
329
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
330
        self.assertTransportMode(t, 'mode400', 0o400)
354
331
 
355
332
        # The default permissions should be based on the current umask
356
333
        umask = osutils.get_umask()
357
 
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
334
        t.put_bytes('nomode', b'test text\n', mode=None)
 
335
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
336
 
360
337
    def test_put_bytes_non_atomic_permissions(self):
361
338
        t = self.get_transport()
365
342
        if not t._can_roundtrip_unix_modebits():
366
343
            # Can't roundtrip, so no need to run this test
367
344
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
345
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
346
        self.assertTransportMode(t, 'mode644', 0o644)
 
347
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
348
        self.assertTransportMode(t, 'mode666', 0o666)
 
349
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
350
        self.assertTransportMode(t, 'mode600', 0o600)
 
351
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
352
        self.assertTransportMode(t, 'mode400', 0o400)
376
353
 
377
354
        # The default permissions should be based on the current umask
378
355
        umask = osutils.get_umask()
379
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
356
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
357
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
358
 
382
359
        # We should also be able to set the mode for a parent directory
383
360
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
361
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
362
                               dir_mode=0o700, create_parent_dir=True)
 
363
        self.assertTransportMode(t, 'dir700', 0o700)
 
364
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
365
                               dir_mode=0o770, create_parent_dir=True)
 
366
        self.assertTransportMode(t, 'dir770', 0o770)
 
367
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
368
                               dir_mode=0o777, create_parent_dir=True)
 
369
        self.assertTransportMode(t, 'dir777', 0o777)
393
370
 
394
371
    def test_put_file(self):
395
372
        t = self.get_transport()
396
373
 
397
374
        if t.is_readonly():
398
375
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
376
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
400
377
            return
401
378
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
379
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
380
        # put_file returns the length of the data written
404
381
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
406
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
382
        self.assertTrue(t.has('a'))
 
383
        self.check_transport_contents(b'some text for a\n', t, 'a')
407
384
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
385
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
386
        self.assertEqual(19, result)
410
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
387
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
411
388
        self.assertRaises(NoSuchFile,
412
389
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
390
                          BytesIO(b'contents'))
414
391
 
415
392
    def test_put_file_non_atomic(self):
416
393
        t = self.get_transport()
417
394
 
418
395
        if t.is_readonly():
419
396
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
397
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
398
            return
422
399
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
426
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
400
        self.assertFalse(t.has('a'))
 
401
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
402
        self.assertTrue(t.has('a'))
 
403
        self.check_transport_contents(b'some text for a\n', t, 'a')
427
404
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
405
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
406
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
430
407
 
431
408
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
409
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
410
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
435
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
 
        self.check_transport_contents('', t, 'a')
 
411
        t.put_file_non_atomic('a', BytesIO(b''))
 
412
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
413
        self.check_transport_contents(b'', t, 'a')
437
414
 
438
415
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
416
                          BytesIO(b'contents\n'))
440
417
        # Now test the create_parent flag
441
418
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
419
                          BytesIO(b'contents\n'))
 
420
        self.assertFalse(t.has('dir/a'))
 
421
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
422
                              create_parent_dir=True)
446
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
423
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
447
424
 
448
425
        # But we still get NoSuchFile if we can't make the parent dir
449
426
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
451
 
                                       create_parent_dir=True)
 
427
                          BytesIO(b'contents\n'),
 
428
                          create_parent_dir=True)
452
429
 
453
430
    def test_put_file_permissions(self):
454
431
 
459
436
        if not t._can_roundtrip_unix_modebits():
460
437
            # Can't roundtrip, so no need to run this test
461
438
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
439
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
440
        self.assertTransportMode(t, 'mode644', 0o644)
 
441
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
442
        self.assertTransportMode(t, 'mode666', 0o666)
 
443
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
444
        self.assertTransportMode(t, 'mode600', 0o600)
468
445
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
446
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
447
        self.assertTransportMode(t, 'mode400', 0o400)
471
448
        # The default permissions should be based on the current umask
472
449
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
450
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
451
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
452
 
476
453
    def test_put_file_non_atomic_permissions(self):
477
454
        t = self.get_transport()
481
458
        if not t._can_roundtrip_unix_modebits():
482
459
            # Can't roundtrip, so no need to run this test
483
460
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
461
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
462
        self.assertTransportMode(t, 'mode644', 0o644)
 
463
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
464
        self.assertTransportMode(t, 'mode666', 0o666)
 
465
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
466
        self.assertTransportMode(t, 'mode600', 0o600)
490
467
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
468
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
469
        self.assertTransportMode(t, 'mode400', 0o400)
493
470
 
494
471
        # The default permissions should be based on the current umask
495
472
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
473
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
474
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
475
 
499
476
        # We should also be able to set the mode for a parent directory
500
477
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
478
        sio = BytesIO()
 
479
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
480
                              dir_mode=0o700, create_parent_dir=True)
 
481
        self.assertTransportMode(t, 'dir700', 0o700)
 
482
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
483
                              dir_mode=0o770, create_parent_dir=True)
 
484
        self.assertTransportMode(t, 'dir770', 0o770)
 
485
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
486
                              dir_mode=0o777, create_parent_dir=True)
 
487
        self.assertTransportMode(t, 'dir777', 0o777)
511
488
 
512
489
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
490
        t = self.get_transport()
520
491
        if t.is_readonly():
521
492
            return
522
493
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
494
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
495
 
540
496
    def test_mkdir(self):
541
497
        t = self.get_transport()
546
502
            # defined for the transport interface.
547
503
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
504
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
505
            self.assertRaises(TransportNotPossible,
 
506
                              t.mkdir, 'path/doesnt/exist')
551
507
            return
552
508
        # Test mkdir
553
509
        t.mkdir('dir_a')
557
513
        t.mkdir('dir_b')
558
514
        self.assertEqual(t.has('dir_b'), True)
559
515
 
560
 
        t.mkdir_multi(['dir_c', 'dir_d'])
561
 
 
562
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
 
        self.assertEqual(list(t.has_multi(
564
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
 
            [True, True, True, False,
567
 
             True, True, True, True])
 
516
        self.assertEqual([t.has(n) for n in
 
517
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
518
                         [True, True, False, True])
568
519
 
569
520
        # we were testing that a local mkdir followed by a transport
570
521
        # mkdir failed thusly, but given that we * in one process * do not
575
526
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
576
527
 
577
528
        # Test get/put in sub-directories
578
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
529
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
530
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
531
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
532
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
582
533
 
583
534
        # mkdir of a dir with an absent parent
584
535
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
591
542
            # no sense testing on this transport
592
543
            return
593
544
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
545
        t.mkdir('dmode755', mode=0o755)
 
546
        self.assertTransportMode(t, 'dmode755', 0o755)
 
547
        t.mkdir('dmode555', mode=0o555)
 
548
        self.assertTransportMode(t, 'dmode555', 0o555)
 
549
        t.mkdir('dmode777', mode=0o777)
 
550
        self.assertTransportMode(t, 'dmode777', 0o777)
 
551
        t.mkdir('dmode700', mode=0o700)
 
552
        self.assertTransportMode(t, 'dmode700', 0o700)
 
553
        t.mkdir('mdmode755', mode=0o755)
 
554
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
555
 
605
556
        # Default mode should be based on umask
606
557
        umask = osutils.get_umask()
607
558
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
559
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
560
 
610
561
    def test_opening_a_file_stream_creates_file(self):
611
562
        t = self.get_transport()
613
564
            return
614
565
        handle = t.open_write_stream('foo')
615
566
        try:
616
 
            self.assertEqual('', t.get_bytes('foo'))
 
567
            self.assertEqual(b'', t.get_bytes('foo'))
617
568
        finally:
618
569
            handle.close()
619
570
 
620
571
    def test_opening_a_file_stream_can_set_mode(self):
621
572
        t = self.get_transport()
622
573
        if t.is_readonly():
 
574
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
575
                              t.open_write_stream, 'foo')
623
576
            return
624
577
        if not t._can_roundtrip_unix_modebits():
625
578
            # Can't roundtrip, so no need to run this test
626
579
            return
 
580
 
627
581
        def check_mode(name, mode, expected):
628
582
            handle = t.open_write_stream(name, mode=mode)
629
583
            handle.close()
630
584
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
585
        check_mode('mode644', 0o644, 0o644)
 
586
        check_mode('mode666', 0o666, 0o666)
 
587
        check_mode('mode600', 0o600, 0o600)
634
588
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
589
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
590
 
637
591
    def test_copy_to(self):
638
592
        # FIXME: test:   same server to same server (partly done)
645
599
            self.build_tree(files, transport=transport_from)
646
600
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
601
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
602
                self.check_transport_contents(transport_to.get_bytes(f),
649
603
                                              transport_from, f)
650
604
 
651
605
        t = self.get_transport()
 
606
        if t.__class__.__name__ == "SFTPTransport":
 
607
            self.skipTest("SFTP copy_to currently too flakey to use")
652
608
        temp_transport = MemoryTransport('memory:///')
653
609
        simple_copy_files(t, temp_transport)
654
610
        if not t.is_readonly():
656
612
            t2 = t.clone('copy_to_simple')
657
613
            simple_copy_files(t, t2)
658
614
 
659
 
 
660
615
        # Test that copying into a missing directory raises
661
616
        # NoSuchFile
662
617
        if t.is_readonly():
663
618
            self.build_tree(['e/', 'e/f'])
664
619
        else:
665
620
            t.mkdir('e')
666
 
            t.put_bytes('e/f', 'contents of e')
 
621
            t.put_bytes('e/f', b'contents of e')
667
622
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
623
        temp_transport.mkdir('e')
669
624
        t.copy_to(['e/f'], temp_transport)
674
629
        files = ['a', 'b', 'c', 'd']
675
630
        t.copy_to(iter(files), temp_transport)
676
631
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
632
            self.check_transport_contents(temp_transport.get_bytes(f),
678
633
                                          t, f)
679
634
        del temp_transport
680
635
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
636
        for mode in (0o666, 0o644, 0o600, 0o400):
682
637
            temp_transport = MemoryTransport("memory:///")
683
638
            t.copy_to(files, temp_transport, mode=mode)
684
639
            for f in files:
699
654
 
700
655
        if t.is_readonly():
701
656
            self.assertRaises(TransportNotPossible,
702
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
657
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
703
658
            return
704
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
705
 
        t.put_bytes('b', 'contents\nfor b\n')
 
659
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
660
        t.put_bytes('b', b'contents\nfor b\n')
706
661
 
707
662
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
663
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
664
 
710
665
        self.check_transport_contents(
711
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
666
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
712
667
            t, 'a')
713
668
 
714
669
        # a file with no parent should fail..
715
670
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
671
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
672
 
718
673
        # And we can create new files, too
719
674
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
 
        self.check_transport_contents('some text\nfor a missing file\n',
 
675
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
676
        self.check_transport_contents(b'some text\nfor a missing file\n',
722
677
                                      t, 'c')
723
678
 
724
679
    def test_append_bytes(self):
726
681
 
727
682
        if t.is_readonly():
728
683
            self.assertRaises(TransportNotPossible,
729
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
684
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
730
685
            return
731
686
 
732
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
687
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
688
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
734
689
 
735
690
        self.assertEqual(20,
736
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
691
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
737
692
 
738
693
        self.check_transport_contents(
739
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
694
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
740
695
            t, 'a')
741
696
 
742
697
        # a file with no parent should fail..
743
698
        self.assertRaises(NoSuchFile,
744
 
                          t.append_bytes, 'missing/path', 'content')
745
 
 
746
 
    def test_append_multi(self):
747
 
        t = self.get_transport()
748
 
 
749
 
        if t.is_readonly():
750
 
            return
751
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
752
 
                         'add\nsome\nmore\ncontents\n')
753
 
        t.put_bytes('b', 'contents\nfor b\n')
754
 
 
755
 
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
758
 
 
759
 
        self.check_transport_contents(
760
 
            'diff\ncontents for\na\n'
761
 
            'add\nsome\nmore\ncontents\n'
762
 
            'and\nthen\nsome\nmore\n',
763
 
            t, 'a')
764
 
        self.check_transport_contents(
765
 
                'contents\nfor b\n'
766
 
                'some\nmore\nfor\nb\n',
767
 
                t, 'b')
768
 
 
769
 
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
772
 
        self.check_transport_contents(
773
 
            'diff\ncontents for\na\n'
774
 
            'add\nsome\nmore\ncontents\n'
775
 
            'and\nthen\nsome\nmore\n'
776
 
            'a little bit more\n',
777
 
            t, 'a')
778
 
        self.check_transport_contents(
779
 
                'contents\nfor b\n'
780
 
                'some\nmore\nfor\nb\n'
781
 
                'from an iterator\n',
782
 
                t, 'b')
783
 
 
784
 
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
787
 
 
788
 
        self.check_transport_contents(
789
 
            'diff\ncontents for\na\n'
790
 
            'add\nsome\nmore\ncontents\n'
791
 
            'and\nthen\nsome\nmore\n'
792
 
            'a little bit more\n'
793
 
            'some text in a\n',
794
 
            t, 'a')
795
 
        self.check_transport_contents('missing file r\n', t, 'd')
 
699
                          t.append_bytes, 'missing/path', b'content')
796
700
 
797
701
    def test_append_file_mode(self):
798
702
        """Check that append accepts a mode parameter"""
800
704
        t = self.get_transport()
801
705
        if t.is_readonly():
802
706
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
707
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
804
708
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
709
        t.append_file('f', BytesIO(b'f'), mode=None)
806
710
 
807
711
    def test_append_bytes_mode(self):
808
712
        # check append_bytes accepts a mode
809
713
        t = self.get_transport()
810
714
        if t.is_readonly():
811
715
            self.assertRaises(TransportNotPossible,
812
 
                t.append_bytes, 'f', 'f', mode=None)
 
716
                              t.append_bytes, 'f', b'f', mode=None)
813
717
            return
814
 
        t.append_bytes('f', 'f', mode=None)
 
718
        t.append_bytes('f', b'f', mode=None)
815
719
 
816
720
    def test_delete(self):
817
721
        # TODO: Test Transport.delete
822
726
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
823
727
            return
824
728
 
825
 
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
729
        t.put_bytes('a', b'a little bit of text\n')
 
730
        self.assertTrue(t.has('a'))
827
731
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
732
        self.assertFalse(t.has('a'))
829
733
 
830
734
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
735
 
832
 
        t.put_bytes('a', 'a text\n')
833
 
        t.put_bytes('b', 'b text\n')
834
 
        t.put_bytes('c', 'c text\n')
 
736
        t.put_bytes('a', b'a text\n')
 
737
        t.put_bytes('b', b'b text\n')
 
738
        t.put_bytes('c', b'c text\n')
835
739
        self.assertEqual([True, True, True],
836
 
                list(t.has_multi(['a', 'b', 'c'])))
837
 
        t.delete_multi(['a', 'c'])
 
740
                         [t.has(n) for n in ['a', 'b', 'c']])
 
741
        t.delete('a')
 
742
        t.delete('c')
838
743
        self.assertEqual([False, True, False],
839
 
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
843
 
 
844
 
        self.assertRaises(NoSuchFile,
845
 
                t.delete_multi, ['a', 'b', 'c'])
846
 
 
847
 
        self.assertRaises(NoSuchFile,
848
 
                t.delete_multi, iter(['a', 'b', 'c']))
849
 
 
850
 
        t.put_bytes('a', 'another a text\n')
851
 
        t.put_bytes('c', 'another c text\n')
852
 
        t.delete_multi(iter(['a', 'b', 'c']))
 
744
                         [t.has(n) for n in ['a', 'b', 'c']])
 
745
        self.assertFalse(t.has('a'))
 
746
        self.assertTrue(t.has('b'))
 
747
        self.assertFalse(t.has('c'))
 
748
 
 
749
        for name in ['a', 'c', 'd']:
 
750
            self.assertRaises(NoSuchFile, t.delete, name)
853
751
 
854
752
        # We should have deleted everything
855
753
        # SftpServer creates control files in the
902
800
        if t.is_readonly():
903
801
            return
904
802
        t.mkdir('foo')
905
 
        t.put_bytes('foo-bar', '')
 
803
        t.put_bytes('foo-bar', b'')
906
804
        t.mkdir('foo-baz')
907
805
        t.rmdir('foo')
908
806
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
807
        self.assertTrue(t.has('foo-bar'))
910
808
 
911
809
    def test_rename_dir_succeeds(self):
912
810
        t = self.get_transport()
913
811
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
812
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
813
                              t.rename, 'foo', 'bar')
 
814
            return
915
815
        t.mkdir('adir')
916
816
        t.mkdir('adir/asubdir')
917
817
        t.rename('adir', 'bdir')
922
822
        """Attempting to replace a nonemtpy directory should fail"""
923
823
        t = self.get_transport()
924
824
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
825
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
826
                              t.rename, 'foo', 'bar')
 
827
            return
926
828
        t.mkdir('adir')
927
829
        t.mkdir('adir/asubdir')
928
830
        t.mkdir('bdir')
943
845
        t.mkdir('b')
944
846
        ta = t.clone('a')
945
847
        tb = t.clone('b')
946
 
        ta.put_bytes('f', 'aoeu')
 
848
        ta.put_bytes('f', b'aoeu')
947
849
        ta.rename('f', '../b/f')
948
850
        self.assertTrue(tb.has('f'))
949
851
        self.assertFalse(ta.has('f'))
991
893
        # creates control files in the working directory
992
894
        # perhaps all of this could be done in a subdirectory
993
895
 
994
 
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
896
        t.put_bytes('a', b'a first file\n')
 
897
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
996
898
 
997
899
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
900
        self.assertTrue(t.has('b'))
 
901
        self.assertFalse(t.has('a'))
1000
902
 
1001
 
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
903
        self.check_transport_contents(b'a first file\n', t, 'b')
 
904
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1003
905
 
1004
906
        # Overwrite a file
1005
 
        t.put_bytes('c', 'c this file\n')
 
907
        t.put_bytes('c', b'c this file\n')
1006
908
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
1008
 
        self.check_transport_contents('c this file\n', t, 'b')
 
909
        self.assertFalse(t.has('c'))
 
910
        self.check_transport_contents(b'c this file\n', t, 'b')
1009
911
 
1010
912
        # TODO: Try to write a test for atomicity
1011
913
        # TODO: Test moving into a non-existent subdirectory
1012
 
        # TODO: Test Transport.move_multi
1013
914
 
1014
915
    def test_copy(self):
1015
916
        t = self.get_transport()
1017
918
        if t.is_readonly():
1018
919
            return
1019
920
 
1020
 
        t.put_bytes('a', 'a file\n')
 
921
        t.put_bytes('a', b'a file\n')
1021
922
        t.copy('a', 'b')
1022
 
        self.check_transport_contents('a file\n', t, 'b')
 
923
        self.check_transport_contents(b'a file\n', t, 'b')
1023
924
 
1024
925
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1025
926
        os.mkdir('c')
1026
927
        # What should the assert be if you try to copy a
1027
928
        # file over a directory?
1028
929
        #self.assertRaises(Something, t.copy, 'a', 'c')
1029
 
        t.put_bytes('d', 'text in d\n')
 
930
        t.put_bytes('d', b'text in d\n')
1030
931
        t.copy('d', 'b')
1031
 
        self.check_transport_contents('text in d\n', t, 'b')
1032
 
 
1033
 
        # TODO: test copy_multi
 
932
        self.check_transport_contents(b'text in d\n', t, 'b')
1034
933
 
1035
934
    def test_connection_error(self):
1036
935
        """ConnectionError is raised when connection is impossible.
1042
941
        except NotImplementedError:
1043
942
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
943
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
944
        t = _mod_transport.get_transport_from_url(url)
1046
945
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
946
 
1048
947
    def test_stat(self):
1053
952
 
1054
953
        try:
1055
954
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
955
        except TransportNotPossible as e:
1057
956
            # This transport cannot stat
1058
957
            return
1059
958
 
1064
963
        for path, size in zip(paths, sizes):
1065
964
            st = t.stat(path)
1066
965
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
966
                self.assertTrue(S_ISDIR(st.st_mode))
1068
967
                # directory sizes are meaningless
1069
968
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
969
                self.assertTrue(S_ISREG(st.st_mode))
1071
970
                self.assertEqual(size, st.st_size)
1072
971
 
1073
 
        remote_stats = list(t.stat_multi(paths))
1074
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1075
 
 
1076
972
        self.assertRaises(NoSuchFile, t.stat, 'q')
1077
973
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1078
974
 
1079
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
975
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
976
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
977
        st = subdir.stat('./file')
 
978
        st = subdir.stat('.')
1085
979
 
1086
980
    def test_hardlink(self):
1087
981
        from stat import ST_NLINK
1096
990
        try:
1097
991
            t.hardlink(source_name, link_name)
1098
992
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
993
            self.assertTrue(t.has(source_name))
 
994
            self.assertTrue(t.has(link_name))
1101
995
 
1102
996
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
997
            self.assertEqual(st[ST_NLINK], 2)
1104
998
        except TransportNotPossible:
1105
999
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
1000
                              self._server.__class__)
1118
1012
        try:
1119
1013
            t.symlink(source_name, link_name)
1120
1014
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1015
            self.assertTrue(t.has(source_name))
 
1016
            self.assertTrue(t.has(link_name))
1123
1017
 
1124
1018
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
1126
 
        except TransportNotPossible:
1127
 
            raise TestSkipped("Transport %s does not support symlinks." %
1128
 
                              self._server.__class__)
1129
 
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1019
            self.assertTrue(S_ISLNK(st.st_mode),
 
1020
                            "expected symlink, got mode %o" % st.st_mode)
 
1021
        except TransportNotPossible:
 
1022
            raise TestSkipped("Transport %s does not support symlinks." %
 
1023
                              self._server.__class__)
 
1024
 
 
1025
        self.assertEqual(source_name, t.readlink(link_name))
 
1026
 
 
1027
    def test_readlink_nonexistent(self):
 
1028
        t = self.get_transport()
 
1029
        try:
 
1030
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
 
1031
        except TransportNotPossible:
 
1032
            raise TestSkipped("Transport %s does not support symlinks." %
 
1033
                              self._server.__class__)
1131
1034
 
1132
1035
    def test_list_dir(self):
1133
1036
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1041
            return
1139
1042
 
1140
1043
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1044
            l = sorted(transport.list_dir(d))
1143
1045
            return l
1144
1046
 
1145
1047
        self.assertEqual([], sorted_list('.', t))
1197
1099
            raise TestSkipped("not a connected transport")
1198
1100
 
1199
1101
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1102
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1103
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1104
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1105
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1106
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1107
 
1206
1108
    def test__reuse_for(self):
1207
1109
        t = self.get_transport()
1214
1116
 
1215
1117
            Only the parameters different from None will be changed.
1216
1118
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1119
            if scheme is None:
 
1120
                scheme = t._parsed_url.scheme
 
1121
            if user is None:
 
1122
                user = t._parsed_url.user
 
1123
            if password is None:
 
1124
                password = t._parsed_url.password
 
1125
            if user is None:
 
1126
                user = t._parsed_url.user
 
1127
            if host is None:
 
1128
                host = t._parsed_url.host
 
1129
            if port is None:
 
1130
                port = t._parsed_url.port
 
1131
            if path is None:
 
1132
                path = t._parsed_url.path
 
1133
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1134
 
1226
 
        if t._scheme == 'ftp':
 
1135
        if t._parsed_url.scheme == 'ftp':
1227
1136
            scheme = 'sftp'
1228
1137
        else:
1229
1138
            scheme = 'ftp'
1230
1139
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1140
        if t._parsed_url.user == 'me':
1232
1141
            user = 'you'
1233
1142
        else:
1234
1143
            user = 'me'
1245
1154
        #   (they may be typed by the user when prompted for example)
1246
1155
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1156
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1157
        self.assertIsNot(t, t._reuse_for(
 
1158
            new_url(host=t._parsed_url.host + 'bar')))
 
1159
        if t._parsed_url.port == 1234:
1250
1160
            port = 4321
1251
1161
        else:
1252
1162
            port = 1234
1261
1171
 
1262
1172
        c = t.clone('subdir')
1263
1173
        # Some transports will create the connection  only when needed
1264
 
        t.has('surely_not') # Force connection
 
1174
        t.has('surely_not')  # Force connection
1265
1175
        self.assertIs(t._get_connection(), c._get_connection())
1266
1176
 
1267
1177
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1178
        new_connection = None
1269
1179
        t._set_connection(new_connection)
1270
1180
        # Check that both transports use the same connection
1271
1181
        self.assertIs(new_connection, t._get_connection())
1276
1186
        if not isinstance(t, ConnectedTransport):
1277
1187
            raise TestSkipped("not a connected transport")
1278
1188
 
1279
 
        t.has('surely_not') # Force connection
 
1189
        t.has('surely_not')  # Force connection
1280
1190
        self.assertIsNot(None, t._get_connection())
1281
1191
 
1282
1192
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1293
1203
 
1294
1204
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1205
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1206
        self.assertTrue(t1.has('a'))
 
1207
        self.assertTrue(t1.has('b/c'))
 
1208
        self.assertFalse(t1.has('c'))
1299
1209
 
1300
1210
        t2 = t1.clone('b')
1301
1211
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1212
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1213
        self.assertTrue(t2.has('c'))
 
1214
        self.assertFalse(t2.has('a'))
1305
1215
 
1306
1216
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1217
        self.assertTrue(t3.has('a'))
 
1218
        self.assertFalse(t3.has('c'))
1309
1219
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1220
        self.assertFalse(t1.has('b/d'))
 
1221
        self.assertFalse(t2.has('d'))
 
1222
        self.assertFalse(t3.has('b/d'))
1313
1223
 
1314
1224
        if t1.is_readonly():
1315
 
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1225
            self.build_tree_contents([('b/d', b'newfile\n')])
1316
1226
        else:
1317
 
            t2.put_bytes('d', 'newfile\n')
 
1227
            t2.put_bytes('d', b'newfile\n')
1318
1228
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1229
        self.assertTrue(t1.has('b/d'))
 
1230
        self.assertTrue(t2.has('d'))
 
1231
        self.assertTrue(t3.has('b/d'))
1322
1232
 
1323
1233
    def test_clone_to_root(self):
1324
1234
        orig_transport = self.get_transport()
1328
1238
        new_transport = root_transport.clone("..")
1329
1239
        # as we are walking up directories, the path must be
1330
1240
        # growing less, except at the top
1331
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1332
 
            or new_transport.base == root_transport.base)
 
1241
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1242
                        new_transport.base == root_transport.base)
1333
1243
        while new_transport.base != root_transport.base:
1334
1244
            root_transport = new_transport
1335
1245
            new_transport = root_transport.clone("..")
1336
1246
            # as we are walking up directories, the path must be
1337
1247
            # growing less, except at the top
1338
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1339
 
                or new_transport.base == root_transport.base)
 
1248
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1249
                            new_transport.base == root_transport.base)
1340
1250
 
1341
1251
        # Cloning to "/" should take us to exactly the same location.
1342
1252
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1352
1262
        orig_transport = self.get_transport()
1353
1263
        root_transport = orig_transport.clone('/')
1354
1264
        self.assertEqual(root_transport.base + '.bzr/',
1355
 
            root_transport.clone('.bzr').base)
 
1265
                         root_transport.clone('.bzr').base)
1356
1266
 
1357
1267
    def test_base_url(self):
1358
1268
        t = self.get_transport()
1398
1308
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1309
                         transport.abspath("/foo"))
1400
1310
 
 
1311
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1312
    def test_win32_abspath(self):
1402
1313
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1314
        # other platforms too, but then osutils does platform specific
1408
1319
 
1409
1320
        # smoke test for abspath on win32.
1410
1321
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1322
        transport = _mod_transport.get_transport_from_url("file:///")
 
1323
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1324
 
1414
1325
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1326
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1327
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1328
 
1418
1329
    def test_local_abspath(self):
1419
1330
        transport = self.get_transport()
1420
1331
        try:
1421
1332
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1333
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1334
            # should be formattable
1424
1335
            s = str(e)
1425
1336
        else:
1452
1363
                         'isolated/dir/',
1453
1364
                         'isolated/dir/foo',
1454
1365
                         'isolated/dir/bar',
1455
 
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1366
                         'isolated/dir/b%25z',  # make sure quoting is correct
1456
1367
                         'isolated/bar'],
1457
1368
                        transport=transport)
1458
1369
        paths = set(transport.iter_files_recursive())
1459
1370
        # nb the directories are not converted
1460
1371
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
1462
 
                         'isolated/dir/bar',
1463
 
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1372
                         {'isolated/dir/foo',
 
1373
                          'isolated/dir/bar',
 
1374
                          'isolated/dir/b%2525z',
 
1375
                          'isolated/bar'})
1465
1376
        sub_transport = transport.clone('isolated')
1466
1377
        paths = set(sub_transport.iter_files_recursive())
1467
1378
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1379
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1380
 
1470
1381
    def test_copy_tree(self):
1471
1382
        # TODO: test file contents and permissions are preserved. This test was
1482
1393
                         'from/dir/',
1483
1394
                         'from/dir/foo',
1484
1395
                         'from/dir/bar',
1485
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1396
                         'from/dir/b%25z',  # make sure quoting is correct
1486
1397
                         'from/bar'],
1487
1398
                        transport=transport)
1488
1399
        transport.copy_tree('from', 'to')
1489
1400
        paths = set(transport.iter_files_recursive())
1490
1401
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
1492
 
                         'from/dir/bar',
1493
 
                         'from/dir/b%2525z',
1494
 
                         'from/bar',
1495
 
                         'to/dir/foo',
1496
 
                         'to/dir/bar',
1497
 
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1402
                         {'from/dir/foo',
 
1403
                          'from/dir/bar',
 
1404
                          'from/dir/b%2525z',
 
1405
                          'from/bar',
 
1406
                          'to/dir/foo',
 
1407
                          'to/dir/bar',
 
1408
                          'to/dir/b%2525z',
 
1409
                          'to/bar', })
1499
1410
 
1500
1411
    def test_copy_tree_to_transport(self):
1501
1412
        transport = self.get_transport()
1509
1420
                         'from/dir/',
1510
1421
                         'from/dir/foo',
1511
1422
                         'from/dir/bar',
1512
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1423
                         'from/dir/b%25z',  # make sure quoting is correct
1513
1424
                         'from/bar'],
1514
1425
                        transport=transport)
1515
1426
        from_transport = transport.clone('from')
1518
1429
        from_transport.copy_tree_to_transport(to_transport)
1519
1430
        paths = set(transport.iter_files_recursive())
1520
1431
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
1522
 
                         'from/dir/bar',
1523
 
                         'from/dir/b%2525z',
1524
 
                         'from/bar',
1525
 
                         'to/dir/foo',
1526
 
                         'to/dir/bar',
1527
 
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1432
                         {'from/dir/foo',
 
1433
                          'from/dir/bar',
 
1434
                          'from/dir/b%2525z',
 
1435
                          'from/bar',
 
1436
                          'to/dir/foo',
 
1437
                          'to/dir/bar',
 
1438
                          'to/dir/b%2525z',
 
1439
                          'to/bar', })
1529
1440
 
1530
1441
    def test_unicode_paths(self):
1531
1442
        """Test that we can read/write files with Unicode names."""
1535
1446
        # '\xe5' and '\xe4' actually map to the same file
1536
1447
        # adding a suffix kicks in the 'preserving but insensitive'
1537
1448
        # route, and maintains the right files
1538
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1539
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1540
 
                 u'\u017d', # Z with umlat iso-8859-2
1541
 
                 u'\u062c', # Arabic j
1542
 
                 u'\u0410', # Russian A
1543
 
                 u'\u65e5', # Kanji person
1544
 
                ]
 
1449
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1450
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1451
                 u'\u017d',  # Z with umlat iso-8859-2
 
1452
                 u'\u062c',  # Arabic j
 
1453
                 u'\u0410',  # Russian A
 
1454
                 u'\u65e5',  # Kanji person
 
1455
                 ]
1545
1456
 
1546
1457
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1458
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1459
            self.knownFailure("test server cannot handle unicode paths")
1549
1460
 
1550
1461
        try:
1551
1462
            self.build_tree(files, transport=t, line_endings='binary')
1552
1463
        except UnicodeError:
1553
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1464
            raise TestSkipped(
 
1465
                "cannot handle unicode paths in current encoding")
1554
1466
 
1555
1467
        # A plain unicode string is not a valid url
1556
1468
        for fname in files:
1557
 
            self.assertRaises(InvalidURL, t.get, fname)
 
1469
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
1558
1470
 
1559
1471
        for fname in files:
1560
1472
            fname_utf8 = fname.encode('utf-8')
1561
 
            contents = 'contents of %s\n' % (fname_utf8,)
 
1473
            contents = b'contents of %s\n' % (fname_utf8,)
1562
1474
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1563
1475
 
1564
1476
    def test_connect_twice_is_same_content(self):
1567
1479
        transport = self.get_transport()
1568
1480
        if transport.is_readonly():
1569
1481
            return
1570
 
        transport.put_bytes('foo', 'bar')
 
1482
        transport.put_bytes('foo', b'bar')
1571
1483
        transport3 = self.get_transport()
1572
 
        self.check_transport_contents('bar', transport3, 'foo')
 
1484
        self.check_transport_contents(b'bar', transport3, 'foo')
1573
1485
 
1574
1486
        # now opening at a relative url should give use a sane result:
1575
1487
        transport.mkdir('newdir')
1576
1488
        transport5 = self.get_transport('newdir')
1577
1489
        transport6 = transport5.clone('..')
1578
 
        self.check_transport_contents('bar', transport6, 'foo')
 
1490
        self.check_transport_contents(b'bar', transport6, 'foo')
1579
1491
 
1580
1492
    def test_lock_write(self):
1581
1493
        """Test transport-level write locks.
1584
1496
        """
1585
1497
        transport = self.get_transport()
1586
1498
        if transport.is_readonly():
1587
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1499
            self.assertRaises(TransportNotPossible,
 
1500
                              transport.lock_write, 'foo')
1588
1501
            return
1589
 
        transport.put_bytes('lock', '')
 
1502
        transport.put_bytes('lock', b'')
1590
1503
        try:
1591
1504
            lock = transport.lock_write('lock')
1592
1505
        except TransportNotPossible:
1602
1515
        """
1603
1516
        transport = self.get_transport()
1604
1517
        if transport.is_readonly():
1605
 
            file('lock', 'w').close()
 
1518
            open('lock', 'w').close()
1606
1519
        else:
1607
 
            transport.put_bytes('lock', '')
 
1520
            transport.put_bytes('lock', b'')
1608
1521
        try:
1609
1522
            lock = transport.lock_read('lock')
1610
1523
        except TransportNotPossible:
1616
1529
    def test_readv(self):
1617
1530
        transport = self.get_transport()
1618
1531
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1532
            with open('a', 'w') as f:
 
1533
                f.write('0123456789')
1620
1534
        else:
1621
 
            transport.put_bytes('a', '0123456789')
 
1535
            transport.put_bytes('a', b'0123456789')
1622
1536
 
1623
1537
        d = list(transport.readv('a', ((0, 1),)))
1624
 
        self.assertEqual(d[0], (0, '0'))
 
1538
        self.assertEqual(d[0], (0, b'0'))
1625
1539
 
1626
1540
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
 
        self.assertEqual(d[0], (0, '0'))
1628
 
        self.assertEqual(d[1], (1, '1'))
1629
 
        self.assertEqual(d[2], (3, '34'))
1630
 
        self.assertEqual(d[3], (9, '9'))
 
1541
        self.assertEqual(d[0], (0, b'0'))
 
1542
        self.assertEqual(d[1], (1, b'1'))
 
1543
        self.assertEqual(d[2], (3, b'34'))
 
1544
        self.assertEqual(d[3], (9, b'9'))
1631
1545
 
1632
1546
    def test_readv_out_of_order(self):
1633
1547
        transport = self.get_transport()
1634
1548
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1549
            with open('a', 'w') as f:
 
1550
                f.write('0123456789')
1636
1551
        else:
1637
 
            transport.put_bytes('a', '01234567890')
 
1552
            transport.put_bytes('a', b'01234567890')
1638
1553
 
1639
1554
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
 
        self.assertEqual(d[0], (1, '1'))
1641
 
        self.assertEqual(d[1], (9, '9'))
1642
 
        self.assertEqual(d[2], (0, '0'))
1643
 
        self.assertEqual(d[3], (3, '34'))
 
1555
        self.assertEqual(d[0], (1, b'1'))
 
1556
        self.assertEqual(d[1], (9, b'9'))
 
1557
        self.assertEqual(d[2], (0, b'0'))
 
1558
        self.assertEqual(d[3], (3, b'34'))
1644
1559
 
1645
1560
    def test_readv_with_adjust_for_latency(self):
1646
1561
        transport = self.get_transport()
1651
1566
        # reference the returned data with the random data. To avoid doing
1652
1567
        # multiple large random byte look ups we do several tests on the same
1653
1568
        # backing data.
1654
 
        content = osutils.rand_bytes(200*1024)
 
1569
        content = osutils.rand_bytes(200 * 1024)
1655
1570
        content_size = len(content)
1656
1571
        if transport.is_readonly():
1657
1572
            self.build_tree_contents([('a', content)])
1658
1573
        else:
1659
1574
            transport.put_bytes('a', content)
 
1575
 
1660
1576
        def check_result_data(result_vector):
1661
1577
            for item in result_vector:
1662
1578
                data_len = len(item[1])
1664
1580
 
1665
1581
        # start corner case
1666
1582
        result = list(transport.readv('a', ((0, 30),),
1667
 
            adjust_for_latency=True, upper_limit=content_size))
 
1583
                                      adjust_for_latency=True, upper_limit=content_size))
1668
1584
        # we expect 1 result, from 0, to something > 30
1669
1585
        self.assertEqual(1, len(result))
1670
1586
        self.assertEqual(0, result[0][0])
1672
1588
        check_result_data(result)
1673
1589
        # end of file corner case
1674
1590
        result = list(transport.readv('a', ((204700, 100),),
1675
 
            adjust_for_latency=True, upper_limit=content_size))
 
1591
                                      adjust_for_latency=True, upper_limit=content_size))
1676
1592
        # we expect 1 result, from 204800- its length, to the end
1677
1593
        self.assertEqual(1, len(result))
1678
1594
        data_len = len(result[0][1])
1679
 
        self.assertEqual(204800-data_len, result[0][0])
 
1595
        self.assertEqual(204800 - data_len, result[0][0])
1680
1596
        self.assertTrue(data_len >= 100)
1681
1597
        check_result_data(result)
1682
1598
        # out of order ranges are made in order
1683
1599
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1684
 
            adjust_for_latency=True, upper_limit=content_size))
 
1600
                                      adjust_for_latency=True, upper_limit=content_size))
1685
1601
        # we expect 2 results, in order, start and end.
1686
1602
        self.assertEqual(2, len(result))
1687
1603
        # start
1690
1606
        self.assertTrue(data_len >= 30)
1691
1607
        # end
1692
1608
        data_len = len(result[1][1])
1693
 
        self.assertEqual(204800-data_len, result[1][0])
 
1609
        self.assertEqual(204800 - data_len, result[1][0])
1694
1610
        self.assertTrue(data_len >= 100)
1695
1611
        check_result_data(result)
1696
1612
        # close ranges get combined (even if out of order)
1697
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1613
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1698
1614
            result = list(transport.readv('a', request_vector,
1699
 
                adjust_for_latency=True, upper_limit=content_size))
 
1615
                                          adjust_for_latency=True, upper_limit=content_size))
1700
1616
            self.assertEqual(1, len(result))
1701
1617
            data_len = len(result[0][1])
1702
1618
            # minimum length is from 400 to 1034 - 634
1710
1626
        transport = self.get_transport()
1711
1627
        # test from observed failure case.
1712
1628
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1629
            with open('a', 'w') as f:
 
1630
                f.write('a' * 1024 * 1024)
1714
1631
        else:
1715
 
            transport.put_bytes('a', 'a'*1024*1024)
 
1632
            transport.put_bytes('a', b'a' * 1024 * 1024)
1716
1633
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
 
            (465373, 800), (947422, 800)]
1719
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1720
 
        found_items = [False]*9
 
1634
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1635
                         (465373, 800), (947422, 800)]
 
1636
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1637
        found_items = [False] * 9
1721
1638
        for pos, (start, length) in enumerate(broken_vector):
1722
1639
            # check the range is covered by the result
1723
1640
            for offset, data in results:
1724
1641
                if offset <= start and start + length <= offset + len(data):
1725
1642
                    found_items[pos] = True
1726
 
        self.assertEqual([True]*9, found_items)
 
1643
        self.assertEqual([True] * 9, found_items)
1727
1644
 
1728
1645
    def test_get_with_open_write_stream_sees_all_content(self):
1729
1646
        t = self.get_transport()
1730
1647
        if t.is_readonly():
1731
1648
            return
1732
 
        handle = t.open_write_stream('foo')
1733
 
        try:
1734
 
            handle.write('bcd')
1735
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1736
 
        finally:
1737
 
            handle.close()
 
1649
        with t.open_write_stream('foo') as handle:
 
1650
            handle.write(b'bcd')
 
1651
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1652
                t.readv('foo', ((0, 1), (2, 1)))))
1738
1653
 
1739
1654
    def test_get_smart_medium(self):
1740
1655
        """All transports must either give a smart medium, or know they can't.
1750
1665
    def test_readv_short_read(self):
1751
1666
        transport = self.get_transport()
1752
1667
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1668
            with open('a', 'w') as f:
 
1669
                f.write('0123456789')
1754
1670
        else:
1755
 
            transport.put_bytes('a', '01234567890')
 
1671
            transport.put_bytes('a', b'01234567890')
1756
1672
 
1757
1673
        # This is intentionally reading off the end of the file
1758
1674
        # since we are sure that it cannot get there
1759
1675
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
1676
                               # Can be raised by paramiko
1761
1677
                               AssertionError),
1762
 
                              transport.readv, 'a', [(1,1), (8,10)])
 
1678
                              transport.readv, 'a', [(1, 1), (8, 10)])
1763
1679
 
1764
1680
        # This is trying to seek past the end of the file, it should
1765
1681
        # also raise a special error
1766
1682
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
 
                              transport.readv, 'a', [(12,2)])
 
1683
                              transport.readv, 'a', [(12, 2)])
 
1684
 
 
1685
    def test_no_segment_parameters(self):
 
1686
        """Segment parameters should be stripped and stored in
 
1687
        transport.segment_parameters."""
 
1688
        transport = self.get_transport("foo")
 
1689
        self.assertEqual({}, transport.get_segment_parameters())
 
1690
 
 
1691
    def test_segment_parameters(self):
 
1692
        """Segment parameters should be stripped and stored in
 
1693
        transport.get_segment_parameters()."""
 
1694
        base_url = self._server.get_url()
 
1695
        parameters = {"key1": "val1", "key2": "val2"}
 
1696
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1697
        transport = _mod_transport.get_transport_from_url(url)
 
1698
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1699
 
 
1700
    def test_set_segment_parameters(self):
 
1701
        """Segment parameters can be set and show up in base."""
 
1702
        transport = self.get_transport("foo")
 
1703
        orig_base = transport.base
 
1704
        transport.set_segment_parameter("arm", "board")
 
1705
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1706
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1707
        transport.set_segment_parameter("arm", None)
 
1708
        transport.set_segment_parameter("nonexistant", None)
 
1709
        self.assertEqual({}, transport.get_segment_parameters())
 
1710
        self.assertEqual(orig_base, transport.base)
 
1711
 
 
1712
    def test_stat_symlink(self):
 
1713
        # if a transport points directly to a symlink (and supports symlinks
 
1714
        # at all) you can tell this.  helps with bug 32669.
 
1715
        t = self.get_transport()
 
1716
        try:
 
1717
            t.symlink('target', 'link')
 
1718
        except TransportNotPossible:
 
1719
            raise TestSkipped("symlinks not supported")
 
1720
        t2 = t.clone('link')
 
1721
        st = t2.stat('')
 
1722
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1723
 
 
1724
    def test_abspath_url_unquote_unreserved(self):
 
1725
        """URLs from abspath should have unreserved characters unquoted
 
1726
 
 
1727
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1728
        """
 
1729
        t = self.get_transport()
 
1730
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1731
        self.assertEqual(t.base + "-.09AZ_az~",
 
1732
                         t.abspath(needlessly_escaped_dir))
 
1733
 
 
1734
    def test_clone_url_unquote_unreserved(self):
 
1735
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1736
 
 
1737
        Cloned transports should be prefix comparable for things like the
 
1738
        isolation checking of tests, see lp:842223 for more.
 
1739
        """
 
1740
        t1 = self.get_transport()
 
1741
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1742
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1743
        t2 = t1.clone(needlessly_escaped_dir)
 
1744
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1745
 
 
1746
    def test_hook_post_connection_one(self):
 
1747
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1748
        log = []
 
1749
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1750
        t = self.get_transport()
 
1751
        self.assertEqual([], log)
 
1752
        t.has("non-existant")
 
1753
        if isinstance(t, RemoteTransport):
 
1754
            self.assertEqual([t.get_smart_medium()], log)
 
1755
        elif isinstance(t, ConnectedTransport):
 
1756
            self.assertEqual([t], log)
 
1757
        else:
 
1758
            self.assertEqual([], log)
 
1759
 
 
1760
    def test_hook_post_connection_multi(self):
 
1761
        """Fire post_connect hook once per unshared underlying connection"""
 
1762
        log = []
 
1763
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1764
        t1 = self.get_transport()
 
1765
        t2 = t1.clone(".")
 
1766
        t3 = self.get_transport()
 
1767
        self.assertEqual([], log)
 
1768
        t1.has("x")
 
1769
        t2.has("x")
 
1770
        t3.has("x")
 
1771
        if isinstance(t1, RemoteTransport):
 
1772
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1773
        elif isinstance(t1, ConnectedTransport):
 
1774
            self.assertEqual([t1, t3], log)
 
1775
        else:
 
1776
            self.assertEqual([], log)