/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Breezy landing bot
  • Author(s): Jelmer Vernooij
  • Date: 2020-07-28 02:47:10 UTC
  • mfrom: (7519.1.1 merge-3.1)
  • Revision ID: breezy.the.bot@gmail.com-20200728024710-a2ylds219f1lsl62
Merge lp:brz/3.1.

Merged from https://code.launchpad.net/~jelmer/brz/merge-3.1/+merge/388173

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
import itertools
 
23
from io import BytesIO
24
24
import os
25
 
from cStringIO import StringIO
26
 
from StringIO import StringIO as pyStringIO
27
25
import stat
28
26
import sys
29
 
import unittest
30
27
 
31
 
from bzrlib import (
 
28
from .. import (
32
29
    errors,
33
30
    osutils,
 
31
    pyutils,
34
32
    tests,
 
33
    transport as _mod_transport,
35
34
    urlutils,
36
35
    )
37
 
from bzrlib.errors import (ConnectionError,
38
 
                           DirectoryNotEmpty,
39
 
                           FileExists,
40
 
                           InvalidURL,
41
 
                           LockError,
42
 
                           NoSuchFile,
43
 
                           NotLocalUrl,
44
 
                           PathError,
45
 
                           TransportNotPossible,
46
 
                           )
47
 
from bzrlib.osutils import getcwd
48
 
from bzrlib.smart import medium
49
 
from bzrlib.tests import (
50
 
    TestCaseInTempDir,
 
36
from ..errors import (ConnectionError,
 
37
                      FileExists,
 
38
                      NoSuchFile,
 
39
                      PathError,
 
40
                      TransportNotPossible,
 
41
                      )
 
42
from ..osutils import getcwd
 
43
from . import (
51
44
    TestSkipped,
52
45
    TestNotApplicable,
53
46
    multiply_tests,
54
47
    )
55
 
from bzrlib.tests import test_server
56
 
from bzrlib.tests.test_transport import TestTransportImplementation
57
 
from bzrlib.transport import (
 
48
from . import test_server
 
49
from .test_transport import TestTransportImplementation
 
50
from ..transport import (
58
51
    ConnectedTransport,
59
 
    get_transport,
 
52
    Transport,
60
53
    _get_transport_modules,
61
54
    )
62
 
from bzrlib.transport.memory import MemoryTransport
 
55
from ..transport.memory import MemoryTransport
 
56
from ..transport.remote import RemoteTransport
63
57
 
64
58
 
65
59
def get_transport_test_permutations(module):
78
72
    for module in _get_transport_modules():
79
73
        try:
80
74
            permutations = get_transport_test_permutations(
81
 
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
75
                pyutils.get_named_object(module))
82
76
            for (klass, server_factory) in permutations:
83
77
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
84
 
                    {"transport_class":klass,
85
 
                     "transport_server":server_factory})
 
78
                            {"transport_class": klass,
 
79
                             "transport_server": server_factory})
86
80
                result.append(scenario)
87
 
        except errors.DependencyNotPresent, e:
 
81
        except errors.DependencyNotPresent as e:
88
82
            # Continue even if a dependency prevents us
89
83
            # from adding this test
90
84
            pass
91
85
    return result
92
86
 
93
87
 
94
 
def load_tests(standard_tests, module, loader):
 
88
def load_tests(loader, standard_tests, pattern):
95
89
    """Multiply tests for tranport implementations."""
96
90
    result = loader.suiteClass()
97
91
    scenarios = transport_test_permutations()
102
96
 
103
97
    def setUp(self):
104
98
        super(TransportTests, self).setUp()
105
 
        self._captureVar('BZR_NO_SMART_VFS', None)
 
99
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
106
100
 
107
101
    def check_transport_contents(self, content, transport, relpath):
108
 
        """Check that transport.get(relpath).read() == content."""
109
 
        self.assertEqualDiff(content, transport.get(relpath).read())
 
102
        """Check that transport.get_bytes(relpath) == content."""
 
103
        self.assertEqualDiff(content, transport.get_bytes(relpath))
110
104
 
111
105
    def test_ensure_base_missing(self):
112
106
        """.ensure_base() should create the directory if it doesn't exist"""
156
150
        self.assertEqual(True, t.has('a'))
157
151
        self.assertEqual(False, t.has('c'))
158
152
        self.assertEqual(True, t.has(urlutils.escape('%')))
159
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
160
 
                                           'e', 'f', 'g', 'h'])),
161
 
                         [True, True, False, False,
162
 
                          True, False, True, False])
163
153
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
164
154
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
165
155
                                           urlutils.escape('%%')]))
166
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
167
 
                                                'e', 'f', 'g', 'h']))),
168
 
                         [True, True, False, False,
169
 
                          True, False, True, False])
170
156
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
171
157
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
172
158
 
183
169
    def test_get(self):
184
170
        t = self.get_transport()
185
171
 
186
 
        files = ['a', 'b', 'e', 'g']
187
 
        contents = ['contents of a\n',
188
 
                    'contents of b\n',
189
 
                    'contents of e\n',
190
 
                    'contents of g\n',
191
 
                    ]
192
 
        self.build_tree(files, transport=t, line_endings='binary')
193
 
        self.check_transport_contents('contents of a\n', t, 'a')
194
 
        content_f = t.get_multi(files)
195
 
        # Use itertools.izip() instead of use zip() or map(), since they fully
196
 
        # evaluate their inputs, the transport requests should be issued and
197
 
        # handled sequentially (we don't want to force transport to buffer).
198
 
        for content, f in itertools.izip(contents, content_f):
199
 
            self.assertEqual(content, f.read())
200
 
 
201
 
        content_f = t.get_multi(iter(files))
202
 
        # Use itertools.izip() for the same reason
203
 
        for content, f in itertools.izip(contents, content_f):
204
 
            self.assertEqual(content, f.read())
 
172
        files = ['a']
 
173
        content = b'contents of a\n'
 
174
        self.build_tree(['a'], transport=t, line_endings='binary')
 
175
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
176
        f = t.get('a')
 
177
        self.assertEqual(content, f.read())
205
178
 
206
179
    def test_get_unknown_file(self):
207
180
        t = self.get_transport()
208
181
        files = ['a', 'b']
209
 
        contents = ['contents of a\n',
210
 
                    'contents of b\n',
 
182
        contents = [b'contents of a\n',
 
183
                    b'contents of b\n',
211
184
                    ]
212
185
        self.build_tree(files, transport=t, line_endings='binary')
213
186
        self.assertRaises(NoSuchFile, t.get, 'c')
214
 
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
215
 
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
187
 
 
188
        def iterate_and_close(func, *args):
 
189
            for f in func(*args):
 
190
                # We call f.read() here because things like paramiko actually
 
191
                # spawn a thread to prefetch the content, which we want to
 
192
                # consume before we close the handle.
 
193
                content = f.read()
 
194
                f.close()
216
195
 
217
196
    def test_get_directory_read_gives_ReadError(self):
218
197
        """consistent errors for read() on a file returned by get()."""
238
217
        t = self.get_transport()
239
218
 
240
219
        files = ['a', 'b', 'e', 'g']
241
 
        contents = ['contents of a\n',
242
 
                    'contents of b\n',
243
 
                    'contents of e\n',
244
 
                    'contents of g\n',
 
220
        contents = [b'contents of a\n',
 
221
                    b'contents of b\n',
 
222
                    b'contents of e\n',
 
223
                    b'contents of g\n',
245
224
                    ]
246
225
        self.build_tree(files, transport=t, line_endings='binary')
247
 
        self.check_transport_contents('contents of a\n', t, 'a')
 
226
        self.check_transport_contents(b'contents of a\n', t, 'a')
248
227
 
249
228
        for content, fname in zip(contents, files):
250
229
            self.assertEqual(content, t.get_bytes(fname))
251
230
 
252
231
    def test_get_bytes_unknown_file(self):
253
232
        t = self.get_transport()
254
 
 
255
233
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
256
234
 
257
235
    def test_get_with_open_write_stream_sees_all_content(self):
258
236
        t = self.get_transport()
259
237
        if t.is_readonly():
260
238
            return
261
 
        handle = t.open_write_stream('foo')
262
 
        try:
263
 
            handle.write('b')
264
 
            self.assertEqual('b', t.get('foo').read())
265
 
        finally:
266
 
            handle.close()
 
239
        with t.open_write_stream('foo') as handle:
 
240
            handle.write(b'b')
 
241
            self.assertEqual(b'b', t.get_bytes('foo'))
267
242
 
268
243
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
269
244
        t = self.get_transport()
270
245
        if t.is_readonly():
271
246
            return
272
 
        handle = t.open_write_stream('foo')
273
 
        try:
274
 
            handle.write('b')
275
 
            self.assertEqual('b', t.get_bytes('foo'))
276
 
            self.assertEqual('b', t.get('foo').read())
277
 
        finally:
278
 
            handle.close()
 
247
        with t.open_write_stream('foo') as handle:
 
248
            handle.write(b'b')
 
249
            self.assertEqual(b'b', t.get_bytes('foo'))
 
250
            with t.get('foo') as f:
 
251
                self.assertEqual(b'b', f.read())
279
252
 
280
253
    def test_put_bytes(self):
281
254
        t = self.get_transport()
282
255
 
283
256
        if t.is_readonly():
284
257
            self.assertRaises(TransportNotPossible,
285
 
                    t.put_bytes, 'a', 'some text for a\n')
 
258
                              t.put_bytes, 'a', b'some text for a\n')
286
259
            return
287
260
 
288
 
        t.put_bytes('a', 'some text for a\n')
289
 
        self.failUnless(t.has('a'))
290
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
261
        t.put_bytes('a', b'some text for a\n')
 
262
        self.assertTrue(t.has('a'))
 
263
        self.check_transport_contents(b'some text for a\n', t, 'a')
291
264
 
292
265
        # The contents should be overwritten
293
 
        t.put_bytes('a', 'new text for a\n')
294
 
        self.check_transport_contents('new text for a\n', t, 'a')
 
266
        t.put_bytes('a', b'new text for a\n')
 
267
        self.check_transport_contents(b'new text for a\n', t, 'a')
295
268
 
296
269
        self.assertRaises(NoSuchFile,
297
 
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
270
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
298
271
 
299
272
    def test_put_bytes_non_atomic(self):
300
273
        t = self.get_transport()
301
274
 
302
275
        if t.is_readonly():
303
276
            self.assertRaises(TransportNotPossible,
304
 
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
277
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
305
278
            return
306
279
 
307
 
        self.failIf(t.has('a'))
308
 
        t.put_bytes_non_atomic('a', 'some text for a\n')
309
 
        self.failUnless(t.has('a'))
310
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
280
        self.assertFalse(t.has('a'))
 
281
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
282
        self.assertTrue(t.has('a'))
 
283
        self.check_transport_contents(b'some text for a\n', t, 'a')
311
284
        # Put also replaces contents
312
 
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
313
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
285
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
286
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
314
287
 
315
288
        # Make sure we can create another file
316
 
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
289
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
317
290
        # And overwrite 'a' with empty contents
318
 
        t.put_bytes_non_atomic('a', '')
319
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
320
 
        self.check_transport_contents('', t, 'a')
 
291
        t.put_bytes_non_atomic('a', b'')
 
292
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
293
        self.check_transport_contents(b'', t, 'a')
321
294
 
322
295
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
323
 
                                       'contents\n')
 
296
                          b'contents\n')
324
297
        # Now test the create_parent flag
325
298
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
326
 
                                       'contents\n')
327
 
        self.failIf(t.has('dir/a'))
328
 
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
299
                          b'contents\n')
 
300
        self.assertFalse(t.has('dir/a'))
 
301
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
329
302
                               create_parent_dir=True)
330
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
303
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
331
304
 
332
305
        # But we still get NoSuchFile if we can't make the parent dir
333
306
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
334
 
                                       'contents\n',
335
 
                                       create_parent_dir=True)
 
307
                          b'contents\n',
 
308
                          create_parent_dir=True)
336
309
 
337
310
    def test_put_bytes_permissions(self):
338
311
        t = self.get_transport()
342
315
        if not t._can_roundtrip_unix_modebits():
343
316
            # Can't roundtrip, so no need to run this test
344
317
            return
345
 
        t.put_bytes('mode644', 'test text\n', mode=0644)
346
 
        self.assertTransportMode(t, 'mode644', 0644)
347
 
        t.put_bytes('mode666', 'test text\n', mode=0666)
348
 
        self.assertTransportMode(t, 'mode666', 0666)
349
 
        t.put_bytes('mode600', 'test text\n', mode=0600)
350
 
        self.assertTransportMode(t, 'mode600', 0600)
 
318
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
319
        self.assertTransportMode(t, 'mode644', 0o644)
 
320
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
321
        self.assertTransportMode(t, 'mode666', 0o666)
 
322
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
323
        self.assertTransportMode(t, 'mode600', 0o600)
351
324
        # Yes, you can put_bytes a file such that it becomes readonly
352
 
        t.put_bytes('mode400', 'test text\n', mode=0400)
353
 
        self.assertTransportMode(t, 'mode400', 0400)
 
325
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
326
        self.assertTransportMode(t, 'mode400', 0o400)
354
327
 
355
328
        # The default permissions should be based on the current umask
356
329
        umask = osutils.get_umask()
357
 
        t.put_bytes('nomode', 'test text\n', mode=None)
358
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
330
        t.put_bytes('nomode', b'test text\n', mode=None)
 
331
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
359
332
 
360
333
    def test_put_bytes_non_atomic_permissions(self):
361
334
        t = self.get_transport()
365
338
        if not t._can_roundtrip_unix_modebits():
366
339
            # Can't roundtrip, so no need to run this test
367
340
            return
368
 
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
369
 
        self.assertTransportMode(t, 'mode644', 0644)
370
 
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
371
 
        self.assertTransportMode(t, 'mode666', 0666)
372
 
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
373
 
        self.assertTransportMode(t, 'mode600', 0600)
374
 
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
375
 
        self.assertTransportMode(t, 'mode400', 0400)
 
341
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
342
        self.assertTransportMode(t, 'mode644', 0o644)
 
343
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
344
        self.assertTransportMode(t, 'mode666', 0o666)
 
345
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
346
        self.assertTransportMode(t, 'mode600', 0o600)
 
347
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
348
        self.assertTransportMode(t, 'mode400', 0o400)
376
349
 
377
350
        # The default permissions should be based on the current umask
378
351
        umask = osutils.get_umask()
379
 
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
380
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
352
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
353
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
381
354
 
382
355
        # We should also be able to set the mode for a parent directory
383
356
        # when it is created
384
 
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
385
 
                               dir_mode=0700, create_parent_dir=True)
386
 
        self.assertTransportMode(t, 'dir700', 0700)
387
 
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
388
 
                               dir_mode=0770, create_parent_dir=True)
389
 
        self.assertTransportMode(t, 'dir770', 0770)
390
 
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
391
 
                               dir_mode=0777, create_parent_dir=True)
392
 
        self.assertTransportMode(t, 'dir777', 0777)
 
357
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
358
                               dir_mode=0o700, create_parent_dir=True)
 
359
        self.assertTransportMode(t, 'dir700', 0o700)
 
360
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
361
                               dir_mode=0o770, create_parent_dir=True)
 
362
        self.assertTransportMode(t, 'dir770', 0o770)
 
363
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
364
                               dir_mode=0o777, create_parent_dir=True)
 
365
        self.assertTransportMode(t, 'dir777', 0o777)
393
366
 
394
367
    def test_put_file(self):
395
368
        t = self.get_transport()
396
369
 
397
370
        if t.is_readonly():
398
371
            self.assertRaises(TransportNotPossible,
399
 
                    t.put_file, 'a', StringIO('some text for a\n'))
 
372
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
400
373
            return
401
374
 
402
 
        result = t.put_file('a', StringIO('some text for a\n'))
 
375
        result = t.put_file('a', BytesIO(b'some text for a\n'))
403
376
        # put_file returns the length of the data written
404
377
        self.assertEqual(16, result)
405
 
        self.failUnless(t.has('a'))
406
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
378
        self.assertTrue(t.has('a'))
 
379
        self.check_transport_contents(b'some text for a\n', t, 'a')
407
380
        # Put also replaces contents
408
 
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
381
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
409
382
        self.assertEqual(19, result)
410
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
383
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
411
384
        self.assertRaises(NoSuchFile,
412
385
                          t.put_file, 'path/doesnt/exist/c',
413
 
                              StringIO('contents'))
 
386
                          BytesIO(b'contents'))
414
387
 
415
388
    def test_put_file_non_atomic(self):
416
389
        t = self.get_transport()
417
390
 
418
391
        if t.is_readonly():
419
392
            self.assertRaises(TransportNotPossible,
420
 
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
393
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
421
394
            return
422
395
 
423
 
        self.failIf(t.has('a'))
424
 
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
425
 
        self.failUnless(t.has('a'))
426
 
        self.check_transport_contents('some text for a\n', t, 'a')
 
396
        self.assertFalse(t.has('a'))
 
397
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
398
        self.assertTrue(t.has('a'))
 
399
        self.check_transport_contents(b'some text for a\n', t, 'a')
427
400
        # Put also replaces contents
428
 
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
429
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
401
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
402
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
430
403
 
431
404
        # Make sure we can create another file
432
 
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
405
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
433
406
        # And overwrite 'a' with empty contents
434
 
        t.put_file_non_atomic('a', StringIO(''))
435
 
        self.check_transport_contents('contents for\nd\n', t, 'd')
436
 
        self.check_transport_contents('', t, 'a')
 
407
        t.put_file_non_atomic('a', BytesIO(b''))
 
408
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
409
        self.check_transport_contents(b'', t, 'a')
437
410
 
438
411
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
439
 
                                       StringIO('contents\n'))
 
412
                          BytesIO(b'contents\n'))
440
413
        # Now test the create_parent flag
441
414
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
442
 
                                       StringIO('contents\n'))
443
 
        self.failIf(t.has('dir/a'))
444
 
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
415
                          BytesIO(b'contents\n'))
 
416
        self.assertFalse(t.has('dir/a'))
 
417
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
445
418
                              create_parent_dir=True)
446
 
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
419
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
447
420
 
448
421
        # But we still get NoSuchFile if we can't make the parent dir
449
422
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
450
 
                                       StringIO('contents\n'),
451
 
                                       create_parent_dir=True)
 
423
                          BytesIO(b'contents\n'),
 
424
                          create_parent_dir=True)
452
425
 
453
426
    def test_put_file_permissions(self):
454
427
 
459
432
        if not t._can_roundtrip_unix_modebits():
460
433
            # Can't roundtrip, so no need to run this test
461
434
            return
462
 
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
463
 
        self.assertTransportMode(t, 'mode644', 0644)
464
 
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
465
 
        self.assertTransportMode(t, 'mode666', 0666)
466
 
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
467
 
        self.assertTransportMode(t, 'mode600', 0600)
 
435
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
436
        self.assertTransportMode(t, 'mode644', 0o644)
 
437
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
438
        self.assertTransportMode(t, 'mode666', 0o666)
 
439
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
440
        self.assertTransportMode(t, 'mode600', 0o600)
468
441
        # Yes, you can put a file such that it becomes readonly
469
 
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
470
 
        self.assertTransportMode(t, 'mode400', 0400)
 
442
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
443
        self.assertTransportMode(t, 'mode400', 0o400)
471
444
        # The default permissions should be based on the current umask
472
445
        umask = osutils.get_umask()
473
 
        t.put_file('nomode', StringIO('test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
446
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
447
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
475
448
 
476
449
    def test_put_file_non_atomic_permissions(self):
477
450
        t = self.get_transport()
481
454
        if not t._can_roundtrip_unix_modebits():
482
455
            # Can't roundtrip, so no need to run this test
483
456
            return
484
 
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
485
 
        self.assertTransportMode(t, 'mode644', 0644)
486
 
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
487
 
        self.assertTransportMode(t, 'mode666', 0666)
488
 
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
489
 
        self.assertTransportMode(t, 'mode600', 0600)
 
457
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
458
        self.assertTransportMode(t, 'mode644', 0o644)
 
459
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
460
        self.assertTransportMode(t, 'mode666', 0o666)
 
461
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
462
        self.assertTransportMode(t, 'mode600', 0o600)
490
463
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
491
 
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
492
 
        self.assertTransportMode(t, 'mode400', 0400)
 
464
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
465
        self.assertTransportMode(t, 'mode400', 0o400)
493
466
 
494
467
        # The default permissions should be based on the current umask
495
468
        umask = osutils.get_umask()
496
 
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
497
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
469
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
470
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
498
471
 
499
472
        # We should also be able to set the mode for a parent directory
500
473
        # when it is created
501
 
        sio = StringIO()
502
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
503
 
                              dir_mode=0700, create_parent_dir=True)
504
 
        self.assertTransportMode(t, 'dir700', 0700)
505
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
506
 
                              dir_mode=0770, create_parent_dir=True)
507
 
        self.assertTransportMode(t, 'dir770', 0770)
508
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
509
 
                              dir_mode=0777, create_parent_dir=True)
510
 
        self.assertTransportMode(t, 'dir777', 0777)
 
474
        sio = BytesIO()
 
475
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
476
                              dir_mode=0o700, create_parent_dir=True)
 
477
        self.assertTransportMode(t, 'dir700', 0o700)
 
478
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
479
                              dir_mode=0o770, create_parent_dir=True)
 
480
        self.assertTransportMode(t, 'dir770', 0o770)
 
481
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
482
                              dir_mode=0o777, create_parent_dir=True)
 
483
        self.assertTransportMode(t, 'dir777', 0o777)
511
484
 
512
485
    def test_put_bytes_unicode(self):
513
 
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
514
 
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
515
 
        # (we don't want to encode unicode here at all, callers should be
516
 
        # strictly passing bytes to put_bytes), but we allow it for backwards
517
 
        # compatibility.  At some point we should use a specific exception.
518
 
        # See https://bugs.launchpad.net/bzr/+bug/106898.
519
486
        t = self.get_transport()
520
487
        if t.is_readonly():
521
488
            return
522
489
        unicode_string = u'\u1234'
523
 
        self.assertRaises(
524
 
            (AssertionError, UnicodeEncodeError),
525
 
            t.put_bytes, 'foo', unicode_string)
526
 
 
527
 
    def test_put_file_unicode(self):
528
 
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
529
 
        # This situation can happen (and has) if code is careless about the type
530
 
        # of "string" they initialise/write to a StringIO with.  We cannot use
531
 
        # cStringIO, because it never returns unicode from read.
532
 
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
533
 
        # raise, but we raise it for hysterical raisins.
534
 
        t = self.get_transport()
535
 
        if t.is_readonly():
536
 
            return
537
 
        unicode_file = pyStringIO(u'\u1234')
538
 
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
490
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
539
491
 
540
492
    def test_mkdir(self):
541
493
        t = self.get_transport()
546
498
            # defined for the transport interface.
547
499
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
548
500
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
549
 
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
550
 
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
501
            self.assertRaises(TransportNotPossible,
 
502
                              t.mkdir, 'path/doesnt/exist')
551
503
            return
552
504
        # Test mkdir
553
505
        t.mkdir('dir_a')
557
509
        t.mkdir('dir_b')
558
510
        self.assertEqual(t.has('dir_b'), True)
559
511
 
560
 
        t.mkdir_multi(['dir_c', 'dir_d'])
561
 
 
562
 
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
563
 
        self.assertEqual(list(t.has_multi(
564
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
565
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
566
 
            [True, True, True, False,
567
 
             True, True, True, True])
 
512
        self.assertEqual([t.has(n) for n in
 
513
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
514
                         [True, True, False, True])
568
515
 
569
516
        # we were testing that a local mkdir followed by a transport
570
517
        # mkdir failed thusly, but given that we * in one process * do not
575
522
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
576
523
 
577
524
        # Test get/put in sub-directories
578
 
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
579
 
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
580
 
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
581
 
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
525
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
526
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
527
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
528
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
582
529
 
583
530
        # mkdir of a dir with an absent parent
584
531
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
591
538
            # no sense testing on this transport
592
539
            return
593
540
        # Test mkdir with a mode
594
 
        t.mkdir('dmode755', mode=0755)
595
 
        self.assertTransportMode(t, 'dmode755', 0755)
596
 
        t.mkdir('dmode555', mode=0555)
597
 
        self.assertTransportMode(t, 'dmode555', 0555)
598
 
        t.mkdir('dmode777', mode=0777)
599
 
        self.assertTransportMode(t, 'dmode777', 0777)
600
 
        t.mkdir('dmode700', mode=0700)
601
 
        self.assertTransportMode(t, 'dmode700', 0700)
602
 
        t.mkdir_multi(['mdmode755'], mode=0755)
603
 
        self.assertTransportMode(t, 'mdmode755', 0755)
 
541
        t.mkdir('dmode755', mode=0o755)
 
542
        self.assertTransportMode(t, 'dmode755', 0o755)
 
543
        t.mkdir('dmode555', mode=0o555)
 
544
        self.assertTransportMode(t, 'dmode555', 0o555)
 
545
        t.mkdir('dmode777', mode=0o777)
 
546
        self.assertTransportMode(t, 'dmode777', 0o777)
 
547
        t.mkdir('dmode700', mode=0o700)
 
548
        self.assertTransportMode(t, 'dmode700', 0o700)
 
549
        t.mkdir('mdmode755', mode=0o755)
 
550
        self.assertTransportMode(t, 'mdmode755', 0o755)
604
551
 
605
552
        # Default mode should be based on umask
606
553
        umask = osutils.get_umask()
607
554
        t.mkdir('dnomode', mode=None)
608
 
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
555
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
609
556
 
610
557
    def test_opening_a_file_stream_creates_file(self):
611
558
        t = self.get_transport()
613
560
            return
614
561
        handle = t.open_write_stream('foo')
615
562
        try:
616
 
            self.assertEqual('', t.get_bytes('foo'))
 
563
            self.assertEqual(b'', t.get_bytes('foo'))
617
564
        finally:
618
565
            handle.close()
619
566
 
620
567
    def test_opening_a_file_stream_can_set_mode(self):
621
568
        t = self.get_transport()
622
569
        if t.is_readonly():
 
570
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
571
                              t.open_write_stream, 'foo')
623
572
            return
624
573
        if not t._can_roundtrip_unix_modebits():
625
574
            # Can't roundtrip, so no need to run this test
626
575
            return
 
576
 
627
577
        def check_mode(name, mode, expected):
628
578
            handle = t.open_write_stream(name, mode=mode)
629
579
            handle.close()
630
580
            self.assertTransportMode(t, name, expected)
631
 
        check_mode('mode644', 0644, 0644)
632
 
        check_mode('mode666', 0666, 0666)
633
 
        check_mode('mode600', 0600, 0600)
 
581
        check_mode('mode644', 0o644, 0o644)
 
582
        check_mode('mode666', 0o666, 0o666)
 
583
        check_mode('mode600', 0o600, 0o600)
634
584
        # The default permissions should be based on the current umask
635
 
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
585
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
636
586
 
637
587
    def test_copy_to(self):
638
588
        # FIXME: test:   same server to same server (partly done)
645
595
            self.build_tree(files, transport=transport_from)
646
596
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
647
597
            for f in files:
648
 
                self.check_transport_contents(transport_to.get(f).read(),
 
598
                self.check_transport_contents(transport_to.get_bytes(f),
649
599
                                              transport_from, f)
650
600
 
651
601
        t = self.get_transport()
 
602
        if t.__class__.__name__ == "SFTPTransport":
 
603
            self.skipTest("SFTP copy_to currently too flakey to use")
652
604
        temp_transport = MemoryTransport('memory:///')
653
605
        simple_copy_files(t, temp_transport)
654
606
        if not t.is_readonly():
656
608
            t2 = t.clone('copy_to_simple')
657
609
            simple_copy_files(t, t2)
658
610
 
659
 
 
660
611
        # Test that copying into a missing directory raises
661
612
        # NoSuchFile
662
613
        if t.is_readonly():
663
614
            self.build_tree(['e/', 'e/f'])
664
615
        else:
665
616
            t.mkdir('e')
666
 
            t.put_bytes('e/f', 'contents of e')
 
617
            t.put_bytes('e/f', b'contents of e')
667
618
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
668
619
        temp_transport.mkdir('e')
669
620
        t.copy_to(['e/f'], temp_transport)
674
625
        files = ['a', 'b', 'c', 'd']
675
626
        t.copy_to(iter(files), temp_transport)
676
627
        for f in files:
677
 
            self.check_transport_contents(temp_transport.get(f).read(),
 
628
            self.check_transport_contents(temp_transport.get_bytes(f),
678
629
                                          t, f)
679
630
        del temp_transport
680
631
 
681
 
        for mode in (0666, 0644, 0600, 0400):
 
632
        for mode in (0o666, 0o644, 0o600, 0o400):
682
633
            temp_transport = MemoryTransport("memory:///")
683
634
            t.copy_to(files, temp_transport, mode=mode)
684
635
            for f in files:
699
650
 
700
651
        if t.is_readonly():
701
652
            self.assertRaises(TransportNotPossible,
702
 
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
653
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
703
654
            return
704
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
705
 
        t.put_bytes('b', 'contents\nfor b\n')
 
655
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
656
        t.put_bytes('b', b'contents\nfor b\n')
706
657
 
707
658
        self.assertEqual(20,
708
 
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
659
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
709
660
 
710
661
        self.check_transport_contents(
711
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
662
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
712
663
            t, 'a')
713
664
 
714
665
        # a file with no parent should fail..
715
666
        self.assertRaises(NoSuchFile,
716
 
                          t.append_file, 'missing/path', StringIO('content'))
 
667
                          t.append_file, 'missing/path', BytesIO(b'content'))
717
668
 
718
669
        # And we can create new files, too
719
670
        self.assertEqual(0,
720
 
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
721
 
        self.check_transport_contents('some text\nfor a missing file\n',
 
671
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
672
        self.check_transport_contents(b'some text\nfor a missing file\n',
722
673
                                      t, 'c')
723
674
 
724
675
    def test_append_bytes(self):
726
677
 
727
678
        if t.is_readonly():
728
679
            self.assertRaises(TransportNotPossible,
729
 
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
680
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
730
681
            return
731
682
 
732
 
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
733
 
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
683
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
684
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
734
685
 
735
686
        self.assertEqual(20,
736
 
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
687
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
737
688
 
738
689
        self.check_transport_contents(
739
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
690
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
740
691
            t, 'a')
741
692
 
742
693
        # a file with no parent should fail..
743
694
        self.assertRaises(NoSuchFile,
744
 
                          t.append_bytes, 'missing/path', 'content')
745
 
 
746
 
    def test_append_multi(self):
747
 
        t = self.get_transport()
748
 
 
749
 
        if t.is_readonly():
750
 
            return
751
 
        t.put_bytes('a', 'diff\ncontents for\na\n'
752
 
                         'add\nsome\nmore\ncontents\n')
753
 
        t.put_bytes('b', 'contents\nfor b\n')
754
 
 
755
 
        self.assertEqual((43, 15),
756
 
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
757
 
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
758
 
 
759
 
        self.check_transport_contents(
760
 
            'diff\ncontents for\na\n'
761
 
            'add\nsome\nmore\ncontents\n'
762
 
            'and\nthen\nsome\nmore\n',
763
 
            t, 'a')
764
 
        self.check_transport_contents(
765
 
                'contents\nfor b\n'
766
 
                'some\nmore\nfor\nb\n',
767
 
                t, 'b')
768
 
 
769
 
        self.assertEqual((62, 31),
770
 
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
771
 
                                 ('b', StringIO('from an iterator\n'))])))
772
 
        self.check_transport_contents(
773
 
            'diff\ncontents for\na\n'
774
 
            'add\nsome\nmore\ncontents\n'
775
 
            'and\nthen\nsome\nmore\n'
776
 
            'a little bit more\n',
777
 
            t, 'a')
778
 
        self.check_transport_contents(
779
 
                'contents\nfor b\n'
780
 
                'some\nmore\nfor\nb\n'
781
 
                'from an iterator\n',
782
 
                t, 'b')
783
 
 
784
 
        self.assertEqual((80, 0),
785
 
            t.append_multi([('a', StringIO('some text in a\n')),
786
 
                            ('d', StringIO('missing file r\n'))]))
787
 
 
788
 
        self.check_transport_contents(
789
 
            'diff\ncontents for\na\n'
790
 
            'add\nsome\nmore\ncontents\n'
791
 
            'and\nthen\nsome\nmore\n'
792
 
            'a little bit more\n'
793
 
            'some text in a\n',
794
 
            t, 'a')
795
 
        self.check_transport_contents('missing file r\n', t, 'd')
 
695
                          t.append_bytes, 'missing/path', b'content')
796
696
 
797
697
    def test_append_file_mode(self):
798
698
        """Check that append accepts a mode parameter"""
800
700
        t = self.get_transport()
801
701
        if t.is_readonly():
802
702
            self.assertRaises(TransportNotPossible,
803
 
                t.append_file, 'f', StringIO('f'), mode=None)
 
703
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
804
704
            return
805
 
        t.append_file('f', StringIO('f'), mode=None)
 
705
        t.append_file('f', BytesIO(b'f'), mode=None)
806
706
 
807
707
    def test_append_bytes_mode(self):
808
708
        # check append_bytes accepts a mode
809
709
        t = self.get_transport()
810
710
        if t.is_readonly():
811
711
            self.assertRaises(TransportNotPossible,
812
 
                t.append_bytes, 'f', 'f', mode=None)
 
712
                              t.append_bytes, 'f', b'f', mode=None)
813
713
            return
814
 
        t.append_bytes('f', 'f', mode=None)
 
714
        t.append_bytes('f', b'f', mode=None)
815
715
 
816
716
    def test_delete(self):
817
717
        # TODO: Test Transport.delete
822
722
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
823
723
            return
824
724
 
825
 
        t.put_bytes('a', 'a little bit of text\n')
826
 
        self.failUnless(t.has('a'))
 
725
        t.put_bytes('a', b'a little bit of text\n')
 
726
        self.assertTrue(t.has('a'))
827
727
        t.delete('a')
828
 
        self.failIf(t.has('a'))
 
728
        self.assertFalse(t.has('a'))
829
729
 
830
730
        self.assertRaises(NoSuchFile, t.delete, 'a')
831
731
 
832
 
        t.put_bytes('a', 'a text\n')
833
 
        t.put_bytes('b', 'b text\n')
834
 
        t.put_bytes('c', 'c text\n')
 
732
        t.put_bytes('a', b'a text\n')
 
733
        t.put_bytes('b', b'b text\n')
 
734
        t.put_bytes('c', b'c text\n')
835
735
        self.assertEqual([True, True, True],
836
 
                list(t.has_multi(['a', 'b', 'c'])))
837
 
        t.delete_multi(['a', 'c'])
 
736
                         [t.has(n) for n in ['a', 'b', 'c']])
 
737
        t.delete('a')
 
738
        t.delete('c')
838
739
        self.assertEqual([False, True, False],
839
 
                list(t.has_multi(['a', 'b', 'c'])))
840
 
        self.failIf(t.has('a'))
841
 
        self.failUnless(t.has('b'))
842
 
        self.failIf(t.has('c'))
843
 
 
844
 
        self.assertRaises(NoSuchFile,
845
 
                t.delete_multi, ['a', 'b', 'c'])
846
 
 
847
 
        self.assertRaises(NoSuchFile,
848
 
                t.delete_multi, iter(['a', 'b', 'c']))
849
 
 
850
 
        t.put_bytes('a', 'another a text\n')
851
 
        t.put_bytes('c', 'another c text\n')
852
 
        t.delete_multi(iter(['a', 'b', 'c']))
 
740
                         [t.has(n) for n in ['a', 'b', 'c']])
 
741
        self.assertFalse(t.has('a'))
 
742
        self.assertTrue(t.has('b'))
 
743
        self.assertFalse(t.has('c'))
 
744
 
 
745
        for name in ['a', 'c', 'd']:
 
746
            self.assertRaises(NoSuchFile, t.delete, name)
853
747
 
854
748
        # We should have deleted everything
855
749
        # SftpServer creates control files in the
902
796
        if t.is_readonly():
903
797
            return
904
798
        t.mkdir('foo')
905
 
        t.put_bytes('foo-bar', '')
 
799
        t.put_bytes('foo-bar', b'')
906
800
        t.mkdir('foo-baz')
907
801
        t.rmdir('foo')
908
802
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
909
 
        self.failUnless(t.has('foo-bar'))
 
803
        self.assertTrue(t.has('foo-bar'))
910
804
 
911
805
    def test_rename_dir_succeeds(self):
912
806
        t = self.get_transport()
913
807
        if t.is_readonly():
914
 
            raise TestSkipped("transport is readonly")
 
808
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
809
                              t.rename, 'foo', 'bar')
 
810
            return
915
811
        t.mkdir('adir')
916
812
        t.mkdir('adir/asubdir')
917
813
        t.rename('adir', 'bdir')
922
818
        """Attempting to replace a nonemtpy directory should fail"""
923
819
        t = self.get_transport()
924
820
        if t.is_readonly():
925
 
            raise TestSkipped("transport is readonly")
 
821
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
822
                              t.rename, 'foo', 'bar')
 
823
            return
926
824
        t.mkdir('adir')
927
825
        t.mkdir('adir/asubdir')
928
826
        t.mkdir('bdir')
943
841
        t.mkdir('b')
944
842
        ta = t.clone('a')
945
843
        tb = t.clone('b')
946
 
        ta.put_bytes('f', 'aoeu')
 
844
        ta.put_bytes('f', b'aoeu')
947
845
        ta.rename('f', '../b/f')
948
846
        self.assertTrue(tb.has('f'))
949
847
        self.assertFalse(ta.has('f'))
991
889
        # creates control files in the working directory
992
890
        # perhaps all of this could be done in a subdirectory
993
891
 
994
 
        t.put_bytes('a', 'a first file\n')
995
 
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
892
        t.put_bytes('a', b'a first file\n')
 
893
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
996
894
 
997
895
        t.move('a', 'b')
998
 
        self.failUnless(t.has('b'))
999
 
        self.failIf(t.has('a'))
 
896
        self.assertTrue(t.has('b'))
 
897
        self.assertFalse(t.has('a'))
1000
898
 
1001
 
        self.check_transport_contents('a first file\n', t, 'b')
1002
 
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
899
        self.check_transport_contents(b'a first file\n', t, 'b')
 
900
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
1003
901
 
1004
902
        # Overwrite a file
1005
 
        t.put_bytes('c', 'c this file\n')
 
903
        t.put_bytes('c', b'c this file\n')
1006
904
        t.move('c', 'b')
1007
 
        self.failIf(t.has('c'))
1008
 
        self.check_transport_contents('c this file\n', t, 'b')
 
905
        self.assertFalse(t.has('c'))
 
906
        self.check_transport_contents(b'c this file\n', t, 'b')
1009
907
 
1010
908
        # TODO: Try to write a test for atomicity
1011
909
        # TODO: Test moving into a non-existent subdirectory
1012
 
        # TODO: Test Transport.move_multi
1013
910
 
1014
911
    def test_copy(self):
1015
912
        t = self.get_transport()
1017
914
        if t.is_readonly():
1018
915
            return
1019
916
 
1020
 
        t.put_bytes('a', 'a file\n')
 
917
        t.put_bytes('a', b'a file\n')
1021
918
        t.copy('a', 'b')
1022
 
        self.check_transport_contents('a file\n', t, 'b')
 
919
        self.check_transport_contents(b'a file\n', t, 'b')
1023
920
 
1024
921
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
1025
922
        os.mkdir('c')
1026
923
        # What should the assert be if you try to copy a
1027
924
        # file over a directory?
1028
925
        #self.assertRaises(Something, t.copy, 'a', 'c')
1029
 
        t.put_bytes('d', 'text in d\n')
 
926
        t.put_bytes('d', b'text in d\n')
1030
927
        t.copy('d', 'b')
1031
 
        self.check_transport_contents('text in d\n', t, 'b')
1032
 
 
1033
 
        # TODO: test copy_multi
 
928
        self.check_transport_contents(b'text in d\n', t, 'b')
1034
929
 
1035
930
    def test_connection_error(self):
1036
931
        """ConnectionError is raised when connection is impossible.
1042
937
        except NotImplementedError:
1043
938
            raise TestSkipped("Transport %s has no bogus URL support." %
1044
939
                              self._server.__class__)
1045
 
        t = get_transport(url)
 
940
        t = _mod_transport.get_transport_from_url(url)
1046
941
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1047
942
 
1048
943
    def test_stat(self):
1053
948
 
1054
949
        try:
1055
950
            st = t.stat('.')
1056
 
        except TransportNotPossible, e:
 
951
        except TransportNotPossible as e:
1057
952
            # This transport cannot stat
1058
953
            return
1059
954
 
1064
959
        for path, size in zip(paths, sizes):
1065
960
            st = t.stat(path)
1066
961
            if path.endswith('/'):
1067
 
                self.failUnless(S_ISDIR(st.st_mode))
 
962
                self.assertTrue(S_ISDIR(st.st_mode))
1068
963
                # directory sizes are meaningless
1069
964
            else:
1070
 
                self.failUnless(S_ISREG(st.st_mode))
 
965
                self.assertTrue(S_ISREG(st.st_mode))
1071
966
                self.assertEqual(size, st.st_size)
1072
967
 
1073
 
        remote_stats = list(t.stat_multi(paths))
1074
 
        remote_iter_stats = list(t.stat_multi(iter(paths)))
1075
 
 
1076
968
        self.assertRaises(NoSuchFile, t.stat, 'q')
1077
969
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
1078
970
 
1079
 
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
1080
 
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1081
971
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
1082
972
        subdir = t.clone('subdir')
1083
 
        subdir.stat('./file')
1084
 
        subdir.stat('.')
 
973
        st = subdir.stat('./file')
 
974
        st = subdir.stat('.')
1085
975
 
1086
976
    def test_hardlink(self):
1087
977
        from stat import ST_NLINK
1096
986
        try:
1097
987
            t.hardlink(source_name, link_name)
1098
988
 
1099
 
            self.failUnless(t.has(source_name))
1100
 
            self.failUnless(t.has(link_name))
 
989
            self.assertTrue(t.has(source_name))
 
990
            self.assertTrue(t.has(link_name))
1101
991
 
1102
992
            st = t.stat(link_name)
1103
 
            self.failUnlessEqual(st[ST_NLINK], 2)
 
993
            self.assertEqual(st[ST_NLINK], 2)
1104
994
        except TransportNotPossible:
1105
995
            raise TestSkipped("Transport %s does not support hardlinks." %
1106
996
                              self._server.__class__)
1118
1008
        try:
1119
1009
            t.symlink(source_name, link_name)
1120
1010
 
1121
 
            self.failUnless(t.has(source_name))
1122
 
            self.failUnless(t.has(link_name))
 
1011
            self.assertTrue(t.has(source_name))
 
1012
            self.assertTrue(t.has(link_name))
1123
1013
 
1124
1014
            st = t.stat(link_name)
1125
 
            self.failUnless(S_ISLNK(st.st_mode))
1126
 
        except TransportNotPossible:
1127
 
            raise TestSkipped("Transport %s does not support symlinks." %
1128
 
                              self._server.__class__)
1129
 
        except IOError:
1130
 
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1015
            self.assertTrue(S_ISLNK(st.st_mode),
 
1016
                            "expected symlink, got mode %o" % st.st_mode)
 
1017
        except TransportNotPossible:
 
1018
            raise TestSkipped("Transport %s does not support symlinks." %
 
1019
                              self._server.__class__)
 
1020
 
 
1021
        self.assertEqual(source_name, t.readlink(link_name))
 
1022
 
 
1023
    def test_readlink_nonexistent(self):
 
1024
        t = self.get_transport()
 
1025
        try:
 
1026
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
 
1027
        except TransportNotPossible:
 
1028
            raise TestSkipped("Transport %s does not support symlinks." %
 
1029
                              self._server.__class__)
1131
1030
 
1132
1031
    def test_list_dir(self):
1133
1032
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1138
1037
            return
1139
1038
 
1140
1039
        def sorted_list(d, transport):
1141
 
            l = list(transport.list_dir(d))
1142
 
            l.sort()
 
1040
            l = sorted(transport.list_dir(d))
1143
1041
            return l
1144
1042
 
1145
1043
        self.assertEqual([], sorted_list('.', t))
1197
1095
            raise TestSkipped("not a connected transport")
1198
1096
 
1199
1097
        t2 = t1.clone('subdir')
1200
 
        self.assertEquals(t1._scheme, t2._scheme)
1201
 
        self.assertEquals(t1._user, t2._user)
1202
 
        self.assertEquals(t1._password, t2._password)
1203
 
        self.assertEquals(t1._host, t2._host)
1204
 
        self.assertEquals(t1._port, t2._port)
 
1098
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1099
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1100
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1101
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1102
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
1205
1103
 
1206
1104
    def test__reuse_for(self):
1207
1105
        t = self.get_transport()
1214
1112
 
1215
1113
            Only the parameters different from None will be changed.
1216
1114
            """
1217
 
            if scheme   is None: scheme   = t._scheme
1218
 
            if user     is None: user     = t._user
1219
 
            if password is None: password = t._password
1220
 
            if user     is None: user     = t._user
1221
 
            if host     is None: host     = t._host
1222
 
            if port     is None: port     = t._port
1223
 
            if path     is None: path     = t._path
1224
 
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1115
            if scheme is None:
 
1116
                scheme = t._parsed_url.scheme
 
1117
            if user is None:
 
1118
                user = t._parsed_url.user
 
1119
            if password is None:
 
1120
                password = t._parsed_url.password
 
1121
            if user is None:
 
1122
                user = t._parsed_url.user
 
1123
            if host is None:
 
1124
                host = t._parsed_url.host
 
1125
            if port is None:
 
1126
                port = t._parsed_url.port
 
1127
            if path is None:
 
1128
                path = t._parsed_url.path
 
1129
            return str(urlutils.URL(scheme, user, password, host, port, path))
1225
1130
 
1226
 
        if t._scheme == 'ftp':
 
1131
        if t._parsed_url.scheme == 'ftp':
1227
1132
            scheme = 'sftp'
1228
1133
        else:
1229
1134
            scheme = 'ftp'
1230
1135
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1231
 
        if t._user == 'me':
 
1136
        if t._parsed_url.user == 'me':
1232
1137
            user = 'you'
1233
1138
        else:
1234
1139
            user = 'me'
1245
1150
        #   (they may be typed by the user when prompted for example)
1246
1151
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1247
1152
        # We will not connect, we can use a invalid host
1248
 
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
1249
 
        if t._port == 1234:
 
1153
        self.assertIsNot(t, t._reuse_for(
 
1154
            new_url(host=t._parsed_url.host + 'bar')))
 
1155
        if t._parsed_url.port == 1234:
1250
1156
            port = 4321
1251
1157
        else:
1252
1158
            port = 1234
1261
1167
 
1262
1168
        c = t.clone('subdir')
1263
1169
        # Some transports will create the connection  only when needed
1264
 
        t.has('surely_not') # Force connection
 
1170
        t.has('surely_not')  # Force connection
1265
1171
        self.assertIs(t._get_connection(), c._get_connection())
1266
1172
 
1267
1173
        # Temporary failure, we need to create a new dummy connection
1268
 
        new_connection = object()
 
1174
        new_connection = None
1269
1175
        t._set_connection(new_connection)
1270
1176
        # Check that both transports use the same connection
1271
1177
        self.assertIs(new_connection, t._get_connection())
1276
1182
        if not isinstance(t, ConnectedTransport):
1277
1183
            raise TestSkipped("not a connected transport")
1278
1184
 
1279
 
        t.has('surely_not') # Force connection
 
1185
        t.has('surely_not')  # Force connection
1280
1186
        self.assertIsNot(None, t._get_connection())
1281
1187
 
1282
1188
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1293
1199
 
1294
1200
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1201
 
1296
 
        self.failUnless(t1.has('a'))
1297
 
        self.failUnless(t1.has('b/c'))
1298
 
        self.failIf(t1.has('c'))
 
1202
        self.assertTrue(t1.has('a'))
 
1203
        self.assertTrue(t1.has('b/c'))
 
1204
        self.assertFalse(t1.has('c'))
1299
1205
 
1300
1206
        t2 = t1.clone('b')
1301
1207
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1208
 
1303
 
        self.failUnless(t2.has('c'))
1304
 
        self.failIf(t2.has('a'))
 
1209
        self.assertTrue(t2.has('c'))
 
1210
        self.assertFalse(t2.has('a'))
1305
1211
 
1306
1212
        t3 = t2.clone('..')
1307
 
        self.failUnless(t3.has('a'))
1308
 
        self.failIf(t3.has('c'))
 
1213
        self.assertTrue(t3.has('a'))
 
1214
        self.assertFalse(t3.has('c'))
1309
1215
 
1310
 
        self.failIf(t1.has('b/d'))
1311
 
        self.failIf(t2.has('d'))
1312
 
        self.failIf(t3.has('b/d'))
 
1216
        self.assertFalse(t1.has('b/d'))
 
1217
        self.assertFalse(t2.has('d'))
 
1218
        self.assertFalse(t3.has('b/d'))
1313
1219
 
1314
1220
        if t1.is_readonly():
1315
 
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1221
            self.build_tree_contents([('b/d', b'newfile\n')])
1316
1222
        else:
1317
 
            t2.put_bytes('d', 'newfile\n')
 
1223
            t2.put_bytes('d', b'newfile\n')
1318
1224
 
1319
 
        self.failUnless(t1.has('b/d'))
1320
 
        self.failUnless(t2.has('d'))
1321
 
        self.failUnless(t3.has('b/d'))
 
1225
        self.assertTrue(t1.has('b/d'))
 
1226
        self.assertTrue(t2.has('d'))
 
1227
        self.assertTrue(t3.has('b/d'))
1322
1228
 
1323
1229
    def test_clone_to_root(self):
1324
1230
        orig_transport = self.get_transport()
1328
1234
        new_transport = root_transport.clone("..")
1329
1235
        # as we are walking up directories, the path must be
1330
1236
        # growing less, except at the top
1331
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
1332
 
            or new_transport.base == root_transport.base)
 
1237
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1238
                        new_transport.base == root_transport.base)
1333
1239
        while new_transport.base != root_transport.base:
1334
1240
            root_transport = new_transport
1335
1241
            new_transport = root_transport.clone("..")
1336
1242
            # as we are walking up directories, the path must be
1337
1243
            # growing less, except at the top
1338
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
1339
 
                or new_transport.base == root_transport.base)
 
1244
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1245
                            new_transport.base == root_transport.base)
1340
1246
 
1341
1247
        # Cloning to "/" should take us to exactly the same location.
1342
1248
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1352
1258
        orig_transport = self.get_transport()
1353
1259
        root_transport = orig_transport.clone('/')
1354
1260
        self.assertEqual(root_transport.base + '.bzr/',
1355
 
            root_transport.clone('.bzr').base)
 
1261
                         root_transport.clone('.bzr').base)
1356
1262
 
1357
1263
    def test_base_url(self):
1358
1264
        t = self.get_transport()
1398
1304
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1305
                         transport.abspath("/foo"))
1400
1306
 
 
1307
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1401
1308
    def test_win32_abspath(self):
1402
1309
        # Note: we tried to set sys.platform='win32' so we could test on
1403
1310
        # other platforms too, but then osutils does platform specific
1408
1315
 
1409
1316
        # smoke test for abspath on win32.
1410
1317
        # a transport based on 'file:///' never fully qualifies the drive.
1411
 
        transport = get_transport("file:///")
1412
 
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1318
        transport = _mod_transport.get_transport_from_url("file:///")
 
1319
        self.assertEqual(transport.abspath("/"), "file:///")
1413
1320
 
1414
1321
        # but a transport that starts with a drive spec must keep it.
1415
 
        transport = get_transport("file:///C:/")
1416
 
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1322
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1323
        self.assertEqual(transport.abspath("/"), "file:///C:/")
1417
1324
 
1418
1325
    def test_local_abspath(self):
1419
1326
        transport = self.get_transport()
1420
1327
        try:
1421
1328
            p = transport.local_abspath('.')
1422
 
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1329
        except (errors.NotLocalUrl, TransportNotPossible) as e:
1423
1330
            # should be formattable
1424
1331
            s = str(e)
1425
1332
        else:
1452
1359
                         'isolated/dir/',
1453
1360
                         'isolated/dir/foo',
1454
1361
                         'isolated/dir/bar',
1455
 
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1362
                         'isolated/dir/b%25z',  # make sure quoting is correct
1456
1363
                         'isolated/bar'],
1457
1364
                        transport=transport)
1458
1365
        paths = set(transport.iter_files_recursive())
1459
1366
        # nb the directories are not converted
1460
1367
        self.assertEqual(paths,
1461
 
                    set(['isolated/dir/foo',
1462
 
                         'isolated/dir/bar',
1463
 
                         'isolated/dir/b%2525z',
1464
 
                         'isolated/bar']))
 
1368
                         {'isolated/dir/foo',
 
1369
                          'isolated/dir/bar',
 
1370
                          'isolated/dir/b%2525z',
 
1371
                          'isolated/bar'})
1465
1372
        sub_transport = transport.clone('isolated')
1466
1373
        paths = set(sub_transport.iter_files_recursive())
1467
1374
        self.assertEqual(paths,
1468
 
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1375
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
1469
1376
 
1470
1377
    def test_copy_tree(self):
1471
1378
        # TODO: test file contents and permissions are preserved. This test was
1482
1389
                         'from/dir/',
1483
1390
                         'from/dir/foo',
1484
1391
                         'from/dir/bar',
1485
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1392
                         'from/dir/b%25z',  # make sure quoting is correct
1486
1393
                         'from/bar'],
1487
1394
                        transport=transport)
1488
1395
        transport.copy_tree('from', 'to')
1489
1396
        paths = set(transport.iter_files_recursive())
1490
1397
        self.assertEqual(paths,
1491
 
                    set(['from/dir/foo',
1492
 
                         'from/dir/bar',
1493
 
                         'from/dir/b%2525z',
1494
 
                         'from/bar',
1495
 
                         'to/dir/foo',
1496
 
                         'to/dir/bar',
1497
 
                         'to/dir/b%2525z',
1498
 
                         'to/bar',]))
 
1398
                         {'from/dir/foo',
 
1399
                          'from/dir/bar',
 
1400
                          'from/dir/b%2525z',
 
1401
                          'from/bar',
 
1402
                          'to/dir/foo',
 
1403
                          'to/dir/bar',
 
1404
                          'to/dir/b%2525z',
 
1405
                          'to/bar', })
1499
1406
 
1500
1407
    def test_copy_tree_to_transport(self):
1501
1408
        transport = self.get_transport()
1509
1416
                         'from/dir/',
1510
1417
                         'from/dir/foo',
1511
1418
                         'from/dir/bar',
1512
 
                         'from/dir/b%25z', # make sure quoting is correct
 
1419
                         'from/dir/b%25z',  # make sure quoting is correct
1513
1420
                         'from/bar'],
1514
1421
                        transport=transport)
1515
1422
        from_transport = transport.clone('from')
1518
1425
        from_transport.copy_tree_to_transport(to_transport)
1519
1426
        paths = set(transport.iter_files_recursive())
1520
1427
        self.assertEqual(paths,
1521
 
                    set(['from/dir/foo',
1522
 
                         'from/dir/bar',
1523
 
                         'from/dir/b%2525z',
1524
 
                         'from/bar',
1525
 
                         'to/dir/foo',
1526
 
                         'to/dir/bar',
1527
 
                         'to/dir/b%2525z',
1528
 
                         'to/bar',]))
 
1428
                         {'from/dir/foo',
 
1429
                          'from/dir/bar',
 
1430
                          'from/dir/b%2525z',
 
1431
                          'from/bar',
 
1432
                          'to/dir/foo',
 
1433
                          'to/dir/bar',
 
1434
                          'to/dir/b%2525z',
 
1435
                          'to/bar', })
1529
1436
 
1530
1437
    def test_unicode_paths(self):
1531
1438
        """Test that we can read/write files with Unicode names."""
1535
1442
        # '\xe5' and '\xe4' actually map to the same file
1536
1443
        # adding a suffix kicks in the 'preserving but insensitive'
1537
1444
        # route, and maintains the right files
1538
 
        files = [u'\xe5.1', # a w/ circle iso-8859-1
1539
 
                 u'\xe4.2', # a w/ dots iso-8859-1
1540
 
                 u'\u017d', # Z with umlat iso-8859-2
1541
 
                 u'\u062c', # Arabic j
1542
 
                 u'\u0410', # Russian A
1543
 
                 u'\u65e5', # Kanji person
1544
 
                ]
 
1445
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1446
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1447
                 u'\u017d',  # Z with umlat iso-8859-2
 
1448
                 u'\u062c',  # Arabic j
 
1449
                 u'\u0410',  # Russian A
 
1450
                 u'\u65e5',  # Kanji person
 
1451
                 ]
1545
1452
 
1546
1453
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1547
1454
        if no_unicode_support:
1548
 
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1455
            self.knownFailure("test server cannot handle unicode paths")
1549
1456
 
1550
1457
        try:
1551
1458
            self.build_tree(files, transport=t, line_endings='binary')
1552
1459
        except UnicodeError:
1553
 
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1460
            raise TestSkipped(
 
1461
                "cannot handle unicode paths in current encoding")
1554
1462
 
1555
1463
        # A plain unicode string is not a valid url
1556
1464
        for fname in files:
1557
 
            self.assertRaises(InvalidURL, t.get, fname)
 
1465
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
1558
1466
 
1559
1467
        for fname in files:
1560
1468
            fname_utf8 = fname.encode('utf-8')
1561
 
            contents = 'contents of %s\n' % (fname_utf8,)
 
1469
            contents = b'contents of %s\n' % (fname_utf8,)
1562
1470
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1563
1471
 
1564
1472
    def test_connect_twice_is_same_content(self):
1567
1475
        transport = self.get_transport()
1568
1476
        if transport.is_readonly():
1569
1477
            return
1570
 
        transport.put_bytes('foo', 'bar')
 
1478
        transport.put_bytes('foo', b'bar')
1571
1479
        transport3 = self.get_transport()
1572
 
        self.check_transport_contents('bar', transport3, 'foo')
 
1480
        self.check_transport_contents(b'bar', transport3, 'foo')
1573
1481
 
1574
1482
        # now opening at a relative url should give use a sane result:
1575
1483
        transport.mkdir('newdir')
1576
1484
        transport5 = self.get_transport('newdir')
1577
1485
        transport6 = transport5.clone('..')
1578
 
        self.check_transport_contents('bar', transport6, 'foo')
 
1486
        self.check_transport_contents(b'bar', transport6, 'foo')
1579
1487
 
1580
1488
    def test_lock_write(self):
1581
1489
        """Test transport-level write locks.
1584
1492
        """
1585
1493
        transport = self.get_transport()
1586
1494
        if transport.is_readonly():
1587
 
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1495
            self.assertRaises(TransportNotPossible,
 
1496
                              transport.lock_write, 'foo')
1588
1497
            return
1589
 
        transport.put_bytes('lock', '')
 
1498
        transport.put_bytes('lock', b'')
1590
1499
        try:
1591
1500
            lock = transport.lock_write('lock')
1592
1501
        except TransportNotPossible:
1602
1511
        """
1603
1512
        transport = self.get_transport()
1604
1513
        if transport.is_readonly():
1605
 
            file('lock', 'w').close()
 
1514
            open('lock', 'w').close()
1606
1515
        else:
1607
 
            transport.put_bytes('lock', '')
 
1516
            transport.put_bytes('lock', b'')
1608
1517
        try:
1609
1518
            lock = transport.lock_read('lock')
1610
1519
        except TransportNotPossible:
1616
1525
    def test_readv(self):
1617
1526
        transport = self.get_transport()
1618
1527
        if transport.is_readonly():
1619
 
            file('a', 'w').write('0123456789')
 
1528
            with open('a', 'w') as f:
 
1529
                f.write('0123456789')
1620
1530
        else:
1621
 
            transport.put_bytes('a', '0123456789')
 
1531
            transport.put_bytes('a', b'0123456789')
1622
1532
 
1623
1533
        d = list(transport.readv('a', ((0, 1),)))
1624
 
        self.assertEqual(d[0], (0, '0'))
 
1534
        self.assertEqual(d[0], (0, b'0'))
1625
1535
 
1626
1536
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1627
 
        self.assertEqual(d[0], (0, '0'))
1628
 
        self.assertEqual(d[1], (1, '1'))
1629
 
        self.assertEqual(d[2], (3, '34'))
1630
 
        self.assertEqual(d[3], (9, '9'))
 
1537
        self.assertEqual(d[0], (0, b'0'))
 
1538
        self.assertEqual(d[1], (1, b'1'))
 
1539
        self.assertEqual(d[2], (3, b'34'))
 
1540
        self.assertEqual(d[3], (9, b'9'))
1631
1541
 
1632
1542
    def test_readv_out_of_order(self):
1633
1543
        transport = self.get_transport()
1634
1544
        if transport.is_readonly():
1635
 
            file('a', 'w').write('0123456789')
 
1545
            with open('a', 'w') as f:
 
1546
                f.write('0123456789')
1636
1547
        else:
1637
 
            transport.put_bytes('a', '01234567890')
 
1548
            transport.put_bytes('a', b'01234567890')
1638
1549
 
1639
1550
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1640
 
        self.assertEqual(d[0], (1, '1'))
1641
 
        self.assertEqual(d[1], (9, '9'))
1642
 
        self.assertEqual(d[2], (0, '0'))
1643
 
        self.assertEqual(d[3], (3, '34'))
 
1551
        self.assertEqual(d[0], (1, b'1'))
 
1552
        self.assertEqual(d[1], (9, b'9'))
 
1553
        self.assertEqual(d[2], (0, b'0'))
 
1554
        self.assertEqual(d[3], (3, b'34'))
1644
1555
 
1645
1556
    def test_readv_with_adjust_for_latency(self):
1646
1557
        transport = self.get_transport()
1651
1562
        # reference the returned data with the random data. To avoid doing
1652
1563
        # multiple large random byte look ups we do several tests on the same
1653
1564
        # backing data.
1654
 
        content = osutils.rand_bytes(200*1024)
 
1565
        content = osutils.rand_bytes(200 * 1024)
1655
1566
        content_size = len(content)
1656
1567
        if transport.is_readonly():
1657
1568
            self.build_tree_contents([('a', content)])
1658
1569
        else:
1659
1570
            transport.put_bytes('a', content)
 
1571
 
1660
1572
        def check_result_data(result_vector):
1661
1573
            for item in result_vector:
1662
1574
                data_len = len(item[1])
1664
1576
 
1665
1577
        # start corner case
1666
1578
        result = list(transport.readv('a', ((0, 30),),
1667
 
            adjust_for_latency=True, upper_limit=content_size))
 
1579
                                      adjust_for_latency=True, upper_limit=content_size))
1668
1580
        # we expect 1 result, from 0, to something > 30
1669
1581
        self.assertEqual(1, len(result))
1670
1582
        self.assertEqual(0, result[0][0])
1672
1584
        check_result_data(result)
1673
1585
        # end of file corner case
1674
1586
        result = list(transport.readv('a', ((204700, 100),),
1675
 
            adjust_for_latency=True, upper_limit=content_size))
 
1587
                                      adjust_for_latency=True, upper_limit=content_size))
1676
1588
        # we expect 1 result, from 204800- its length, to the end
1677
1589
        self.assertEqual(1, len(result))
1678
1590
        data_len = len(result[0][1])
1679
 
        self.assertEqual(204800-data_len, result[0][0])
 
1591
        self.assertEqual(204800 - data_len, result[0][0])
1680
1592
        self.assertTrue(data_len >= 100)
1681
1593
        check_result_data(result)
1682
1594
        # out of order ranges are made in order
1683
1595
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1684
 
            adjust_for_latency=True, upper_limit=content_size))
 
1596
                                      adjust_for_latency=True, upper_limit=content_size))
1685
1597
        # we expect 2 results, in order, start and end.
1686
1598
        self.assertEqual(2, len(result))
1687
1599
        # start
1690
1602
        self.assertTrue(data_len >= 30)
1691
1603
        # end
1692
1604
        data_len = len(result[1][1])
1693
 
        self.assertEqual(204800-data_len, result[1][0])
 
1605
        self.assertEqual(204800 - data_len, result[1][0])
1694
1606
        self.assertTrue(data_len >= 100)
1695
1607
        check_result_data(result)
1696
1608
        # close ranges get combined (even if out of order)
1697
 
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1609
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
1698
1610
            result = list(transport.readv('a', request_vector,
1699
 
                adjust_for_latency=True, upper_limit=content_size))
 
1611
                                          adjust_for_latency=True, upper_limit=content_size))
1700
1612
            self.assertEqual(1, len(result))
1701
1613
            data_len = len(result[0][1])
1702
1614
            # minimum length is from 400 to 1034 - 634
1710
1622
        transport = self.get_transport()
1711
1623
        # test from observed failure case.
1712
1624
        if transport.is_readonly():
1713
 
            file('a', 'w').write('a'*1024*1024)
 
1625
            with open('a', 'w') as f:
 
1626
                f.write('a' * 1024 * 1024)
1714
1627
        else:
1715
 
            transport.put_bytes('a', 'a'*1024*1024)
 
1628
            transport.put_bytes('a', b'a' * 1024 * 1024)
1716
1629
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1717
 
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1718
 
            (465373, 800), (947422, 800)]
1719
 
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
1720
 
        found_items = [False]*9
 
1630
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1631
                         (465373, 800), (947422, 800)]
 
1632
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1633
        found_items = [False] * 9
1721
1634
        for pos, (start, length) in enumerate(broken_vector):
1722
1635
            # check the range is covered by the result
1723
1636
            for offset, data in results:
1724
1637
                if offset <= start and start + length <= offset + len(data):
1725
1638
                    found_items[pos] = True
1726
 
        self.assertEqual([True]*9, found_items)
 
1639
        self.assertEqual([True] * 9, found_items)
1727
1640
 
1728
1641
    def test_get_with_open_write_stream_sees_all_content(self):
1729
1642
        t = self.get_transport()
1730
1643
        if t.is_readonly():
1731
1644
            return
1732
 
        handle = t.open_write_stream('foo')
1733
 
        try:
1734
 
            handle.write('bcd')
1735
 
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
1736
 
        finally:
1737
 
            handle.close()
 
1645
        with t.open_write_stream('foo') as handle:
 
1646
            handle.write(b'bcd')
 
1647
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1648
                t.readv('foo', ((0, 1), (2, 1)))))
1738
1649
 
1739
1650
    def test_get_smart_medium(self):
1740
1651
        """All transports must either give a smart medium, or know they can't.
1742
1653
        transport = self.get_transport()
1743
1654
        try:
1744
1655
            client_medium = transport.get_smart_medium()
1745
 
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1746
1656
        except errors.NoSmartMedium:
1747
1657
            # as long as we got it we're fine
1748
1658
            pass
 
1659
        else:
 
1660
            from ..bzr.smart import medium
 
1661
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1749
1662
 
1750
1663
    def test_readv_short_read(self):
1751
1664
        transport = self.get_transport()
1752
1665
        if transport.is_readonly():
1753
 
            file('a', 'w').write('0123456789')
 
1666
            with open('a', 'w') as f:
 
1667
                f.write('0123456789')
1754
1668
        else:
1755
 
            transport.put_bytes('a', '01234567890')
 
1669
            transport.put_bytes('a', b'01234567890')
1756
1670
 
1757
1671
        # This is intentionally reading off the end of the file
1758
1672
        # since we are sure that it cannot get there
1759
1673
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1760
1674
                               # Can be raised by paramiko
1761
1675
                               AssertionError),
1762
 
                              transport.readv, 'a', [(1,1), (8,10)])
 
1676
                              transport.readv, 'a', [(1, 1), (8, 10)])
1763
1677
 
1764
1678
        # This is trying to seek past the end of the file, it should
1765
1679
        # also raise a special error
1766
1680
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1767
 
                              transport.readv, 'a', [(12,2)])
 
1681
                              transport.readv, 'a', [(12, 2)])
 
1682
 
 
1683
    def test_no_segment_parameters(self):
 
1684
        """Segment parameters should be stripped and stored in
 
1685
        transport.segment_parameters."""
 
1686
        transport = self.get_transport("foo")
 
1687
        self.assertEqual({}, transport.get_segment_parameters())
 
1688
 
 
1689
    def test_segment_parameters(self):
 
1690
        """Segment parameters should be stripped and stored in
 
1691
        transport.get_segment_parameters()."""
 
1692
        base_url = self._server.get_url()
 
1693
        parameters = {"key1": "val1", "key2": "val2"}
 
1694
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1695
        transport = _mod_transport.get_transport_from_url(url)
 
1696
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1697
 
 
1698
    def test_set_segment_parameters(self):
 
1699
        """Segment parameters can be set and show up in base."""
 
1700
        transport = self.get_transport("foo")
 
1701
        orig_base = transport.base
 
1702
        transport.set_segment_parameter("arm", "board")
 
1703
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1704
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1705
        transport.set_segment_parameter("arm", None)
 
1706
        transport.set_segment_parameter("nonexistant", None)
 
1707
        self.assertEqual({}, transport.get_segment_parameters())
 
1708
        self.assertEqual(orig_base, transport.base)
 
1709
 
 
1710
    def test_stat_symlink(self):
 
1711
        # if a transport points directly to a symlink (and supports symlinks
 
1712
        # at all) you can tell this.  helps with bug 32669.
 
1713
        t = self.get_transport()
 
1714
        try:
 
1715
            t.symlink('target', 'link')
 
1716
        except TransportNotPossible:
 
1717
            raise TestSkipped("symlinks not supported")
 
1718
        t2 = t.clone('link')
 
1719
        st = t2.stat('')
 
1720
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1721
 
 
1722
    def test_abspath_url_unquote_unreserved(self):
 
1723
        """URLs from abspath should have unreserved characters unquoted
 
1724
 
 
1725
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1726
        """
 
1727
        t = self.get_transport()
 
1728
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1729
        self.assertEqual(t.base + "-.09AZ_az~",
 
1730
                         t.abspath(needlessly_escaped_dir))
 
1731
 
 
1732
    def test_clone_url_unquote_unreserved(self):
 
1733
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1734
 
 
1735
        Cloned transports should be prefix comparable for things like the
 
1736
        isolation checking of tests, see lp:842223 for more.
 
1737
        """
 
1738
        t1 = self.get_transport()
 
1739
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1740
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1741
        t2 = t1.clone(needlessly_escaped_dir)
 
1742
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1743
 
 
1744
    def test_hook_post_connection_one(self):
 
1745
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1746
        log = []
 
1747
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1748
        t = self.get_transport()
 
1749
        self.assertEqual([], log)
 
1750
        t.has("non-existant")
 
1751
        if isinstance(t, RemoteTransport):
 
1752
            self.assertEqual([t.get_smart_medium()], log)
 
1753
        elif isinstance(t, ConnectedTransport):
 
1754
            self.assertEqual([t], log)
 
1755
        else:
 
1756
            self.assertEqual([], log)
 
1757
 
 
1758
    def test_hook_post_connection_multi(self):
 
1759
        """Fire post_connect hook once per unshared underlying connection"""
 
1760
        log = []
 
1761
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1762
        t1 = self.get_transport()
 
1763
        t2 = t1.clone(".")
 
1764
        t3 = self.get_transport()
 
1765
        self.assertEqual([], log)
 
1766
        t1.has("x")
 
1767
        t2.has("x")
 
1768
        t3.has("x")
 
1769
        if isinstance(t1, RemoteTransport):
 
1770
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1771
        elif isinstance(t1, ConnectedTransport):
 
1772
            self.assertEqual([t1, t3], log)
 
1773
        else:
 
1774
            self.assertEqual([], log)