/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

Merge with serialize-transform

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import itertools
 
24
import os
 
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
 
27
import stat
 
28
import sys
 
29
import unittest
 
30
 
 
31
from bzrlib import (
 
32
    errors,
 
33
    osutils,
 
34
    tests,
 
35
    urlutils,
 
36
    )
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
 
44
                           PathError,
 
45
                           TransportNotPossible,
 
46
                           )
 
47
from bzrlib.osutils import getcwd
 
48
from bzrlib.smart import medium
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
 
51
    TestScenarioApplier,
 
52
    TestSkipped,
 
53
    TestNotApplicable,
 
54
    )
 
55
from bzrlib.tests.test_transport import TestTransportImplementation
 
56
from bzrlib.transport import (
 
57
    ConnectedTransport,
 
58
    get_transport,
 
59
    _get_transport_modules,
 
60
    )
 
61
from bzrlib.transport.memory import MemoryTransport
 
62
 
 
63
 
 
64
class TransportTestProviderAdapter(TestScenarioApplier):
 
65
    """A tool to generate a suite testing all transports for a single test.
 
66
 
 
67
    This is done by copying the test once for each transport and injecting
 
68
    the transport_class and transport_server classes into each copy. Each copy
 
69
    is also given a new id() to make it easy to identify.
 
70
    """
 
71
 
 
72
    def __init__(self):
 
73
        self.scenarios = self._test_permutations()
 
74
 
 
75
    def get_transport_test_permutations(self, module):
 
76
        """Get the permutations module wants to have tested."""
 
77
        if getattr(module, 'get_test_permutations', None) is None:
 
78
            raise AssertionError(
 
79
                "transport module %s doesn't provide get_test_permutations()"
 
80
                % module.__name__)
 
81
            return []
 
82
        return module.get_test_permutations()
 
83
 
 
84
    def _test_permutations(self):
 
85
        """Return a list of the klass, server_factory pairs to test."""
 
86
        result = []
 
87
        for module in _get_transport_modules():
 
88
            try:
 
89
                permutations = self.get_transport_test_permutations(
 
90
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
 
91
                for (klass, server_factory) in permutations:
 
92
                    scenario = (server_factory.__name__,
 
93
                        {"transport_class":klass,
 
94
                         "transport_server":server_factory})
 
95
                    result.append(scenario)
 
96
            except errors.DependencyNotPresent, e:
 
97
                # Continue even if a dependency prevents us 
 
98
                # from adding this test
 
99
                pass
 
100
        return result
 
101
 
 
102
 
 
103
def load_tests(standard_tests, module, loader):
 
104
    """Multiply tests for tranport implementations."""
 
105
    result = loader.suiteClass()
 
106
    adapter = TransportTestProviderAdapter()
 
107
    for test in tests.iter_suite_tests(standard_tests):
 
108
        result.addTests(adapter.adapt(test))
 
109
    return result
 
110
 
 
111
 
 
112
class TransportTests(TestTransportImplementation):
 
113
 
 
114
    def setUp(self):
 
115
        super(TransportTests, self).setUp()
 
116
        self._captureVar('BZR_NO_SMART_VFS', None)
 
117
 
 
118
    def check_transport_contents(self, content, transport, relpath):
 
119
        """Check that transport.get(relpath).read() == content."""
 
120
        self.assertEqualDiff(content, transport.get(relpath).read())
 
121
 
 
122
    def test_ensure_base_missing(self):
 
123
        """.ensure_base() should create the directory if it doesn't exist"""
 
124
        t = self.get_transport()
 
125
        t_a = t.clone('a')
 
126
        if t_a.is_readonly():
 
127
            self.assertRaises(TransportNotPossible,
 
128
                              t_a.ensure_base)
 
129
            return
 
130
        self.assertTrue(t_a.ensure_base())
 
131
        self.assertTrue(t.has('a'))
 
132
 
 
133
    def test_ensure_base_exists(self):
 
134
        """.ensure_base() should just be happy if it already exists"""
 
135
        t = self.get_transport()
 
136
        if t.is_readonly():
 
137
            return
 
138
 
 
139
        t.mkdir('a')
 
140
        t_a = t.clone('a')
 
141
        # ensure_base returns False if it didn't create the base
 
142
        self.assertFalse(t_a.ensure_base())
 
143
 
 
144
    def test_ensure_base_missing_parent(self):
 
145
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
146
        t = self.get_transport()
 
147
        if t.is_readonly():
 
148
            return
 
149
 
 
150
        t_a = t.clone('a')
 
151
        t_b = t_a.clone('b')
 
152
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
153
 
 
154
    def test_external_url(self):
 
155
        """.external_url either works or raises InProcessTransport."""
 
156
        t = self.get_transport()
 
157
        try:
 
158
            t.external_url()
 
159
        except errors.InProcessTransport:
 
160
            pass
 
161
 
 
162
    def test_has(self):
 
163
        t = self.get_transport()
 
164
 
 
165
        files = ['a', 'b', 'e', 'g', '%']
 
166
        self.build_tree(files, transport=t)
 
167
        self.assertEqual(True, t.has('a'))
 
168
        self.assertEqual(False, t.has('c'))
 
169
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
170
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
171
                [True, True, False, False, True, False, True, False])
 
172
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
173
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
 
174
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
175
                [True, True, False, False, True, False, True, False])
 
176
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
177
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
178
 
 
179
    def test_has_root_works(self):
 
180
        from bzrlib.smart import server
 
181
        if self.transport_server is server.SmartTCPServer_for_testing:
 
182
            raise TestNotApplicable(
 
183
                "SmartTCPServer_for_testing intentionally does not allow "
 
184
                "access to /.")
 
185
        current_transport = self.get_transport()
 
186
        self.assertTrue(current_transport.has('/'))
 
187
        root = current_transport.clone('/')
 
188
        self.assertTrue(root.has(''))
 
189
 
 
190
    def test_get(self):
 
191
        t = self.get_transport()
 
192
 
 
193
        files = ['a', 'b', 'e', 'g']
 
194
        contents = ['contents of a\n',
 
195
                    'contents of b\n',
 
196
                    'contents of e\n',
 
197
                    'contents of g\n',
 
198
                    ]
 
199
        self.build_tree(files, transport=t, line_endings='binary')
 
200
        self.check_transport_contents('contents of a\n', t, 'a')
 
201
        content_f = t.get_multi(files)
 
202
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
203
        # evaluate their inputs, the transport requests should be issued and
 
204
        # handled sequentially (we don't want to force transport to buffer).
 
205
        for content, f in itertools.izip(contents, content_f):
 
206
            self.assertEqual(content, f.read())
 
207
 
 
208
        content_f = t.get_multi(iter(files))
 
209
        # Use itertools.izip() for the same reason
 
210
        for content, f in itertools.izip(contents, content_f):
 
211
            self.assertEqual(content, f.read())
 
212
 
 
213
        self.assertRaises(NoSuchFile, t.get, 'c')
 
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
 
216
 
 
217
    def test_get_directory_read_gives_ReadError(self):
 
218
        """consistent errors for read() on a file returned by get()."""
 
219
        t = self.get_transport()
 
220
        if t.is_readonly():
 
221
            self.build_tree(['a directory/'])
 
222
        else:
 
223
            t.mkdir('a%20directory')
 
224
        # getting the file must either work or fail with a PathError
 
225
        try:
 
226
            a_file = t.get('a%20directory')
 
227
        except (errors.PathError, errors.RedirectRequested):
 
228
            # early failure return immediately.
 
229
            return
 
230
        # having got a file, read() must either work (i.e. http reading a dir
 
231
        # listing) or fail with ReadError
 
232
        try:
 
233
            a_file.read()
 
234
        except errors.ReadError:
 
235
            pass
 
236
 
 
237
    def test_get_bytes(self):
 
238
        t = self.get_transport()
 
239
 
 
240
        files = ['a', 'b', 'e', 'g']
 
241
        contents = ['contents of a\n',
 
242
                    'contents of b\n',
 
243
                    'contents of e\n',
 
244
                    'contents of g\n',
 
245
                    ]
 
246
        self.build_tree(files, transport=t, line_endings='binary')
 
247
        self.check_transport_contents('contents of a\n', t, 'a')
 
248
 
 
249
        for content, fname in zip(contents, files):
 
250
            self.assertEqual(content, t.get_bytes(fname))
 
251
 
 
252
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
253
 
 
254
    def test_get_with_open_write_stream_sees_all_content(self):
 
255
        t = self.get_transport()
 
256
        if t.is_readonly():
 
257
            return
 
258
        handle = t.open_write_stream('foo')
 
259
        try:
 
260
            handle.write('b')
 
261
            self.assertEqual('b', t.get('foo').read())
 
262
        finally:
 
263
            handle.close()
 
264
 
 
265
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
266
        t = self.get_transport()
 
267
        if t.is_readonly():
 
268
            return
 
269
        handle = t.open_write_stream('foo')
 
270
        try:
 
271
            handle.write('b')
 
272
            self.assertEqual('b', t.get_bytes('foo'))
 
273
            self.assertEqual('b', t.get('foo').read())
 
274
        finally:
 
275
            handle.close()
 
276
 
 
277
    def test_put_bytes(self):
 
278
        t = self.get_transport()
 
279
 
 
280
        if t.is_readonly():
 
281
            self.assertRaises(TransportNotPossible,
 
282
                    t.put_bytes, 'a', 'some text for a\n')
 
283
            return
 
284
 
 
285
        t.put_bytes('a', 'some text for a\n')
 
286
        self.failUnless(t.has('a'))
 
287
        self.check_transport_contents('some text for a\n', t, 'a')
 
288
 
 
289
        # The contents should be overwritten
 
290
        t.put_bytes('a', 'new text for a\n')
 
291
        self.check_transport_contents('new text for a\n', t, 'a')
 
292
 
 
293
        self.assertRaises(NoSuchFile,
 
294
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
295
 
 
296
    def test_put_bytes_non_atomic(self):
 
297
        t = self.get_transport()
 
298
 
 
299
        if t.is_readonly():
 
300
            self.assertRaises(TransportNotPossible,
 
301
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
302
            return
 
303
 
 
304
        self.failIf(t.has('a'))
 
305
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
306
        self.failUnless(t.has('a'))
 
307
        self.check_transport_contents('some text for a\n', t, 'a')
 
308
        # Put also replaces contents
 
309
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
310
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
311
 
 
312
        # Make sure we can create another file
 
313
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
314
        # And overwrite 'a' with empty contents
 
315
        t.put_bytes_non_atomic('a', '')
 
316
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
317
        self.check_transport_contents('', t, 'a')
 
318
 
 
319
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
320
                                       'contents\n')
 
321
        # Now test the create_parent flag
 
322
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
323
                                       'contents\n')
 
324
        self.failIf(t.has('dir/a'))
 
325
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
326
                               create_parent_dir=True)
 
327
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
328
        
 
329
        # But we still get NoSuchFile if we can't make the parent dir
 
330
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
331
                                       'contents\n',
 
332
                                       create_parent_dir=True)
 
333
 
 
334
    def test_put_bytes_permissions(self):
 
335
        t = self.get_transport()
 
336
 
 
337
        if t.is_readonly():
 
338
            return
 
339
        if not t._can_roundtrip_unix_modebits():
 
340
            # Can't roundtrip, so no need to run this test
 
341
            return
 
342
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
343
        self.assertTransportMode(t, 'mode644', 0644)
 
344
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
345
        self.assertTransportMode(t, 'mode666', 0666)
 
346
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
347
        self.assertTransportMode(t, 'mode600', 0600)
 
348
        # Yes, you can put_bytes a file such that it becomes readonly
 
349
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
350
        self.assertTransportMode(t, 'mode400', 0400)
 
351
 
 
352
        # The default permissions should be based on the current umask
 
353
        umask = osutils.get_umask()
 
354
        t.put_bytes('nomode', 'test text\n', mode=None)
 
355
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
356
        
 
357
    def test_put_bytes_non_atomic_permissions(self):
 
358
        t = self.get_transport()
 
359
 
 
360
        if t.is_readonly():
 
361
            return
 
362
        if not t._can_roundtrip_unix_modebits():
 
363
            # Can't roundtrip, so no need to run this test
 
364
            return
 
365
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
366
        self.assertTransportMode(t, 'mode644', 0644)
 
367
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
368
        self.assertTransportMode(t, 'mode666', 0666)
 
369
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
370
        self.assertTransportMode(t, 'mode600', 0600)
 
371
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
372
        self.assertTransportMode(t, 'mode400', 0400)
 
373
 
 
374
        # The default permissions should be based on the current umask
 
375
        umask = osutils.get_umask()
 
376
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
377
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
378
 
 
379
        # We should also be able to set the mode for a parent directory
 
380
        # when it is created
 
381
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
382
                               dir_mode=0700, create_parent_dir=True)
 
383
        self.assertTransportMode(t, 'dir700', 0700)
 
384
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
385
                               dir_mode=0770, create_parent_dir=True)
 
386
        self.assertTransportMode(t, 'dir770', 0770)
 
387
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
388
                               dir_mode=0777, create_parent_dir=True)
 
389
        self.assertTransportMode(t, 'dir777', 0777)
 
390
        
 
391
    def test_put_file(self):
 
392
        t = self.get_transport()
 
393
 
 
394
        if t.is_readonly():
 
395
            self.assertRaises(TransportNotPossible,
 
396
                    t.put_file, 'a', StringIO('some text for a\n'))
 
397
            return
 
398
 
 
399
        result = t.put_file('a', StringIO('some text for a\n'))
 
400
        # put_file returns the length of the data written
 
401
        self.assertEqual(16, result)
 
402
        self.failUnless(t.has('a'))
 
403
        self.check_transport_contents('some text for a\n', t, 'a')
 
404
        # Put also replaces contents
 
405
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
406
        self.assertEqual(19, result)
 
407
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
408
        self.assertRaises(NoSuchFile,
 
409
                          t.put_file, 'path/doesnt/exist/c',
 
410
                              StringIO('contents'))
 
411
 
 
412
    def test_put_file_non_atomic(self):
 
413
        t = self.get_transport()
 
414
 
 
415
        if t.is_readonly():
 
416
            self.assertRaises(TransportNotPossible,
 
417
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
418
            return
 
419
 
 
420
        self.failIf(t.has('a'))
 
421
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
422
        self.failUnless(t.has('a'))
 
423
        self.check_transport_contents('some text for a\n', t, 'a')
 
424
        # Put also replaces contents
 
425
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
426
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
427
 
 
428
        # Make sure we can create another file
 
429
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
430
        # And overwrite 'a' with empty contents
 
431
        t.put_file_non_atomic('a', StringIO(''))
 
432
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
433
        self.check_transport_contents('', t, 'a')
 
434
 
 
435
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
436
                                       StringIO('contents\n'))
 
437
        # Now test the create_parent flag
 
438
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
439
                                       StringIO('contents\n'))
 
440
        self.failIf(t.has('dir/a'))
 
441
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
442
                              create_parent_dir=True)
 
443
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
444
        
 
445
        # But we still get NoSuchFile if we can't make the parent dir
 
446
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
447
                                       StringIO('contents\n'),
 
448
                                       create_parent_dir=True)
 
449
 
 
450
    def test_put_file_permissions(self):
 
451
 
 
452
        t = self.get_transport()
 
453
 
 
454
        if t.is_readonly():
 
455
            return
 
456
        if not t._can_roundtrip_unix_modebits():
 
457
            # Can't roundtrip, so no need to run this test
 
458
            return
 
459
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
460
        self.assertTransportMode(t, 'mode644', 0644)
 
461
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
462
        self.assertTransportMode(t, 'mode666', 0666)
 
463
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
 
464
        self.assertTransportMode(t, 'mode600', 0600)
 
465
        # Yes, you can put a file such that it becomes readonly
 
466
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
467
        self.assertTransportMode(t, 'mode400', 0400)
 
468
        # The default permissions should be based on the current umask
 
469
        umask = osutils.get_umask()
 
470
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
471
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
472
        
 
473
    def test_put_file_non_atomic_permissions(self):
 
474
        t = self.get_transport()
 
475
 
 
476
        if t.is_readonly():
 
477
            return
 
478
        if not t._can_roundtrip_unix_modebits():
 
479
            # Can't roundtrip, so no need to run this test
 
480
            return
 
481
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
482
        self.assertTransportMode(t, 'mode644', 0644)
 
483
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
484
        self.assertTransportMode(t, 'mode666', 0666)
 
485
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
486
        self.assertTransportMode(t, 'mode600', 0600)
 
487
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
488
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
489
        self.assertTransportMode(t, 'mode400', 0400)
 
490
 
 
491
        # The default permissions should be based on the current umask
 
492
        umask = osutils.get_umask()
 
493
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
494
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
495
        
 
496
        # We should also be able to set the mode for a parent directory
 
497
        # when it is created
 
498
        sio = StringIO()
 
499
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
500
                              dir_mode=0700, create_parent_dir=True)
 
501
        self.assertTransportMode(t, 'dir700', 0700)
 
502
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
503
                              dir_mode=0770, create_parent_dir=True)
 
504
        self.assertTransportMode(t, 'dir770', 0770)
 
505
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
506
                              dir_mode=0777, create_parent_dir=True)
 
507
        self.assertTransportMode(t, 'dir777', 0777)
 
508
 
 
509
    def test_put_bytes_unicode(self):
 
510
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
511
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
512
        # (we don't want to encode unicode here at all, callers should be
 
513
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
514
        # compatibility.  At some point we should use a specific exception.
 
515
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
516
        t = self.get_transport()
 
517
        if t.is_readonly():
 
518
            return
 
519
        unicode_string = u'\u1234'
 
520
        self.assertRaises(
 
521
            (AssertionError, UnicodeEncodeError),
 
522
            t.put_bytes, 'foo', unicode_string)
 
523
 
 
524
    def test_put_file_unicode(self):
 
525
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
526
        # This situation can happen (and has) if code is careless about the type
 
527
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
528
        # cStringIO, because it never returns unicode from read.
 
529
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
530
        # raise, but we raise it for hysterical raisins.
 
531
        t = self.get_transport()
 
532
        if t.is_readonly():
 
533
            return
 
534
        unicode_file = pyStringIO(u'\u1234')
 
535
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
536
 
 
537
    def test_mkdir(self):
 
538
        t = self.get_transport()
 
539
 
 
540
        if t.is_readonly():
 
541
            # cannot mkdir on readonly transports. We're not testing for 
 
542
            # cache coherency because cache behaviour is not currently
 
543
            # defined for the transport interface.
 
544
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
545
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
546
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
 
547
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
548
            return
 
549
        # Test mkdir
 
550
        t.mkdir('dir_a')
 
551
        self.assertEqual(t.has('dir_a'), True)
 
552
        self.assertEqual(t.has('dir_b'), False)
 
553
 
 
554
        t.mkdir('dir_b')
 
555
        self.assertEqual(t.has('dir_b'), True)
 
556
 
 
557
        t.mkdir_multi(['dir_c', 'dir_d'])
 
558
 
 
559
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
560
        self.assertEqual(list(t.has_multi(
 
561
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
562
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
563
            [True, True, True, False,
 
564
             True, True, True, True])
 
565
 
 
566
        # we were testing that a local mkdir followed by a transport
 
567
        # mkdir failed thusly, but given that we * in one process * do not
 
568
        # concurrently fiddle with disk dirs and then use transport to do 
 
569
        # things, the win here seems marginal compared to the constraint on
 
570
        # the interface. RBC 20051227
 
571
        t.mkdir('dir_g')
 
572
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
573
 
 
574
        # Test get/put in sub-directories
 
575
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
576
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
 
577
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
 
578
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
 
579
 
 
580
        # mkdir of a dir with an absent parent
 
581
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
582
 
 
583
    def test_mkdir_permissions(self):
 
584
        t = self.get_transport()
 
585
        if t.is_readonly():
 
586
            return
 
587
        if not t._can_roundtrip_unix_modebits():
 
588
            # no sense testing on this transport
 
589
            return
 
590
        # Test mkdir with a mode
 
591
        t.mkdir('dmode755', mode=0755)
 
592
        self.assertTransportMode(t, 'dmode755', 0755)
 
593
        t.mkdir('dmode555', mode=0555)
 
594
        self.assertTransportMode(t, 'dmode555', 0555)
 
595
        t.mkdir('dmode777', mode=0777)
 
596
        self.assertTransportMode(t, 'dmode777', 0777)
 
597
        t.mkdir('dmode700', mode=0700)
 
598
        self.assertTransportMode(t, 'dmode700', 0700)
 
599
        t.mkdir_multi(['mdmode755'], mode=0755)
 
600
        self.assertTransportMode(t, 'mdmode755', 0755)
 
601
 
 
602
        # Default mode should be based on umask
 
603
        umask = osutils.get_umask()
 
604
        t.mkdir('dnomode', mode=None)
 
605
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
606
 
 
607
    def test_opening_a_file_stream_creates_file(self):
 
608
        t = self.get_transport()
 
609
        if t.is_readonly():
 
610
            return
 
611
        handle = t.open_write_stream('foo')
 
612
        try:
 
613
            self.assertEqual('', t.get_bytes('foo'))
 
614
        finally:
 
615
            handle.close()
 
616
 
 
617
    def test_opening_a_file_stream_can_set_mode(self):
 
618
        t = self.get_transport()
 
619
        if t.is_readonly():
 
620
            return
 
621
        if not t._can_roundtrip_unix_modebits():
 
622
            # Can't roundtrip, so no need to run this test
 
623
            return
 
624
        def check_mode(name, mode, expected):
 
625
            handle = t.open_write_stream(name, mode=mode)
 
626
            handle.close()
 
627
            self.assertTransportMode(t, name, expected)
 
628
        check_mode('mode644', 0644, 0644)
 
629
        check_mode('mode666', 0666, 0666)
 
630
        check_mode('mode600', 0600, 0600)
 
631
        # The default permissions should be based on the current umask
 
632
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
633
 
 
634
    def test_copy_to(self):
 
635
        # FIXME: test:   same server to same server (partly done)
 
636
        # same protocol two servers
 
637
        # and    different protocols (done for now except for MemoryTransport.
 
638
        # - RBC 20060122
 
639
 
 
640
        def simple_copy_files(transport_from, transport_to):
 
641
            files = ['a', 'b', 'c', 'd']
 
642
            self.build_tree(files, transport=transport_from)
 
643
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
644
            for f in files:
 
645
                self.check_transport_contents(transport_to.get(f).read(),
 
646
                                              transport_from, f)
 
647
 
 
648
        t = self.get_transport()
 
649
        temp_transport = MemoryTransport('memory:///')
 
650
        simple_copy_files(t, temp_transport)
 
651
        if not t.is_readonly():
 
652
            t.mkdir('copy_to_simple')
 
653
            t2 = t.clone('copy_to_simple')
 
654
            simple_copy_files(t, t2)
 
655
 
 
656
 
 
657
        # Test that copying into a missing directory raises
 
658
        # NoSuchFile
 
659
        if t.is_readonly():
 
660
            self.build_tree(['e/', 'e/f'])
 
661
        else:
 
662
            t.mkdir('e')
 
663
            t.put_bytes('e/f', 'contents of e')
 
664
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
665
        temp_transport.mkdir('e')
 
666
        t.copy_to(['e/f'], temp_transport)
 
667
 
 
668
        del temp_transport
 
669
        temp_transport = MemoryTransport('memory:///')
 
670
 
 
671
        files = ['a', 'b', 'c', 'd']
 
672
        t.copy_to(iter(files), temp_transport)
 
673
        for f in files:
 
674
            self.check_transport_contents(temp_transport.get(f).read(),
 
675
                                          t, f)
 
676
        del temp_transport
 
677
 
 
678
        for mode in (0666, 0644, 0600, 0400):
 
679
            temp_transport = MemoryTransport("memory:///")
 
680
            t.copy_to(files, temp_transport, mode=mode)
 
681
            for f in files:
 
682
                self.assertTransportMode(temp_transport, f, mode)
 
683
 
 
684
    def test_append_file(self):
 
685
        t = self.get_transport()
 
686
 
 
687
        if t.is_readonly():
 
688
            self.assertRaises(TransportNotPossible,
 
689
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
690
            return
 
691
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
692
        t.put_bytes('b', 'contents\nfor b\n')
 
693
 
 
694
        self.assertEqual(20,
 
695
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
696
 
 
697
        self.check_transport_contents(
 
698
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
699
            t, 'a')
 
700
 
 
701
        # a file with no parent should fail..
 
702
        self.assertRaises(NoSuchFile,
 
703
                          t.append_file, 'missing/path', StringIO('content'))
 
704
 
 
705
        # And we can create new files, too
 
706
        self.assertEqual(0,
 
707
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
708
        self.check_transport_contents('some text\nfor a missing file\n',
 
709
                                      t, 'c')
 
710
 
 
711
    def test_append_bytes(self):
 
712
        t = self.get_transport()
 
713
 
 
714
        if t.is_readonly():
 
715
            self.assertRaises(TransportNotPossible,
 
716
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
717
            return
 
718
 
 
719
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
720
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
721
 
 
722
        self.assertEqual(20,
 
723
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
724
 
 
725
        self.check_transport_contents(
 
726
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
727
            t, 'a')
 
728
 
 
729
        # a file with no parent should fail..
 
730
        self.assertRaises(NoSuchFile,
 
731
                          t.append_bytes, 'missing/path', 'content')
 
732
 
 
733
    def test_append_multi(self):
 
734
        t = self.get_transport()
 
735
 
 
736
        if t.is_readonly():
 
737
            return
 
738
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
739
                         'add\nsome\nmore\ncontents\n')
 
740
        t.put_bytes('b', 'contents\nfor b\n')
 
741
 
 
742
        self.assertEqual((43, 15),
 
743
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
744
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
745
 
 
746
        self.check_transport_contents(
 
747
            'diff\ncontents for\na\n'
 
748
            'add\nsome\nmore\ncontents\n'
 
749
            'and\nthen\nsome\nmore\n',
 
750
            t, 'a')
 
751
        self.check_transport_contents(
 
752
                'contents\nfor b\n'
 
753
                'some\nmore\nfor\nb\n',
 
754
                t, 'b')
 
755
 
 
756
        self.assertEqual((62, 31),
 
757
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
758
                                 ('b', StringIO('from an iterator\n'))])))
 
759
        self.check_transport_contents(
 
760
            'diff\ncontents for\na\n'
 
761
            'add\nsome\nmore\ncontents\n'
 
762
            'and\nthen\nsome\nmore\n'
 
763
            'a little bit more\n',
 
764
            t, 'a')
 
765
        self.check_transport_contents(
 
766
                'contents\nfor b\n'
 
767
                'some\nmore\nfor\nb\n'
 
768
                'from an iterator\n',
 
769
                t, 'b')
 
770
 
 
771
        self.assertEqual((80, 0),
 
772
            t.append_multi([('a', StringIO('some text in a\n')),
 
773
                            ('d', StringIO('missing file r\n'))]))
 
774
 
 
775
        self.check_transport_contents(
 
776
            'diff\ncontents for\na\n'
 
777
            'add\nsome\nmore\ncontents\n'
 
778
            'and\nthen\nsome\nmore\n'
 
779
            'a little bit more\n'
 
780
            'some text in a\n',
 
781
            t, 'a')
 
782
        self.check_transport_contents('missing file r\n', t, 'd')
 
783
 
 
784
    def test_append_file_mode(self):
 
785
        """Check that append accepts a mode parameter"""
 
786
        # check append accepts a mode
 
787
        t = self.get_transport()
 
788
        if t.is_readonly():
 
789
            self.assertRaises(TransportNotPossible,
 
790
                t.append_file, 'f', StringIO('f'), mode=None)
 
791
            return
 
792
        t.append_file('f', StringIO('f'), mode=None)
 
793
        
 
794
    def test_append_bytes_mode(self):
 
795
        # check append_bytes accepts a mode
 
796
        t = self.get_transport()
 
797
        if t.is_readonly():
 
798
            self.assertRaises(TransportNotPossible,
 
799
                t.append_bytes, 'f', 'f', mode=None)
 
800
            return
 
801
        t.append_bytes('f', 'f', mode=None)
 
802
        
 
803
    def test_delete(self):
 
804
        # TODO: Test Transport.delete
 
805
        t = self.get_transport()
 
806
 
 
807
        # Not much to do with a readonly transport
 
808
        if t.is_readonly():
 
809
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
810
            return
 
811
 
 
812
        t.put_bytes('a', 'a little bit of text\n')
 
813
        self.failUnless(t.has('a'))
 
814
        t.delete('a')
 
815
        self.failIf(t.has('a'))
 
816
 
 
817
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
818
 
 
819
        t.put_bytes('a', 'a text\n')
 
820
        t.put_bytes('b', 'b text\n')
 
821
        t.put_bytes('c', 'c text\n')
 
822
        self.assertEqual([True, True, True],
 
823
                list(t.has_multi(['a', 'b', 'c'])))
 
824
        t.delete_multi(['a', 'c'])
 
825
        self.assertEqual([False, True, False],
 
826
                list(t.has_multi(['a', 'b', 'c'])))
 
827
        self.failIf(t.has('a'))
 
828
        self.failUnless(t.has('b'))
 
829
        self.failIf(t.has('c'))
 
830
 
 
831
        self.assertRaises(NoSuchFile,
 
832
                t.delete_multi, ['a', 'b', 'c'])
 
833
 
 
834
        self.assertRaises(NoSuchFile,
 
835
                t.delete_multi, iter(['a', 'b', 'c']))
 
836
 
 
837
        t.put_bytes('a', 'another a text\n')
 
838
        t.put_bytes('c', 'another c text\n')
 
839
        t.delete_multi(iter(['a', 'b', 'c']))
 
840
 
 
841
        # We should have deleted everything
 
842
        # SftpServer creates control files in the
 
843
        # working directory, so we can just do a
 
844
        # plain "listdir".
 
845
        # self.assertEqual([], os.listdir('.'))
 
846
 
 
847
    def test_recommended_page_size(self):
 
848
        """Transports recommend a page size for partial access to files."""
 
849
        t = self.get_transport()
 
850
        self.assertIsInstance(t.recommended_page_size(), int)
 
851
 
 
852
    def test_rmdir(self):
 
853
        t = self.get_transport()
 
854
        # Not much to do with a readonly transport
 
855
        if t.is_readonly():
 
856
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
857
            return
 
858
        t.mkdir('adir')
 
859
        t.mkdir('adir/bdir')
 
860
        t.rmdir('adir/bdir')
 
861
        # ftp may not be able to raise NoSuchFile for lack of
 
862
        # details when failing
 
863
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
864
        t.rmdir('adir')
 
865
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
866
 
 
867
    def test_rmdir_not_empty(self):
 
868
        """Deleting a non-empty directory raises an exception
 
869
        
 
870
        sftp (and possibly others) don't give us a specific "directory not
 
871
        empty" exception -- we can just see that the operation failed.
 
872
        """
 
873
        t = self.get_transport()
 
874
        if t.is_readonly():
 
875
            return
 
876
        t.mkdir('adir')
 
877
        t.mkdir('adir/bdir')
 
878
        self.assertRaises(PathError, t.rmdir, 'adir')
 
879
 
 
880
    def test_rmdir_empty_but_similar_prefix(self):
 
881
        """rmdir does not get confused by sibling paths.
 
882
        
 
883
        A naive implementation of MemoryTransport would refuse to rmdir
 
884
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
885
        uses "path.startswith(dir)" on all file paths to determine if directory
 
886
        is empty.
 
887
        """
 
888
        t = self.get_transport()
 
889
        if t.is_readonly():
 
890
            return
 
891
        t.mkdir('foo')
 
892
        t.put_bytes('foo-bar', '')
 
893
        t.mkdir('foo-baz')
 
894
        t.rmdir('foo')
 
895
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
896
        self.failUnless(t.has('foo-bar'))
 
897
 
 
898
    def test_rename_dir_succeeds(self):
 
899
        t = self.get_transport()
 
900
        if t.is_readonly():
 
901
            raise TestSkipped("transport is readonly")
 
902
        t.mkdir('adir')
 
903
        t.mkdir('adir/asubdir')
 
904
        t.rename('adir', 'bdir')
 
905
        self.assertTrue(t.has('bdir/asubdir'))
 
906
        self.assertFalse(t.has('adir'))
 
907
 
 
908
    def test_rename_dir_nonempty(self):
 
909
        """Attempting to replace a nonemtpy directory should fail"""
 
910
        t = self.get_transport()
 
911
        if t.is_readonly():
 
912
            raise TestSkipped("transport is readonly")
 
913
        t.mkdir('adir')
 
914
        t.mkdir('adir/asubdir')
 
915
        t.mkdir('bdir')
 
916
        t.mkdir('bdir/bsubdir')
 
917
        # any kind of PathError would be OK, though we normally expect
 
918
        # DirectoryNotEmpty
 
919
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
920
        # nothing was changed so it should still be as before
 
921
        self.assertTrue(t.has('bdir/bsubdir'))
 
922
        self.assertFalse(t.has('adir/bdir'))
 
923
        self.assertFalse(t.has('adir/bsubdir'))
 
924
 
 
925
    def test_rename_across_subdirs(self):
 
926
        t = self.get_transport()
 
927
        if t.is_readonly():
 
928
            raise TestNotApplicable("transport is readonly")
 
929
        t.mkdir('a')
 
930
        t.mkdir('b')
 
931
        ta = t.clone('a')
 
932
        tb = t.clone('b')
 
933
        ta.put_bytes('f', 'aoeu')
 
934
        ta.rename('f', '../b/f')
 
935
        self.assertTrue(tb.has('f'))
 
936
        self.assertFalse(ta.has('f'))
 
937
        self.assertTrue(t.has('b/f'))
 
938
 
 
939
    def test_delete_tree(self):
 
940
        t = self.get_transport()
 
941
 
 
942
        # Not much to do with a readonly transport
 
943
        if t.is_readonly():
 
944
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
945
            return
 
946
 
 
947
        # and does it like listing ?
 
948
        t.mkdir('adir')
 
949
        try:
 
950
            t.delete_tree('adir')
 
951
        except TransportNotPossible:
 
952
            # ok, this transport does not support delete_tree
 
953
            return
 
954
 
 
955
        # did it delete that trivial case?
 
956
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
957
 
 
958
        self.build_tree(['adir/',
 
959
                         'adir/file',
 
960
                         'adir/subdir/',
 
961
                         'adir/subdir/file',
 
962
                         'adir/subdir2/',
 
963
                         'adir/subdir2/file',
 
964
                         ], transport=t)
 
965
 
 
966
        t.delete_tree('adir')
 
967
        # adir should be gone now.
 
968
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
969
 
 
970
    def test_move(self):
 
971
        t = self.get_transport()
 
972
 
 
973
        if t.is_readonly():
 
974
            return
 
975
 
 
976
        # TODO: I would like to use os.listdir() to
 
977
        # make sure there are no extra files, but SftpServer
 
978
        # creates control files in the working directory
 
979
        # perhaps all of this could be done in a subdirectory
 
980
 
 
981
        t.put_bytes('a', 'a first file\n')
 
982
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
983
 
 
984
        t.move('a', 'b')
 
985
        self.failUnless(t.has('b'))
 
986
        self.failIf(t.has('a'))
 
987
 
 
988
        self.check_transport_contents('a first file\n', t, 'b')
 
989
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
990
 
 
991
        # Overwrite a file
 
992
        t.put_bytes('c', 'c this file\n')
 
993
        t.move('c', 'b')
 
994
        self.failIf(t.has('c'))
 
995
        self.check_transport_contents('c this file\n', t, 'b')
 
996
 
 
997
        # TODO: Try to write a test for atomicity
 
998
        # TODO: Test moving into a non-existent subdirectory
 
999
        # TODO: Test Transport.move_multi
 
1000
 
 
1001
    def test_copy(self):
 
1002
        t = self.get_transport()
 
1003
 
 
1004
        if t.is_readonly():
 
1005
            return
 
1006
 
 
1007
        t.put_bytes('a', 'a file\n')
 
1008
        t.copy('a', 'b')
 
1009
        self.check_transport_contents('a file\n', t, 'b')
 
1010
 
 
1011
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
1012
        os.mkdir('c')
 
1013
        # What should the assert be if you try to copy a
 
1014
        # file over a directory?
 
1015
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
1016
        t.put_bytes('d', 'text in d\n')
 
1017
        t.copy('d', 'b')
 
1018
        self.check_transport_contents('text in d\n', t, 'b')
 
1019
 
 
1020
        # TODO: test copy_multi
 
1021
 
 
1022
    def test_connection_error(self):
 
1023
        """ConnectionError is raised when connection is impossible.
 
1024
        
 
1025
        The error should be raised from the first operation on the transport.
 
1026
        """
 
1027
        try:
 
1028
            url = self._server.get_bogus_url()
 
1029
        except NotImplementedError:
 
1030
            raise TestSkipped("Transport %s has no bogus URL support." %
 
1031
                              self._server.__class__)
 
1032
        t = get_transport(url)
 
1033
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
1034
 
 
1035
    def test_stat(self):
 
1036
        # TODO: Test stat, just try once, and if it throws, stop testing
 
1037
        from stat import S_ISDIR, S_ISREG
 
1038
 
 
1039
        t = self.get_transport()
 
1040
 
 
1041
        try:
 
1042
            st = t.stat('.')
 
1043
        except TransportNotPossible, e:
 
1044
            # This transport cannot stat
 
1045
            return
 
1046
 
 
1047
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
1048
        sizes = [14, 0, 16, 0, 18]
 
1049
        self.build_tree(paths, transport=t, line_endings='binary')
 
1050
 
 
1051
        for path, size in zip(paths, sizes):
 
1052
            st = t.stat(path)
 
1053
            if path.endswith('/'):
 
1054
                self.failUnless(S_ISDIR(st.st_mode))
 
1055
                # directory sizes are meaningless
 
1056
            else:
 
1057
                self.failUnless(S_ISREG(st.st_mode))
 
1058
                self.assertEqual(size, st.st_size)
 
1059
 
 
1060
        remote_stats = list(t.stat_multi(paths))
 
1061
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
1062
 
 
1063
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
1064
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
1065
 
 
1066
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
1067
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
1068
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
1069
        subdir = t.clone('subdir')
 
1070
        subdir.stat('./file')
 
1071
        subdir.stat('.')
 
1072
 
 
1073
    def test_list_dir(self):
 
1074
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1075
        t = self.get_transport()
 
1076
 
 
1077
        if not t.listable():
 
1078
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1079
            return
 
1080
 
 
1081
        def sorted_list(d, transport):
 
1082
            l = list(transport.list_dir(d))
 
1083
            l.sort()
 
1084
            return l
 
1085
 
 
1086
        self.assertEqual([], sorted_list('.', t))
 
1087
        # c2 is precisely one letter longer than c here to test that
 
1088
        # suffixing is not confused.
 
1089
        # a%25b checks that quoting is done consistently across transports
 
1090
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1091
 
 
1092
        if not t.is_readonly():
 
1093
            self.build_tree(tree_names, transport=t)
 
1094
        else:
 
1095
            self.build_tree(tree_names)
 
1096
 
 
1097
        self.assertEqual(
 
1098
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1099
        self.assertEqual(
 
1100
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1101
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1102
 
 
1103
        # Cloning the transport produces an equivalent listing
 
1104
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1105
 
 
1106
        if not t.is_readonly():
 
1107
            t.delete('c/d')
 
1108
            t.delete('b')
 
1109
        else:
 
1110
            os.unlink('c/d')
 
1111
            os.unlink('b')
 
1112
 
 
1113
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1114
        self.assertEqual(['e'], sorted_list('c', t))
 
1115
 
 
1116
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1117
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1118
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1119
 
 
1120
    def test_list_dir_result_is_url_escaped(self):
 
1121
        t = self.get_transport()
 
1122
        if not t.listable():
 
1123
            raise TestSkipped("transport not listable")
 
1124
 
 
1125
        if not t.is_readonly():
 
1126
            self.build_tree(['a/', 'a/%'], transport=t)
 
1127
        else:
 
1128
            self.build_tree(['a/', 'a/%'])
 
1129
 
 
1130
        names = list(t.list_dir('a'))
 
1131
        self.assertEqual(['%25'], names)
 
1132
        self.assertIsInstance(names[0], str)
 
1133
 
 
1134
    def test_clone_preserve_info(self):
 
1135
        t1 = self.get_transport()
 
1136
        if not isinstance(t1, ConnectedTransport):
 
1137
            raise TestSkipped("not a connected transport")
 
1138
 
 
1139
        t2 = t1.clone('subdir')
 
1140
        self.assertEquals(t1._scheme, t2._scheme)
 
1141
        self.assertEquals(t1._user, t2._user)
 
1142
        self.assertEquals(t1._password, t2._password)
 
1143
        self.assertEquals(t1._host, t2._host)
 
1144
        self.assertEquals(t1._port, t2._port)
 
1145
 
 
1146
    def test__reuse_for(self):
 
1147
        t = self.get_transport()
 
1148
        if not isinstance(t, ConnectedTransport):
 
1149
            raise TestSkipped("not a connected transport")
 
1150
 
 
1151
        def new_url(scheme=None, user=None, password=None,
 
1152
                    host=None, port=None, path=None):
 
1153
            """Build a new url from t.base changing only parts of it.
 
1154
 
 
1155
            Only the parameters different from None will be changed.
 
1156
            """
 
1157
            if scheme   is None: scheme   = t._scheme
 
1158
            if user     is None: user     = t._user
 
1159
            if password is None: password = t._password
 
1160
            if user     is None: user     = t._user
 
1161
            if host     is None: host     = t._host
 
1162
            if port     is None: port     = t._port
 
1163
            if path     is None: path     = t._path
 
1164
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1165
 
 
1166
        if t._scheme == 'ftp':
 
1167
            scheme = 'sftp'
 
1168
        else:
 
1169
            scheme = 'ftp'
 
1170
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1171
        if t._user == 'me':
 
1172
            user = 'you'
 
1173
        else:
 
1174
            user = 'me'
 
1175
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1176
        # passwords are not taken into account because:
 
1177
        # - it makes no sense to have two different valid passwords for the
 
1178
        #   same user
 
1179
        # - _password in ConnectedTransport is intended to collect what the
 
1180
        #   user specified from the command-line and there are cases where the
 
1181
        #   new url can contain no password (if the url was built from an
 
1182
        #   existing transport.base for example)
 
1183
        # - password are considered part of the credentials provided at
 
1184
        #   connection creation time and as such may not be present in the url
 
1185
        #   (they may be typed by the user when prompted for example)
 
1186
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1187
        # We will not connect, we can use a invalid host
 
1188
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1189
        if t._port == 1234:
 
1190
            port = 4321
 
1191
        else:
 
1192
            port = 1234
 
1193
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1194
        # No point in trying to reuse a transport for a local URL
 
1195
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1196
 
 
1197
    def test_connection_sharing(self):
 
1198
        t = self.get_transport()
 
1199
        if not isinstance(t, ConnectedTransport):
 
1200
            raise TestSkipped("not a connected transport")
 
1201
 
 
1202
        c = t.clone('subdir')
 
1203
        # Some transports will create the connection  only when needed
 
1204
        t.has('surely_not') # Force connection
 
1205
        self.assertIs(t._get_connection(), c._get_connection())
 
1206
 
 
1207
        # Temporary failure, we need to create a new dummy connection
 
1208
        new_connection = object()
 
1209
        t._set_connection(new_connection)
 
1210
        # Check that both transports use the same connection
 
1211
        self.assertIs(new_connection, t._get_connection())
 
1212
        self.assertIs(new_connection, c._get_connection())
 
1213
 
 
1214
    def test_reuse_connection_for_various_paths(self):
 
1215
        t = self.get_transport()
 
1216
        if not isinstance(t, ConnectedTransport):
 
1217
            raise TestSkipped("not a connected transport")
 
1218
 
 
1219
        t.has('surely_not') # Force connection
 
1220
        self.assertIsNot(None, t._get_connection())
 
1221
 
 
1222
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1223
        self.assertIsNot(t, subdir)
 
1224
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1225
 
 
1226
        home = subdir._reuse_for(t.base + 'home')
 
1227
        self.assertIs(t._get_connection(), home._get_connection())
 
1228
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1229
 
 
1230
    def test_clone(self):
 
1231
        # TODO: Test that clone moves up and down the filesystem
 
1232
        t1 = self.get_transport()
 
1233
 
 
1234
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1235
 
 
1236
        self.failUnless(t1.has('a'))
 
1237
        self.failUnless(t1.has('b/c'))
 
1238
        self.failIf(t1.has('c'))
 
1239
 
 
1240
        t2 = t1.clone('b')
 
1241
        self.assertEqual(t1.base + 'b/', t2.base)
 
1242
 
 
1243
        self.failUnless(t2.has('c'))
 
1244
        self.failIf(t2.has('a'))
 
1245
 
 
1246
        t3 = t2.clone('..')
 
1247
        self.failUnless(t3.has('a'))
 
1248
        self.failIf(t3.has('c'))
 
1249
 
 
1250
        self.failIf(t1.has('b/d'))
 
1251
        self.failIf(t2.has('d'))
 
1252
        self.failIf(t3.has('b/d'))
 
1253
 
 
1254
        if t1.is_readonly():
 
1255
            self.build_tree_contents([('b/d', 'newfile\n')])
 
1256
        else:
 
1257
            t2.put_bytes('d', 'newfile\n')
 
1258
 
 
1259
        self.failUnless(t1.has('b/d'))
 
1260
        self.failUnless(t2.has('d'))
 
1261
        self.failUnless(t3.has('b/d'))
 
1262
 
 
1263
    def test_clone_to_root(self):
 
1264
        orig_transport = self.get_transport()
 
1265
        # Repeatedly go up to a parent directory until we're at the root
 
1266
        # directory of this transport
 
1267
        root_transport = orig_transport
 
1268
        new_transport = root_transport.clone("..")
 
1269
        # as we are walking up directories, the path must be
 
1270
        # growing less, except at the top
 
1271
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1272
            or new_transport.base == root_transport.base)
 
1273
        while new_transport.base != root_transport.base:
 
1274
            root_transport = new_transport
 
1275
            new_transport = root_transport.clone("..")
 
1276
            # as we are walking up directories, the path must be
 
1277
            # growing less, except at the top
 
1278
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1279
                or new_transport.base == root_transport.base)
 
1280
 
 
1281
        # Cloning to "/" should take us to exactly the same location.
 
1282
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1283
        # the abspath of "/" from the original transport should be the same
 
1284
        # as the base at the root:
 
1285
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1286
 
 
1287
        # At the root, the URL must still end with / as its a directory
 
1288
        self.assertEqual(root_transport.base[-1], '/')
 
1289
 
 
1290
    def test_clone_from_root(self):
 
1291
        """At the root, cloning to a simple dir should just do string append."""
 
1292
        orig_transport = self.get_transport()
 
1293
        root_transport = orig_transport.clone('/')
 
1294
        self.assertEqual(root_transport.base + '.bzr/',
 
1295
            root_transport.clone('.bzr').base)
 
1296
 
 
1297
    def test_base_url(self):
 
1298
        t = self.get_transport()
 
1299
        self.assertEqual('/', t.base[-1])
 
1300
 
 
1301
    def test_relpath(self):
 
1302
        t = self.get_transport()
 
1303
        self.assertEqual('', t.relpath(t.base))
 
1304
        # base ends with /
 
1305
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1306
        # subdirs which don't exist should still give relpaths.
 
1307
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1308
        # trailing slash should be the same.
 
1309
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1310
 
 
1311
    def test_relpath_at_root(self):
 
1312
        t = self.get_transport()
 
1313
        # clone all the way to the top
 
1314
        new_transport = t.clone('..')
 
1315
        while new_transport.base != t.base:
 
1316
            t = new_transport
 
1317
            new_transport = t.clone('..')
 
1318
        # we must be able to get a relpath below the root
 
1319
        self.assertEqual('', t.relpath(t.base))
 
1320
        # and a deeper one should work too
 
1321
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1322
 
 
1323
    def test_abspath(self):
 
1324
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1325
        # that have aliasing problems like symlinks should go in backend
 
1326
        # specific test cases.
 
1327
        transport = self.get_transport()
 
1328
 
 
1329
        self.assertEqual(transport.base + 'relpath',
 
1330
                         transport.abspath('relpath'))
 
1331
 
 
1332
        # This should work without raising an error.
 
1333
        transport.abspath("/")
 
1334
 
 
1335
        # the abspath of "/" and "/foo/.." should result in the same location
 
1336
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1337
 
 
1338
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1339
                         transport.abspath("/foo"))
 
1340
 
 
1341
    def test_win32_abspath(self):
 
1342
        # Note: we tried to set sys.platform='win32' so we could test on
 
1343
        # other platforms too, but then osutils does platform specific
 
1344
        # things at import time which defeated us...
 
1345
        if sys.platform != 'win32':
 
1346
            raise TestSkipped(
 
1347
                'Testing drive letters in abspath implemented only for win32')
 
1348
 
 
1349
        # smoke test for abspath on win32.
 
1350
        # a transport based on 'file:///' never fully qualifies the drive.
 
1351
        transport = get_transport("file:///")
 
1352
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1353
 
 
1354
        # but a transport that starts with a drive spec must keep it.
 
1355
        transport = get_transport("file:///C:/")
 
1356
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1357
 
 
1358
    def test_local_abspath(self):
 
1359
        transport = self.get_transport()
 
1360
        try:
 
1361
            p = transport.local_abspath('.')
 
1362
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1363
            # should be formattable
 
1364
            s = str(e)
 
1365
        else:
 
1366
            self.assertEqual(getcwd(), p)
 
1367
 
 
1368
    def test_abspath_at_root(self):
 
1369
        t = self.get_transport()
 
1370
        # clone all the way to the top
 
1371
        new_transport = t.clone('..')
 
1372
        while new_transport.base != t.base:
 
1373
            t = new_transport
 
1374
            new_transport = t.clone('..')
 
1375
        # we must be able to get a abspath of the root when we ask for
 
1376
        # t.abspath('..') - this due to our choice that clone('..')
 
1377
        # should return the root from the root, combined with the desire that
 
1378
        # the url from clone('..') and from abspath('..') should be the same.
 
1379
        self.assertEqual(t.base, t.abspath('..'))
 
1380
        # '' should give us the root
 
1381
        self.assertEqual(t.base, t.abspath(''))
 
1382
        # and a path should append to the url
 
1383
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1384
 
 
1385
    def test_iter_files_recursive(self):
 
1386
        transport = self.get_transport()
 
1387
        if not transport.listable():
 
1388
            self.assertRaises(TransportNotPossible,
 
1389
                              transport.iter_files_recursive)
 
1390
            return
 
1391
        self.build_tree(['isolated/',
 
1392
                         'isolated/dir/',
 
1393
                         'isolated/dir/foo',
 
1394
                         'isolated/dir/bar',
 
1395
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1396
                         'isolated/bar'],
 
1397
                        transport=transport)
 
1398
        paths = set(transport.iter_files_recursive())
 
1399
        # nb the directories are not converted
 
1400
        self.assertEqual(paths,
 
1401
                    set(['isolated/dir/foo',
 
1402
                         'isolated/dir/bar',
 
1403
                         'isolated/dir/b%2525z',
 
1404
                         'isolated/bar']))
 
1405
        sub_transport = transport.clone('isolated')
 
1406
        paths = set(sub_transport.iter_files_recursive())
 
1407
        self.assertEqual(paths,
 
1408
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1409
 
 
1410
    def test_copy_tree(self):
 
1411
        # TODO: test file contents and permissions are preserved. This test was
 
1412
        # added just to ensure that quoting was handled correctly.
 
1413
        # -- David Allouche 2006-08-11
 
1414
        transport = self.get_transport()
 
1415
        if not transport.listable():
 
1416
            self.assertRaises(TransportNotPossible,
 
1417
                              transport.iter_files_recursive)
 
1418
            return
 
1419
        if transport.is_readonly():
 
1420
            return
 
1421
        self.build_tree(['from/',
 
1422
                         'from/dir/',
 
1423
                         'from/dir/foo',
 
1424
                         'from/dir/bar',
 
1425
                         'from/dir/b%25z', # make sure quoting is correct
 
1426
                         'from/bar'],
 
1427
                        transport=transport)
 
1428
        transport.copy_tree('from', 'to')
 
1429
        paths = set(transport.iter_files_recursive())
 
1430
        self.assertEqual(paths,
 
1431
                    set(['from/dir/foo',
 
1432
                         'from/dir/bar',
 
1433
                         'from/dir/b%2525z',
 
1434
                         'from/bar',
 
1435
                         'to/dir/foo',
 
1436
                         'to/dir/bar',
 
1437
                         'to/dir/b%2525z',
 
1438
                         'to/bar',]))
 
1439
 
 
1440
    def test_unicode_paths(self):
 
1441
        """Test that we can read/write files with Unicode names."""
 
1442
        t = self.get_transport()
 
1443
 
 
1444
        # With FAT32 and certain encodings on win32
 
1445
        # '\xe5' and '\xe4' actually map to the same file
 
1446
        # adding a suffix kicks in the 'preserving but insensitive'
 
1447
        # route, and maintains the right files
 
1448
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1449
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1450
                 u'\u017d', # Z with umlat iso-8859-2
 
1451
                 u'\u062c', # Arabic j
 
1452
                 u'\u0410', # Russian A
 
1453
                 u'\u65e5', # Kanji person
 
1454
                ]
 
1455
 
 
1456
        try:
 
1457
            self.build_tree(files, transport=t, line_endings='binary')
 
1458
        except UnicodeError:
 
1459
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1460
 
 
1461
        # A plain unicode string is not a valid url
 
1462
        for fname in files:
 
1463
            self.assertRaises(InvalidURL, t.get, fname)
 
1464
 
 
1465
        for fname in files:
 
1466
            fname_utf8 = fname.encode('utf-8')
 
1467
            contents = 'contents of %s\n' % (fname_utf8,)
 
1468
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1469
 
 
1470
    def test_connect_twice_is_same_content(self):
 
1471
        # check that our server (whatever it is) is accessible reliably
 
1472
        # via get_transport and multiple connections share content.
 
1473
        transport = self.get_transport()
 
1474
        if transport.is_readonly():
 
1475
            return
 
1476
        transport.put_bytes('foo', 'bar')
 
1477
        transport3 = self.get_transport()
 
1478
        self.check_transport_contents('bar', transport3, 'foo')
 
1479
        # its base should be usable.
 
1480
        transport4 = get_transport(transport.base)
 
1481
        self.check_transport_contents('bar', transport4, 'foo')
 
1482
 
 
1483
        # now opening at a relative url should give use a sane result:
 
1484
        transport.mkdir('newdir')
 
1485
        transport5 = get_transport(transport.base + "newdir")
 
1486
        transport6 = transport5.clone('..')
 
1487
        self.check_transport_contents('bar', transport6, 'foo')
 
1488
 
 
1489
    def test_lock_write(self):
 
1490
        """Test transport-level write locks.
 
1491
 
 
1492
        These are deprecated and transports may decline to support them.
 
1493
        """
 
1494
        transport = self.get_transport()
 
1495
        if transport.is_readonly():
 
1496
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1497
            return
 
1498
        transport.put_bytes('lock', '')
 
1499
        try:
 
1500
            lock = transport.lock_write('lock')
 
1501
        except TransportNotPossible:
 
1502
            return
 
1503
        # TODO make this consistent on all platforms:
 
1504
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1505
        lock.unlock()
 
1506
 
 
1507
    def test_lock_read(self):
 
1508
        """Test transport-level read locks.
 
1509
 
 
1510
        These are deprecated and transports may decline to support them.
 
1511
        """
 
1512
        transport = self.get_transport()
 
1513
        if transport.is_readonly():
 
1514
            file('lock', 'w').close()
 
1515
        else:
 
1516
            transport.put_bytes('lock', '')
 
1517
        try:
 
1518
            lock = transport.lock_read('lock')
 
1519
        except TransportNotPossible:
 
1520
            return
 
1521
        # TODO make this consistent on all platforms:
 
1522
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1523
        lock.unlock()
 
1524
 
 
1525
    def test_readv(self):
 
1526
        transport = self.get_transport()
 
1527
        if transport.is_readonly():
 
1528
            file('a', 'w').write('0123456789')
 
1529
        else:
 
1530
            transport.put_bytes('a', '0123456789')
 
1531
 
 
1532
        d = list(transport.readv('a', ((0, 1),)))
 
1533
        self.assertEqual(d[0], (0, '0'))
 
1534
 
 
1535
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1536
        self.assertEqual(d[0], (0, '0'))
 
1537
        self.assertEqual(d[1], (1, '1'))
 
1538
        self.assertEqual(d[2], (3, '34'))
 
1539
        self.assertEqual(d[3], (9, '9'))
 
1540
 
 
1541
    def test_readv_out_of_order(self):
 
1542
        transport = self.get_transport()
 
1543
        if transport.is_readonly():
 
1544
            file('a', 'w').write('0123456789')
 
1545
        else:
 
1546
            transport.put_bytes('a', '01234567890')
 
1547
 
 
1548
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1549
        self.assertEqual(d[0], (1, '1'))
 
1550
        self.assertEqual(d[1], (9, '9'))
 
1551
        self.assertEqual(d[2], (0, '0'))
 
1552
        self.assertEqual(d[3], (3, '34'))
 
1553
 
 
1554
    def test_readv_with_adjust_for_latency(self):
 
1555
        transport = self.get_transport()
 
1556
        # the adjust for latency flag expands the data region returned
 
1557
        # according to a per-transport heuristic, so testing is a little
 
1558
        # tricky as we need more data than the largest combining that our
 
1559
        # transports do. To accomodate this we generate random data and cross
 
1560
        # reference the returned data with the random data. To avoid doing
 
1561
        # multiple large random byte look ups we do several tests on the same
 
1562
        # backing data.
 
1563
        content = osutils.rand_bytes(200*1024)
 
1564
        content_size = len(content)
 
1565
        if transport.is_readonly():
 
1566
            self.build_tree_contents([('a', content)])
 
1567
        else:
 
1568
            transport.put_bytes('a', content)
 
1569
        def check_result_data(result_vector):
 
1570
            for item in result_vector:
 
1571
                data_len = len(item[1])
 
1572
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1573
 
 
1574
        # start corner case
 
1575
        result = list(transport.readv('a', ((0, 30),),
 
1576
            adjust_for_latency=True, upper_limit=content_size))
 
1577
        # we expect 1 result, from 0, to something > 30
 
1578
        self.assertEqual(1, len(result))
 
1579
        self.assertEqual(0, result[0][0])
 
1580
        self.assertTrue(len(result[0][1]) >= 30)
 
1581
        check_result_data(result)
 
1582
        # end of file corner case
 
1583
        result = list(transport.readv('a', ((204700, 100),),
 
1584
            adjust_for_latency=True, upper_limit=content_size))
 
1585
        # we expect 1 result, from 204800- its length, to the end
 
1586
        self.assertEqual(1, len(result))
 
1587
        data_len = len(result[0][1])
 
1588
        self.assertEqual(204800-data_len, result[0][0])
 
1589
        self.assertTrue(data_len >= 100)
 
1590
        check_result_data(result)
 
1591
        # out of order ranges are made in order
 
1592
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1593
            adjust_for_latency=True, upper_limit=content_size))
 
1594
        # we expect 2 results, in order, start and end.
 
1595
        self.assertEqual(2, len(result))
 
1596
        # start
 
1597
        data_len = len(result[0][1])
 
1598
        self.assertEqual(0, result[0][0])
 
1599
        self.assertTrue(data_len >= 30)
 
1600
        # end
 
1601
        data_len = len(result[1][1])
 
1602
        self.assertEqual(204800-data_len, result[1][0])
 
1603
        self.assertTrue(data_len >= 100)
 
1604
        check_result_data(result)
 
1605
        # close ranges get combined (even if out of order)
 
1606
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1607
            result = list(transport.readv('a', request_vector,
 
1608
                adjust_for_latency=True, upper_limit=content_size))
 
1609
            self.assertEqual(1, len(result))
 
1610
            data_len = len(result[0][1])
 
1611
            # minimum length is from 400 to 1034 - 634
 
1612
            self.assertTrue(data_len >= 634)
 
1613
            # must contain the region 400 to 1034
 
1614
            self.assertTrue(result[0][0] <= 400)
 
1615
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1616
            check_result_data(result)
 
1617
 
 
1618
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1619
        transport = self.get_transport()
 
1620
        # test from observed failure case.
 
1621
        if transport.is_readonly():
 
1622
            file('a', 'w').write('a'*1024*1024)
 
1623
        else:
 
1624
            transport.put_bytes('a', 'a'*1024*1024)
 
1625
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1626
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1627
            (465373, 800), (947422, 800)]
 
1628
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1629
        found_items = [False]*9
 
1630
        for pos, (start, length) in enumerate(broken_vector):
 
1631
            # check the range is covered by the result
 
1632
            for offset, data in results:
 
1633
                if offset <= start and start + length <= offset + len(data):
 
1634
                    found_items[pos] = True
 
1635
        self.assertEqual([True]*9, found_items)
 
1636
 
 
1637
    def test_get_with_open_write_stream_sees_all_content(self):
 
1638
        t = self.get_transport()
 
1639
        if t.is_readonly():
 
1640
            return
 
1641
        handle = t.open_write_stream('foo')
 
1642
        try:
 
1643
            handle.write('bcd')
 
1644
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1645
        finally:
 
1646
            handle.close()
 
1647
 
 
1648
    def test_get_smart_medium(self):
 
1649
        """All transports must either give a smart medium, or know they can't.
 
1650
        """
 
1651
        transport = self.get_transport()
 
1652
        try:
 
1653
            client_medium = transport.get_smart_medium()
 
1654
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1655
        except errors.NoSmartMedium:
 
1656
            # as long as we got it we're fine
 
1657
            pass
 
1658
 
 
1659
    def test_readv_short_read(self):
 
1660
        transport = self.get_transport()
 
1661
        if transport.is_readonly():
 
1662
            file('a', 'w').write('0123456789')
 
1663
        else:
 
1664
            transport.put_bytes('a', '01234567890')
 
1665
 
 
1666
        # This is intentionally reading off the end of the file
 
1667
        # since we are sure that it cannot get there
 
1668
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1669
                               # Can be raised by paramiko
 
1670
                               AssertionError),
 
1671
                              transport.readv, 'a', [(1,1), (8,10)])
 
1672
 
 
1673
        # This is trying to seek past the end of the file, it should
 
1674
        # also raise a special error
 
1675
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1676
                              transport.readv, 'a', [(12,2)])