/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Marius Kruger
  • Date: 2010-07-10 21:28:56 UTC
  • mto: (5384.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5385.
  • Revision ID: marius.kruger@enerweb.co.za-20100710212856-uq4ji3go0u5se7hx
* Update documentation
* add NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
23
 
from io import BytesIO
 
23
import itertools
24
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
 
29
import unittest
27
30
 
28
 
from .. import (
 
31
from bzrlib import (
29
32
    errors,
30
33
    osutils,
31
 
    pyutils,
32
34
    tests,
33
 
    transport as _mod_transport,
34
35
    urlutils,
35
36
    )
36
 
from ..errors import (ConnectionError,
37
 
                      FileExists,
38
 
                      NoSuchFile,
39
 
                      PathError,
40
 
                      TransportNotPossible,
41
 
                      )
42
 
from ..osutils import getcwd
43
 
from . import (
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
 
44
                           PathError,
 
45
                           TransportNotPossible,
 
46
                           )
 
47
from bzrlib.osutils import getcwd
 
48
from bzrlib.smart import medium
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
44
51
    TestSkipped,
45
52
    TestNotApplicable,
46
53
    multiply_tests,
47
54
    )
48
 
from . import test_server
49
 
from .test_transport import TestTransportImplementation
50
 
from ..transport import (
 
55
from bzrlib.tests import test_server
 
56
from bzrlib.tests.test_transport import TestTransportImplementation
 
57
from bzrlib.transport import (
51
58
    ConnectedTransport,
52
 
    Transport,
 
59
    get_transport,
53
60
    _get_transport_modules,
54
61
    )
55
 
from ..transport.memory import MemoryTransport
56
 
from ..transport.remote import RemoteTransport
 
62
from bzrlib.transport.memory import MemoryTransport
57
63
 
58
64
 
59
65
def get_transport_test_permutations(module):
72
78
    for module in _get_transport_modules():
73
79
        try:
74
80
            permutations = get_transport_test_permutations(
75
 
                pyutils.get_named_object(module))
 
81
                reduce(getattr, (module).split('.')[1:], __import__(module)))
76
82
            for (klass, server_factory) in permutations:
77
83
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
78
 
                            {"transport_class": klass,
79
 
                             "transport_server": server_factory})
 
84
                    {"transport_class":klass,
 
85
                     "transport_server":server_factory})
80
86
                result.append(scenario)
81
 
        except errors.DependencyNotPresent as e:
 
87
        except errors.DependencyNotPresent, e:
82
88
            # Continue even if a dependency prevents us
83
89
            # from adding this test
84
90
            pass
85
91
    return result
86
92
 
87
93
 
88
 
def load_tests(loader, standard_tests, pattern):
 
94
def load_tests(standard_tests, module, loader):
89
95
    """Multiply tests for tranport implementations."""
90
96
    result = loader.suiteClass()
91
97
    scenarios = transport_test_permutations()
96
102
 
97
103
    def setUp(self):
98
104
        super(TransportTests, self).setUp()
99
 
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
 
105
        self._captureVar('BZR_NO_SMART_VFS', None)
100
106
 
101
107
    def check_transport_contents(self, content, transport, relpath):
102
 
        """Check that transport.get_bytes(relpath) == content."""
103
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
        """Check that transport.get(relpath).read() == content."""
 
109
        self.assertEqualDiff(content, transport.get(relpath).read())
104
110
 
105
111
    def test_ensure_base_missing(self):
106
112
        """.ensure_base() should create the directory if it doesn't exist"""
150
156
        self.assertEqual(True, t.has('a'))
151
157
        self.assertEqual(False, t.has('c'))
152
158
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
159
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
160
                                           'e', 'f', 'g', 'h'])),
 
161
                         [True, True, False, False,
 
162
                          True, False, True, False])
153
163
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
154
164
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
155
165
                                           urlutils.escape('%%')]))
 
166
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
167
                                                'e', 'f', 'g', 'h']))),
 
168
                         [True, True, False, False,
 
169
                          True, False, True, False])
156
170
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
157
171
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
158
172
 
169
183
    def test_get(self):
170
184
        t = self.get_transport()
171
185
 
172
 
        files = ['a']
173
 
        content = b'contents of a\n'
174
 
        self.build_tree(['a'], transport=t, line_endings='binary')
175
 
        self.check_transport_contents(b'contents of a\n', t, 'a')
176
 
        f = t.get('a')
177
 
        self.assertEqual(content, f.read())
 
186
        files = ['a', 'b', 'e', 'g']
 
187
        contents = ['contents of a\n',
 
188
                    'contents of b\n',
 
189
                    'contents of e\n',
 
190
                    'contents of g\n',
 
191
                    ]
 
192
        self.build_tree(files, transport=t, line_endings='binary')
 
193
        self.check_transport_contents('contents of a\n', t, 'a')
 
194
        content_f = t.get_multi(files)
 
195
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
196
        # evaluate their inputs, the transport requests should be issued and
 
197
        # handled sequentially (we don't want to force transport to buffer).
 
198
        for content, f in itertools.izip(contents, content_f):
 
199
            self.assertEqual(content, f.read())
 
200
 
 
201
        content_f = t.get_multi(iter(files))
 
202
        # Use itertools.izip() for the same reason
 
203
        for content, f in itertools.izip(contents, content_f):
 
204
            self.assertEqual(content, f.read())
178
205
 
179
206
    def test_get_unknown_file(self):
180
207
        t = self.get_transport()
181
208
        files = ['a', 'b']
182
 
        contents = [b'contents of a\n',
183
 
                    b'contents of b\n',
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
184
211
                    ]
185
212
        self.build_tree(files, transport=t, line_endings='binary')
186
213
        self.assertRaises(NoSuchFile, t.get, 'c')
187
 
 
188
 
        def iterate_and_close(func, *args):
189
 
            for f in func(*args):
190
 
                # We call f.read() here because things like paramiko actually
191
 
                # spawn a thread to prefetch the content, which we want to
192
 
                # consume before we close the handle.
193
 
                content = f.read()
194
 
                f.close()
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
195
216
 
196
217
    def test_get_directory_read_gives_ReadError(self):
197
218
        """consistent errors for read() on a file returned by get()."""
217
238
        t = self.get_transport()
218
239
 
219
240
        files = ['a', 'b', 'e', 'g']
220
 
        contents = [b'contents of a\n',
221
 
                    b'contents of b\n',
222
 
                    b'contents of e\n',
223
 
                    b'contents of g\n',
 
241
        contents = ['contents of a\n',
 
242
                    'contents of b\n',
 
243
                    'contents of e\n',
 
244
                    'contents of g\n',
224
245
                    ]
225
246
        self.build_tree(files, transport=t, line_endings='binary')
226
 
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
247
        self.check_transport_contents('contents of a\n', t, 'a')
227
248
 
228
249
        for content, fname in zip(contents, files):
229
250
            self.assertEqual(content, t.get_bytes(fname))
230
251
 
231
252
    def test_get_bytes_unknown_file(self):
232
253
        t = self.get_transport()
 
254
 
233
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
234
256
 
235
257
    def test_get_with_open_write_stream_sees_all_content(self):
236
258
        t = self.get_transport()
237
259
        if t.is_readonly():
238
260
            return
239
 
        with t.open_write_stream('foo') as handle:
240
 
            handle.write(b'b')
241
 
            self.assertEqual(b'b', t.get_bytes('foo'))
 
261
        handle = t.open_write_stream('foo')
 
262
        try:
 
263
            handle.write('b')
 
264
            self.assertEqual('b', t.get('foo').read())
 
265
        finally:
 
266
            handle.close()
242
267
 
243
268
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
244
269
        t = self.get_transport()
245
270
        if t.is_readonly():
246
271
            return
247
 
        with t.open_write_stream('foo') as handle:
248
 
            handle.write(b'b')
249
 
            self.assertEqual(b'b', t.get_bytes('foo'))
250
 
            with t.get('foo') as f:
251
 
                self.assertEqual(b'b', f.read())
 
272
        handle = t.open_write_stream('foo')
 
273
        try:
 
274
            handle.write('b')
 
275
            self.assertEqual('b', t.get_bytes('foo'))
 
276
            self.assertEqual('b', t.get('foo').read())
 
277
        finally:
 
278
            handle.close()
252
279
 
253
280
    def test_put_bytes(self):
254
281
        t = self.get_transport()
255
282
 
256
283
        if t.is_readonly():
257
284
            self.assertRaises(TransportNotPossible,
258
 
                              t.put_bytes, 'a', b'some text for a\n')
 
285
                    t.put_bytes, 'a', 'some text for a\n')
259
286
            return
260
287
 
261
 
        t.put_bytes('a', b'some text for a\n')
262
 
        self.assertTrue(t.has('a'))
263
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
288
        t.put_bytes('a', 'some text for a\n')
 
289
        self.failUnless(t.has('a'))
 
290
        self.check_transport_contents('some text for a\n', t, 'a')
264
291
 
265
292
        # The contents should be overwritten
266
 
        t.put_bytes('a', b'new text for a\n')
267
 
        self.check_transport_contents(b'new text for a\n', t, 'a')
 
293
        t.put_bytes('a', 'new text for a\n')
 
294
        self.check_transport_contents('new text for a\n', t, 'a')
268
295
 
269
296
        self.assertRaises(NoSuchFile,
270
 
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
 
297
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
271
298
 
272
299
    def test_put_bytes_non_atomic(self):
273
300
        t = self.get_transport()
274
301
 
275
302
        if t.is_readonly():
276
303
            self.assertRaises(TransportNotPossible,
277
 
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
 
304
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
278
305
            return
279
306
 
280
 
        self.assertFalse(t.has('a'))
281
 
        t.put_bytes_non_atomic('a', b'some text for a\n')
282
 
        self.assertTrue(t.has('a'))
283
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
307
        self.failIf(t.has('a'))
 
308
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
309
        self.failUnless(t.has('a'))
 
310
        self.check_transport_contents('some text for a\n', t, 'a')
284
311
        # Put also replaces contents
285
 
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
286
 
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
312
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
313
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
287
314
 
288
315
        # Make sure we can create another file
289
 
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
 
316
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
290
317
        # And overwrite 'a' with empty contents
291
 
        t.put_bytes_non_atomic('a', b'')
292
 
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
293
 
        self.check_transport_contents(b'', t, 'a')
 
318
        t.put_bytes_non_atomic('a', '')
 
319
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
320
        self.check_transport_contents('', t, 'a')
294
321
 
295
322
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
296
 
                          b'contents\n')
 
323
                                       'contents\n')
297
324
        # Now test the create_parent flag
298
325
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
299
 
                          b'contents\n')
300
 
        self.assertFalse(t.has('dir/a'))
301
 
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
 
326
                                       'contents\n')
 
327
        self.failIf(t.has('dir/a'))
 
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
302
329
                               create_parent_dir=True)
303
 
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
304
331
 
305
332
        # But we still get NoSuchFile if we can't make the parent dir
306
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
307
 
                          b'contents\n',
308
 
                          create_parent_dir=True)
 
334
                                       'contents\n',
 
335
                                       create_parent_dir=True)
309
336
 
310
337
    def test_put_bytes_permissions(self):
311
338
        t = self.get_transport()
315
342
        if not t._can_roundtrip_unix_modebits():
316
343
            # Can't roundtrip, so no need to run this test
317
344
            return
318
 
        t.put_bytes('mode644', b'test text\n', mode=0o644)
319
 
        self.assertTransportMode(t, 'mode644', 0o644)
320
 
        t.put_bytes('mode666', b'test text\n', mode=0o666)
321
 
        self.assertTransportMode(t, 'mode666', 0o666)
322
 
        t.put_bytes('mode600', b'test text\n', mode=0o600)
323
 
        self.assertTransportMode(t, 'mode600', 0o600)
 
345
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
346
        self.assertTransportMode(t, 'mode644', 0644)
 
347
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
348
        self.assertTransportMode(t, 'mode666', 0666)
 
349
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
350
        self.assertTransportMode(t, 'mode600', 0600)
324
351
        # Yes, you can put_bytes a file such that it becomes readonly
325
 
        t.put_bytes('mode400', b'test text\n', mode=0o400)
326
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
352
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
353
        self.assertTransportMode(t, 'mode400', 0400)
327
354
 
328
355
        # The default permissions should be based on the current umask
329
356
        umask = osutils.get_umask()
330
 
        t.put_bytes('nomode', b'test text\n', mode=None)
331
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
357
        t.put_bytes('nomode', 'test text\n', mode=None)
 
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
332
359
 
333
360
    def test_put_bytes_non_atomic_permissions(self):
334
361
        t = self.get_transport()
338
365
        if not t._can_roundtrip_unix_modebits():
339
366
            # Can't roundtrip, so no need to run this test
340
367
            return
341
 
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
342
 
        self.assertTransportMode(t, 'mode644', 0o644)
343
 
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
344
 
        self.assertTransportMode(t, 'mode666', 0o666)
345
 
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
346
 
        self.assertTransportMode(t, 'mode600', 0o600)
347
 
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
348
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
368
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
369
        self.assertTransportMode(t, 'mode644', 0644)
 
370
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
371
        self.assertTransportMode(t, 'mode666', 0666)
 
372
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
373
        self.assertTransportMode(t, 'mode600', 0600)
 
374
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
375
        self.assertTransportMode(t, 'mode400', 0400)
349
376
 
350
377
        # The default permissions should be based on the current umask
351
378
        umask = osutils.get_umask()
352
 
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
353
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
379
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
380
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
354
381
 
355
382
        # We should also be able to set the mode for a parent directory
356
383
        # when it is created
357
 
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
358
 
                               dir_mode=0o700, create_parent_dir=True)
359
 
        self.assertTransportMode(t, 'dir700', 0o700)
360
 
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
361
 
                               dir_mode=0o770, create_parent_dir=True)
362
 
        self.assertTransportMode(t, 'dir770', 0o770)
363
 
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
364
 
                               dir_mode=0o777, create_parent_dir=True)
365
 
        self.assertTransportMode(t, 'dir777', 0o777)
 
384
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
385
                               dir_mode=0700, create_parent_dir=True)
 
386
        self.assertTransportMode(t, 'dir700', 0700)
 
387
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
388
                               dir_mode=0770, create_parent_dir=True)
 
389
        self.assertTransportMode(t, 'dir770', 0770)
 
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
391
                               dir_mode=0777, create_parent_dir=True)
 
392
        self.assertTransportMode(t, 'dir777', 0777)
366
393
 
367
394
    def test_put_file(self):
368
395
        t = self.get_transport()
369
396
 
370
397
        if t.is_readonly():
371
398
            self.assertRaises(TransportNotPossible,
372
 
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
 
399
                    t.put_file, 'a', StringIO('some text for a\n'))
373
400
            return
374
401
 
375
 
        result = t.put_file('a', BytesIO(b'some text for a\n'))
 
402
        result = t.put_file('a', StringIO('some text for a\n'))
376
403
        # put_file returns the length of the data written
377
404
        self.assertEqual(16, result)
378
 
        self.assertTrue(t.has('a'))
379
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
405
        self.failUnless(t.has('a'))
 
406
        self.check_transport_contents('some text for a\n', t, 'a')
380
407
        # Put also replaces contents
381
 
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
 
408
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
382
409
        self.assertEqual(19, result)
383
 
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
410
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
384
411
        self.assertRaises(NoSuchFile,
385
412
                          t.put_file, 'path/doesnt/exist/c',
386
 
                          BytesIO(b'contents'))
 
413
                              StringIO('contents'))
387
414
 
388
415
    def test_put_file_non_atomic(self):
389
416
        t = self.get_transport()
390
417
 
391
418
        if t.is_readonly():
392
419
            self.assertRaises(TransportNotPossible,
393
 
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
420
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
394
421
            return
395
422
 
396
 
        self.assertFalse(t.has('a'))
397
 
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
398
 
        self.assertTrue(t.has('a'))
399
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
423
        self.failIf(t.has('a'))
 
424
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
425
        self.failUnless(t.has('a'))
 
426
        self.check_transport_contents('some text for a\n', t, 'a')
400
427
        # Put also replaces contents
401
 
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
402
 
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
428
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
429
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
403
430
 
404
431
        # Make sure we can create another file
405
 
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
 
432
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
406
433
        # And overwrite 'a' with empty contents
407
 
        t.put_file_non_atomic('a', BytesIO(b''))
408
 
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
409
 
        self.check_transport_contents(b'', t, 'a')
 
434
        t.put_file_non_atomic('a', StringIO(''))
 
435
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
436
        self.check_transport_contents('', t, 'a')
410
437
 
411
438
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
412
 
                          BytesIO(b'contents\n'))
 
439
                                       StringIO('contents\n'))
413
440
        # Now test the create_parent flag
414
441
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
415
 
                          BytesIO(b'contents\n'))
416
 
        self.assertFalse(t.has('dir/a'))
417
 
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
 
442
                                       StringIO('contents\n'))
 
443
        self.failIf(t.has('dir/a'))
 
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
418
445
                              create_parent_dir=True)
419
 
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
420
447
 
421
448
        # But we still get NoSuchFile if we can't make the parent dir
422
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
423
 
                          BytesIO(b'contents\n'),
424
 
                          create_parent_dir=True)
 
450
                                       StringIO('contents\n'),
 
451
                                       create_parent_dir=True)
425
452
 
426
453
    def test_put_file_permissions(self):
427
454
 
432
459
        if not t._can_roundtrip_unix_modebits():
433
460
            # Can't roundtrip, so no need to run this test
434
461
            return
435
 
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
436
 
        self.assertTransportMode(t, 'mode644', 0o644)
437
 
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
438
 
        self.assertTransportMode(t, 'mode666', 0o666)
439
 
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
440
 
        self.assertTransportMode(t, 'mode600', 0o600)
 
462
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
463
        self.assertTransportMode(t, 'mode644', 0644)
 
464
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
465
        self.assertTransportMode(t, 'mode666', 0666)
 
466
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
467
        self.assertTransportMode(t, 'mode600', 0600)
441
468
        # Yes, you can put a file such that it becomes readonly
442
 
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
443
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
469
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
470
        self.assertTransportMode(t, 'mode400', 0400)
444
471
        # The default permissions should be based on the current umask
445
472
        umask = osutils.get_umask()
446
 
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
447
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
448
475
 
449
476
    def test_put_file_non_atomic_permissions(self):
450
477
        t = self.get_transport()
454
481
        if not t._can_roundtrip_unix_modebits():
455
482
            # Can't roundtrip, so no need to run this test
456
483
            return
457
 
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
458
 
        self.assertTransportMode(t, 'mode644', 0o644)
459
 
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
460
 
        self.assertTransportMode(t, 'mode666', 0o666)
461
 
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
462
 
        self.assertTransportMode(t, 'mode600', 0o600)
 
484
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
485
        self.assertTransportMode(t, 'mode644', 0644)
 
486
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
487
        self.assertTransportMode(t, 'mode666', 0666)
 
488
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
489
        self.assertTransportMode(t, 'mode600', 0600)
463
490
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
464
 
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
465
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
491
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
492
        self.assertTransportMode(t, 'mode400', 0400)
466
493
 
467
494
        # The default permissions should be based on the current umask
468
495
        umask = osutils.get_umask()
469
 
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
470
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
471
498
 
472
499
        # We should also be able to set the mode for a parent directory
473
500
        # when it is created
474
 
        sio = BytesIO()
475
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
476
 
                              dir_mode=0o700, create_parent_dir=True)
477
 
        self.assertTransportMode(t, 'dir700', 0o700)
478
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
479
 
                              dir_mode=0o770, create_parent_dir=True)
480
 
        self.assertTransportMode(t, 'dir770', 0o770)
481
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
482
 
                              dir_mode=0o777, create_parent_dir=True)
483
 
        self.assertTransportMode(t, 'dir777', 0o777)
 
501
        sio = StringIO()
 
502
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
503
                              dir_mode=0700, create_parent_dir=True)
 
504
        self.assertTransportMode(t, 'dir700', 0700)
 
505
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
506
                              dir_mode=0770, create_parent_dir=True)
 
507
        self.assertTransportMode(t, 'dir770', 0770)
 
508
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
509
                              dir_mode=0777, create_parent_dir=True)
 
510
        self.assertTransportMode(t, 'dir777', 0777)
484
511
 
485
512
    def test_put_bytes_unicode(self):
 
513
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
514
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
515
        # (we don't want to encode unicode here at all, callers should be
 
516
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
517
        # compatibility.  At some point we should use a specific exception.
 
518
        # See https://bugs.launchpad.net/bzr/+bug/106898.
486
519
        t = self.get_transport()
487
520
        if t.is_readonly():
488
521
            return
489
522
        unicode_string = u'\u1234'
490
 
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
523
        self.assertRaises(
 
524
            (AssertionError, UnicodeEncodeError),
 
525
            t.put_bytes, 'foo', unicode_string)
 
526
 
 
527
    def test_put_file_unicode(self):
 
528
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
529
        # This situation can happen (and has) if code is careless about the type
 
530
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
531
        # cStringIO, because it never returns unicode from read.
 
532
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
533
        # raise, but we raise it for hysterical raisins.
 
534
        t = self.get_transport()
 
535
        if t.is_readonly():
 
536
            return
 
537
        unicode_file = pyStringIO(u'\u1234')
 
538
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
491
539
 
492
540
    def test_mkdir(self):
493
541
        t = self.get_transport()
498
546
            # defined for the transport interface.
499
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
500
548
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
501
 
            self.assertRaises(TransportNotPossible,
502
 
                              t.mkdir, 'path/doesnt/exist')
 
549
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
550
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
503
551
            return
504
552
        # Test mkdir
505
553
        t.mkdir('dir_a')
509
557
        t.mkdir('dir_b')
510
558
        self.assertEqual(t.has('dir_b'), True)
511
559
 
512
 
        self.assertEqual([t.has(n) for n in
513
 
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
514
 
                         [True, True, False, True])
 
560
        t.mkdir_multi(['dir_c', 'dir_d'])
 
561
 
 
562
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
563
        self.assertEqual(list(t.has_multi(
 
564
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
565
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
566
            [True, True, True, False,
 
567
             True, True, True, True])
515
568
 
516
569
        # we were testing that a local mkdir followed by a transport
517
570
        # mkdir failed thusly, but given that we * in one process * do not
522
575
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
523
576
 
524
577
        # Test get/put in sub-directories
525
 
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
526
 
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
527
 
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
528
 
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
 
578
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
579
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
580
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
581
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
529
582
 
530
583
        # mkdir of a dir with an absent parent
531
584
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
538
591
            # no sense testing on this transport
539
592
            return
540
593
        # Test mkdir with a mode
541
 
        t.mkdir('dmode755', mode=0o755)
542
 
        self.assertTransportMode(t, 'dmode755', 0o755)
543
 
        t.mkdir('dmode555', mode=0o555)
544
 
        self.assertTransportMode(t, 'dmode555', 0o555)
545
 
        t.mkdir('dmode777', mode=0o777)
546
 
        self.assertTransportMode(t, 'dmode777', 0o777)
547
 
        t.mkdir('dmode700', mode=0o700)
548
 
        self.assertTransportMode(t, 'dmode700', 0o700)
549
 
        t.mkdir('mdmode755', mode=0o755)
550
 
        self.assertTransportMode(t, 'mdmode755', 0o755)
 
594
        t.mkdir('dmode755', mode=0755)
 
595
        self.assertTransportMode(t, 'dmode755', 0755)
 
596
        t.mkdir('dmode555', mode=0555)
 
597
        self.assertTransportMode(t, 'dmode555', 0555)
 
598
        t.mkdir('dmode777', mode=0777)
 
599
        self.assertTransportMode(t, 'dmode777', 0777)
 
600
        t.mkdir('dmode700', mode=0700)
 
601
        self.assertTransportMode(t, 'dmode700', 0700)
 
602
        t.mkdir_multi(['mdmode755'], mode=0755)
 
603
        self.assertTransportMode(t, 'mdmode755', 0755)
551
604
 
552
605
        # Default mode should be based on umask
553
606
        umask = osutils.get_umask()
554
607
        t.mkdir('dnomode', mode=None)
555
 
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
 
608
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
556
609
 
557
610
    def test_opening_a_file_stream_creates_file(self):
558
611
        t = self.get_transport()
560
613
            return
561
614
        handle = t.open_write_stream('foo')
562
615
        try:
563
 
            self.assertEqual(b'', t.get_bytes('foo'))
 
616
            self.assertEqual('', t.get_bytes('foo'))
564
617
        finally:
565
618
            handle.close()
566
619
 
567
620
    def test_opening_a_file_stream_can_set_mode(self):
568
621
        t = self.get_transport()
569
622
        if t.is_readonly():
570
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
571
 
                              t.open_write_stream, 'foo')
572
623
            return
573
624
        if not t._can_roundtrip_unix_modebits():
574
625
            # Can't roundtrip, so no need to run this test
575
626
            return
576
 
 
577
627
        def check_mode(name, mode, expected):
578
628
            handle = t.open_write_stream(name, mode=mode)
579
629
            handle.close()
580
630
            self.assertTransportMode(t, name, expected)
581
 
        check_mode('mode644', 0o644, 0o644)
582
 
        check_mode('mode666', 0o666, 0o666)
583
 
        check_mode('mode600', 0o600, 0o600)
 
631
        check_mode('mode644', 0644, 0644)
 
632
        check_mode('mode666', 0666, 0666)
 
633
        check_mode('mode600', 0600, 0600)
584
634
        # The default permissions should be based on the current umask
585
 
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
 
635
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
586
636
 
587
637
    def test_copy_to(self):
588
638
        # FIXME: test:   same server to same server (partly done)
595
645
            self.build_tree(files, transport=transport_from)
596
646
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
597
647
            for f in files:
598
 
                self.check_transport_contents(transport_to.get_bytes(f),
 
648
                self.check_transport_contents(transport_to.get(f).read(),
599
649
                                              transport_from, f)
600
650
 
601
651
        t = self.get_transport()
602
 
        if t.__class__.__name__ == "SFTPTransport":
603
 
            self.skipTest("SFTP copy_to currently too flakey to use")
604
652
        temp_transport = MemoryTransport('memory:///')
605
653
        simple_copy_files(t, temp_transport)
606
654
        if not t.is_readonly():
608
656
            t2 = t.clone('copy_to_simple')
609
657
            simple_copy_files(t, t2)
610
658
 
 
659
 
611
660
        # Test that copying into a missing directory raises
612
661
        # NoSuchFile
613
662
        if t.is_readonly():
614
663
            self.build_tree(['e/', 'e/f'])
615
664
        else:
616
665
            t.mkdir('e')
617
 
            t.put_bytes('e/f', b'contents of e')
 
666
            t.put_bytes('e/f', 'contents of e')
618
667
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
619
668
        temp_transport.mkdir('e')
620
669
        t.copy_to(['e/f'], temp_transport)
625
674
        files = ['a', 'b', 'c', 'd']
626
675
        t.copy_to(iter(files), temp_transport)
627
676
        for f in files:
628
 
            self.check_transport_contents(temp_transport.get_bytes(f),
 
677
            self.check_transport_contents(temp_transport.get(f).read(),
629
678
                                          t, f)
630
679
        del temp_transport
631
680
 
632
 
        for mode in (0o666, 0o644, 0o600, 0o400):
 
681
        for mode in (0666, 0644, 0600, 0400):
633
682
            temp_transport = MemoryTransport("memory:///")
634
683
            t.copy_to(files, temp_transport, mode=mode)
635
684
            for f in files:
650
699
 
651
700
        if t.is_readonly():
652
701
            self.assertRaises(TransportNotPossible,
653
 
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
702
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
654
703
            return
655
 
        t.put_bytes('a', b'diff\ncontents for\na\n')
656
 
        t.put_bytes('b', b'contents\nfor b\n')
 
704
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
705
        t.put_bytes('b', 'contents\nfor b\n')
657
706
 
658
707
        self.assertEqual(20,
659
 
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
708
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
660
709
 
661
710
        self.check_transport_contents(
662
 
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
711
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
663
712
            t, 'a')
664
713
 
665
714
        # a file with no parent should fail..
666
715
        self.assertRaises(NoSuchFile,
667
 
                          t.append_file, 'missing/path', BytesIO(b'content'))
 
716
                          t.append_file, 'missing/path', StringIO('content'))
668
717
 
669
718
        # And we can create new files, too
670
719
        self.assertEqual(0,
671
 
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
672
 
        self.check_transport_contents(b'some text\nfor a missing file\n',
 
720
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
721
        self.check_transport_contents('some text\nfor a missing file\n',
673
722
                                      t, 'c')
674
723
 
675
724
    def test_append_bytes(self):
677
726
 
678
727
        if t.is_readonly():
679
728
            self.assertRaises(TransportNotPossible,
680
 
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
 
729
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
681
730
            return
682
731
 
683
 
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
684
 
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
 
732
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
733
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
685
734
 
686
735
        self.assertEqual(20,
687
 
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
 
736
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
688
737
 
689
738
        self.check_transport_contents(
690
 
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
739
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
691
740
            t, 'a')
692
741
 
693
742
        # a file with no parent should fail..
694
743
        self.assertRaises(NoSuchFile,
695
 
                          t.append_bytes, 'missing/path', b'content')
 
744
                          t.append_bytes, 'missing/path', 'content')
 
745
 
 
746
    def test_append_multi(self):
 
747
        t = self.get_transport()
 
748
 
 
749
        if t.is_readonly():
 
750
            return
 
751
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
752
                         'add\nsome\nmore\ncontents\n')
 
753
        t.put_bytes('b', 'contents\nfor b\n')
 
754
 
 
755
        self.assertEqual((43, 15),
 
756
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
757
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
758
 
 
759
        self.check_transport_contents(
 
760
            'diff\ncontents for\na\n'
 
761
            'add\nsome\nmore\ncontents\n'
 
762
            'and\nthen\nsome\nmore\n',
 
763
            t, 'a')
 
764
        self.check_transport_contents(
 
765
                'contents\nfor b\n'
 
766
                'some\nmore\nfor\nb\n',
 
767
                t, 'b')
 
768
 
 
769
        self.assertEqual((62, 31),
 
770
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
771
                                 ('b', StringIO('from an iterator\n'))])))
 
772
        self.check_transport_contents(
 
773
            'diff\ncontents for\na\n'
 
774
            'add\nsome\nmore\ncontents\n'
 
775
            'and\nthen\nsome\nmore\n'
 
776
            'a little bit more\n',
 
777
            t, 'a')
 
778
        self.check_transport_contents(
 
779
                'contents\nfor b\n'
 
780
                'some\nmore\nfor\nb\n'
 
781
                'from an iterator\n',
 
782
                t, 'b')
 
783
 
 
784
        self.assertEqual((80, 0),
 
785
            t.append_multi([('a', StringIO('some text in a\n')),
 
786
                            ('d', StringIO('missing file r\n'))]))
 
787
 
 
788
        self.check_transport_contents(
 
789
            'diff\ncontents for\na\n'
 
790
            'add\nsome\nmore\ncontents\n'
 
791
            'and\nthen\nsome\nmore\n'
 
792
            'a little bit more\n'
 
793
            'some text in a\n',
 
794
            t, 'a')
 
795
        self.check_transport_contents('missing file r\n', t, 'd')
696
796
 
697
797
    def test_append_file_mode(self):
698
798
        """Check that append accepts a mode parameter"""
700
800
        t = self.get_transport()
701
801
        if t.is_readonly():
702
802
            self.assertRaises(TransportNotPossible,
703
 
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
 
803
                t.append_file, 'f', StringIO('f'), mode=None)
704
804
            return
705
 
        t.append_file('f', BytesIO(b'f'), mode=None)
 
805
        t.append_file('f', StringIO('f'), mode=None)
706
806
 
707
807
    def test_append_bytes_mode(self):
708
808
        # check append_bytes accepts a mode
709
809
        t = self.get_transport()
710
810
        if t.is_readonly():
711
811
            self.assertRaises(TransportNotPossible,
712
 
                              t.append_bytes, 'f', b'f', mode=None)
 
812
                t.append_bytes, 'f', 'f', mode=None)
713
813
            return
714
 
        t.append_bytes('f', b'f', mode=None)
 
814
        t.append_bytes('f', 'f', mode=None)
715
815
 
716
816
    def test_delete(self):
717
817
        # TODO: Test Transport.delete
722
822
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
723
823
            return
724
824
 
725
 
        t.put_bytes('a', b'a little bit of text\n')
726
 
        self.assertTrue(t.has('a'))
 
825
        t.put_bytes('a', 'a little bit of text\n')
 
826
        self.failUnless(t.has('a'))
727
827
        t.delete('a')
728
 
        self.assertFalse(t.has('a'))
 
828
        self.failIf(t.has('a'))
729
829
 
730
830
        self.assertRaises(NoSuchFile, t.delete, 'a')
731
831
 
732
 
        t.put_bytes('a', b'a text\n')
733
 
        t.put_bytes('b', b'b text\n')
734
 
        t.put_bytes('c', b'c text\n')
 
832
        t.put_bytes('a', 'a text\n')
 
833
        t.put_bytes('b', 'b text\n')
 
834
        t.put_bytes('c', 'c text\n')
735
835
        self.assertEqual([True, True, True],
736
 
                         [t.has(n) for n in ['a', 'b', 'c']])
737
 
        t.delete('a')
738
 
        t.delete('c')
 
836
                list(t.has_multi(['a', 'b', 'c'])))
 
837
        t.delete_multi(['a', 'c'])
739
838
        self.assertEqual([False, True, False],
740
 
                         [t.has(n) for n in ['a', 'b', 'c']])
741
 
        self.assertFalse(t.has('a'))
742
 
        self.assertTrue(t.has('b'))
743
 
        self.assertFalse(t.has('c'))
744
 
 
745
 
        for name in ['a', 'c', 'd']:
746
 
            self.assertRaises(NoSuchFile, t.delete, name)
 
839
                list(t.has_multi(['a', 'b', 'c'])))
 
840
        self.failIf(t.has('a'))
 
841
        self.failUnless(t.has('b'))
 
842
        self.failIf(t.has('c'))
 
843
 
 
844
        self.assertRaises(NoSuchFile,
 
845
                t.delete_multi, ['a', 'b', 'c'])
 
846
 
 
847
        self.assertRaises(NoSuchFile,
 
848
                t.delete_multi, iter(['a', 'b', 'c']))
 
849
 
 
850
        t.put_bytes('a', 'another a text\n')
 
851
        t.put_bytes('c', 'another c text\n')
 
852
        t.delete_multi(iter(['a', 'b', 'c']))
747
853
 
748
854
        # We should have deleted everything
749
855
        # SftpServer creates control files in the
796
902
        if t.is_readonly():
797
903
            return
798
904
        t.mkdir('foo')
799
 
        t.put_bytes('foo-bar', b'')
 
905
        t.put_bytes('foo-bar', '')
800
906
        t.mkdir('foo-baz')
801
907
        t.rmdir('foo')
802
908
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
803
 
        self.assertTrue(t.has('foo-bar'))
 
909
        self.failUnless(t.has('foo-bar'))
804
910
 
805
911
    def test_rename_dir_succeeds(self):
806
912
        t = self.get_transport()
807
913
        if t.is_readonly():
808
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
809
 
                              t.rename, 'foo', 'bar')
810
 
            return
 
914
            raise TestSkipped("transport is readonly")
811
915
        t.mkdir('adir')
812
916
        t.mkdir('adir/asubdir')
813
917
        t.rename('adir', 'bdir')
818
922
        """Attempting to replace a nonemtpy directory should fail"""
819
923
        t = self.get_transport()
820
924
        if t.is_readonly():
821
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
822
 
                              t.rename, 'foo', 'bar')
823
 
            return
 
925
            raise TestSkipped("transport is readonly")
824
926
        t.mkdir('adir')
825
927
        t.mkdir('adir/asubdir')
826
928
        t.mkdir('bdir')
841
943
        t.mkdir('b')
842
944
        ta = t.clone('a')
843
945
        tb = t.clone('b')
844
 
        ta.put_bytes('f', b'aoeu')
 
946
        ta.put_bytes('f', 'aoeu')
845
947
        ta.rename('f', '../b/f')
846
948
        self.assertTrue(tb.has('f'))
847
949
        self.assertFalse(ta.has('f'))
889
991
        # creates control files in the working directory
890
992
        # perhaps all of this could be done in a subdirectory
891
993
 
892
 
        t.put_bytes('a', b'a first file\n')
893
 
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
 
994
        t.put_bytes('a', 'a first file\n')
 
995
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
894
996
 
895
997
        t.move('a', 'b')
896
 
        self.assertTrue(t.has('b'))
897
 
        self.assertFalse(t.has('a'))
 
998
        self.failUnless(t.has('b'))
 
999
        self.failIf(t.has('a'))
898
1000
 
899
 
        self.check_transport_contents(b'a first file\n', t, 'b')
900
 
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
 
1001
        self.check_transport_contents('a first file\n', t, 'b')
 
1002
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
901
1003
 
902
1004
        # Overwrite a file
903
 
        t.put_bytes('c', b'c this file\n')
 
1005
        t.put_bytes('c', 'c this file\n')
904
1006
        t.move('c', 'b')
905
 
        self.assertFalse(t.has('c'))
906
 
        self.check_transport_contents(b'c this file\n', t, 'b')
 
1007
        self.failIf(t.has('c'))
 
1008
        self.check_transport_contents('c this file\n', t, 'b')
907
1009
 
908
1010
        # TODO: Try to write a test for atomicity
909
1011
        # TODO: Test moving into a non-existent subdirectory
 
1012
        # TODO: Test Transport.move_multi
910
1013
 
911
1014
    def test_copy(self):
912
1015
        t = self.get_transport()
914
1017
        if t.is_readonly():
915
1018
            return
916
1019
 
917
 
        t.put_bytes('a', b'a file\n')
 
1020
        t.put_bytes('a', 'a file\n')
918
1021
        t.copy('a', 'b')
919
 
        self.check_transport_contents(b'a file\n', t, 'b')
 
1022
        self.check_transport_contents('a file\n', t, 'b')
920
1023
 
921
1024
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
922
1025
        os.mkdir('c')
923
1026
        # What should the assert be if you try to copy a
924
1027
        # file over a directory?
925
1028
        #self.assertRaises(Something, t.copy, 'a', 'c')
926
 
        t.put_bytes('d', b'text in d\n')
 
1029
        t.put_bytes('d', 'text in d\n')
927
1030
        t.copy('d', 'b')
928
 
        self.check_transport_contents(b'text in d\n', t, 'b')
 
1031
        self.check_transport_contents('text in d\n', t, 'b')
 
1032
 
 
1033
        # TODO: test copy_multi
929
1034
 
930
1035
    def test_connection_error(self):
931
1036
        """ConnectionError is raised when connection is impossible.
937
1042
        except NotImplementedError:
938
1043
            raise TestSkipped("Transport %s has no bogus URL support." %
939
1044
                              self._server.__class__)
940
 
        t = _mod_transport.get_transport_from_url(url)
 
1045
        t = get_transport(url)
941
1046
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
942
1047
 
943
1048
    def test_stat(self):
948
1053
 
949
1054
        try:
950
1055
            st = t.stat('.')
951
 
        except TransportNotPossible as e:
 
1056
        except TransportNotPossible, e:
952
1057
            # This transport cannot stat
953
1058
            return
954
1059
 
959
1064
        for path, size in zip(paths, sizes):
960
1065
            st = t.stat(path)
961
1066
            if path.endswith('/'):
962
 
                self.assertTrue(S_ISDIR(st.st_mode))
 
1067
                self.failUnless(S_ISDIR(st.st_mode))
963
1068
                # directory sizes are meaningless
964
1069
            else:
965
 
                self.assertTrue(S_ISREG(st.st_mode))
 
1070
                self.failUnless(S_ISREG(st.st_mode))
966
1071
                self.assertEqual(size, st.st_size)
967
1072
 
 
1073
        remote_stats = list(t.stat_multi(paths))
 
1074
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1075
 
968
1076
        self.assertRaises(NoSuchFile, t.stat, 'q')
969
1077
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
970
1078
 
 
1079
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1080
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
971
1081
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
972
1082
        subdir = t.clone('subdir')
973
 
        st = subdir.stat('./file')
974
 
        st = subdir.stat('.')
 
1083
        subdir.stat('./file')
 
1084
        subdir.stat('.')
975
1085
 
976
1086
    def test_hardlink(self):
977
1087
        from stat import ST_NLINK
986
1096
        try:
987
1097
            t.hardlink(source_name, link_name)
988
1098
 
989
 
            self.assertTrue(t.has(source_name))
990
 
            self.assertTrue(t.has(link_name))
 
1099
            self.failUnless(t.has(source_name))
 
1100
            self.failUnless(t.has(link_name))
991
1101
 
992
1102
            st = t.stat(link_name)
993
 
            self.assertEqual(st[ST_NLINK], 2)
 
1103
            self.failUnlessEqual(st[ST_NLINK], 2)
994
1104
        except TransportNotPossible:
995
1105
            raise TestSkipped("Transport %s does not support hardlinks." %
996
1106
                              self._server.__class__)
1008
1118
        try:
1009
1119
            t.symlink(source_name, link_name)
1010
1120
 
1011
 
            self.assertTrue(t.has(source_name))
1012
 
            self.assertTrue(t.has(link_name))
 
1121
            self.failUnless(t.has(source_name))
 
1122
            self.failUnless(t.has(link_name))
1013
1123
 
1014
1124
            st = t.stat(link_name)
1015
 
            self.assertTrue(S_ISLNK(st.st_mode),
1016
 
                            "expected symlink, got mode %o" % st.st_mode)
1017
 
        except TransportNotPossible:
1018
 
            raise TestSkipped("Transport %s does not support symlinks." %
1019
 
                              self._server.__class__)
1020
 
 
1021
 
        self.assertEqual(source_name, t.readlink(link_name))
1022
 
 
1023
 
    def test_readlink_nonexistent(self):
1024
 
        t = self.get_transport()
1025
 
        try:
1026
 
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
1027
 
        except TransportNotPossible:
1028
 
            raise TestSkipped("Transport %s does not support symlinks." %
1029
 
                              self._server.__class__)
 
1125
            self.failUnless(S_ISLNK(st.st_mode))
 
1126
        except TransportNotPossible:
 
1127
            raise TestSkipped("Transport %s does not support symlinks." %
 
1128
                              self._server.__class__)
 
1129
        except IOError:
 
1130
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1030
1131
 
1031
1132
    def test_list_dir(self):
1032
1133
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1037
1138
            return
1038
1139
 
1039
1140
        def sorted_list(d, transport):
1040
 
            l = sorted(transport.list_dir(d))
 
1141
            l = list(transport.list_dir(d))
 
1142
            l.sort()
1041
1143
            return l
1042
1144
 
1043
1145
        self.assertEqual([], sorted_list('.', t))
1095
1197
            raise TestSkipped("not a connected transport")
1096
1198
 
1097
1199
        t2 = t1.clone('subdir')
1098
 
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1099
 
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1100
 
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1101
 
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1102
 
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1200
        self.assertEquals(t1._scheme, t2._scheme)
 
1201
        self.assertEquals(t1._user, t2._user)
 
1202
        self.assertEquals(t1._password, t2._password)
 
1203
        self.assertEquals(t1._host, t2._host)
 
1204
        self.assertEquals(t1._port, t2._port)
1103
1205
 
1104
1206
    def test__reuse_for(self):
1105
1207
        t = self.get_transport()
1112
1214
 
1113
1215
            Only the parameters different from None will be changed.
1114
1216
            """
1115
 
            if scheme is None:
1116
 
                scheme = t._parsed_url.scheme
1117
 
            if user is None:
1118
 
                user = t._parsed_url.user
1119
 
            if password is None:
1120
 
                password = t._parsed_url.password
1121
 
            if user is None:
1122
 
                user = t._parsed_url.user
1123
 
            if host is None:
1124
 
                host = t._parsed_url.host
1125
 
            if port is None:
1126
 
                port = t._parsed_url.port
1127
 
            if path is None:
1128
 
                path = t._parsed_url.path
1129
 
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1217
            if scheme   is None: scheme   = t._scheme
 
1218
            if user     is None: user     = t._user
 
1219
            if password is None: password = t._password
 
1220
            if user     is None: user     = t._user
 
1221
            if host     is None: host     = t._host
 
1222
            if port     is None: port     = t._port
 
1223
            if path     is None: path     = t._path
 
1224
            return t._unsplit_url(scheme, user, password, host, port, path)
1130
1225
 
1131
 
        if t._parsed_url.scheme == 'ftp':
 
1226
        if t._scheme == 'ftp':
1132
1227
            scheme = 'sftp'
1133
1228
        else:
1134
1229
            scheme = 'ftp'
1135
1230
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1136
 
        if t._parsed_url.user == 'me':
 
1231
        if t._user == 'me':
1137
1232
            user = 'you'
1138
1233
        else:
1139
1234
            user = 'me'
1150
1245
        #   (they may be typed by the user when prompted for example)
1151
1246
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1152
1247
        # We will not connect, we can use a invalid host
1153
 
        self.assertIsNot(t, t._reuse_for(
1154
 
            new_url(host=t._parsed_url.host + 'bar')))
1155
 
        if t._parsed_url.port == 1234:
 
1248
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1249
        if t._port == 1234:
1156
1250
            port = 4321
1157
1251
        else:
1158
1252
            port = 1234
1167
1261
 
1168
1262
        c = t.clone('subdir')
1169
1263
        # Some transports will create the connection  only when needed
1170
 
        t.has('surely_not')  # Force connection
 
1264
        t.has('surely_not') # Force connection
1171
1265
        self.assertIs(t._get_connection(), c._get_connection())
1172
1266
 
1173
1267
        # Temporary failure, we need to create a new dummy connection
1174
 
        new_connection = None
 
1268
        new_connection = object()
1175
1269
        t._set_connection(new_connection)
1176
1270
        # Check that both transports use the same connection
1177
1271
        self.assertIs(new_connection, t._get_connection())
1182
1276
        if not isinstance(t, ConnectedTransport):
1183
1277
            raise TestSkipped("not a connected transport")
1184
1278
 
1185
 
        t.has('surely_not')  # Force connection
 
1279
        t.has('surely_not') # Force connection
1186
1280
        self.assertIsNot(None, t._get_connection())
1187
1281
 
1188
1282
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1199
1293
 
1200
1294
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1201
1295
 
1202
 
        self.assertTrue(t1.has('a'))
1203
 
        self.assertTrue(t1.has('b/c'))
1204
 
        self.assertFalse(t1.has('c'))
 
1296
        self.failUnless(t1.has('a'))
 
1297
        self.failUnless(t1.has('b/c'))
 
1298
        self.failIf(t1.has('c'))
1205
1299
 
1206
1300
        t2 = t1.clone('b')
1207
1301
        self.assertEqual(t1.base + 'b/', t2.base)
1208
1302
 
1209
 
        self.assertTrue(t2.has('c'))
1210
 
        self.assertFalse(t2.has('a'))
 
1303
        self.failUnless(t2.has('c'))
 
1304
        self.failIf(t2.has('a'))
1211
1305
 
1212
1306
        t3 = t2.clone('..')
1213
 
        self.assertTrue(t3.has('a'))
1214
 
        self.assertFalse(t3.has('c'))
 
1307
        self.failUnless(t3.has('a'))
 
1308
        self.failIf(t3.has('c'))
1215
1309
 
1216
 
        self.assertFalse(t1.has('b/d'))
1217
 
        self.assertFalse(t2.has('d'))
1218
 
        self.assertFalse(t3.has('b/d'))
 
1310
        self.failIf(t1.has('b/d'))
 
1311
        self.failIf(t2.has('d'))
 
1312
        self.failIf(t3.has('b/d'))
1219
1313
 
1220
1314
        if t1.is_readonly():
1221
 
            self.build_tree_contents([('b/d', b'newfile\n')])
 
1315
            self.build_tree_contents([('b/d', 'newfile\n')])
1222
1316
        else:
1223
 
            t2.put_bytes('d', b'newfile\n')
 
1317
            t2.put_bytes('d', 'newfile\n')
1224
1318
 
1225
 
        self.assertTrue(t1.has('b/d'))
1226
 
        self.assertTrue(t2.has('d'))
1227
 
        self.assertTrue(t3.has('b/d'))
 
1319
        self.failUnless(t1.has('b/d'))
 
1320
        self.failUnless(t2.has('d'))
 
1321
        self.failUnless(t3.has('b/d'))
1228
1322
 
1229
1323
    def test_clone_to_root(self):
1230
1324
        orig_transport = self.get_transport()
1234
1328
        new_transport = root_transport.clone("..")
1235
1329
        # as we are walking up directories, the path must be
1236
1330
        # growing less, except at the top
1237
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1238
 
                        new_transport.base == root_transport.base)
 
1331
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1332
            or new_transport.base == root_transport.base)
1239
1333
        while new_transport.base != root_transport.base:
1240
1334
            root_transport = new_transport
1241
1335
            new_transport = root_transport.clone("..")
1242
1336
            # as we are walking up directories, the path must be
1243
1337
            # growing less, except at the top
1244
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1245
 
                            new_transport.base == root_transport.base)
 
1338
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1339
                or new_transport.base == root_transport.base)
1246
1340
 
1247
1341
        # Cloning to "/" should take us to exactly the same location.
1248
1342
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1258
1352
        orig_transport = self.get_transport()
1259
1353
        root_transport = orig_transport.clone('/')
1260
1354
        self.assertEqual(root_transport.base + '.bzr/',
1261
 
                         root_transport.clone('.bzr').base)
 
1355
            root_transport.clone('.bzr').base)
1262
1356
 
1263
1357
    def test_base_url(self):
1264
1358
        t = self.get_transport()
1304
1398
        self.assertEqual(transport.clone("/").abspath('foo'),
1305
1399
                         transport.abspath("/foo"))
1306
1400
 
1307
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1308
1401
    def test_win32_abspath(self):
1309
1402
        # Note: we tried to set sys.platform='win32' so we could test on
1310
1403
        # other platforms too, but then osutils does platform specific
1315
1408
 
1316
1409
        # smoke test for abspath on win32.
1317
1410
        # a transport based on 'file:///' never fully qualifies the drive.
1318
 
        transport = _mod_transport.get_transport_from_url("file:///")
1319
 
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
        transport = get_transport("file:///")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1320
1413
 
1321
1414
        # but a transport that starts with a drive spec must keep it.
1322
 
        transport = _mod_transport.get_transport_from_url("file:///C:/")
1323
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
        transport = get_transport("file:///C:/")
 
1416
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1324
1417
 
1325
1418
    def test_local_abspath(self):
1326
1419
        transport = self.get_transport()
1327
1420
        try:
1328
1421
            p = transport.local_abspath('.')
1329
 
        except (errors.NotLocalUrl, TransportNotPossible) as e:
 
1422
        except (errors.NotLocalUrl, TransportNotPossible), e:
1330
1423
            # should be formattable
1331
1424
            s = str(e)
1332
1425
        else:
1359
1452
                         'isolated/dir/',
1360
1453
                         'isolated/dir/foo',
1361
1454
                         'isolated/dir/bar',
1362
 
                         'isolated/dir/b%25z',  # make sure quoting is correct
 
1455
                         'isolated/dir/b%25z', # make sure quoting is correct
1363
1456
                         'isolated/bar'],
1364
1457
                        transport=transport)
1365
1458
        paths = set(transport.iter_files_recursive())
1366
1459
        # nb the directories are not converted
1367
1460
        self.assertEqual(paths,
1368
 
                         {'isolated/dir/foo',
1369
 
                          'isolated/dir/bar',
1370
 
                          'isolated/dir/b%2525z',
1371
 
                          'isolated/bar'})
 
1461
                    set(['isolated/dir/foo',
 
1462
                         'isolated/dir/bar',
 
1463
                         'isolated/dir/b%2525z',
 
1464
                         'isolated/bar']))
1372
1465
        sub_transport = transport.clone('isolated')
1373
1466
        paths = set(sub_transport.iter_files_recursive())
1374
1467
        self.assertEqual(paths,
1375
 
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1468
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1376
1469
 
1377
1470
    def test_copy_tree(self):
1378
1471
        # TODO: test file contents and permissions are preserved. This test was
1389
1482
                         'from/dir/',
1390
1483
                         'from/dir/foo',
1391
1484
                         'from/dir/bar',
1392
 
                         'from/dir/b%25z',  # make sure quoting is correct
 
1485
                         'from/dir/b%25z', # make sure quoting is correct
1393
1486
                         'from/bar'],
1394
1487
                        transport=transport)
1395
1488
        transport.copy_tree('from', 'to')
1396
1489
        paths = set(transport.iter_files_recursive())
1397
1490
        self.assertEqual(paths,
1398
 
                         {'from/dir/foo',
1399
 
                          'from/dir/bar',
1400
 
                          'from/dir/b%2525z',
1401
 
                          'from/bar',
1402
 
                          'to/dir/foo',
1403
 
                          'to/dir/bar',
1404
 
                          'to/dir/b%2525z',
1405
 
                          'to/bar', })
 
1491
                    set(['from/dir/foo',
 
1492
                         'from/dir/bar',
 
1493
                         'from/dir/b%2525z',
 
1494
                         'from/bar',
 
1495
                         'to/dir/foo',
 
1496
                         'to/dir/bar',
 
1497
                         'to/dir/b%2525z',
 
1498
                         'to/bar',]))
1406
1499
 
1407
1500
    def test_copy_tree_to_transport(self):
1408
1501
        transport = self.get_transport()
1416
1509
                         'from/dir/',
1417
1510
                         'from/dir/foo',
1418
1511
                         'from/dir/bar',
1419
 
                         'from/dir/b%25z',  # make sure quoting is correct
 
1512
                         'from/dir/b%25z', # make sure quoting is correct
1420
1513
                         'from/bar'],
1421
1514
                        transport=transport)
1422
1515
        from_transport = transport.clone('from')
1425
1518
        from_transport.copy_tree_to_transport(to_transport)
1426
1519
        paths = set(transport.iter_files_recursive())
1427
1520
        self.assertEqual(paths,
1428
 
                         {'from/dir/foo',
1429
 
                          'from/dir/bar',
1430
 
                          'from/dir/b%2525z',
1431
 
                          'from/bar',
1432
 
                          'to/dir/foo',
1433
 
                          'to/dir/bar',
1434
 
                          'to/dir/b%2525z',
1435
 
                          'to/bar', })
 
1521
                    set(['from/dir/foo',
 
1522
                         'from/dir/bar',
 
1523
                         'from/dir/b%2525z',
 
1524
                         'from/bar',
 
1525
                         'to/dir/foo',
 
1526
                         'to/dir/bar',
 
1527
                         'to/dir/b%2525z',
 
1528
                         'to/bar',]))
1436
1529
 
1437
1530
    def test_unicode_paths(self):
1438
1531
        """Test that we can read/write files with Unicode names."""
1442
1535
        # '\xe5' and '\xe4' actually map to the same file
1443
1536
        # adding a suffix kicks in the 'preserving but insensitive'
1444
1537
        # route, and maintains the right files
1445
 
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
1446
 
                 u'\xe4.2',  # a w/ dots iso-8859-1
1447
 
                 u'\u017d',  # Z with umlat iso-8859-2
1448
 
                 u'\u062c',  # Arabic j
1449
 
                 u'\u0410',  # Russian A
1450
 
                 u'\u65e5',  # Kanji person
1451
 
                 ]
 
1538
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1539
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1540
                 u'\u017d', # Z with umlat iso-8859-2
 
1541
                 u'\u062c', # Arabic j
 
1542
                 u'\u0410', # Russian A
 
1543
                 u'\u65e5', # Kanji person
 
1544
                ]
1452
1545
 
1453
1546
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1454
1547
        if no_unicode_support:
1455
 
            self.knownFailure("test server cannot handle unicode paths")
 
1548
            raise tests.KnownFailure("test server cannot handle unicode paths")
1456
1549
 
1457
1550
        try:
1458
1551
            self.build_tree(files, transport=t, line_endings='binary')
1459
1552
        except UnicodeError:
1460
 
            raise TestSkipped(
1461
 
                "cannot handle unicode paths in current encoding")
 
1553
            raise TestSkipped("cannot handle unicode paths in current encoding")
1462
1554
 
1463
1555
        # A plain unicode string is not a valid url
1464
1556
        for fname in files:
1465
 
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
 
1557
            self.assertRaises(InvalidURL, t.get, fname)
1466
1558
 
1467
1559
        for fname in files:
1468
1560
            fname_utf8 = fname.encode('utf-8')
1469
 
            contents = b'contents of %s\n' % (fname_utf8,)
 
1561
            contents = 'contents of %s\n' % (fname_utf8,)
1470
1562
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1471
1563
 
1472
1564
    def test_connect_twice_is_same_content(self):
1475
1567
        transport = self.get_transport()
1476
1568
        if transport.is_readonly():
1477
1569
            return
1478
 
        transport.put_bytes('foo', b'bar')
 
1570
        transport.put_bytes('foo', 'bar')
1479
1571
        transport3 = self.get_transport()
1480
 
        self.check_transport_contents(b'bar', transport3, 'foo')
 
1572
        self.check_transport_contents('bar', transport3, 'foo')
1481
1573
 
1482
1574
        # now opening at a relative url should give use a sane result:
1483
1575
        transport.mkdir('newdir')
1484
1576
        transport5 = self.get_transport('newdir')
1485
1577
        transport6 = transport5.clone('..')
1486
 
        self.check_transport_contents(b'bar', transport6, 'foo')
 
1578
        self.check_transport_contents('bar', transport6, 'foo')
1487
1579
 
1488
1580
    def test_lock_write(self):
1489
1581
        """Test transport-level write locks.
1492
1584
        """
1493
1585
        transport = self.get_transport()
1494
1586
        if transport.is_readonly():
1495
 
            self.assertRaises(TransportNotPossible,
1496
 
                              transport.lock_write, 'foo')
 
1587
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1497
1588
            return
1498
 
        transport.put_bytes('lock', b'')
 
1589
        transport.put_bytes('lock', '')
1499
1590
        try:
1500
1591
            lock = transport.lock_write('lock')
1501
1592
        except TransportNotPossible:
1511
1602
        """
1512
1603
        transport = self.get_transport()
1513
1604
        if transport.is_readonly():
1514
 
            open('lock', 'w').close()
 
1605
            file('lock', 'w').close()
1515
1606
        else:
1516
 
            transport.put_bytes('lock', b'')
 
1607
            transport.put_bytes('lock', '')
1517
1608
        try:
1518
1609
            lock = transport.lock_read('lock')
1519
1610
        except TransportNotPossible:
1525
1616
    def test_readv(self):
1526
1617
        transport = self.get_transport()
1527
1618
        if transport.is_readonly():
1528
 
            with open('a', 'w') as f:
1529
 
                f.write('0123456789')
 
1619
            file('a', 'w').write('0123456789')
1530
1620
        else:
1531
 
            transport.put_bytes('a', b'0123456789')
 
1621
            transport.put_bytes('a', '0123456789')
1532
1622
 
1533
1623
        d = list(transport.readv('a', ((0, 1),)))
1534
 
        self.assertEqual(d[0], (0, b'0'))
 
1624
        self.assertEqual(d[0], (0, '0'))
1535
1625
 
1536
1626
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1537
 
        self.assertEqual(d[0], (0, b'0'))
1538
 
        self.assertEqual(d[1], (1, b'1'))
1539
 
        self.assertEqual(d[2], (3, b'34'))
1540
 
        self.assertEqual(d[3], (9, b'9'))
 
1627
        self.assertEqual(d[0], (0, '0'))
 
1628
        self.assertEqual(d[1], (1, '1'))
 
1629
        self.assertEqual(d[2], (3, '34'))
 
1630
        self.assertEqual(d[3], (9, '9'))
1541
1631
 
1542
1632
    def test_readv_out_of_order(self):
1543
1633
        transport = self.get_transport()
1544
1634
        if transport.is_readonly():
1545
 
            with open('a', 'w') as f:
1546
 
                f.write('0123456789')
 
1635
            file('a', 'w').write('0123456789')
1547
1636
        else:
1548
 
            transport.put_bytes('a', b'01234567890')
 
1637
            transport.put_bytes('a', '01234567890')
1549
1638
 
1550
1639
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1551
 
        self.assertEqual(d[0], (1, b'1'))
1552
 
        self.assertEqual(d[1], (9, b'9'))
1553
 
        self.assertEqual(d[2], (0, b'0'))
1554
 
        self.assertEqual(d[3], (3, b'34'))
 
1640
        self.assertEqual(d[0], (1, '1'))
 
1641
        self.assertEqual(d[1], (9, '9'))
 
1642
        self.assertEqual(d[2], (0, '0'))
 
1643
        self.assertEqual(d[3], (3, '34'))
1555
1644
 
1556
1645
    def test_readv_with_adjust_for_latency(self):
1557
1646
        transport = self.get_transport()
1562
1651
        # reference the returned data with the random data. To avoid doing
1563
1652
        # multiple large random byte look ups we do several tests on the same
1564
1653
        # backing data.
1565
 
        content = osutils.rand_bytes(200 * 1024)
 
1654
        content = osutils.rand_bytes(200*1024)
1566
1655
        content_size = len(content)
1567
1656
        if transport.is_readonly():
1568
1657
            self.build_tree_contents([('a', content)])
1569
1658
        else:
1570
1659
            transport.put_bytes('a', content)
1571
 
 
1572
1660
        def check_result_data(result_vector):
1573
1661
            for item in result_vector:
1574
1662
                data_len = len(item[1])
1576
1664
 
1577
1665
        # start corner case
1578
1666
        result = list(transport.readv('a', ((0, 30),),
1579
 
                                      adjust_for_latency=True, upper_limit=content_size))
 
1667
            adjust_for_latency=True, upper_limit=content_size))
1580
1668
        # we expect 1 result, from 0, to something > 30
1581
1669
        self.assertEqual(1, len(result))
1582
1670
        self.assertEqual(0, result[0][0])
1584
1672
        check_result_data(result)
1585
1673
        # end of file corner case
1586
1674
        result = list(transport.readv('a', ((204700, 100),),
1587
 
                                      adjust_for_latency=True, upper_limit=content_size))
 
1675
            adjust_for_latency=True, upper_limit=content_size))
1588
1676
        # we expect 1 result, from 204800- its length, to the end
1589
1677
        self.assertEqual(1, len(result))
1590
1678
        data_len = len(result[0][1])
1591
 
        self.assertEqual(204800 - data_len, result[0][0])
 
1679
        self.assertEqual(204800-data_len, result[0][0])
1592
1680
        self.assertTrue(data_len >= 100)
1593
1681
        check_result_data(result)
1594
1682
        # out of order ranges are made in order
1595
1683
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1596
 
                                      adjust_for_latency=True, upper_limit=content_size))
 
1684
            adjust_for_latency=True, upper_limit=content_size))
1597
1685
        # we expect 2 results, in order, start and end.
1598
1686
        self.assertEqual(2, len(result))
1599
1687
        # start
1602
1690
        self.assertTrue(data_len >= 30)
1603
1691
        # end
1604
1692
        data_len = len(result[1][1])
1605
 
        self.assertEqual(204800 - data_len, result[1][0])
 
1693
        self.assertEqual(204800-data_len, result[1][0])
1606
1694
        self.assertTrue(data_len >= 100)
1607
1695
        check_result_data(result)
1608
1696
        # close ranges get combined (even if out of order)
1609
 
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
 
1697
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1610
1698
            result = list(transport.readv('a', request_vector,
1611
 
                                          adjust_for_latency=True, upper_limit=content_size))
 
1699
                adjust_for_latency=True, upper_limit=content_size))
1612
1700
            self.assertEqual(1, len(result))
1613
1701
            data_len = len(result[0][1])
1614
1702
            # minimum length is from 400 to 1034 - 634
1622
1710
        transport = self.get_transport()
1623
1711
        # test from observed failure case.
1624
1712
        if transport.is_readonly():
1625
 
            with open('a', 'w') as f:
1626
 
                f.write('a' * 1024 * 1024)
 
1713
            file('a', 'w').write('a'*1024*1024)
1627
1714
        else:
1628
 
            transport.put_bytes('a', b'a' * 1024 * 1024)
 
1715
            transport.put_bytes('a', 'a'*1024*1024)
1629
1716
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1630
 
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1631
 
                         (465373, 800), (947422, 800)]
1632
 
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
1633
 
        found_items = [False] * 9
 
1717
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1718
            (465373, 800), (947422, 800)]
 
1719
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1720
        found_items = [False]*9
1634
1721
        for pos, (start, length) in enumerate(broken_vector):
1635
1722
            # check the range is covered by the result
1636
1723
            for offset, data in results:
1637
1724
                if offset <= start and start + length <= offset + len(data):
1638
1725
                    found_items[pos] = True
1639
 
        self.assertEqual([True] * 9, found_items)
 
1726
        self.assertEqual([True]*9, found_items)
1640
1727
 
1641
1728
    def test_get_with_open_write_stream_sees_all_content(self):
1642
1729
        t = self.get_transport()
1643
1730
        if t.is_readonly():
1644
1731
            return
1645
 
        with t.open_write_stream('foo') as handle:
1646
 
            handle.write(b'bcd')
1647
 
            self.assertEqual([(0, b'b'), (2, b'd')], list(
1648
 
                t.readv('foo', ((0, 1), (2, 1)))))
 
1732
        handle = t.open_write_stream('foo')
 
1733
        try:
 
1734
            handle.write('bcd')
 
1735
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1736
        finally:
 
1737
            handle.close()
1649
1738
 
1650
1739
    def test_get_smart_medium(self):
1651
1740
        """All transports must either give a smart medium, or know they can't.
1653
1742
        transport = self.get_transport()
1654
1743
        try:
1655
1744
            client_medium = transport.get_smart_medium()
 
1745
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1656
1746
        except errors.NoSmartMedium:
1657
1747
            # as long as we got it we're fine
1658
1748
            pass
1659
 
        else:
1660
 
            from ..bzr.smart import medium
1661
 
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
1662
1749
 
1663
1750
    def test_readv_short_read(self):
1664
1751
        transport = self.get_transport()
1665
1752
        if transport.is_readonly():
1666
 
            with open('a', 'w') as f:
1667
 
                f.write('0123456789')
 
1753
            file('a', 'w').write('0123456789')
1668
1754
        else:
1669
 
            transport.put_bytes('a', b'01234567890')
 
1755
            transport.put_bytes('a', '01234567890')
1670
1756
 
1671
1757
        # This is intentionally reading off the end of the file
1672
1758
        # since we are sure that it cannot get there
1673
1759
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1674
1760
                               # Can be raised by paramiko
1675
1761
                               AssertionError),
1676
 
                              transport.readv, 'a', [(1, 1), (8, 10)])
 
1762
                              transport.readv, 'a', [(1,1), (8,10)])
1677
1763
 
1678
1764
        # This is trying to seek past the end of the file, it should
1679
1765
        # also raise a special error
1680
1766
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1681
 
                              transport.readv, 'a', [(12, 2)])
1682
 
 
1683
 
    def test_no_segment_parameters(self):
1684
 
        """Segment parameters should be stripped and stored in
1685
 
        transport.segment_parameters."""
1686
 
        transport = self.get_transport("foo")
1687
 
        self.assertEqual({}, transport.get_segment_parameters())
1688
 
 
1689
 
    def test_segment_parameters(self):
1690
 
        """Segment parameters should be stripped and stored in
1691
 
        transport.get_segment_parameters()."""
1692
 
        base_url = self._server.get_url()
1693
 
        parameters = {"key1": "val1", "key2": "val2"}
1694
 
        url = urlutils.join_segment_parameters(base_url, parameters)
1695
 
        transport = _mod_transport.get_transport_from_url(url)
1696
 
        self.assertEqual(parameters, transport.get_segment_parameters())
1697
 
 
1698
 
    def test_set_segment_parameters(self):
1699
 
        """Segment parameters can be set and show up in base."""
1700
 
        transport = self.get_transport("foo")
1701
 
        orig_base = transport.base
1702
 
        transport.set_segment_parameter("arm", "board")
1703
 
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
1704
 
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1705
 
        transport.set_segment_parameter("arm", None)
1706
 
        transport.set_segment_parameter("nonexistant", None)
1707
 
        self.assertEqual({}, transport.get_segment_parameters())
1708
 
        self.assertEqual(orig_base, transport.base)
1709
 
 
1710
 
    def test_stat_symlink(self):
1711
 
        # if a transport points directly to a symlink (and supports symlinks
1712
 
        # at all) you can tell this.  helps with bug 32669.
1713
 
        t = self.get_transport()
1714
 
        try:
1715
 
            t.symlink('target', 'link')
1716
 
        except TransportNotPossible:
1717
 
            raise TestSkipped("symlinks not supported")
1718
 
        t2 = t.clone('link')
1719
 
        st = t2.stat('')
1720
 
        self.assertTrue(stat.S_ISLNK(st.st_mode))
1721
 
 
1722
 
    def test_abspath_url_unquote_unreserved(self):
1723
 
        """URLs from abspath should have unreserved characters unquoted
1724
 
 
1725
 
        Need consistent quoting notably for tildes, see lp:842223 for more.
1726
 
        """
1727
 
        t = self.get_transport()
1728
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1729
 
        self.assertEqual(t.base + "-.09AZ_az~",
1730
 
                         t.abspath(needlessly_escaped_dir))
1731
 
 
1732
 
    def test_clone_url_unquote_unreserved(self):
1733
 
        """Base URL of a cloned branch needs unreserved characters unquoted
1734
 
 
1735
 
        Cloned transports should be prefix comparable for things like the
1736
 
        isolation checking of tests, see lp:842223 for more.
1737
 
        """
1738
 
        t1 = self.get_transport()
1739
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1740
 
        self.build_tree([needlessly_escaped_dir], transport=t1)
1741
 
        t2 = t1.clone(needlessly_escaped_dir)
1742
 
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1743
 
 
1744
 
    def test_hook_post_connection_one(self):
1745
 
        """Fire post_connect hook after a ConnectedTransport is first used"""
1746
 
        log = []
1747
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1748
 
        t = self.get_transport()
1749
 
        self.assertEqual([], log)
1750
 
        t.has("non-existant")
1751
 
        if isinstance(t, RemoteTransport):
1752
 
            self.assertEqual([t.get_smart_medium()], log)
1753
 
        elif isinstance(t, ConnectedTransport):
1754
 
            self.assertEqual([t], log)
1755
 
        else:
1756
 
            self.assertEqual([], log)
1757
 
 
1758
 
    def test_hook_post_connection_multi(self):
1759
 
        """Fire post_connect hook once per unshared underlying connection"""
1760
 
        log = []
1761
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1762
 
        t1 = self.get_transport()
1763
 
        t2 = t1.clone(".")
1764
 
        t3 = self.get_transport()
1765
 
        self.assertEqual([], log)
1766
 
        t1.has("x")
1767
 
        t2.has("x")
1768
 
        t3.has("x")
1769
 
        if isinstance(t1, RemoteTransport):
1770
 
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1771
 
        elif isinstance(t1, ConnectedTransport):
1772
 
            self.assertEqual([t1, t3], log)
1773
 
        else:
1774
 
            self.assertEqual([], log)
 
1767
                              transport.readv, 'a', [(12,2)])