/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Robert Collins
  • Date: 2010-05-06 23:41:35 UTC
  • mto: This revision was merged to the branch mainline in revision 5223.
  • Revision ID: robertc@robertcollins.net-20100506234135-yivbzczw1sejxnxc
Lock methods on ``Tree``, ``Branch`` and ``Repository`` are now
expected to return an object which can be used to unlock them. This reduces
duplicate code when using cleanups. The previous 'tokens's returned by
``Branch.lock_write`` and ``Repository.lock_write`` are now attributes
on the result of the lock_write. ``repository.RepositoryWriteLockResult``
and ``branch.BranchWriteLockResult`` document this. (Robert Collins)

``log._get_info_for_log_files`` now takes an add_cleanup callable.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
24
27
import stat
25
28
import sys
 
29
import unittest
26
30
 
27
 
from .. import (
 
31
from bzrlib import (
28
32
    errors,
29
33
    osutils,
30
 
    pyutils,
31
34
    tests,
32
 
    transport as _mod_transport,
33
35
    urlutils,
34
36
    )
35
 
from ..errors import (ConnectionError,
36
 
                      FileExists,
37
 
                      NoSuchFile,
38
 
                      PathError,
39
 
                      TransportNotPossible,
40
 
                      )
41
 
from ..osutils import getcwd
42
 
from ..sixish import (
43
 
    BytesIO,
44
 
    zip,
45
 
    )
46
 
from ..bzr.smart import medium
47
 
from . import (
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
 
44
                           PathError,
 
45
                           TransportNotPossible,
 
46
                           )
 
47
from bzrlib.osutils import getcwd
 
48
from bzrlib.smart import medium
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
48
51
    TestSkipped,
49
52
    TestNotApplicable,
50
53
    multiply_tests,
51
54
    )
52
 
from . import test_server
53
 
from .test_transport import TestTransportImplementation
54
 
from ..transport import (
 
55
from bzrlib.tests import test_server
 
56
from bzrlib.tests.test_transport import TestTransportImplementation
 
57
from bzrlib.transport import (
55
58
    ConnectedTransport,
56
 
    Transport,
 
59
    get_transport,
57
60
    _get_transport_modules,
58
61
    )
59
 
from ..transport.memory import MemoryTransport
60
 
from ..transport.remote import RemoteTransport
 
62
from bzrlib.transport.memory import MemoryTransport
61
63
 
62
64
 
63
65
def get_transport_test_permutations(module):
76
78
    for module in _get_transport_modules():
77
79
        try:
78
80
            permutations = get_transport_test_permutations(
79
 
                pyutils.get_named_object(module))
 
81
                reduce(getattr, (module).split('.')[1:], __import__(module)))
80
82
            for (klass, server_factory) in permutations:
81
83
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
82
 
                            {"transport_class": klass,
83
 
                             "transport_server": server_factory})
 
84
                    {"transport_class":klass,
 
85
                     "transport_server":server_factory})
84
86
                result.append(scenario)
85
 
        except errors.DependencyNotPresent as e:
 
87
        except errors.DependencyNotPresent, e:
86
88
            # Continue even if a dependency prevents us
87
89
            # from adding this test
88
90
            pass
89
91
    return result
90
92
 
91
93
 
92
 
def load_tests(loader, standard_tests, pattern):
 
94
def load_tests(standard_tests, module, loader):
93
95
    """Multiply tests for tranport implementations."""
94
96
    result = loader.suiteClass()
95
97
    scenarios = transport_test_permutations()
100
102
 
101
103
    def setUp(self):
102
104
        super(TransportTests, self).setUp()
103
 
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
 
105
        self._captureVar('BZR_NO_SMART_VFS', None)
104
106
 
105
107
    def check_transport_contents(self, content, transport, relpath):
106
 
        """Check that transport.get_bytes(relpath) == content."""
107
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
        """Check that transport.get(relpath).read() == content."""
 
109
        self.assertEqualDiff(content, transport.get(relpath).read())
108
110
 
109
111
    def test_ensure_base_missing(self):
110
112
        """.ensure_base() should create the directory if it doesn't exist"""
154
156
        self.assertEqual(True, t.has('a'))
155
157
        self.assertEqual(False, t.has('c'))
156
158
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
159
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
160
                                           'e', 'f', 'g', 'h'])),
 
161
                         [True, True, False, False,
 
162
                          True, False, True, False])
157
163
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
158
164
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
159
165
                                           urlutils.escape('%%')]))
 
166
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
167
                                                'e', 'f', 'g', 'h']))),
 
168
                         [True, True, False, False,
 
169
                          True, False, True, False])
160
170
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
161
171
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
162
172
 
173
183
    def test_get(self):
174
184
        t = self.get_transport()
175
185
 
176
 
        files = ['a']
177
 
        content = b'contents of a\n'
178
 
        self.build_tree(['a'], transport=t, line_endings='binary')
179
 
        self.check_transport_contents(b'contents of a\n', t, 'a')
180
 
        f = t.get('a')
181
 
        self.assertEqual(content, f.read())
 
186
        files = ['a', 'b', 'e', 'g']
 
187
        contents = ['contents of a\n',
 
188
                    'contents of b\n',
 
189
                    'contents of e\n',
 
190
                    'contents of g\n',
 
191
                    ]
 
192
        self.build_tree(files, transport=t, line_endings='binary')
 
193
        self.check_transport_contents('contents of a\n', t, 'a')
 
194
        content_f = t.get_multi(files)
 
195
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
196
        # evaluate their inputs, the transport requests should be issued and
 
197
        # handled sequentially (we don't want to force transport to buffer).
 
198
        for content, f in itertools.izip(contents, content_f):
 
199
            self.assertEqual(content, f.read())
 
200
 
 
201
        content_f = t.get_multi(iter(files))
 
202
        # Use itertools.izip() for the same reason
 
203
        for content, f in itertools.izip(contents, content_f):
 
204
            self.assertEqual(content, f.read())
182
205
 
183
206
    def test_get_unknown_file(self):
184
207
        t = self.get_transport()
185
208
        files = ['a', 'b']
186
 
        contents = [b'contents of a\n',
187
 
                    b'contents of b\n',
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
188
211
                    ]
189
212
        self.build_tree(files, transport=t, line_endings='binary')
190
213
        self.assertRaises(NoSuchFile, t.get, 'c')
191
 
 
192
 
        def iterate_and_close(func, *args):
193
 
            for f in func(*args):
194
 
                # We call f.read() here because things like paramiko actually
195
 
                # spawn a thread to prefetch the content, which we want to
196
 
                # consume before we close the handle.
197
 
                content = f.read()
198
 
                f.close()
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
199
216
 
200
217
    def test_get_directory_read_gives_ReadError(self):
201
218
        """consistent errors for read() on a file returned by get()."""
221
238
        t = self.get_transport()
222
239
 
223
240
        files = ['a', 'b', 'e', 'g']
224
 
        contents = [b'contents of a\n',
225
 
                    b'contents of b\n',
226
 
                    b'contents of e\n',
227
 
                    b'contents of g\n',
 
241
        contents = ['contents of a\n',
 
242
                    'contents of b\n',
 
243
                    'contents of e\n',
 
244
                    'contents of g\n',
228
245
                    ]
229
246
        self.build_tree(files, transport=t, line_endings='binary')
230
 
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
247
        self.check_transport_contents('contents of a\n', t, 'a')
231
248
 
232
249
        for content, fname in zip(contents, files):
233
250
            self.assertEqual(content, t.get_bytes(fname))
234
251
 
235
252
    def test_get_bytes_unknown_file(self):
236
253
        t = self.get_transport()
 
254
 
237
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
238
256
 
239
257
    def test_get_with_open_write_stream_sees_all_content(self):
240
258
        t = self.get_transport()
241
259
        if t.is_readonly():
242
260
            return
243
 
        with t.open_write_stream('foo') as handle:
244
 
            handle.write(b'b')
245
 
            self.assertEqual(b'b', t.get_bytes('foo'))
 
261
        handle = t.open_write_stream('foo')
 
262
        try:
 
263
            handle.write('b')
 
264
            self.assertEqual('b', t.get('foo').read())
 
265
        finally:
 
266
            handle.close()
246
267
 
247
268
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
248
269
        t = self.get_transport()
249
270
        if t.is_readonly():
250
271
            return
251
 
        with t.open_write_stream('foo') as handle:
252
 
            handle.write(b'b')
253
 
            self.assertEqual(b'b', t.get_bytes('foo'))
254
 
            with t.get('foo') as f:
255
 
                self.assertEqual(b'b', f.read())
 
272
        handle = t.open_write_stream('foo')
 
273
        try:
 
274
            handle.write('b')
 
275
            self.assertEqual('b', t.get_bytes('foo'))
 
276
            self.assertEqual('b', t.get('foo').read())
 
277
        finally:
 
278
            handle.close()
256
279
 
257
280
    def test_put_bytes(self):
258
281
        t = self.get_transport()
259
282
 
260
283
        if t.is_readonly():
261
284
            self.assertRaises(TransportNotPossible,
262
 
                              t.put_bytes, 'a', b'some text for a\n')
 
285
                    t.put_bytes, 'a', 'some text for a\n')
263
286
            return
264
287
 
265
 
        t.put_bytes('a', b'some text for a\n')
266
 
        self.assertTrue(t.has('a'))
267
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
288
        t.put_bytes('a', 'some text for a\n')
 
289
        self.failUnless(t.has('a'))
 
290
        self.check_transport_contents('some text for a\n', t, 'a')
268
291
 
269
292
        # The contents should be overwritten
270
 
        t.put_bytes('a', b'new text for a\n')
271
 
        self.check_transport_contents(b'new text for a\n', t, 'a')
 
293
        t.put_bytes('a', 'new text for a\n')
 
294
        self.check_transport_contents('new text for a\n', t, 'a')
272
295
 
273
296
        self.assertRaises(NoSuchFile,
274
 
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
 
297
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
275
298
 
276
299
    def test_put_bytes_non_atomic(self):
277
300
        t = self.get_transport()
278
301
 
279
302
        if t.is_readonly():
280
303
            self.assertRaises(TransportNotPossible,
281
 
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
 
304
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
282
305
            return
283
306
 
284
 
        self.assertFalse(t.has('a'))
285
 
        t.put_bytes_non_atomic('a', b'some text for a\n')
286
 
        self.assertTrue(t.has('a'))
287
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
307
        self.failIf(t.has('a'))
 
308
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
309
        self.failUnless(t.has('a'))
 
310
        self.check_transport_contents('some text for a\n', t, 'a')
288
311
        # Put also replaces contents
289
 
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
290
 
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
312
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
313
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
291
314
 
292
315
        # Make sure we can create another file
293
 
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
 
316
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
294
317
        # And overwrite 'a' with empty contents
295
 
        t.put_bytes_non_atomic('a', b'')
296
 
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
297
 
        self.check_transport_contents(b'', t, 'a')
 
318
        t.put_bytes_non_atomic('a', '')
 
319
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
320
        self.check_transport_contents('', t, 'a')
298
321
 
299
322
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
300
 
                          b'contents\n')
 
323
                                       'contents\n')
301
324
        # Now test the create_parent flag
302
325
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
303
 
                          b'contents\n')
304
 
        self.assertFalse(t.has('dir/a'))
305
 
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
 
326
                                       'contents\n')
 
327
        self.failIf(t.has('dir/a'))
 
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
306
329
                               create_parent_dir=True)
307
 
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
308
331
 
309
332
        # But we still get NoSuchFile if we can't make the parent dir
310
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
311
 
                          b'contents\n',
312
 
                          create_parent_dir=True)
 
334
                                       'contents\n',
 
335
                                       create_parent_dir=True)
313
336
 
314
337
    def test_put_bytes_permissions(self):
315
338
        t = self.get_transport()
319
342
        if not t._can_roundtrip_unix_modebits():
320
343
            # Can't roundtrip, so no need to run this test
321
344
            return
322
 
        t.put_bytes('mode644', b'test text\n', mode=0o644)
323
 
        self.assertTransportMode(t, 'mode644', 0o644)
324
 
        t.put_bytes('mode666', b'test text\n', mode=0o666)
325
 
        self.assertTransportMode(t, 'mode666', 0o666)
326
 
        t.put_bytes('mode600', b'test text\n', mode=0o600)
327
 
        self.assertTransportMode(t, 'mode600', 0o600)
 
345
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
346
        self.assertTransportMode(t, 'mode644', 0644)
 
347
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
348
        self.assertTransportMode(t, 'mode666', 0666)
 
349
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
350
        self.assertTransportMode(t, 'mode600', 0600)
328
351
        # Yes, you can put_bytes a file such that it becomes readonly
329
 
        t.put_bytes('mode400', b'test text\n', mode=0o400)
330
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
352
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
353
        self.assertTransportMode(t, 'mode400', 0400)
331
354
 
332
355
        # The default permissions should be based on the current umask
333
356
        umask = osutils.get_umask()
334
 
        t.put_bytes('nomode', b'test text\n', mode=None)
335
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
357
        t.put_bytes('nomode', 'test text\n', mode=None)
 
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
336
359
 
337
360
    def test_put_bytes_non_atomic_permissions(self):
338
361
        t = self.get_transport()
342
365
        if not t._can_roundtrip_unix_modebits():
343
366
            # Can't roundtrip, so no need to run this test
344
367
            return
345
 
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
346
 
        self.assertTransportMode(t, 'mode644', 0o644)
347
 
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
348
 
        self.assertTransportMode(t, 'mode666', 0o666)
349
 
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
350
 
        self.assertTransportMode(t, 'mode600', 0o600)
351
 
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
352
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
368
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
369
        self.assertTransportMode(t, 'mode644', 0644)
 
370
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
371
        self.assertTransportMode(t, 'mode666', 0666)
 
372
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
373
        self.assertTransportMode(t, 'mode600', 0600)
 
374
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
375
        self.assertTransportMode(t, 'mode400', 0400)
353
376
 
354
377
        # The default permissions should be based on the current umask
355
378
        umask = osutils.get_umask()
356
 
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
357
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
379
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
380
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
358
381
 
359
382
        # We should also be able to set the mode for a parent directory
360
383
        # when it is created
361
 
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
362
 
                               dir_mode=0o700, create_parent_dir=True)
363
 
        self.assertTransportMode(t, 'dir700', 0o700)
364
 
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
365
 
                               dir_mode=0o770, create_parent_dir=True)
366
 
        self.assertTransportMode(t, 'dir770', 0o770)
367
 
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
368
 
                               dir_mode=0o777, create_parent_dir=True)
369
 
        self.assertTransportMode(t, 'dir777', 0o777)
 
384
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
385
                               dir_mode=0700, create_parent_dir=True)
 
386
        self.assertTransportMode(t, 'dir700', 0700)
 
387
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
388
                               dir_mode=0770, create_parent_dir=True)
 
389
        self.assertTransportMode(t, 'dir770', 0770)
 
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
391
                               dir_mode=0777, create_parent_dir=True)
 
392
        self.assertTransportMode(t, 'dir777', 0777)
370
393
 
371
394
    def test_put_file(self):
372
395
        t = self.get_transport()
373
396
 
374
397
        if t.is_readonly():
375
398
            self.assertRaises(TransportNotPossible,
376
 
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
 
399
                    t.put_file, 'a', StringIO('some text for a\n'))
377
400
            return
378
401
 
379
 
        result = t.put_file('a', BytesIO(b'some text for a\n'))
 
402
        result = t.put_file('a', StringIO('some text for a\n'))
380
403
        # put_file returns the length of the data written
381
404
        self.assertEqual(16, result)
382
 
        self.assertTrue(t.has('a'))
383
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
405
        self.failUnless(t.has('a'))
 
406
        self.check_transport_contents('some text for a\n', t, 'a')
384
407
        # Put also replaces contents
385
 
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
 
408
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
386
409
        self.assertEqual(19, result)
387
 
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
410
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
388
411
        self.assertRaises(NoSuchFile,
389
412
                          t.put_file, 'path/doesnt/exist/c',
390
 
                          BytesIO(b'contents'))
 
413
                              StringIO('contents'))
391
414
 
392
415
    def test_put_file_non_atomic(self):
393
416
        t = self.get_transport()
394
417
 
395
418
        if t.is_readonly():
396
419
            self.assertRaises(TransportNotPossible,
397
 
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
420
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
398
421
            return
399
422
 
400
 
        self.assertFalse(t.has('a'))
401
 
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
402
 
        self.assertTrue(t.has('a'))
403
 
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
423
        self.failIf(t.has('a'))
 
424
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
425
        self.failUnless(t.has('a'))
 
426
        self.check_transport_contents('some text for a\n', t, 'a')
404
427
        # Put also replaces contents
405
 
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
406
 
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
428
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
429
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
407
430
 
408
431
        # Make sure we can create another file
409
 
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
 
432
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
410
433
        # And overwrite 'a' with empty contents
411
 
        t.put_file_non_atomic('a', BytesIO(b''))
412
 
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
413
 
        self.check_transport_contents(b'', t, 'a')
 
434
        t.put_file_non_atomic('a', StringIO(''))
 
435
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
436
        self.check_transport_contents('', t, 'a')
414
437
 
415
438
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
416
 
                          BytesIO(b'contents\n'))
 
439
                                       StringIO('contents\n'))
417
440
        # Now test the create_parent flag
418
441
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
419
 
                          BytesIO(b'contents\n'))
420
 
        self.assertFalse(t.has('dir/a'))
421
 
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
 
442
                                       StringIO('contents\n'))
 
443
        self.failIf(t.has('dir/a'))
 
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
422
445
                              create_parent_dir=True)
423
 
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
424
447
 
425
448
        # But we still get NoSuchFile if we can't make the parent dir
426
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
427
 
                          BytesIO(b'contents\n'),
428
 
                          create_parent_dir=True)
 
450
                                       StringIO('contents\n'),
 
451
                                       create_parent_dir=True)
429
452
 
430
453
    def test_put_file_permissions(self):
431
454
 
436
459
        if not t._can_roundtrip_unix_modebits():
437
460
            # Can't roundtrip, so no need to run this test
438
461
            return
439
 
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
440
 
        self.assertTransportMode(t, 'mode644', 0o644)
441
 
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
442
 
        self.assertTransportMode(t, 'mode666', 0o666)
443
 
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
444
 
        self.assertTransportMode(t, 'mode600', 0o600)
 
462
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
463
        self.assertTransportMode(t, 'mode644', 0644)
 
464
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
465
        self.assertTransportMode(t, 'mode666', 0666)
 
466
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
467
        self.assertTransportMode(t, 'mode600', 0600)
445
468
        # Yes, you can put a file such that it becomes readonly
446
 
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
447
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
469
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
470
        self.assertTransportMode(t, 'mode400', 0400)
448
471
        # The default permissions should be based on the current umask
449
472
        umask = osutils.get_umask()
450
 
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
451
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
452
475
 
453
476
    def test_put_file_non_atomic_permissions(self):
454
477
        t = self.get_transport()
458
481
        if not t._can_roundtrip_unix_modebits():
459
482
            # Can't roundtrip, so no need to run this test
460
483
            return
461
 
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
462
 
        self.assertTransportMode(t, 'mode644', 0o644)
463
 
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
464
 
        self.assertTransportMode(t, 'mode666', 0o666)
465
 
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
466
 
        self.assertTransportMode(t, 'mode600', 0o600)
 
484
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
485
        self.assertTransportMode(t, 'mode644', 0644)
 
486
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
487
        self.assertTransportMode(t, 'mode666', 0666)
 
488
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
489
        self.assertTransportMode(t, 'mode600', 0600)
467
490
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
468
 
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
469
 
        self.assertTransportMode(t, 'mode400', 0o400)
 
491
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
492
        self.assertTransportMode(t, 'mode400', 0400)
470
493
 
471
494
        # The default permissions should be based on the current umask
472
495
        umask = osutils.get_umask()
473
 
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
474
 
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
475
498
 
476
499
        # We should also be able to set the mode for a parent directory
477
500
        # when it is created
478
 
        sio = BytesIO()
479
 
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
480
 
                              dir_mode=0o700, create_parent_dir=True)
481
 
        self.assertTransportMode(t, 'dir700', 0o700)
482
 
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
483
 
                              dir_mode=0o770, create_parent_dir=True)
484
 
        self.assertTransportMode(t, 'dir770', 0o770)
485
 
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
486
 
                              dir_mode=0o777, create_parent_dir=True)
487
 
        self.assertTransportMode(t, 'dir777', 0o777)
 
501
        sio = StringIO()
 
502
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
503
                              dir_mode=0700, create_parent_dir=True)
 
504
        self.assertTransportMode(t, 'dir700', 0700)
 
505
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
506
                              dir_mode=0770, create_parent_dir=True)
 
507
        self.assertTransportMode(t, 'dir770', 0770)
 
508
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
509
                              dir_mode=0777, create_parent_dir=True)
 
510
        self.assertTransportMode(t, 'dir777', 0777)
488
511
 
489
512
    def test_put_bytes_unicode(self):
 
513
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
514
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
515
        # (we don't want to encode unicode here at all, callers should be
 
516
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
517
        # compatibility.  At some point we should use a specific exception.
 
518
        # See https://bugs.launchpad.net/bzr/+bug/106898.
490
519
        t = self.get_transport()
491
520
        if t.is_readonly():
492
521
            return
493
522
        unicode_string = u'\u1234'
494
 
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
523
        self.assertRaises(
 
524
            (AssertionError, UnicodeEncodeError),
 
525
            t.put_bytes, 'foo', unicode_string)
 
526
 
 
527
    def test_put_file_unicode(self):
 
528
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
529
        # This situation can happen (and has) if code is careless about the type
 
530
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
531
        # cStringIO, because it never returns unicode from read.
 
532
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
533
        # raise, but we raise it for hysterical raisins.
 
534
        t = self.get_transport()
 
535
        if t.is_readonly():
 
536
            return
 
537
        unicode_file = pyStringIO(u'\u1234')
 
538
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
495
539
 
496
540
    def test_mkdir(self):
497
541
        t = self.get_transport()
502
546
            # defined for the transport interface.
503
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
504
548
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
505
 
            self.assertRaises(TransportNotPossible,
506
 
                              t.mkdir, 'path/doesnt/exist')
 
549
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
550
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
507
551
            return
508
552
        # Test mkdir
509
553
        t.mkdir('dir_a')
513
557
        t.mkdir('dir_b')
514
558
        self.assertEqual(t.has('dir_b'), True)
515
559
 
516
 
        self.assertEqual([t.has(n) for n in
517
 
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
518
 
                         [True, True, False, True])
 
560
        t.mkdir_multi(['dir_c', 'dir_d'])
 
561
 
 
562
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
563
        self.assertEqual(list(t.has_multi(
 
564
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
565
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
566
            [True, True, True, False,
 
567
             True, True, True, True])
519
568
 
520
569
        # we were testing that a local mkdir followed by a transport
521
570
        # mkdir failed thusly, but given that we * in one process * do not
526
575
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
527
576
 
528
577
        # Test get/put in sub-directories
529
 
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
530
 
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
531
 
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
532
 
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
 
578
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
579
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
580
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
581
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
533
582
 
534
583
        # mkdir of a dir with an absent parent
535
584
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
542
591
            # no sense testing on this transport
543
592
            return
544
593
        # Test mkdir with a mode
545
 
        t.mkdir('dmode755', mode=0o755)
546
 
        self.assertTransportMode(t, 'dmode755', 0o755)
547
 
        t.mkdir('dmode555', mode=0o555)
548
 
        self.assertTransportMode(t, 'dmode555', 0o555)
549
 
        t.mkdir('dmode777', mode=0o777)
550
 
        self.assertTransportMode(t, 'dmode777', 0o777)
551
 
        t.mkdir('dmode700', mode=0o700)
552
 
        self.assertTransportMode(t, 'dmode700', 0o700)
553
 
        t.mkdir('mdmode755', mode=0o755)
554
 
        self.assertTransportMode(t, 'mdmode755', 0o755)
 
594
        t.mkdir('dmode755', mode=0755)
 
595
        self.assertTransportMode(t, 'dmode755', 0755)
 
596
        t.mkdir('dmode555', mode=0555)
 
597
        self.assertTransportMode(t, 'dmode555', 0555)
 
598
        t.mkdir('dmode777', mode=0777)
 
599
        self.assertTransportMode(t, 'dmode777', 0777)
 
600
        t.mkdir('dmode700', mode=0700)
 
601
        self.assertTransportMode(t, 'dmode700', 0700)
 
602
        t.mkdir_multi(['mdmode755'], mode=0755)
 
603
        self.assertTransportMode(t, 'mdmode755', 0755)
555
604
 
556
605
        # Default mode should be based on umask
557
606
        umask = osutils.get_umask()
558
607
        t.mkdir('dnomode', mode=None)
559
 
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
 
608
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
560
609
 
561
610
    def test_opening_a_file_stream_creates_file(self):
562
611
        t = self.get_transport()
564
613
            return
565
614
        handle = t.open_write_stream('foo')
566
615
        try:
567
 
            self.assertEqual(b'', t.get_bytes('foo'))
 
616
            self.assertEqual('', t.get_bytes('foo'))
568
617
        finally:
569
618
            handle.close()
570
619
 
571
620
    def test_opening_a_file_stream_can_set_mode(self):
572
621
        t = self.get_transport()
573
622
        if t.is_readonly():
574
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
575
 
                              t.open_write_stream, 'foo')
576
623
            return
577
624
        if not t._can_roundtrip_unix_modebits():
578
625
            # Can't roundtrip, so no need to run this test
579
626
            return
580
 
 
581
627
        def check_mode(name, mode, expected):
582
628
            handle = t.open_write_stream(name, mode=mode)
583
629
            handle.close()
584
630
            self.assertTransportMode(t, name, expected)
585
 
        check_mode('mode644', 0o644, 0o644)
586
 
        check_mode('mode666', 0o666, 0o666)
587
 
        check_mode('mode600', 0o600, 0o600)
 
631
        check_mode('mode644', 0644, 0644)
 
632
        check_mode('mode666', 0666, 0666)
 
633
        check_mode('mode600', 0600, 0600)
588
634
        # The default permissions should be based on the current umask
589
 
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
 
635
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
590
636
 
591
637
    def test_copy_to(self):
592
638
        # FIXME: test:   same server to same server (partly done)
599
645
            self.build_tree(files, transport=transport_from)
600
646
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
601
647
            for f in files:
602
 
                self.check_transport_contents(transport_to.get_bytes(f),
 
648
                self.check_transport_contents(transport_to.get(f).read(),
603
649
                                              transport_from, f)
604
650
 
605
651
        t = self.get_transport()
606
 
        if t.__class__.__name__ == "SFTPTransport":
607
 
            self.skipTest("SFTP copy_to currently too flakey to use")
608
652
        temp_transport = MemoryTransport('memory:///')
609
653
        simple_copy_files(t, temp_transport)
610
654
        if not t.is_readonly():
612
656
            t2 = t.clone('copy_to_simple')
613
657
            simple_copy_files(t, t2)
614
658
 
 
659
 
615
660
        # Test that copying into a missing directory raises
616
661
        # NoSuchFile
617
662
        if t.is_readonly():
618
663
            self.build_tree(['e/', 'e/f'])
619
664
        else:
620
665
            t.mkdir('e')
621
 
            t.put_bytes('e/f', b'contents of e')
 
666
            t.put_bytes('e/f', 'contents of e')
622
667
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
623
668
        temp_transport.mkdir('e')
624
669
        t.copy_to(['e/f'], temp_transport)
629
674
        files = ['a', 'b', 'c', 'd']
630
675
        t.copy_to(iter(files), temp_transport)
631
676
        for f in files:
632
 
            self.check_transport_contents(temp_transport.get_bytes(f),
 
677
            self.check_transport_contents(temp_transport.get(f).read(),
633
678
                                          t, f)
634
679
        del temp_transport
635
680
 
636
 
        for mode in (0o666, 0o644, 0o600, 0o400):
 
681
        for mode in (0666, 0644, 0600, 0400):
637
682
            temp_transport = MemoryTransport("memory:///")
638
683
            t.copy_to(files, temp_transport, mode=mode)
639
684
            for f in files:
654
699
 
655
700
        if t.is_readonly():
656
701
            self.assertRaises(TransportNotPossible,
657
 
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
702
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
658
703
            return
659
 
        t.put_bytes('a', b'diff\ncontents for\na\n')
660
 
        t.put_bytes('b', b'contents\nfor b\n')
 
704
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
705
        t.put_bytes('b', 'contents\nfor b\n')
661
706
 
662
707
        self.assertEqual(20,
663
 
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
708
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
664
709
 
665
710
        self.check_transport_contents(
666
 
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
711
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
667
712
            t, 'a')
668
713
 
669
714
        # a file with no parent should fail..
670
715
        self.assertRaises(NoSuchFile,
671
 
                          t.append_file, 'missing/path', BytesIO(b'content'))
 
716
                          t.append_file, 'missing/path', StringIO('content'))
672
717
 
673
718
        # And we can create new files, too
674
719
        self.assertEqual(0,
675
 
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
676
 
        self.check_transport_contents(b'some text\nfor a missing file\n',
 
720
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
721
        self.check_transport_contents('some text\nfor a missing file\n',
677
722
                                      t, 'c')
678
723
 
679
724
    def test_append_bytes(self):
681
726
 
682
727
        if t.is_readonly():
683
728
            self.assertRaises(TransportNotPossible,
684
 
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
 
729
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
685
730
            return
686
731
 
687
 
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
688
 
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
 
732
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
733
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
689
734
 
690
735
        self.assertEqual(20,
691
 
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
 
736
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
692
737
 
693
738
        self.check_transport_contents(
694
 
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
739
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
695
740
            t, 'a')
696
741
 
697
742
        # a file with no parent should fail..
698
743
        self.assertRaises(NoSuchFile,
699
 
                          t.append_bytes, 'missing/path', b'content')
 
744
                          t.append_bytes, 'missing/path', 'content')
 
745
 
 
746
    def test_append_multi(self):
 
747
        t = self.get_transport()
 
748
 
 
749
        if t.is_readonly():
 
750
            return
 
751
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
752
                         'add\nsome\nmore\ncontents\n')
 
753
        t.put_bytes('b', 'contents\nfor b\n')
 
754
 
 
755
        self.assertEqual((43, 15),
 
756
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
757
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
758
 
 
759
        self.check_transport_contents(
 
760
            'diff\ncontents for\na\n'
 
761
            'add\nsome\nmore\ncontents\n'
 
762
            'and\nthen\nsome\nmore\n',
 
763
            t, 'a')
 
764
        self.check_transport_contents(
 
765
                'contents\nfor b\n'
 
766
                'some\nmore\nfor\nb\n',
 
767
                t, 'b')
 
768
 
 
769
        self.assertEqual((62, 31),
 
770
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
771
                                 ('b', StringIO('from an iterator\n'))])))
 
772
        self.check_transport_contents(
 
773
            'diff\ncontents for\na\n'
 
774
            'add\nsome\nmore\ncontents\n'
 
775
            'and\nthen\nsome\nmore\n'
 
776
            'a little bit more\n',
 
777
            t, 'a')
 
778
        self.check_transport_contents(
 
779
                'contents\nfor b\n'
 
780
                'some\nmore\nfor\nb\n'
 
781
                'from an iterator\n',
 
782
                t, 'b')
 
783
 
 
784
        self.assertEqual((80, 0),
 
785
            t.append_multi([('a', StringIO('some text in a\n')),
 
786
                            ('d', StringIO('missing file r\n'))]))
 
787
 
 
788
        self.check_transport_contents(
 
789
            'diff\ncontents for\na\n'
 
790
            'add\nsome\nmore\ncontents\n'
 
791
            'and\nthen\nsome\nmore\n'
 
792
            'a little bit more\n'
 
793
            'some text in a\n',
 
794
            t, 'a')
 
795
        self.check_transport_contents('missing file r\n', t, 'd')
700
796
 
701
797
    def test_append_file_mode(self):
702
798
        """Check that append accepts a mode parameter"""
704
800
        t = self.get_transport()
705
801
        if t.is_readonly():
706
802
            self.assertRaises(TransportNotPossible,
707
 
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
 
803
                t.append_file, 'f', StringIO('f'), mode=None)
708
804
            return
709
 
        t.append_file('f', BytesIO(b'f'), mode=None)
 
805
        t.append_file('f', StringIO('f'), mode=None)
710
806
 
711
807
    def test_append_bytes_mode(self):
712
808
        # check append_bytes accepts a mode
713
809
        t = self.get_transport()
714
810
        if t.is_readonly():
715
811
            self.assertRaises(TransportNotPossible,
716
 
                              t.append_bytes, 'f', b'f', mode=None)
 
812
                t.append_bytes, 'f', 'f', mode=None)
717
813
            return
718
 
        t.append_bytes('f', b'f', mode=None)
 
814
        t.append_bytes('f', 'f', mode=None)
719
815
 
720
816
    def test_delete(self):
721
817
        # TODO: Test Transport.delete
726
822
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
727
823
            return
728
824
 
729
 
        t.put_bytes('a', b'a little bit of text\n')
730
 
        self.assertTrue(t.has('a'))
 
825
        t.put_bytes('a', 'a little bit of text\n')
 
826
        self.failUnless(t.has('a'))
731
827
        t.delete('a')
732
 
        self.assertFalse(t.has('a'))
 
828
        self.failIf(t.has('a'))
733
829
 
734
830
        self.assertRaises(NoSuchFile, t.delete, 'a')
735
831
 
736
 
        t.put_bytes('a', b'a text\n')
737
 
        t.put_bytes('b', b'b text\n')
738
 
        t.put_bytes('c', b'c text\n')
 
832
        t.put_bytes('a', 'a text\n')
 
833
        t.put_bytes('b', 'b text\n')
 
834
        t.put_bytes('c', 'c text\n')
739
835
        self.assertEqual([True, True, True],
740
 
                         [t.has(n) for n in ['a', 'b', 'c']])
741
 
        t.delete('a')
742
 
        t.delete('c')
 
836
                list(t.has_multi(['a', 'b', 'c'])))
 
837
        t.delete_multi(['a', 'c'])
743
838
        self.assertEqual([False, True, False],
744
 
                         [t.has(n) for n in ['a', 'b', 'c']])
745
 
        self.assertFalse(t.has('a'))
746
 
        self.assertTrue(t.has('b'))
747
 
        self.assertFalse(t.has('c'))
748
 
 
749
 
        for name in ['a', 'c', 'd']:
750
 
            self.assertRaises(NoSuchFile, t.delete, name)
 
839
                list(t.has_multi(['a', 'b', 'c'])))
 
840
        self.failIf(t.has('a'))
 
841
        self.failUnless(t.has('b'))
 
842
        self.failIf(t.has('c'))
 
843
 
 
844
        self.assertRaises(NoSuchFile,
 
845
                t.delete_multi, ['a', 'b', 'c'])
 
846
 
 
847
        self.assertRaises(NoSuchFile,
 
848
                t.delete_multi, iter(['a', 'b', 'c']))
 
849
 
 
850
        t.put_bytes('a', 'another a text\n')
 
851
        t.put_bytes('c', 'another c text\n')
 
852
        t.delete_multi(iter(['a', 'b', 'c']))
751
853
 
752
854
        # We should have deleted everything
753
855
        # SftpServer creates control files in the
800
902
        if t.is_readonly():
801
903
            return
802
904
        t.mkdir('foo')
803
 
        t.put_bytes('foo-bar', b'')
 
905
        t.put_bytes('foo-bar', '')
804
906
        t.mkdir('foo-baz')
805
907
        t.rmdir('foo')
806
908
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
807
 
        self.assertTrue(t.has('foo-bar'))
 
909
        self.failUnless(t.has('foo-bar'))
808
910
 
809
911
    def test_rename_dir_succeeds(self):
810
912
        t = self.get_transport()
811
913
        if t.is_readonly():
812
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
813
 
                              t.rename, 'foo', 'bar')
814
 
            return
 
914
            raise TestSkipped("transport is readonly")
815
915
        t.mkdir('adir')
816
916
        t.mkdir('adir/asubdir')
817
917
        t.rename('adir', 'bdir')
822
922
        """Attempting to replace a nonemtpy directory should fail"""
823
923
        t = self.get_transport()
824
924
        if t.is_readonly():
825
 
            self.assertRaises((TransportNotPossible, NotImplementedError),
826
 
                              t.rename, 'foo', 'bar')
827
 
            return
 
925
            raise TestSkipped("transport is readonly")
828
926
        t.mkdir('adir')
829
927
        t.mkdir('adir/asubdir')
830
928
        t.mkdir('bdir')
845
943
        t.mkdir('b')
846
944
        ta = t.clone('a')
847
945
        tb = t.clone('b')
848
 
        ta.put_bytes('f', b'aoeu')
 
946
        ta.put_bytes('f', 'aoeu')
849
947
        ta.rename('f', '../b/f')
850
948
        self.assertTrue(tb.has('f'))
851
949
        self.assertFalse(ta.has('f'))
893
991
        # creates control files in the working directory
894
992
        # perhaps all of this could be done in a subdirectory
895
993
 
896
 
        t.put_bytes('a', b'a first file\n')
897
 
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
 
994
        t.put_bytes('a', 'a first file\n')
 
995
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
898
996
 
899
997
        t.move('a', 'b')
900
 
        self.assertTrue(t.has('b'))
901
 
        self.assertFalse(t.has('a'))
 
998
        self.failUnless(t.has('b'))
 
999
        self.failIf(t.has('a'))
902
1000
 
903
 
        self.check_transport_contents(b'a first file\n', t, 'b')
904
 
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
 
1001
        self.check_transport_contents('a first file\n', t, 'b')
 
1002
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
905
1003
 
906
1004
        # Overwrite a file
907
 
        t.put_bytes('c', b'c this file\n')
 
1005
        t.put_bytes('c', 'c this file\n')
908
1006
        t.move('c', 'b')
909
 
        self.assertFalse(t.has('c'))
910
 
        self.check_transport_contents(b'c this file\n', t, 'b')
 
1007
        self.failIf(t.has('c'))
 
1008
        self.check_transport_contents('c this file\n', t, 'b')
911
1009
 
912
1010
        # TODO: Try to write a test for atomicity
913
1011
        # TODO: Test moving into a non-existent subdirectory
 
1012
        # TODO: Test Transport.move_multi
914
1013
 
915
1014
    def test_copy(self):
916
1015
        t = self.get_transport()
918
1017
        if t.is_readonly():
919
1018
            return
920
1019
 
921
 
        t.put_bytes('a', b'a file\n')
 
1020
        t.put_bytes('a', 'a file\n')
922
1021
        t.copy('a', 'b')
923
 
        self.check_transport_contents(b'a file\n', t, 'b')
 
1022
        self.check_transport_contents('a file\n', t, 'b')
924
1023
 
925
1024
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
926
1025
        os.mkdir('c')
927
1026
        # What should the assert be if you try to copy a
928
1027
        # file over a directory?
929
1028
        #self.assertRaises(Something, t.copy, 'a', 'c')
930
 
        t.put_bytes('d', b'text in d\n')
 
1029
        t.put_bytes('d', 'text in d\n')
931
1030
        t.copy('d', 'b')
932
 
        self.check_transport_contents(b'text in d\n', t, 'b')
 
1031
        self.check_transport_contents('text in d\n', t, 'b')
 
1032
 
 
1033
        # TODO: test copy_multi
933
1034
 
934
1035
    def test_connection_error(self):
935
1036
        """ConnectionError is raised when connection is impossible.
941
1042
        except NotImplementedError:
942
1043
            raise TestSkipped("Transport %s has no bogus URL support." %
943
1044
                              self._server.__class__)
944
 
        t = _mod_transport.get_transport_from_url(url)
 
1045
        t = get_transport(url)
945
1046
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
946
1047
 
947
1048
    def test_stat(self):
952
1053
 
953
1054
        try:
954
1055
            st = t.stat('.')
955
 
        except TransportNotPossible as e:
 
1056
        except TransportNotPossible, e:
956
1057
            # This transport cannot stat
957
1058
            return
958
1059
 
963
1064
        for path, size in zip(paths, sizes):
964
1065
            st = t.stat(path)
965
1066
            if path.endswith('/'):
966
 
                self.assertTrue(S_ISDIR(st.st_mode))
 
1067
                self.failUnless(S_ISDIR(st.st_mode))
967
1068
                # directory sizes are meaningless
968
1069
            else:
969
 
                self.assertTrue(S_ISREG(st.st_mode))
 
1070
                self.failUnless(S_ISREG(st.st_mode))
970
1071
                self.assertEqual(size, st.st_size)
971
1072
 
 
1073
        remote_stats = list(t.stat_multi(paths))
 
1074
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1075
 
972
1076
        self.assertRaises(NoSuchFile, t.stat, 'q')
973
1077
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
974
1078
 
 
1079
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1080
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
975
1081
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
976
1082
        subdir = t.clone('subdir')
977
 
        st = subdir.stat('./file')
978
 
        st = subdir.stat('.')
 
1083
        subdir.stat('./file')
 
1084
        subdir.stat('.')
979
1085
 
980
1086
    def test_hardlink(self):
981
1087
        from stat import ST_NLINK
990
1096
        try:
991
1097
            t.hardlink(source_name, link_name)
992
1098
 
993
 
            self.assertTrue(t.has(source_name))
994
 
            self.assertTrue(t.has(link_name))
 
1099
            self.failUnless(t.has(source_name))
 
1100
            self.failUnless(t.has(link_name))
995
1101
 
996
1102
            st = t.stat(link_name)
997
 
            self.assertEqual(st[ST_NLINK], 2)
 
1103
            self.failUnlessEqual(st[ST_NLINK], 2)
998
1104
        except TransportNotPossible:
999
1105
            raise TestSkipped("Transport %s does not support hardlinks." %
1000
1106
                              self._server.__class__)
1012
1118
        try:
1013
1119
            t.symlink(source_name, link_name)
1014
1120
 
1015
 
            self.assertTrue(t.has(source_name))
1016
 
            self.assertTrue(t.has(link_name))
 
1121
            self.failUnless(t.has(source_name))
 
1122
            self.failUnless(t.has(link_name))
1017
1123
 
1018
1124
            st = t.stat(link_name)
1019
 
            self.assertTrue(S_ISLNK(st.st_mode),
1020
 
                            "expected symlink, got mode %o" % st.st_mode)
1021
 
        except TransportNotPossible:
1022
 
            raise TestSkipped("Transport %s does not support symlinks." %
1023
 
                              self._server.__class__)
1024
 
 
1025
 
        self.assertEqual(source_name, t.readlink(link_name))
1026
 
 
1027
 
    def test_readlink_nonexistent(self):
1028
 
        t = self.get_transport()
1029
 
        try:
1030
 
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
1031
 
        except TransportNotPossible:
1032
 
            raise TestSkipped("Transport %s does not support symlinks." %
1033
 
                              self._server.__class__)
 
1125
            self.failUnless(S_ISLNK(st.st_mode))
 
1126
        except TransportNotPossible:
 
1127
            raise TestSkipped("Transport %s does not support symlinks." %
 
1128
                              self._server.__class__)
 
1129
        except IOError:
 
1130
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
1034
1131
 
1035
1132
    def test_list_dir(self):
1036
1133
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1041
1138
            return
1042
1139
 
1043
1140
        def sorted_list(d, transport):
1044
 
            l = sorted(transport.list_dir(d))
 
1141
            l = list(transport.list_dir(d))
 
1142
            l.sort()
1045
1143
            return l
1046
1144
 
1047
1145
        self.assertEqual([], sorted_list('.', t))
1099
1197
            raise TestSkipped("not a connected transport")
1100
1198
 
1101
1199
        t2 = t1.clone('subdir')
1102
 
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
1103
 
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
1104
 
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
1105
 
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
1106
 
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1200
        self.assertEquals(t1._scheme, t2._scheme)
 
1201
        self.assertEquals(t1._user, t2._user)
 
1202
        self.assertEquals(t1._password, t2._password)
 
1203
        self.assertEquals(t1._host, t2._host)
 
1204
        self.assertEquals(t1._port, t2._port)
1107
1205
 
1108
1206
    def test__reuse_for(self):
1109
1207
        t = self.get_transport()
1116
1214
 
1117
1215
            Only the parameters different from None will be changed.
1118
1216
            """
1119
 
            if scheme is None:
1120
 
                scheme = t._parsed_url.scheme
1121
 
            if user is None:
1122
 
                user = t._parsed_url.user
1123
 
            if password is None:
1124
 
                password = t._parsed_url.password
1125
 
            if user is None:
1126
 
                user = t._parsed_url.user
1127
 
            if host is None:
1128
 
                host = t._parsed_url.host
1129
 
            if port is None:
1130
 
                port = t._parsed_url.port
1131
 
            if path is None:
1132
 
                path = t._parsed_url.path
1133
 
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1217
            if scheme   is None: scheme   = t._scheme
 
1218
            if user     is None: user     = t._user
 
1219
            if password is None: password = t._password
 
1220
            if user     is None: user     = t._user
 
1221
            if host     is None: host     = t._host
 
1222
            if port     is None: port     = t._port
 
1223
            if path     is None: path     = t._path
 
1224
            return t._unsplit_url(scheme, user, password, host, port, path)
1134
1225
 
1135
 
        if t._parsed_url.scheme == 'ftp':
 
1226
        if t._scheme == 'ftp':
1136
1227
            scheme = 'sftp'
1137
1228
        else:
1138
1229
            scheme = 'ftp'
1139
1230
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1140
 
        if t._parsed_url.user == 'me':
 
1231
        if t._user == 'me':
1141
1232
            user = 'you'
1142
1233
        else:
1143
1234
            user = 'me'
1154
1245
        #   (they may be typed by the user when prompted for example)
1155
1246
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
1156
1247
        # We will not connect, we can use a invalid host
1157
 
        self.assertIsNot(t, t._reuse_for(
1158
 
            new_url(host=t._parsed_url.host + 'bar')))
1159
 
        if t._parsed_url.port == 1234:
 
1248
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1249
        if t._port == 1234:
1160
1250
            port = 4321
1161
1251
        else:
1162
1252
            port = 1234
1171
1261
 
1172
1262
        c = t.clone('subdir')
1173
1263
        # Some transports will create the connection  only when needed
1174
 
        t.has('surely_not')  # Force connection
 
1264
        t.has('surely_not') # Force connection
1175
1265
        self.assertIs(t._get_connection(), c._get_connection())
1176
1266
 
1177
1267
        # Temporary failure, we need to create a new dummy connection
1178
 
        new_connection = None
 
1268
        new_connection = object()
1179
1269
        t._set_connection(new_connection)
1180
1270
        # Check that both transports use the same connection
1181
1271
        self.assertIs(new_connection, t._get_connection())
1186
1276
        if not isinstance(t, ConnectedTransport):
1187
1277
            raise TestSkipped("not a connected transport")
1188
1278
 
1189
 
        t.has('surely_not')  # Force connection
 
1279
        t.has('surely_not') # Force connection
1190
1280
        self.assertIsNot(None, t._get_connection())
1191
1281
 
1192
1282
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
1203
1293
 
1204
1294
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1205
1295
 
1206
 
        self.assertTrue(t1.has('a'))
1207
 
        self.assertTrue(t1.has('b/c'))
1208
 
        self.assertFalse(t1.has('c'))
 
1296
        self.failUnless(t1.has('a'))
 
1297
        self.failUnless(t1.has('b/c'))
 
1298
        self.failIf(t1.has('c'))
1209
1299
 
1210
1300
        t2 = t1.clone('b')
1211
1301
        self.assertEqual(t1.base + 'b/', t2.base)
1212
1302
 
1213
 
        self.assertTrue(t2.has('c'))
1214
 
        self.assertFalse(t2.has('a'))
 
1303
        self.failUnless(t2.has('c'))
 
1304
        self.failIf(t2.has('a'))
1215
1305
 
1216
1306
        t3 = t2.clone('..')
1217
 
        self.assertTrue(t3.has('a'))
1218
 
        self.assertFalse(t3.has('c'))
 
1307
        self.failUnless(t3.has('a'))
 
1308
        self.failIf(t3.has('c'))
1219
1309
 
1220
 
        self.assertFalse(t1.has('b/d'))
1221
 
        self.assertFalse(t2.has('d'))
1222
 
        self.assertFalse(t3.has('b/d'))
 
1310
        self.failIf(t1.has('b/d'))
 
1311
        self.failIf(t2.has('d'))
 
1312
        self.failIf(t3.has('b/d'))
1223
1313
 
1224
1314
        if t1.is_readonly():
1225
 
            self.build_tree_contents([('b/d', b'newfile\n')])
 
1315
            self.build_tree_contents([('b/d', 'newfile\n')])
1226
1316
        else:
1227
 
            t2.put_bytes('d', b'newfile\n')
 
1317
            t2.put_bytes('d', 'newfile\n')
1228
1318
 
1229
 
        self.assertTrue(t1.has('b/d'))
1230
 
        self.assertTrue(t2.has('d'))
1231
 
        self.assertTrue(t3.has('b/d'))
 
1319
        self.failUnless(t1.has('b/d'))
 
1320
        self.failUnless(t2.has('d'))
 
1321
        self.failUnless(t3.has('b/d'))
1232
1322
 
1233
1323
    def test_clone_to_root(self):
1234
1324
        orig_transport = self.get_transport()
1238
1328
        new_transport = root_transport.clone("..")
1239
1329
        # as we are walking up directories, the path must be
1240
1330
        # growing less, except at the top
1241
 
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1242
 
                        new_transport.base == root_transport.base)
 
1331
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1332
            or new_transport.base == root_transport.base)
1243
1333
        while new_transport.base != root_transport.base:
1244
1334
            root_transport = new_transport
1245
1335
            new_transport = root_transport.clone("..")
1246
1336
            # as we are walking up directories, the path must be
1247
1337
            # growing less, except at the top
1248
 
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
1249
 
                            new_transport.base == root_transport.base)
 
1338
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1339
                or new_transport.base == root_transport.base)
1250
1340
 
1251
1341
        # Cloning to "/" should take us to exactly the same location.
1252
1342
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
1262
1352
        orig_transport = self.get_transport()
1263
1353
        root_transport = orig_transport.clone('/')
1264
1354
        self.assertEqual(root_transport.base + '.bzr/',
1265
 
                         root_transport.clone('.bzr').base)
 
1355
            root_transport.clone('.bzr').base)
1266
1356
 
1267
1357
    def test_base_url(self):
1268
1358
        t = self.get_transport()
1308
1398
        self.assertEqual(transport.clone("/").abspath('foo'),
1309
1399
                         transport.abspath("/foo"))
1310
1400
 
1311
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1312
1401
    def test_win32_abspath(self):
1313
1402
        # Note: we tried to set sys.platform='win32' so we could test on
1314
1403
        # other platforms too, but then osutils does platform specific
1319
1408
 
1320
1409
        # smoke test for abspath on win32.
1321
1410
        # a transport based on 'file:///' never fully qualifies the drive.
1322
 
        transport = _mod_transport.get_transport_from_url("file:///")
1323
 
        self.assertEqual(transport.abspath("/"), "file:///")
 
1411
        transport = get_transport("file:///")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1324
1413
 
1325
1414
        # but a transport that starts with a drive spec must keep it.
1326
 
        transport = _mod_transport.get_transport_from_url("file:///C:/")
1327
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1415
        transport = get_transport("file:///C:/")
 
1416
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1328
1417
 
1329
1418
    def test_local_abspath(self):
1330
1419
        transport = self.get_transport()
1331
1420
        try:
1332
1421
            p = transport.local_abspath('.')
1333
 
        except (errors.NotLocalUrl, TransportNotPossible) as e:
 
1422
        except (errors.NotLocalUrl, TransportNotPossible), e:
1334
1423
            # should be formattable
1335
1424
            s = str(e)
1336
1425
        else:
1363
1452
                         'isolated/dir/',
1364
1453
                         'isolated/dir/foo',
1365
1454
                         'isolated/dir/bar',
1366
 
                         'isolated/dir/b%25z',  # make sure quoting is correct
 
1455
                         'isolated/dir/b%25z', # make sure quoting is correct
1367
1456
                         'isolated/bar'],
1368
1457
                        transport=transport)
1369
1458
        paths = set(transport.iter_files_recursive())
1370
1459
        # nb the directories are not converted
1371
1460
        self.assertEqual(paths,
1372
 
                         {'isolated/dir/foo',
1373
 
                          'isolated/dir/bar',
1374
 
                          'isolated/dir/b%2525z',
1375
 
                          'isolated/bar'})
 
1461
                    set(['isolated/dir/foo',
 
1462
                         'isolated/dir/bar',
 
1463
                         'isolated/dir/b%2525z',
 
1464
                         'isolated/bar']))
1376
1465
        sub_transport = transport.clone('isolated')
1377
1466
        paths = set(sub_transport.iter_files_recursive())
1378
1467
        self.assertEqual(paths,
1379
 
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1468
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
1380
1469
 
1381
1470
    def test_copy_tree(self):
1382
1471
        # TODO: test file contents and permissions are preserved. This test was
1393
1482
                         'from/dir/',
1394
1483
                         'from/dir/foo',
1395
1484
                         'from/dir/bar',
1396
 
                         'from/dir/b%25z',  # make sure quoting is correct
 
1485
                         'from/dir/b%25z', # make sure quoting is correct
1397
1486
                         'from/bar'],
1398
1487
                        transport=transport)
1399
1488
        transport.copy_tree('from', 'to')
1400
1489
        paths = set(transport.iter_files_recursive())
1401
1490
        self.assertEqual(paths,
1402
 
                         {'from/dir/foo',
1403
 
                          'from/dir/bar',
1404
 
                          'from/dir/b%2525z',
1405
 
                          'from/bar',
1406
 
                          'to/dir/foo',
1407
 
                          'to/dir/bar',
1408
 
                          'to/dir/b%2525z',
1409
 
                          'to/bar', })
 
1491
                    set(['from/dir/foo',
 
1492
                         'from/dir/bar',
 
1493
                         'from/dir/b%2525z',
 
1494
                         'from/bar',
 
1495
                         'to/dir/foo',
 
1496
                         'to/dir/bar',
 
1497
                         'to/dir/b%2525z',
 
1498
                         'to/bar',]))
1410
1499
 
1411
1500
    def test_copy_tree_to_transport(self):
1412
1501
        transport = self.get_transport()
1420
1509
                         'from/dir/',
1421
1510
                         'from/dir/foo',
1422
1511
                         'from/dir/bar',
1423
 
                         'from/dir/b%25z',  # make sure quoting is correct
 
1512
                         'from/dir/b%25z', # make sure quoting is correct
1424
1513
                         'from/bar'],
1425
1514
                        transport=transport)
1426
1515
        from_transport = transport.clone('from')
1429
1518
        from_transport.copy_tree_to_transport(to_transport)
1430
1519
        paths = set(transport.iter_files_recursive())
1431
1520
        self.assertEqual(paths,
1432
 
                         {'from/dir/foo',
1433
 
                          'from/dir/bar',
1434
 
                          'from/dir/b%2525z',
1435
 
                          'from/bar',
1436
 
                          'to/dir/foo',
1437
 
                          'to/dir/bar',
1438
 
                          'to/dir/b%2525z',
1439
 
                          'to/bar', })
 
1521
                    set(['from/dir/foo',
 
1522
                         'from/dir/bar',
 
1523
                         'from/dir/b%2525z',
 
1524
                         'from/bar',
 
1525
                         'to/dir/foo',
 
1526
                         'to/dir/bar',
 
1527
                         'to/dir/b%2525z',
 
1528
                         'to/bar',]))
1440
1529
 
1441
1530
    def test_unicode_paths(self):
1442
1531
        """Test that we can read/write files with Unicode names."""
1446
1535
        # '\xe5' and '\xe4' actually map to the same file
1447
1536
        # adding a suffix kicks in the 'preserving but insensitive'
1448
1537
        # route, and maintains the right files
1449
 
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
1450
 
                 u'\xe4.2',  # a w/ dots iso-8859-1
1451
 
                 u'\u017d',  # Z with umlat iso-8859-2
1452
 
                 u'\u062c',  # Arabic j
1453
 
                 u'\u0410',  # Russian A
1454
 
                 u'\u65e5',  # Kanji person
1455
 
                 ]
 
1538
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1539
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1540
                 u'\u017d', # Z with umlat iso-8859-2
 
1541
                 u'\u062c', # Arabic j
 
1542
                 u'\u0410', # Russian A
 
1543
                 u'\u65e5', # Kanji person
 
1544
                ]
1456
1545
 
1457
1546
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
1458
1547
        if no_unicode_support:
1459
 
            self.knownFailure("test server cannot handle unicode paths")
 
1548
            raise tests.KnownFailure("test server cannot handle unicode paths")
1460
1549
 
1461
1550
        try:
1462
1551
            self.build_tree(files, transport=t, line_endings='binary')
1463
1552
        except UnicodeError:
1464
 
            raise TestSkipped(
1465
 
                "cannot handle unicode paths in current encoding")
 
1553
            raise TestSkipped("cannot handle unicode paths in current encoding")
1466
1554
 
1467
1555
        # A plain unicode string is not a valid url
1468
1556
        for fname in files:
1469
 
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
 
1557
            self.assertRaises(InvalidURL, t.get, fname)
1470
1558
 
1471
1559
        for fname in files:
1472
1560
            fname_utf8 = fname.encode('utf-8')
1473
 
            contents = b'contents of %s\n' % (fname_utf8,)
 
1561
            contents = 'contents of %s\n' % (fname_utf8,)
1474
1562
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1475
1563
 
1476
1564
    def test_connect_twice_is_same_content(self):
1479
1567
        transport = self.get_transport()
1480
1568
        if transport.is_readonly():
1481
1569
            return
1482
 
        transport.put_bytes('foo', b'bar')
 
1570
        transport.put_bytes('foo', 'bar')
1483
1571
        transport3 = self.get_transport()
1484
 
        self.check_transport_contents(b'bar', transport3, 'foo')
 
1572
        self.check_transport_contents('bar', transport3, 'foo')
1485
1573
 
1486
1574
        # now opening at a relative url should give use a sane result:
1487
1575
        transport.mkdir('newdir')
1488
1576
        transport5 = self.get_transport('newdir')
1489
1577
        transport6 = transport5.clone('..')
1490
 
        self.check_transport_contents(b'bar', transport6, 'foo')
 
1578
        self.check_transport_contents('bar', transport6, 'foo')
1491
1579
 
1492
1580
    def test_lock_write(self):
1493
1581
        """Test transport-level write locks.
1496
1584
        """
1497
1585
        transport = self.get_transport()
1498
1586
        if transport.is_readonly():
1499
 
            self.assertRaises(TransportNotPossible,
1500
 
                              transport.lock_write, 'foo')
 
1587
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1501
1588
            return
1502
 
        transport.put_bytes('lock', b'')
 
1589
        transport.put_bytes('lock', '')
1503
1590
        try:
1504
1591
            lock = transport.lock_write('lock')
1505
1592
        except TransportNotPossible:
1515
1602
        """
1516
1603
        transport = self.get_transport()
1517
1604
        if transport.is_readonly():
1518
 
            open('lock', 'w').close()
 
1605
            file('lock', 'w').close()
1519
1606
        else:
1520
 
            transport.put_bytes('lock', b'')
 
1607
            transport.put_bytes('lock', '')
1521
1608
        try:
1522
1609
            lock = transport.lock_read('lock')
1523
1610
        except TransportNotPossible:
1529
1616
    def test_readv(self):
1530
1617
        transport = self.get_transport()
1531
1618
        if transport.is_readonly():
1532
 
            with open('a', 'w') as f:
1533
 
                f.write('0123456789')
 
1619
            file('a', 'w').write('0123456789')
1534
1620
        else:
1535
 
            transport.put_bytes('a', b'0123456789')
 
1621
            transport.put_bytes('a', '0123456789')
1536
1622
 
1537
1623
        d = list(transport.readv('a', ((0, 1),)))
1538
 
        self.assertEqual(d[0], (0, b'0'))
 
1624
        self.assertEqual(d[0], (0, '0'))
1539
1625
 
1540
1626
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1541
 
        self.assertEqual(d[0], (0, b'0'))
1542
 
        self.assertEqual(d[1], (1, b'1'))
1543
 
        self.assertEqual(d[2], (3, b'34'))
1544
 
        self.assertEqual(d[3], (9, b'9'))
 
1627
        self.assertEqual(d[0], (0, '0'))
 
1628
        self.assertEqual(d[1], (1, '1'))
 
1629
        self.assertEqual(d[2], (3, '34'))
 
1630
        self.assertEqual(d[3], (9, '9'))
1545
1631
 
1546
1632
    def test_readv_out_of_order(self):
1547
1633
        transport = self.get_transport()
1548
1634
        if transport.is_readonly():
1549
 
            with open('a', 'w') as f:
1550
 
                f.write('0123456789')
 
1635
            file('a', 'w').write('0123456789')
1551
1636
        else:
1552
 
            transport.put_bytes('a', b'01234567890')
 
1637
            transport.put_bytes('a', '01234567890')
1553
1638
 
1554
1639
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
1555
 
        self.assertEqual(d[0], (1, b'1'))
1556
 
        self.assertEqual(d[1], (9, b'9'))
1557
 
        self.assertEqual(d[2], (0, b'0'))
1558
 
        self.assertEqual(d[3], (3, b'34'))
 
1640
        self.assertEqual(d[0], (1, '1'))
 
1641
        self.assertEqual(d[1], (9, '9'))
 
1642
        self.assertEqual(d[2], (0, '0'))
 
1643
        self.assertEqual(d[3], (3, '34'))
1559
1644
 
1560
1645
    def test_readv_with_adjust_for_latency(self):
1561
1646
        transport = self.get_transport()
1566
1651
        # reference the returned data with the random data. To avoid doing
1567
1652
        # multiple large random byte look ups we do several tests on the same
1568
1653
        # backing data.
1569
 
        content = osutils.rand_bytes(200 * 1024)
 
1654
        content = osutils.rand_bytes(200*1024)
1570
1655
        content_size = len(content)
1571
1656
        if transport.is_readonly():
1572
1657
            self.build_tree_contents([('a', content)])
1573
1658
        else:
1574
1659
            transport.put_bytes('a', content)
1575
 
 
1576
1660
        def check_result_data(result_vector):
1577
1661
            for item in result_vector:
1578
1662
                data_len = len(item[1])
1580
1664
 
1581
1665
        # start corner case
1582
1666
        result = list(transport.readv('a', ((0, 30),),
1583
 
                                      adjust_for_latency=True, upper_limit=content_size))
 
1667
            adjust_for_latency=True, upper_limit=content_size))
1584
1668
        # we expect 1 result, from 0, to something > 30
1585
1669
        self.assertEqual(1, len(result))
1586
1670
        self.assertEqual(0, result[0][0])
1588
1672
        check_result_data(result)
1589
1673
        # end of file corner case
1590
1674
        result = list(transport.readv('a', ((204700, 100),),
1591
 
                                      adjust_for_latency=True, upper_limit=content_size))
 
1675
            adjust_for_latency=True, upper_limit=content_size))
1592
1676
        # we expect 1 result, from 204800- its length, to the end
1593
1677
        self.assertEqual(1, len(result))
1594
1678
        data_len = len(result[0][1])
1595
 
        self.assertEqual(204800 - data_len, result[0][0])
 
1679
        self.assertEqual(204800-data_len, result[0][0])
1596
1680
        self.assertTrue(data_len >= 100)
1597
1681
        check_result_data(result)
1598
1682
        # out of order ranges are made in order
1599
1683
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
1600
 
                                      adjust_for_latency=True, upper_limit=content_size))
 
1684
            adjust_for_latency=True, upper_limit=content_size))
1601
1685
        # we expect 2 results, in order, start and end.
1602
1686
        self.assertEqual(2, len(result))
1603
1687
        # start
1606
1690
        self.assertTrue(data_len >= 30)
1607
1691
        # end
1608
1692
        data_len = len(result[1][1])
1609
 
        self.assertEqual(204800 - data_len, result[1][0])
 
1693
        self.assertEqual(204800-data_len, result[1][0])
1610
1694
        self.assertTrue(data_len >= 100)
1611
1695
        check_result_data(result)
1612
1696
        # close ranges get combined (even if out of order)
1613
 
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
 
1697
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
1614
1698
            result = list(transport.readv('a', request_vector,
1615
 
                                          adjust_for_latency=True, upper_limit=content_size))
 
1699
                adjust_for_latency=True, upper_limit=content_size))
1616
1700
            self.assertEqual(1, len(result))
1617
1701
            data_len = len(result[0][1])
1618
1702
            # minimum length is from 400 to 1034 - 634
1626
1710
        transport = self.get_transport()
1627
1711
        # test from observed failure case.
1628
1712
        if transport.is_readonly():
1629
 
            with open('a', 'w') as f:
1630
 
                f.write('a' * 1024 * 1024)
 
1713
            file('a', 'w').write('a'*1024*1024)
1631
1714
        else:
1632
 
            transport.put_bytes('a', b'a' * 1024 * 1024)
 
1715
            transport.put_bytes('a', 'a'*1024*1024)
1633
1716
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
1634
 
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
1635
 
                         (465373, 800), (947422, 800)]
1636
 
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
1637
 
        found_items = [False] * 9
 
1717
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1718
            (465373, 800), (947422, 800)]
 
1719
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1720
        found_items = [False]*9
1638
1721
        for pos, (start, length) in enumerate(broken_vector):
1639
1722
            # check the range is covered by the result
1640
1723
            for offset, data in results:
1641
1724
                if offset <= start and start + length <= offset + len(data):
1642
1725
                    found_items[pos] = True
1643
 
        self.assertEqual([True] * 9, found_items)
 
1726
        self.assertEqual([True]*9, found_items)
1644
1727
 
1645
1728
    def test_get_with_open_write_stream_sees_all_content(self):
1646
1729
        t = self.get_transport()
1647
1730
        if t.is_readonly():
1648
1731
            return
1649
 
        with t.open_write_stream('foo') as handle:
1650
 
            handle.write(b'bcd')
1651
 
            self.assertEqual([(0, b'b'), (2, b'd')], list(
1652
 
                t.readv('foo', ((0, 1), (2, 1)))))
 
1732
        handle = t.open_write_stream('foo')
 
1733
        try:
 
1734
            handle.write('bcd')
 
1735
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1736
        finally:
 
1737
            handle.close()
1653
1738
 
1654
1739
    def test_get_smart_medium(self):
1655
1740
        """All transports must either give a smart medium, or know they can't.
1665
1750
    def test_readv_short_read(self):
1666
1751
        transport = self.get_transport()
1667
1752
        if transport.is_readonly():
1668
 
            with open('a', 'w') as f:
1669
 
                f.write('0123456789')
 
1753
            file('a', 'w').write('0123456789')
1670
1754
        else:
1671
 
            transport.put_bytes('a', b'01234567890')
 
1755
            transport.put_bytes('a', '01234567890')
1672
1756
 
1673
1757
        # This is intentionally reading off the end of the file
1674
1758
        # since we are sure that it cannot get there
1675
1759
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
1676
1760
                               # Can be raised by paramiko
1677
1761
                               AssertionError),
1678
 
                              transport.readv, 'a', [(1, 1), (8, 10)])
 
1762
                              transport.readv, 'a', [(1,1), (8,10)])
1679
1763
 
1680
1764
        # This is trying to seek past the end of the file, it should
1681
1765
        # also raise a special error
1682
1766
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
1683
 
                              transport.readv, 'a', [(12, 2)])
1684
 
 
1685
 
    def test_no_segment_parameters(self):
1686
 
        """Segment parameters should be stripped and stored in
1687
 
        transport.segment_parameters."""
1688
 
        transport = self.get_transport("foo")
1689
 
        self.assertEqual({}, transport.get_segment_parameters())
1690
 
 
1691
 
    def test_segment_parameters(self):
1692
 
        """Segment parameters should be stripped and stored in
1693
 
        transport.get_segment_parameters()."""
1694
 
        base_url = self._server.get_url()
1695
 
        parameters = {"key1": "val1", "key2": "val2"}
1696
 
        url = urlutils.join_segment_parameters(base_url, parameters)
1697
 
        transport = _mod_transport.get_transport_from_url(url)
1698
 
        self.assertEqual(parameters, transport.get_segment_parameters())
1699
 
 
1700
 
    def test_set_segment_parameters(self):
1701
 
        """Segment parameters can be set and show up in base."""
1702
 
        transport = self.get_transport("foo")
1703
 
        orig_base = transport.base
1704
 
        transport.set_segment_parameter("arm", "board")
1705
 
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
1706
 
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
1707
 
        transport.set_segment_parameter("arm", None)
1708
 
        transport.set_segment_parameter("nonexistant", None)
1709
 
        self.assertEqual({}, transport.get_segment_parameters())
1710
 
        self.assertEqual(orig_base, transport.base)
1711
 
 
1712
 
    def test_stat_symlink(self):
1713
 
        # if a transport points directly to a symlink (and supports symlinks
1714
 
        # at all) you can tell this.  helps with bug 32669.
1715
 
        t = self.get_transport()
1716
 
        try:
1717
 
            t.symlink('target', 'link')
1718
 
        except TransportNotPossible:
1719
 
            raise TestSkipped("symlinks not supported")
1720
 
        t2 = t.clone('link')
1721
 
        st = t2.stat('')
1722
 
        self.assertTrue(stat.S_ISLNK(st.st_mode))
1723
 
 
1724
 
    def test_abspath_url_unquote_unreserved(self):
1725
 
        """URLs from abspath should have unreserved characters unquoted
1726
 
 
1727
 
        Need consistent quoting notably for tildes, see lp:842223 for more.
1728
 
        """
1729
 
        t = self.get_transport()
1730
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1731
 
        self.assertEqual(t.base + "-.09AZ_az~",
1732
 
                         t.abspath(needlessly_escaped_dir))
1733
 
 
1734
 
    def test_clone_url_unquote_unreserved(self):
1735
 
        """Base URL of a cloned branch needs unreserved characters unquoted
1736
 
 
1737
 
        Cloned transports should be prefix comparable for things like the
1738
 
        isolation checking of tests, see lp:842223 for more.
1739
 
        """
1740
 
        t1 = self.get_transport()
1741
 
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
1742
 
        self.build_tree([needlessly_escaped_dir], transport=t1)
1743
 
        t2 = t1.clone(needlessly_escaped_dir)
1744
 
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
1745
 
 
1746
 
    def test_hook_post_connection_one(self):
1747
 
        """Fire post_connect hook after a ConnectedTransport is first used"""
1748
 
        log = []
1749
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1750
 
        t = self.get_transport()
1751
 
        self.assertEqual([], log)
1752
 
        t.has("non-existant")
1753
 
        if isinstance(t, RemoteTransport):
1754
 
            self.assertEqual([t.get_smart_medium()], log)
1755
 
        elif isinstance(t, ConnectedTransport):
1756
 
            self.assertEqual([t], log)
1757
 
        else:
1758
 
            self.assertEqual([], log)
1759
 
 
1760
 
    def test_hook_post_connection_multi(self):
1761
 
        """Fire post_connect hook once per unshared underlying connection"""
1762
 
        log = []
1763
 
        Transport.hooks.install_named_hook("post_connect", log.append, None)
1764
 
        t1 = self.get_transport()
1765
 
        t2 = t1.clone(".")
1766
 
        t3 = self.get_transport()
1767
 
        self.assertEqual([], log)
1768
 
        t1.has("x")
1769
 
        t2.has("x")
1770
 
        t3.has("x")
1771
 
        if isinstance(t1, RemoteTransport):
1772
 
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
1773
 
        elif isinstance(t1, ConnectedTransport):
1774
 
            self.assertEqual([t1, t3], log)
1775
 
        else:
1776
 
            self.assertEqual([], log)
 
1767
                              transport.readv, 'a', [(12,2)])