/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2020-07-15 21:51:27 UTC
  • mto: (7490.40.58 work)
  • mto: This revision was merged to the branch mainline in revision 7519.
  • Revision ID: jelmer@jelmer.uk-20200715215127-3hn9ktbg3f1xikjj
More fixes for hg probing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
24
23
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
24
import stat
28
25
import sys
29
 
import unittest
30
26
 
31
 
from bzrlib import (
 
27
from .. import (
32
28
    errors,
33
29
    osutils,
 
30
    pyutils,
34
31
    tests,
 
32
    transport as _mod_transport,
35
33
    urlutils,
36
34
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      NoSuchFile,
 
38
                      PathError,
 
39
                      TransportNotPossible,
 
40
                      )
 
41
from ..osutils import getcwd
 
42
from ..sixish import (
 
43
    BytesIO,
 
44
    zip,
 
45
    )
 
46
from . import (
51
47
    TestSkipped,
52
48
    TestNotApplicable,
53
49
    multiply_tests,
54
50
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
51
from . import test_server
 
52
from .test_transport import TestTransportImplementation
 
53
from ..transport import (
58
54
    ConnectedTransport,
59
 
    get_transport,
 
55
    Transport,
60
56
    _get_transport_modules,
61
57
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
58
from ..transport.memory import MemoryTransport
 
59
from ..transport.remote import RemoteTransport
63
60
 
64
61
 
65
62
def get_transport_test_permutations(module):
78
75
    for module in _get_transport_modules():
79
76
        try:
80
77
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
78
                pyutils.get_named_object(module))
82
79
            for (klass, server_factory) in permutations:
83
80
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
 
                    {"transport_class":klass,
85
 
                     "transport_server":server_factory})
 
81
                            {"transport_class": klass,
 
82
                             "transport_server": server_factory})
86
83
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
84
        except errors.DependencyNotPresent as e:
88
85
            # Continue even if a dependency prevents us
89
86
            # from adding this test
90
87
            pass
91
88
    return result
92
89
 
93
90
 
94
 
def load_tests(standard_tests, module, loader):
 
91
def load_tests(loader, standard_tests, pattern):
95
92
    """Multiply tests for tranport implementations."""
96
93
    result = loader.suiteClass()
97
94
    scenarios = transport_test_permutations()
102
99
 
103
100
    def setUp(self):
104
101
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
102
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
103
 
107
104
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
105
        """Check that transport.get_bytes(relpath) == content."""
 
106
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
107
 
111
108
    def test_ensure_base_missing(self):
112
109
        """.ensure_base() should create the directory if it doesn't exist"""
156
153
        self.assertEqual(True, t.has('a'))
157
154
        self.assertEqual(False, t.has('c'))
158
155
        self.assertEqual(True, t.has(urlutils.escape('%')))
159
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
 
                                           'e', 'f', 'g', 'h'])),
161
 
                         [True, True, False, False,
162
 
                          True, False, True, False])
163
156
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
157
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
158
                                           urlutils.escape('%%')]))
166
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
 
                                                'e', 'f', 'g', 'h']))),
168
 
                         [True, True, False, False,
169
 
                          True, False, True, False])
170
159
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
160
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
161
 
183
172
    def test_get(self):
184
173
        t = self.get_transport()
185
174
 
186
 
        files = ['a', 'b', 'e', 'g']
187
 
        contents = ['contents of a\n',
188
 
                    'contents of b\n',
189
 
                    'contents of e\n',
190
 
                    'contents of g\n',
191
 
                    ]
192
 
        self.build_tree(files, transport=t, line_endings='binary')
193
 
        self.check_transport_contents('contents of a\n', t, 'a')
194
 
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
197
 
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
199
 
            self.assertEqual(content, f.read())
200
 
 
201
 
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
204
 
            self.assertEqual(content, f.read())
 
175
        files = ['a']
 
176
        content = b'contents of a\n'
 
177
        self.build_tree(['a'], transport=t, line_endings='binary')
 
178
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
179
        f = t.get('a')
 
180
        self.assertEqual(content, f.read())
205
181
 
206
182
    def test_get_unknown_file(self):
207
183
        t = self.get_transport()
208
184
        files = ['a', 'b']
209
 
        contents = ['contents of a\n',
210
 
                    'contents of b\n',
 
185
        contents = [b'contents of a\n',
 
186
                    b'contents of b\n',
211
187
                    ]
212
188
        self.build_tree(files, transport=t, line_endings='binary')
213
189
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
190
 
 
191
        def iterate_and_close(func, *args):
 
192
            for f in func(*args):
 
193
                # We call f.read() here because things like paramiko actually
 
194
                # spawn a thread to prefetch the content, which we want to
 
195
                # consume before we close the handle.
 
196
                content = f.read()
 
197
                f.close()
216
198
 
217
199
    def test_get_directory_read_gives_ReadError(self):
218
200
        """consistent errors for read() on a file returned by get()."""
238
220
        t = self.get_transport()
239
221
 
240
222
        files = ['a', 'b', 'e', 'g']
241
 
        contents = ['contents of a\n',
242
 
                    'contents of b\n',
243
 
                    'contents of e\n',
244
 
                    'contents of g\n',
 
223
        contents = [b'contents of a\n',
 
224
                    b'contents of b\n',
 
225
                    b'contents of e\n',
 
226
                    b'contents of g\n',
245
227
                    ]
246
228
        self.build_tree(files, transport=t, line_endings='binary')
247
 
        self.check_transport_contents('contents of a\n', t, 'a')
 
229
        self.check_transport_contents(b'contents of a\n', t, 'a')
248
230
 
249
231
        for content, fname in zip(contents, files):
250
232
            self.assertEqual(content, t.get_bytes(fname))
251
233
 
252
234
    def test_get_bytes_unknown_file(self):
253
235
        t = self.get_transport()
254
 
 
255
236
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
237
 
257
238
    def test_get_with_open_write_stream_sees_all_content(self):
258
239
        t = self.get_transport()
259
240
        if t.is_readonly():
260
241
            return
261
 
        handle = t.open_write_stream('foo')
262
 
        try:
263
 
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
265
 
        finally:
266
 
            handle.close()
 
242
        with t.open_write_stream('foo') as handle:
 
243
            handle.write(b'b')
 
244
            self.assertEqual(b'b', t.get_bytes('foo'))
267
245
 
268
246
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
247
        t = self.get_transport()
270
248
        if t.is_readonly():
271
249
            return
272
 
        handle = t.open_write_stream('foo')
273
 
        try:
274
 
            handle.write('b')
275
 
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
277
 
        finally:
278
 
            handle.close()
 
250
        with t.open_write_stream('foo') as handle:
 
251
            handle.write(b'b')
 
252
            self.assertEqual(b'b', t.get_bytes('foo'))
 
253
            with t.get('foo') as f:
 
254
                self.assertEqual(b'b', f.read())
279
255
 
280
256
    def test_put_bytes(self):
281
257
        t = self.get_transport()
282
258
 
283
259
        if t.is_readonly():
284
260
            self.assertRaises(TransportNotPossible,
285
 
                    t.put_bytes, 'a', 'some text for a\n')
 
261
                              t.put_bytes, 'a', b'some text for a\n')
286
262
            return
287
263
 
288
 
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
290
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
264
        t.put_bytes('a', b'some text for a\n')
 
265
        self.assertTrue(t.has('a'))
 
266
        self.check_transport_contents(b'some text for a\n', t, 'a')
291
267
 
292
268
        # The contents should be overwritten
293
 
        t.put_bytes('a', 'new text for a\n')
294
 
        self.check_transport_contents('new text for a\n', t, 'a')
 
269
        t.put_bytes('a', b'new text for a\n')
 
270
        self.check_transport_contents(b'new text for a\n', t, 'a')
295
271
 
296
272
        self.assertRaises(NoSuchFile,
297
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
273
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
298
274
 
299
275
    def test_put_bytes_non_atomic(self):
300
276
        t = self.get_transport()
301
277
 
302
278
        if t.is_readonly():
303
279
            self.assertRaises(TransportNotPossible,
304
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
280
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
305
281
            return
306
282
 
307
 
        self.failIf(t.has('a'))
308
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
310
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
283
        self.assertFalse(t.has('a'))
 
284
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
285
        self.assertTrue(t.has('a'))
 
286
        self.check_transport_contents(b'some text for a\n', t, 'a')
311
287
        # Put also replaces contents
312
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
288
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
289
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
314
290
 
315
291
        # Make sure we can create another file
316
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
292
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
317
293
        # And overwrite 'a' with empty contents
318
 
        t.put_bytes_non_atomic('a', '')
319
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
320
 
        self.check_transport_contents('', t, 'a')
 
294
        t.put_bytes_non_atomic('a', b'')
 
295
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
296
        self.check_transport_contents(b'', t, 'a')
321
297
 
322
298
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
323
 
                                       'contents\n')
 
299
                          b'contents\n')
324
300
        # Now test the create_parent flag
325
301
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
 
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
328
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
302
                          b'contents\n')
 
303
        self.assertFalse(t.has('dir/a'))
 
304
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
329
305
                               create_parent_dir=True)
330
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
306
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
331
307
 
332
308
        # But we still get NoSuchFile if we can't make the parent dir
333
309
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
334
 
                                       'contents\n',
335
 
                                       create_parent_dir=True)
 
310
                          b'contents\n',
 
311
                          create_parent_dir=True)
336
312
 
337
313
    def test_put_bytes_permissions(self):
338
314
        t = self.get_transport()
342
318
        if not t._can_roundtrip_unix_modebits():
343
319
            # Can't roundtrip, so no need to run this test
344
320
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
321
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
322
        self.assertTransportMode(t, 'mode644', 0o644)
 
323
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
324
        self.assertTransportMode(t, 'mode666', 0o666)
 
325
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
326
        self.assertTransportMode(t, 'mode600', 0o600)
351
327
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
328
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
329
        self.assertTransportMode(t, 'mode400', 0o400)
354
330
 
355
331
        # The default permissions should be based on the current umask
356
332
        umask = osutils.get_umask()
357
 
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
333
        t.put_bytes('nomode', b'test text\n', mode=None)
 
334
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
335
 
360
336
    def test_put_bytes_non_atomic_permissions(self):
361
337
        t = self.get_transport()
365
341
        if not t._can_roundtrip_unix_modebits():
366
342
            # Can't roundtrip, so no need to run this test
367
343
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
344
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
345
        self.assertTransportMode(t, 'mode644', 0o644)
 
346
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
347
        self.assertTransportMode(t, 'mode666', 0o666)
 
348
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
349
        self.assertTransportMode(t, 'mode600', 0o600)
 
350
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
351
        self.assertTransportMode(t, 'mode400', 0o400)
376
352
 
377
353
        # The default permissions should be based on the current umask
378
354
        umask = osutils.get_umask()
379
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
355
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
356
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
357
 
382
358
        # We should also be able to set the mode for a parent directory
383
359
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
360
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
361
                               dir_mode=0o700, create_parent_dir=True)
 
362
        self.assertTransportMode(t, 'dir700', 0o700)
 
363
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
364
                               dir_mode=0o770, create_parent_dir=True)
 
365
        self.assertTransportMode(t, 'dir770', 0o770)
 
366
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
367
                               dir_mode=0o777, create_parent_dir=True)
 
368
        self.assertTransportMode(t, 'dir777', 0o777)
393
369
 
394
370
    def test_put_file(self):
395
371
        t = self.get_transport()
396
372
 
397
373
        if t.is_readonly():
398
374
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
375
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
400
376
            return
401
377
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
378
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
379
        # put_file returns the length of the data written
404
380
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
406
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
381
        self.assertTrue(t.has('a'))
 
382
        self.check_transport_contents(b'some text for a\n', t, 'a')
407
383
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
384
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
385
        self.assertEqual(19, result)
410
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
386
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
411
387
        self.assertRaises(NoSuchFile,
412
388
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
389
                          BytesIO(b'contents'))
414
390
 
415
391
    def test_put_file_non_atomic(self):
416
392
        t = self.get_transport()
417
393
 
418
394
        if t.is_readonly():
419
395
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
396
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
397
            return
422
398
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
426
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
399
        self.assertFalse(t.has('a'))
 
400
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
401
        self.assertTrue(t.has('a'))
 
402
        self.check_transport_contents(b'some text for a\n', t, 'a')
427
403
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
404
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
405
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
430
406
 
431
407
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
408
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
409
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
435
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
 
        self.check_transport_contents('', t, 'a')
 
410
        t.put_file_non_atomic('a', BytesIO(b''))
 
411
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
412
        self.check_transport_contents(b'', t, 'a')
437
413
 
438
414
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
415
                          BytesIO(b'contents\n'))
440
416
        # Now test the create_parent flag
441
417
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
418
                          BytesIO(b'contents\n'))
 
419
        self.assertFalse(t.has('dir/a'))
 
420
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
421
                              create_parent_dir=True)
446
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
422
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
447
423
 
448
424
        # But we still get NoSuchFile if we can't make the parent dir
449
425
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
451
 
                                       create_parent_dir=True)
 
426
                          BytesIO(b'contents\n'),
 
427
                          create_parent_dir=True)
452
428
 
453
429
    def test_put_file_permissions(self):
454
430
 
459
435
        if not t._can_roundtrip_unix_modebits():
460
436
            # Can't roundtrip, so no need to run this test
461
437
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
438
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
439
        self.assertTransportMode(t, 'mode644', 0o644)
 
440
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
441
        self.assertTransportMode(t, 'mode666', 0o666)
 
442
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
443
        self.assertTransportMode(t, 'mode600', 0o600)
468
444
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
445
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
446
        self.assertTransportMode(t, 'mode400', 0o400)
471
447
        # The default permissions should be based on the current umask
472
448
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
449
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
450
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
451
 
476
452
    def test_put_file_non_atomic_permissions(self):
477
453
        t = self.get_transport()
481
457
        if not t._can_roundtrip_unix_modebits():
482
458
            # Can't roundtrip, so no need to run this test
483
459
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
460
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
461
        self.assertTransportMode(t, 'mode644', 0o644)
 
462
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
463
        self.assertTransportMode(t, 'mode666', 0o666)
 
464
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
465
        self.assertTransportMode(t, 'mode600', 0o600)
490
466
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
467
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
468
        self.assertTransportMode(t, 'mode400', 0o400)
493
469
 
494
470
        # The default permissions should be based on the current umask
495
471
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
472
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
473
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
474
 
499
475
        # We should also be able to set the mode for a parent directory
500
476
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
477
        sio = BytesIO()
 
478
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
479
                              dir_mode=0o700, create_parent_dir=True)
 
480
        self.assertTransportMode(t, 'dir700', 0o700)
 
481
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
482
                              dir_mode=0o770, create_parent_dir=True)
 
483
        self.assertTransportMode(t, 'dir770', 0o770)
 
484
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
485
                              dir_mode=0o777, create_parent_dir=True)
 
486
        self.assertTransportMode(t, 'dir777', 0o777)
511
487
 
512
488
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
489
        t = self.get_transport()
520
490
        if t.is_readonly():
521
491
            return
522
492
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
493
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
494
 
540
495
    def test_mkdir(self):
541
496
        t = self.get_transport()
546
501
            # defined for the transport interface.
547
502
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
503
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
504
            self.assertRaises(TransportNotPossible,
 
505
                              t.mkdir, 'path/doesnt/exist')
551
506
            return
552
507
        # Test mkdir
553
508
        t.mkdir('dir_a')
557
512
        t.mkdir('dir_b')
558
513
        self.assertEqual(t.has('dir_b'), True)
559
514
 
560
 
        t.mkdir_multi(['dir_c', 'dir_d'])
561
 
 
562
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
 
        self.assertEqual(list(t.has_multi(
564
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
 
            [True, True, True, False,
567
 
             True, True, True, True])
 
515
        self.assertEqual([t.has(n) for n in
 
516
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
517
                         [True, True, False, True])
568
518
 
569
519
        # we were testing that a local mkdir followed by a transport
570
520
        # mkdir failed thusly, but given that we * in one process * do not
575
525
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
576
526
 
577
527
        # Test get/put in sub-directories
578
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
528
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
529
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
530
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
531
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
582
532
 
583
533
        # mkdir of a dir with an absent parent
584
534
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
591
541
            # no sense testing on this transport
592
542
            return
593
543
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
544
        t.mkdir('dmode755', mode=0o755)
 
545
        self.assertTransportMode(t, 'dmode755', 0o755)
 
546
        t.mkdir('dmode555', mode=0o555)
 
547
        self.assertTransportMode(t, 'dmode555', 0o555)
 
548
        t.mkdir('dmode777', mode=0o777)
 
549
        self.assertTransportMode(t, 'dmode777', 0o777)
 
550
        t.mkdir('dmode700', mode=0o700)
 
551
        self.assertTransportMode(t, 'dmode700', 0o700)
 
552
        t.mkdir('mdmode755', mode=0o755)
 
553
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
554
 
605
555
        # Default mode should be based on umask
606
556
        umask = osutils.get_umask()
607
557
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
558
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
559
 
610
560
    def test_opening_a_file_stream_creates_file(self):
611
561
        t = self.get_transport()
613
563
            return
614
564
        handle = t.open_write_stream('foo')
615
565
        try:
616
 
            self.assertEqual('', t.get_bytes('foo'))
 
566
            self.assertEqual(b'', t.get_bytes('foo'))
617
567
        finally:
618
568
            handle.close()
619
569
 
620
570
    def test_opening_a_file_stream_can_set_mode(self):
621
571
        t = self.get_transport()
622
572
        if t.is_readonly():
 
573
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
574
                              t.open_write_stream, 'foo')
623
575
            return
624
576
        if not t._can_roundtrip_unix_modebits():
625
577
            # Can't roundtrip, so no need to run this test
626
578
            return
 
579
 
627
580
        def check_mode(name, mode, expected):
628
581
            handle = t.open_write_stream(name, mode=mode)
629
582
            handle.close()
630
583
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
584
        check_mode('mode644', 0o644, 0o644)
 
585
        check_mode('mode666', 0o666, 0o666)
 
586
        check_mode('mode600', 0o600, 0o600)
634
587
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
588
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
589
 
637
590
    def test_copy_to(self):
638
591
        # FIXME: test:   same server to same server (partly done)
645
598
            self.build_tree(files, transport=transport_from)
646
599
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
600
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
601
                self.check_transport_contents(transport_to.get_bytes(f),
649
602
                                              transport_from, f)
650
603
 
651
604
        t = self.get_transport()
 
605
        if t.__class__.__name__ == "SFTPTransport":
 
606
            self.skipTest("SFTP copy_to currently too flakey to use")
652
607
        temp_transport = MemoryTransport('memory:///')
653
608
        simple_copy_files(t, temp_transport)
654
609
        if not t.is_readonly():
656
611
            t2 = t.clone('copy_to_simple')
657
612
            simple_copy_files(t, t2)
658
613
 
659
 
 
660
614
        # Test that copying into a missing directory raises
661
615
        # NoSuchFile
662
616
        if t.is_readonly():
663
617
            self.build_tree(['e/', 'e/f'])
664
618
        else:
665
619
            t.mkdir('e')
666
 
            t.put_bytes('e/f', 'contents of e')
 
620
            t.put_bytes('e/f', b'contents of e')
667
621
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
622
        temp_transport.mkdir('e')
669
623
        t.copy_to(['e/f'], temp_transport)
674
628
        files = ['a', 'b', 'c', 'd']
675
629
        t.copy_to(iter(files), temp_transport)
676
630
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
631
            self.check_transport_contents(temp_transport.get_bytes(f),
678
632
                                          t, f)
679
633
        del temp_transport
680
634
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
635
        for mode in (0o666, 0o644, 0o600, 0o400):
682
636
            temp_transport = MemoryTransport("memory:///")
683
637
            t.copy_to(files, temp_transport, mode=mode)
684
638
            for f in files:
699
653
 
700
654
        if t.is_readonly():
701
655
            self.assertRaises(TransportNotPossible,
702
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
656
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
703
657
            return
704
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
705
 
        t.put_bytes('b', 'contents\nfor b\n')
 
658
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
659
        t.put_bytes('b', b'contents\nfor b\n')
706
660
 
707
661
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
662
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
663
 
710
664
        self.check_transport_contents(
711
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
665
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
712
666
            t, 'a')
713
667
 
714
668
        # a file with no parent should fail..
715
669
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
670
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
671
 
718
672
        # And we can create new files, too
719
673
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
 
        self.check_transport_contents('some text\nfor a missing file\n',
 
674
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
675
        self.check_transport_contents(b'some text\nfor a missing file\n',
722
676
                                      t, 'c')
723
677
 
724
678
    def test_append_bytes(self):
726
680
 
727
681
        if t.is_readonly():
728
682
            self.assertRaises(TransportNotPossible,
729
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
683
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
730
684
            return
731
685
 
732
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
686
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
687
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
734
688
 
735
689
        self.assertEqual(20,
736
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
690
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
737
691
 
738
692
        self.check_transport_contents(
739
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
693
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
740
694
            t, 'a')
741
695
 
742
696
        # a file with no parent should fail..
743
697
        self.assertRaises(NoSuchFile,
744
 
                          t.append_bytes, 'missing/path', 'content')
745
 
 
746
 
    def test_append_multi(self):
747
 
        t = self.get_transport()
748
 
 
749
 
        if t.is_readonly():
750
 
            return
751
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
752
 
                         'add\nsome\nmore\ncontents\n')
753
 
        t.put_bytes('b', 'contents\nfor b\n')
754
 
 
755
 
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
758
 
 
759
 
        self.check_transport_contents(
760
 
            'diff\ncontents for\na\n'
761
 
            'add\nsome\nmore\ncontents\n'
762
 
            'and\nthen\nsome\nmore\n',
763
 
            t, 'a')
764
 
        self.check_transport_contents(
765
 
                'contents\nfor b\n'
766
 
                'some\nmore\nfor\nb\n',
767
 
                t, 'b')
768
 
 
769
 
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
772
 
        self.check_transport_contents(
773
 
            'diff\ncontents for\na\n'
774
 
            'add\nsome\nmore\ncontents\n'
775
 
            'and\nthen\nsome\nmore\n'
776
 
            'a little bit more\n',
777
 
            t, 'a')
778
 
        self.check_transport_contents(
779
 
                'contents\nfor b\n'
780
 
                'some\nmore\nfor\nb\n'
781
 
                'from an iterator\n',
782
 
                t, 'b')
783
 
 
784
 
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
787
 
 
788
 
        self.check_transport_contents(
789
 
            'diff\ncontents for\na\n'
790
 
            'add\nsome\nmore\ncontents\n'
791
 
            'and\nthen\nsome\nmore\n'
792
 
            'a little bit more\n'
793
 
            'some text in a\n',
794
 
            t, 'a')
795
 
        self.check_transport_contents('missing file r\n', t, 'd')
 
698
                          t.append_bytes, 'missing/path', b'content')
796
699
 
797
700
    def test_append_file_mode(self):
798
701
        """Check that append accepts a mode parameter"""
800
703
        t = self.get_transport()
801
704
        if t.is_readonly():
802
705
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
706
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
804
707
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
708
        t.append_file('f', BytesIO(b'f'), mode=None)
806
709
 
807
710
    def test_append_bytes_mode(self):
808
711
        # check append_bytes accepts a mode
809
712
        t = self.get_transport()
810
713
        if t.is_readonly():
811
714
            self.assertRaises(TransportNotPossible,
812
 
                t.append_bytes, 'f', 'f', mode=None)
 
715
                              t.append_bytes, 'f', b'f', mode=None)
813
716
            return
814
 
        t.append_bytes('f', 'f', mode=None)
 
717
        t.append_bytes('f', b'f', mode=None)
815
718
 
816
719
    def test_delete(self):
817
720
        # TODO: Test Transport.delete
822
725
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
823
726
            return
824
727
 
825
 
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
728
        t.put_bytes('a', b'a little bit of text\n')
 
729
        self.assertTrue(t.has('a'))
827
730
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
731
        self.assertFalse(t.has('a'))
829
732
 
830
733
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
734
 
832
 
        t.put_bytes('a', 'a text\n')
833
 
        t.put_bytes('b', 'b text\n')
834
 
        t.put_bytes('c', 'c text\n')
 
735
        t.put_bytes('a', b'a text\n')
 
736
        t.put_bytes('b', b'b text\n')
 
737
        t.put_bytes('c', b'c text\n')
835
738
        self.assertEqual([True, True, True],
836
 
                list(t.has_multi(['a', 'b', 'c'])))
837
 
        t.delete_multi(['a', 'c'])
 
739
                         [t.has(n) for n in ['a', 'b', 'c']])
 
740
        t.delete('a')
 
741
        t.delete('c')
838
742
        self.assertEqual([False, True, False],
839
 
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
843
 
 
844
 
        self.assertRaises(NoSuchFile,
845
 
                t.delete_multi, ['a', 'b', 'c'])
846
 
 
847
 
        self.assertRaises(NoSuchFile,
848
 
                t.delete_multi, iter(['a', 'b', 'c']))
849
 
 
850
 
        t.put_bytes('a', 'another a text\n')
851
 
        t.put_bytes('c', 'another c text\n')
852
 
        t.delete_multi(iter(['a', 'b', 'c']))
 
743
                         [t.has(n) for n in ['a', 'b', 'c']])
 
744
        self.assertFalse(t.has('a'))
 
745
        self.assertTrue(t.has('b'))
 
746
        self.assertFalse(t.has('c'))
 
747
 
 
748
        for name in ['a', 'c', 'd']:
 
749
            self.assertRaises(NoSuchFile, t.delete, name)
853
750
 
854
751
        # We should have deleted everything
855
752
        # SftpServer creates control files in the
902
799
        if t.is_readonly():
903
800
            return
904
801
        t.mkdir('foo')
905
 
        t.put_bytes('foo-bar', '')
 
802
        t.put_bytes('foo-bar', b'')
906
803
        t.mkdir('foo-baz')
907
804
        t.rmdir('foo')
908
805
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
806
        self.assertTrue(t.has('foo-bar'))
910
807
 
911
808
    def test_rename_dir_succeeds(self):
912
809
        t = self.get_transport()
913
810
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
811
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
812
                              t.rename, 'foo', 'bar')
 
813
            return
915
814
        t.mkdir('adir')
916
815
        t.mkdir('adir/asubdir')
917
816
        t.rename('adir', 'bdir')
922
821
        """Attempting to replace a nonemtpy directory should fail"""
923
822
        t = self.get_transport()
924
823
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
824
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
825
                              t.rename, 'foo', 'bar')
 
826
            return
926
827
        t.mkdir('adir')
927
828
        t.mkdir('adir/asubdir')
928
829
        t.mkdir('bdir')
943
844
        t.mkdir('b')
944
845
        ta = t.clone('a')
945
846
        tb = t.clone('b')
946
 
        ta.put_bytes('f', 'aoeu')
 
847
        ta.put_bytes('f', b'aoeu')
947
848
        ta.rename('f', '../b/f')
948
849
        self.assertTrue(tb.has('f'))
949
850
        self.assertFalse(ta.has('f'))
991
892
        # creates control files in the working directory
992
893
        # perhaps all of this could be done in a subdirectory
993
894
 
994
 
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
895
        t.put_bytes('a', b'a first file\n')
 
896
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
996
897
 
997
898
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
899
        self.assertTrue(t.has('b'))
 
900
        self.assertFalse(t.has('a'))
1000
901
 
1001
 
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
902
        self.check_transport_contents(b'a first file\n', t, 'b')
 
903
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1003
904
 
1004
905
        # Overwrite a file
1005
 
        t.put_bytes('c', 'c this file\n')
 
906
        t.put_bytes('c', b'c this file\n')
1006
907
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
1008
 
        self.check_transport_contents('c this file\n', t, 'b')
 
908
        self.assertFalse(t.has('c'))
 
909
        self.check_transport_contents(b'c this file\n', t, 'b')
1009
910
 
1010
911
        # TODO: Try to write a test for atomicity
1011
912
        # TODO: Test moving into a non-existent subdirectory
1012
 
        # TODO: Test Transport.move_multi
1013
913
 
1014
914
    def test_copy(self):
1015
915
        t = self.get_transport()
1017
917
        if t.is_readonly():
1018
918
            return
1019
919
 
1020
 
        t.put_bytes('a', 'a file\n')
 
920
        t.put_bytes('a', b'a file\n')
1021
921
        t.copy('a', 'b')
1022
 
        self.check_transport_contents('a file\n', t, 'b')
 
922
        self.check_transport_contents(b'a file\n', t, 'b')
1023
923
 
1024
924
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1025
925
        os.mkdir('c')
1026
926
        # What should the assert be if you try to copy a
1027
927
        # file over a directory?
1028
928
        #self.assertRaises(Something, t.copy, 'a', 'c')
1029
 
        t.put_bytes('d', 'text in d\n')
 
929
        t.put_bytes('d', b'text in d\n')
1030
930
        t.copy('d', 'b')
1031
 
        self.check_transport_contents('text in d\n', t, 'b')
1032
 
 
1033
 
        # TODO: test copy_multi
 
931
        self.check_transport_contents(b'text in d\n', t, 'b')
1034
932
 
1035
933
    def test_connection_error(self):
1036
934
        """ConnectionError is raised when connection is impossible.
1042
940
        except NotImplementedError:
1043
941
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
942
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
943
        t = _mod_transport.get_transport_from_url(url)
1046
944
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
945
 
1048
946
    def test_stat(self):
1053
951
 
1054
952
        try:
1055
953
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
954
        except TransportNotPossible as e:
1057
955
            # This transport cannot stat
1058
956
            return
1059
957
 
1064
962
        for path, size in zip(paths, sizes):
1065
963
            st = t.stat(path)
1066
964
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
965
                self.assertTrue(S_ISDIR(st.st_mode))
1068
966
                # directory sizes are meaningless
1069
967
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
968
                self.assertTrue(S_ISREG(st.st_mode))
1071
969
                self.assertEqual(size, st.st_size)
1072
970
 
1073
 
        remote_stats = list(t.stat_multi(paths))
1074
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1075
 
 
1076
971
        self.assertRaises(NoSuchFile, t.stat, 'q')
1077
972
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1078
973
 
1079
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
974
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
975
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
976
        st = subdir.stat('./file')
 
977
        st = subdir.stat('.')
1085
978
 
1086
979
    def test_hardlink(self):
1087
980
        from stat import ST_NLINK
1096
989
        try:
1097
990
            t.hardlink(source_name, link_name)
1098
991
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
992
            self.assertTrue(t.has(source_name))
 
993
            self.assertTrue(t.has(link_name))
1101
994
 
1102
995
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
996
            self.assertEqual(st[ST_NLINK], 2)
1104
997
        except TransportNotPossible:
1105
998
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
999
                              self._server.__class__)
1118
1011
        try:
1119
1012
            t.symlink(source_name, link_name)
1120
1013
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1014
            self.assertTrue(t.has(source_name))
 
1015
            self.assertTrue(t.has(link_name))
1123
1016
 
1124
1017
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
1126
 
        except TransportNotPossible:
1127
 
            raise TestSkipped("Transport %s does not support symlinks." %
1128
 
                              self._server.__class__)
1129
 
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1018
            self.assertTrue(S_ISLNK(st.st_mode),
 
1019
                            "expected symlink, got mode %o" % st.st_mode)
 
1020
        except TransportNotPossible:
 
1021
            raise TestSkipped("Transport %s does not support symlinks." %
 
1022
                              self._server.__class__)
 
1023
 
 
1024
        self.assertEqual(source_name, t.readlink(link_name))
 
1025
 
 
1026
    def test_readlink_nonexistent(self):
 
1027
        t = self.get_transport()
 
1028
        try:
 
1029
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
 
1030
        except TransportNotPossible:
 
1031
            raise TestSkipped("Transport %s does not support symlinks." %
 
1032
                              self._server.__class__)
1131
1033
 
1132
1034
    def test_list_dir(self):
1133
1035
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1040
            return
1139
1041
 
1140
1042
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1043
            l = sorted(transport.list_dir(d))
1143
1044
            return l
1144
1045
 
1145
1046
        self.assertEqual([], sorted_list('.', t))
1197
1098
            raise TestSkipped("not a connected transport")
1198
1099
 
1199
1100
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1101
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1102
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1103
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1104
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1105
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1106
 
1206
1107
    def test__reuse_for(self):
1207
1108
        t = self.get_transport()
1214
1115
 
1215
1116
            Only the parameters different from None will be changed.
1216
1117
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1118
            if scheme is None:
 
1119
                scheme = t._parsed_url.scheme
 
1120
            if user is None:
 
1121
                user = t._parsed_url.user
 
1122
            if password is None:
 
1123
                password = t._parsed_url.password
 
1124
            if user is None:
 
1125
                user = t._parsed_url.user
 
1126
            if host is None:
 
1127
                host = t._parsed_url.host
 
1128
            if port is None:
 
1129
                port = t._parsed_url.port
 
1130
            if path is None:
 
1131
                path = t._parsed_url.path
 
1132
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1133
 
1226
 
        if t._scheme == 'ftp':
 
1134
        if t._parsed_url.scheme == 'ftp':
1227
1135
            scheme = 'sftp'
1228
1136
        else:
1229
1137
            scheme = 'ftp'
1230
1138
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1139
        if t._parsed_url.user == 'me':
1232
1140
            user = 'you'
1233
1141
        else:
1234
1142
            user = 'me'
1245
1153
        #   (they may be typed by the user when prompted for example)
1246
1154
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1155
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1156
        self.assertIsNot(t, t._reuse_for(
 
1157
            new_url(host=t._parsed_url.host + 'bar')))
 
1158
        if t._parsed_url.port == 1234:
1250
1159
            port = 4321
1251
1160
        else:
1252
1161
            port = 1234
1261
1170
 
1262
1171
        c = t.clone('subdir')
1263
1172
        # Some transports will create the connection  only when needed
1264
 
        t.has('surely_not') # Force connection
 
1173
        t.has('surely_not')  # Force connection
1265
1174
        self.assertIs(t._get_connection(), c._get_connection())
1266
1175
 
1267
1176
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1177
        new_connection = None
1269
1178
        t._set_connection(new_connection)
1270
1179
        # Check that both transports use the same connection
1271
1180
        self.assertIs(new_connection, t._get_connection())
1276
1185
        if not isinstance(t, ConnectedTransport):
1277
1186
            raise TestSkipped("not a connected transport")
1278
1187
 
1279
 
        t.has('surely_not') # Force connection
 
1188
        t.has('surely_not')  # Force connection
1280
1189
        self.assertIsNot(None, t._get_connection())
1281
1190
 
1282
1191
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1293
1202
 
1294
1203
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1204
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1205
        self.assertTrue(t1.has('a'))
 
1206
        self.assertTrue(t1.has('b/c'))
 
1207
        self.assertFalse(t1.has('c'))
1299
1208
 
1300
1209
        t2 = t1.clone('b')
1301
1210
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1211
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1212
        self.assertTrue(t2.has('c'))
 
1213
        self.assertFalse(t2.has('a'))
1305
1214
 
1306
1215
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1216
        self.assertTrue(t3.has('a'))
 
1217
        self.assertFalse(t3.has('c'))
1309
1218
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1219
        self.assertFalse(t1.has('b/d'))
 
1220
        self.assertFalse(t2.has('d'))
 
1221
        self.assertFalse(t3.has('b/d'))
1313
1222
 
1314
1223
        if t1.is_readonly():
1315
 
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1224
            self.build_tree_contents([('b/d', b'newfile\n')])
1316
1225
        else:
1317
 
            t2.put_bytes('d', 'newfile\n')
 
1226
            t2.put_bytes('d', b'newfile\n')
1318
1227
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1228
        self.assertTrue(t1.has('b/d'))
 
1229
        self.assertTrue(t2.has('d'))
 
1230
        self.assertTrue(t3.has('b/d'))
1322
1231
 
1323
1232
    def test_clone_to_root(self):
1324
1233
        orig_transport = self.get_transport()
1328
1237
        new_transport = root_transport.clone("..")
1329
1238
        # as we are walking up directories, the path must be
1330
1239
        # growing less, except at the top
1331
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1332
 
            or new_transport.base == root_transport.base)
 
1240
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1241
                        new_transport.base == root_transport.base)
1333
1242
        while new_transport.base != root_transport.base:
1334
1243
            root_transport = new_transport
1335
1244
            new_transport = root_transport.clone("..")
1336
1245
            # as we are walking up directories, the path must be
1337
1246
            # growing less, except at the top
1338
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1339
 
                or new_transport.base == root_transport.base)
 
1247
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1248
                            new_transport.base == root_transport.base)
1340
1249
 
1341
1250
        # Cloning to "/" should take us to exactly the same location.
1342
1251
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1352
1261
        orig_transport = self.get_transport()
1353
1262
        root_transport = orig_transport.clone('/')
1354
1263
        self.assertEqual(root_transport.base + '.bzr/',
1355
 
            root_transport.clone('.bzr').base)
 
1264
                         root_transport.clone('.bzr').base)
1356
1265
 
1357
1266
    def test_base_url(self):
1358
1267
        t = self.get_transport()
1398
1307
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1308
                         transport.abspath("/foo"))
1400
1309
 
 
1310
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1311
    def test_win32_abspath(self):
1402
1312
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1313
        # other platforms too, but then osutils does platform specific
1408
1318
 
1409
1319
        # smoke test for abspath on win32.
1410
1320
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1321
        transport = _mod_transport.get_transport_from_url("file:///")
 
1322
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1323
 
1414
1324
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1325
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1326
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1327
 
1418
1328
    def test_local_abspath(self):
1419
1329
        transport = self.get_transport()
1420
1330
        try:
1421
1331
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1332
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1333
            # should be formattable
1424
1334
            s = str(e)
1425
1335
        else:
1452
1362
                         'isolated/dir/',
1453
1363
                         'isolated/dir/foo',
1454
1364
                         'isolated/dir/bar',
1455
 
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1365
                         'isolated/dir/b%25z',  # make sure quoting is correct
1456
1366
                         'isolated/bar'],
1457
1367
                        transport=transport)
1458
1368
        paths = set(transport.iter_files_recursive())
1459
1369
        # nb the directories are not converted
1460
1370
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
1462
 
                         'isolated/dir/bar',
1463
 
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1371
                         {'isolated/dir/foo',
 
1372
                          'isolated/dir/bar',
 
1373
                          'isolated/dir/b%2525z',
 
1374
                          'isolated/bar'})
1465
1375
        sub_transport = transport.clone('isolated')
1466
1376
        paths = set(sub_transport.iter_files_recursive())
1467
1377
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1378
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1379
 
1470
1380
    def test_copy_tree(self):
1471
1381
        # TODO: test file contents and permissions are preserved. This test was
1482
1392
                         'from/dir/',
1483
1393
                         'from/dir/foo',
1484
1394
                         'from/dir/bar',
1485
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1395
                         'from/dir/b%25z',  # make sure quoting is correct
1486
1396
                         'from/bar'],
1487
1397
                        transport=transport)
1488
1398
        transport.copy_tree('from', 'to')
1489
1399
        paths = set(transport.iter_files_recursive())
1490
1400
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
1492
 
                         'from/dir/bar',
1493
 
                         'from/dir/b%2525z',
1494
 
                         'from/bar',
1495
 
                         'to/dir/foo',
1496
 
                         'to/dir/bar',
1497
 
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1401
                         {'from/dir/foo',
 
1402
                          'from/dir/bar',
 
1403
                          'from/dir/b%2525z',
 
1404
                          'from/bar',
 
1405
                          'to/dir/foo',
 
1406
                          'to/dir/bar',
 
1407
                          'to/dir/b%2525z',
 
1408
                          'to/bar', })
1499
1409
 
1500
1410
    def test_copy_tree_to_transport(self):
1501
1411
        transport = self.get_transport()
1509
1419
                         'from/dir/',
1510
1420
                         'from/dir/foo',
1511
1421
                         'from/dir/bar',
1512
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1422
                         'from/dir/b%25z',  # make sure quoting is correct
1513
1423
                         'from/bar'],
1514
1424
                        transport=transport)
1515
1425
        from_transport = transport.clone('from')
1518
1428
        from_transport.copy_tree_to_transport(to_transport)
1519
1429
        paths = set(transport.iter_files_recursive())
1520
1430
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
1522
 
                         'from/dir/bar',
1523
 
                         'from/dir/b%2525z',
1524
 
                         'from/bar',
1525
 
                         'to/dir/foo',
1526
 
                         'to/dir/bar',
1527
 
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1431
                         {'from/dir/foo',
 
1432
                          'from/dir/bar',
 
1433
                          'from/dir/b%2525z',
 
1434
                          'from/bar',
 
1435
                          'to/dir/foo',
 
1436
                          'to/dir/bar',
 
1437
                          'to/dir/b%2525z',
 
1438
                          'to/bar', })
1529
1439
 
1530
1440
    def test_unicode_paths(self):
1531
1441
        """Test that we can read/write files with Unicode names."""
1535
1445
        # '\xe5' and '\xe4' actually map to the same file
1536
1446
        # adding a suffix kicks in the 'preserving but insensitive'
1537
1447
        # route, and maintains the right files
1538
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1539
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1540
 
                 u'\u017d', # Z with umlat iso-8859-2
1541
 
                 u'\u062c', # Arabic j
1542
 
                 u'\u0410', # Russian A
1543
 
                 u'\u65e5', # Kanji person
1544
 
                ]
 
1448
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1449
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1450
                 u'\u017d',  # Z with umlat iso-8859-2
 
1451
                 u'\u062c',  # Arabic j
 
1452
                 u'\u0410',  # Russian A
 
1453
                 u'\u65e5',  # Kanji person
 
1454
                 ]
1545
1455
 
1546
1456
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1457
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1458
            self.knownFailure("test server cannot handle unicode paths")
1549
1459
 
1550
1460
        try:
1551
1461
            self.build_tree(files, transport=t, line_endings='binary')
1552
1462
        except UnicodeError:
1553
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1463
            raise TestSkipped(
 
1464
                "cannot handle unicode paths in current encoding")
1554
1465
 
1555
1466
        # A plain unicode string is not a valid url
1556
1467
        for fname in files:
1557
 
            self.assertRaises(InvalidURL, t.get, fname)
 
1468
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
1558
1469
 
1559
1470
        for fname in files:
1560
1471
            fname_utf8 = fname.encode('utf-8')
1561
 
            contents = 'contents of %s\n' % (fname_utf8,)
 
1472
            contents = b'contents of %s\n' % (fname_utf8,)
1562
1473
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1563
1474
 
1564
1475
    def test_connect_twice_is_same_content(self):
1567
1478
        transport = self.get_transport()
1568
1479
        if transport.is_readonly():
1569
1480
            return
1570
 
        transport.put_bytes('foo', 'bar')
 
1481
        transport.put_bytes('foo', b'bar')
1571
1482
        transport3 = self.get_transport()
1572
 
        self.check_transport_contents('bar', transport3, 'foo')
 
1483
        self.check_transport_contents(b'bar', transport3, 'foo')
1573
1484
 
1574
1485
        # now opening at a relative url should give use a sane result:
1575
1486
        transport.mkdir('newdir')
1576
1487
        transport5 = self.get_transport('newdir')
1577
1488
        transport6 = transport5.clone('..')
1578
 
        self.check_transport_contents('bar', transport6, 'foo')
 
1489
        self.check_transport_contents(b'bar', transport6, 'foo')
1579
1490
 
1580
1491
    def test_lock_write(self):
1581
1492
        """Test transport-level write locks.
1584
1495
        """
1585
1496
        transport = self.get_transport()
1586
1497
        if transport.is_readonly():
1587
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1498
            self.assertRaises(TransportNotPossible,
 
1499
                              transport.lock_write, 'foo')
1588
1500
            return
1589
 
        transport.put_bytes('lock', '')
 
1501
        transport.put_bytes('lock', b'')
1590
1502
        try:
1591
1503
            lock = transport.lock_write('lock')
1592
1504
        except TransportNotPossible:
1602
1514
        """
1603
1515
        transport = self.get_transport()
1604
1516
        if transport.is_readonly():
1605
 
            file('lock', 'w').close()
 
1517
            open('lock', 'w').close()
1606
1518
        else:
1607
 
            transport.put_bytes('lock', '')
 
1519
            transport.put_bytes('lock', b'')
1608
1520
        try:
1609
1521
            lock = transport.lock_read('lock')
1610
1522
        except TransportNotPossible:
1616
1528
    def test_readv(self):
1617
1529
        transport = self.get_transport()
1618
1530
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1531
            with open('a', 'w') as f:
 
1532
                f.write('0123456789')
1620
1533
        else:
1621
 
            transport.put_bytes('a', '0123456789')
 
1534
            transport.put_bytes('a', b'0123456789')
1622
1535
 
1623
1536
        d = list(transport.readv('a', ((0, 1),)))
1624
 
        self.assertEqual(d[0], (0, '0'))
 
1537
        self.assertEqual(d[0], (0, b'0'))
1625
1538
 
1626
1539
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
 
        self.assertEqual(d[0], (0, '0'))
1628
 
        self.assertEqual(d[1], (1, '1'))
1629
 
        self.assertEqual(d[2], (3, '34'))
1630
 
        self.assertEqual(d[3], (9, '9'))
 
1540
        self.assertEqual(d[0], (0, b'0'))
 
1541
        self.assertEqual(d[1], (1, b'1'))
 
1542
        self.assertEqual(d[2], (3, b'34'))
 
1543
        self.assertEqual(d[3], (9, b'9'))
1631
1544
 
1632
1545
    def test_readv_out_of_order(self):
1633
1546
        transport = self.get_transport()
1634
1547
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1548
            with open('a', 'w') as f:
 
1549
                f.write('0123456789')
1636
1550
        else:
1637
 
            transport.put_bytes('a', '01234567890')
 
1551
            transport.put_bytes('a', b'01234567890')
1638
1552
 
1639
1553
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
 
        self.assertEqual(d[0], (1, '1'))
1641
 
        self.assertEqual(d[1], (9, '9'))
1642
 
        self.assertEqual(d[2], (0, '0'))
1643
 
        self.assertEqual(d[3], (3, '34'))
 
1554
        self.assertEqual(d[0], (1, b'1'))
 
1555
        self.assertEqual(d[1], (9, b'9'))
 
1556
        self.assertEqual(d[2], (0, b'0'))
 
1557
        self.assertEqual(d[3], (3, b'34'))
1644
1558
 
1645
1559
    def test_readv_with_adjust_for_latency(self):
1646
1560
        transport = self.get_transport()
1651
1565
        # reference the returned data with the random data. To avoid doing
1652
1566
        # multiple large random byte look ups we do several tests on the same
1653
1567
        # backing data.
1654
 
        content = osutils.rand_bytes(200*1024)
 
1568
        content = osutils.rand_bytes(200 * 1024)
1655
1569
        content_size = len(content)
1656
1570
        if transport.is_readonly():
1657
1571
            self.build_tree_contents([('a', content)])
1658
1572
        else:
1659
1573
            transport.put_bytes('a', content)
 
1574
 
1660
1575
        def check_result_data(result_vector):
1661
1576
            for item in result_vector:
1662
1577
                data_len = len(item[1])
1664
1579
 
1665
1580
        # start corner case
1666
1581
        result = list(transport.readv('a', ((0, 30),),
1667
 
            adjust_for_latency=True, upper_limit=content_size))
 
1582
                                      adjust_for_latency=True, upper_limit=content_size))
1668
1583
        # we expect 1 result, from 0, to something > 30
1669
1584
        self.assertEqual(1, len(result))
1670
1585
        self.assertEqual(0, result[0][0])
1672
1587
        check_result_data(result)
1673
1588
        # end of file corner case
1674
1589
        result = list(transport.readv('a', ((204700, 100),),
1675
 
            adjust_for_latency=True, upper_limit=content_size))
 
1590
                                      adjust_for_latency=True, upper_limit=content_size))
1676
1591
        # we expect 1 result, from 204800- its length, to the end
1677
1592
        self.assertEqual(1, len(result))
1678
1593
        data_len = len(result[0][1])
1679
 
        self.assertEqual(204800-data_len, result[0][0])
 
1594
        self.assertEqual(204800 - data_len, result[0][0])
1680
1595
        self.assertTrue(data_len >= 100)
1681
1596
        check_result_data(result)
1682
1597
        # out of order ranges are made in order
1683
1598
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1684
 
            adjust_for_latency=True, upper_limit=content_size))
 
1599
                                      adjust_for_latency=True, upper_limit=content_size))
1685
1600
        # we expect 2 results, in order, start and end.
1686
1601
        self.assertEqual(2, len(result))
1687
1602
        # start
1690
1605
        self.assertTrue(data_len >= 30)
1691
1606
        # end
1692
1607
        data_len = len(result[1][1])
1693
 
        self.assertEqual(204800-data_len, result[1][0])
 
1608
        self.assertEqual(204800 - data_len, result[1][0])
1694
1609
        self.assertTrue(data_len >= 100)
1695
1610
        check_result_data(result)
1696
1611
        # close ranges get combined (even if out of order)
1697
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1612
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1698
1613
            result = list(transport.readv('a', request_vector,
1699
 
                adjust_for_latency=True, upper_limit=content_size))
 
1614
                                          adjust_for_latency=True, upper_limit=content_size))
1700
1615
            self.assertEqual(1, len(result))
1701
1616
            data_len = len(result[0][1])
1702
1617
            # minimum length is from 400 to 1034 - 634
1710
1625
        transport = self.get_transport()
1711
1626
        # test from observed failure case.
1712
1627
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1628
            with open('a', 'w') as f:
 
1629
                f.write('a' * 1024 * 1024)
1714
1630
        else:
1715
 
            transport.put_bytes('a', 'a'*1024*1024)
 
1631
            transport.put_bytes('a', b'a' * 1024 * 1024)
1716
1632
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
 
            (465373, 800), (947422, 800)]
1719
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1720
 
        found_items = [False]*9
 
1633
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1634
                         (465373, 800), (947422, 800)]
 
1635
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1636
        found_items = [False] * 9
1721
1637
        for pos, (start, length) in enumerate(broken_vector):
1722
1638
            # check the range is covered by the result
1723
1639
            for offset, data in results:
1724
1640
                if offset <= start and start + length <= offset + len(data):
1725
1641
                    found_items[pos] = True
1726
 
        self.assertEqual([True]*9, found_items)
 
1642
        self.assertEqual([True] * 9, found_items)
1727
1643
 
1728
1644
    def test_get_with_open_write_stream_sees_all_content(self):
1729
1645
        t = self.get_transport()
1730
1646
        if t.is_readonly():
1731
1647
            return
1732
 
        handle = t.open_write_stream('foo')
1733
 
        try:
1734
 
            handle.write('bcd')
1735
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1736
 
        finally:
1737
 
            handle.close()
 
1648
        with t.open_write_stream('foo') as handle:
 
1649
            handle.write(b'bcd')
 
1650
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1651
                t.readv('foo', ((0, 1), (2, 1)))))
1738
1652
 
1739
1653
    def test_get_smart_medium(self):
1740
1654
        """All transports must either give a smart medium, or know they can't.
1742
1656
        transport = self.get_transport()
1743
1657
        try:
1744
1658
            client_medium = transport.get_smart_medium()
1745
 
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1746
1659
        except errors.NoSmartMedium:
1747
1660
            # as long as we got it we're fine
1748
1661
            pass
 
1662
        else:
 
1663
            from ..bzr.smart import medium
 
1664
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1749
1665
 
1750
1666
    def test_readv_short_read(self):
1751
1667
        transport = self.get_transport()
1752
1668
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1669
            with open('a', 'w') as f:
 
1670
                f.write('0123456789')
1754
1671
        else:
1755
 
            transport.put_bytes('a', '01234567890')
 
1672
            transport.put_bytes('a', b'01234567890')
1756
1673
 
1757
1674
        # This is intentionally reading off the end of the file
1758
1675
        # since we are sure that it cannot get there
1759
1676
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
1677
                               # Can be raised by paramiko
1761
1678
                               AssertionError),
1762
 
                              transport.readv, 'a', [(1,1), (8,10)])
 
1679
                              transport.readv, 'a', [(1, 1), (8, 10)])
1763
1680
 
1764
1681
        # This is trying to seek past the end of the file, it should
1765
1682
        # also raise a special error
1766
1683
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
 
                              transport.readv, 'a', [(12,2)])
 
1684
                              transport.readv, 'a', [(12, 2)])
 
1685
 
 
1686
    def test_no_segment_parameters(self):
 
1687
        """Segment parameters should be stripped and stored in
 
1688
        transport.segment_parameters."""
 
1689
        transport = self.get_transport("foo")
 
1690
        self.assertEqual({}, transport.get_segment_parameters())
 
1691
 
 
1692
    def test_segment_parameters(self):
 
1693
        """Segment parameters should be stripped and stored in
 
1694
        transport.get_segment_parameters()."""
 
1695
        base_url = self._server.get_url()
 
1696
        parameters = {"key1": "val1", "key2": "val2"}
 
1697
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1698
        transport = _mod_transport.get_transport_from_url(url)
 
1699
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1700
 
 
1701
    def test_set_segment_parameters(self):
 
1702
        """Segment parameters can be set and show up in base."""
 
1703
        transport = self.get_transport("foo")
 
1704
        orig_base = transport.base
 
1705
        transport.set_segment_parameter("arm", "board")
 
1706
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1707
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1708
        transport.set_segment_parameter("arm", None)
 
1709
        transport.set_segment_parameter("nonexistant", None)
 
1710
        self.assertEqual({}, transport.get_segment_parameters())
 
1711
        self.assertEqual(orig_base, transport.base)
 
1712
 
 
1713
    def test_stat_symlink(self):
 
1714
        # if a transport points directly to a symlink (and supports symlinks
 
1715
        # at all) you can tell this.  helps with bug 32669.
 
1716
        t = self.get_transport()
 
1717
        try:
 
1718
            t.symlink('target', 'link')
 
1719
        except TransportNotPossible:
 
1720
            raise TestSkipped("symlinks not supported")
 
1721
        t2 = t.clone('link')
 
1722
        st = t2.stat('')
 
1723
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1724
 
 
1725
    def test_abspath_url_unquote_unreserved(self):
 
1726
        """URLs from abspath should have unreserved characters unquoted
 
1727
 
 
1728
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1729
        """
 
1730
        t = self.get_transport()
 
1731
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1732
        self.assertEqual(t.base + "-.09AZ_az~",
 
1733
                         t.abspath(needlessly_escaped_dir))
 
1734
 
 
1735
    def test_clone_url_unquote_unreserved(self):
 
1736
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1737
 
 
1738
        Cloned transports should be prefix comparable for things like the
 
1739
        isolation checking of tests, see lp:842223 for more.
 
1740
        """
 
1741
        t1 = self.get_transport()
 
1742
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1743
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1744
        t2 = t1.clone(needlessly_escaped_dir)
 
1745
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1746
 
 
1747
    def test_hook_post_connection_one(self):
 
1748
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1749
        log = []
 
1750
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1751
        t = self.get_transport()
 
1752
        self.assertEqual([], log)
 
1753
        t.has("non-existant")
 
1754
        if isinstance(t, RemoteTransport):
 
1755
            self.assertEqual([t.get_smart_medium()], log)
 
1756
        elif isinstance(t, ConnectedTransport):
 
1757
            self.assertEqual([t], log)
 
1758
        else:
 
1759
            self.assertEqual([], log)
 
1760
 
 
1761
    def test_hook_post_connection_multi(self):
 
1762
        """Fire post_connect hook once per unshared underlying connection"""
 
1763
        log = []
 
1764
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1765
        t1 = self.get_transport()
 
1766
        t2 = t1.clone(".")
 
1767
        t3 = self.get_transport()
 
1768
        self.assertEqual([], log)
 
1769
        t1.has("x")
 
1770
        t2.has("x")
 
1771
        t3.has("x")
 
1772
        if isinstance(t1, RemoteTransport):
 
1773
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1774
        elif isinstance(t1, ConnectedTransport):
 
1775
            self.assertEqual([t1, t3], log)
 
1776
        else:
 
1777
            self.assertEqual([], log)