/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2018-07-26 19:15:27 UTC
  • mto: This revision was merged to the branch mainline in revision 7055.
  • Revision ID: jelmer@jelmer.uk-20180726191527-wniq205k6tzfo1xx
Install fastimport from git.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import os
 
24
import stat
 
25
import sys
 
26
 
 
27
from .. import (
 
28
    errors,
 
29
    osutils,
 
30
    pyutils,
 
31
    tests,
 
32
    transport as _mod_transport,
 
33
    urlutils,
 
34
    )
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      NoSuchFile,
 
38
                      PathError,
 
39
                      TransportNotPossible,
 
40
                      )
 
41
from ..osutils import getcwd
 
42
from ..sixish import (
 
43
    BytesIO,
 
44
    zip,
 
45
    )
 
46
from ..bzr.smart import medium
 
47
from . import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from . import test_server
 
53
from .test_transport import TestTransportImplementation
 
54
from ..transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from ..transport.memory import MemoryTransport
 
60
from ..transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                    {"transport_class":klass,
 
83
                     "transport_server":server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent as e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(loader, standard_tests, pattern):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
 
97
 
 
98
 
 
99
class TransportTests(TestTransportImplementation):
 
100
 
 
101
    def setUp(self):
 
102
        super(TransportTests, self).setUp()
 
103
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
 
104
 
 
105
    def check_transport_contents(self, content, transport, relpath):
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
 
 
109
    def test_ensure_base_missing(self):
 
110
        """.ensure_base() should create the directory if it doesn't exist"""
 
111
        t = self.get_transport()
 
112
        t_a = t.clone('a')
 
113
        if t_a.is_readonly():
 
114
            self.assertRaises(TransportNotPossible,
 
115
                              t_a.ensure_base)
 
116
            return
 
117
        self.assertTrue(t_a.ensure_base())
 
118
        self.assertTrue(t.has('a'))
 
119
 
 
120
    def test_ensure_base_exists(self):
 
121
        """.ensure_base() should just be happy if it already exists"""
 
122
        t = self.get_transport()
 
123
        if t.is_readonly():
 
124
            return
 
125
 
 
126
        t.mkdir('a')
 
127
        t_a = t.clone('a')
 
128
        # ensure_base returns False if it didn't create the base
 
129
        self.assertFalse(t_a.ensure_base())
 
130
 
 
131
    def test_ensure_base_missing_parent(self):
 
132
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
133
        t = self.get_transport()
 
134
        if t.is_readonly():
 
135
            return
 
136
 
 
137
        t_a = t.clone('a')
 
138
        t_b = t_a.clone('b')
 
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
 
148
 
 
149
    def test_has(self):
 
150
        t = self.get_transport()
 
151
 
 
152
        files = ['a', 'b', 'e', 'g', '%']
 
153
        self.build_tree(files, transport=t)
 
154
        self.assertEqual(True, t.has('a'))
 
155
        self.assertEqual(False, t.has('c'))
 
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
157
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
158
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
159
                                           urlutils.escape('%%')]))
 
160
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
161
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
162
 
 
163
    def test_has_root_works(self):
 
164
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
165
            raise TestNotApplicable(
 
166
                "SmartTCPServer_for_testing intentionally does not allow "
 
167
                "access to /.")
 
168
        current_transport = self.get_transport()
 
169
        self.assertTrue(current_transport.has('/'))
 
170
        root = current_transport.clone('/')
 
171
        self.assertTrue(root.has(''))
 
172
 
 
173
    def test_get(self):
 
174
        t = self.get_transport()
 
175
 
 
176
        files = ['a']
 
177
        content = b'contents of a\n'
 
178
        self.build_tree(['a'], transport=t, line_endings='binary')
 
179
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
180
        f = t.get('a')
 
181
        self.assertEqual(content, f.read())
 
182
 
 
183
    def test_get_unknown_file(self):
 
184
        t = self.get_transport()
 
185
        files = ['a', 'b']
 
186
        contents = [b'contents of a\n',
 
187
                    b'contents of b\n',
 
188
                    ]
 
189
        self.build_tree(files, transport=t, line_endings='binary')
 
190
        self.assertRaises(NoSuchFile, t.get, 'c')
 
191
        def iterate_and_close(func, *args):
 
192
            for f in func(*args):
 
193
                # We call f.read() here because things like paramiko actually
 
194
                # spawn a thread to prefetch the content, which we want to
 
195
                # consume before we close the handle.
 
196
                content = f.read()
 
197
                f.close()
 
198
 
 
199
    def test_get_directory_read_gives_ReadError(self):
 
200
        """consistent errors for read() on a file returned by get()."""
 
201
        t = self.get_transport()
 
202
        if t.is_readonly():
 
203
            self.build_tree(['a directory/'])
 
204
        else:
 
205
            t.mkdir('a%20directory')
 
206
        # getting the file must either work or fail with a PathError
 
207
        try:
 
208
            a_file = t.get('a%20directory')
 
209
        except (errors.PathError, errors.RedirectRequested):
 
210
            # early failure return immediately.
 
211
            return
 
212
        # having got a file, read() must either work (i.e. http reading a dir
 
213
        # listing) or fail with ReadError
 
214
        try:
 
215
            a_file.read()
 
216
        except errors.ReadError:
 
217
            pass
 
218
 
 
219
    def test_get_bytes(self):
 
220
        t = self.get_transport()
 
221
 
 
222
        files = ['a', 'b', 'e', 'g']
 
223
        contents = [b'contents of a\n',
 
224
                    b'contents of b\n',
 
225
                    b'contents of e\n',
 
226
                    b'contents of g\n',
 
227
                    ]
 
228
        self.build_tree(files, transport=t, line_endings='binary')
 
229
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
230
 
 
231
        for content, fname in zip(contents, files):
 
232
            self.assertEqual(content, t.get_bytes(fname))
 
233
 
 
234
    def test_get_bytes_unknown_file(self):
 
235
        t = self.get_transport()
 
236
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
237
 
 
238
    def test_get_with_open_write_stream_sees_all_content(self):
 
239
        t = self.get_transport()
 
240
        if t.is_readonly():
 
241
            return
 
242
        with t.open_write_stream('foo') as handle:
 
243
            handle.write(b'b')
 
244
            self.assertEqual(b'b', t.get_bytes('foo'))
 
245
 
 
246
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
247
        t = self.get_transport()
 
248
        if t.is_readonly():
 
249
            return
 
250
        with t.open_write_stream('foo') as handle:
 
251
            handle.write(b'b')
 
252
            self.assertEqual(b'b', t.get_bytes('foo'))
 
253
            with t.get('foo') as f:
 
254
                self.assertEqual(b'b', f.read())
 
255
 
 
256
    def test_put_bytes(self):
 
257
        t = self.get_transport()
 
258
 
 
259
        if t.is_readonly():
 
260
            self.assertRaises(TransportNotPossible,
 
261
                    t.put_bytes, 'a', b'some text for a\n')
 
262
            return
 
263
 
 
264
        t.put_bytes('a', b'some text for a\n')
 
265
        self.assertTrue(t.has('a'))
 
266
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
267
 
 
268
        # The contents should be overwritten
 
269
        t.put_bytes('a', b'new text for a\n')
 
270
        self.check_transport_contents(b'new text for a\n', t, 'a')
 
271
 
 
272
        self.assertRaises(NoSuchFile,
 
273
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
 
274
 
 
275
    def test_put_bytes_non_atomic(self):
 
276
        t = self.get_transport()
 
277
 
 
278
        if t.is_readonly():
 
279
            self.assertRaises(TransportNotPossible,
 
280
                    t.put_bytes_non_atomic, 'a', b'some text for a\n')
 
281
            return
 
282
 
 
283
        self.assertFalse(t.has('a'))
 
284
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
285
        self.assertTrue(t.has('a'))
 
286
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
287
        # Put also replaces contents
 
288
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
289
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
290
 
 
291
        # Make sure we can create another file
 
292
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
 
293
        # And overwrite 'a' with empty contents
 
294
        t.put_bytes_non_atomic('a', b'')
 
295
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
296
        self.check_transport_contents(b'', t, 'a')
 
297
 
 
298
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
299
                                       b'contents\n')
 
300
        # Now test the create_parent flag
 
301
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
302
                                       b'contents\n')
 
303
        self.assertFalse(t.has('dir/a'))
 
304
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
 
305
                               create_parent_dir=True)
 
306
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
307
 
 
308
        # But we still get NoSuchFile if we can't make the parent dir
 
309
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
310
                                       b'contents\n',
 
311
                                       create_parent_dir=True)
 
312
 
 
313
    def test_put_bytes_permissions(self):
 
314
        t = self.get_transport()
 
315
 
 
316
        if t.is_readonly():
 
317
            return
 
318
        if not t._can_roundtrip_unix_modebits():
 
319
            # Can't roundtrip, so no need to run this test
 
320
            return
 
321
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
322
        self.assertTransportMode(t, 'mode644', 0o644)
 
323
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
324
        self.assertTransportMode(t, 'mode666', 0o666)
 
325
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
326
        self.assertTransportMode(t, 'mode600', 0o600)
 
327
        # Yes, you can put_bytes a file such that it becomes readonly
 
328
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
329
        self.assertTransportMode(t, 'mode400', 0o400)
 
330
 
 
331
        # The default permissions should be based on the current umask
 
332
        umask = osutils.get_umask()
 
333
        t.put_bytes('nomode', b'test text\n', mode=None)
 
334
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
335
 
 
336
    def test_put_bytes_non_atomic_permissions(self):
 
337
        t = self.get_transport()
 
338
 
 
339
        if t.is_readonly():
 
340
            return
 
341
        if not t._can_roundtrip_unix_modebits():
 
342
            # Can't roundtrip, so no need to run this test
 
343
            return
 
344
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
345
        self.assertTransportMode(t, 'mode644', 0o644)
 
346
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
347
        self.assertTransportMode(t, 'mode666', 0o666)
 
348
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
349
        self.assertTransportMode(t, 'mode600', 0o600)
 
350
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
351
        self.assertTransportMode(t, 'mode400', 0o400)
 
352
 
 
353
        # The default permissions should be based on the current umask
 
354
        umask = osutils.get_umask()
 
355
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
356
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
357
 
 
358
        # We should also be able to set the mode for a parent directory
 
359
        # when it is created
 
360
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
361
                               dir_mode=0o700, create_parent_dir=True)
 
362
        self.assertTransportMode(t, 'dir700', 0o700)
 
363
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
364
                               dir_mode=0o770, create_parent_dir=True)
 
365
        self.assertTransportMode(t, 'dir770', 0o770)
 
366
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
367
                               dir_mode=0o777, create_parent_dir=True)
 
368
        self.assertTransportMode(t, 'dir777', 0o777)
 
369
 
 
370
    def test_put_file(self):
 
371
        t = self.get_transport()
 
372
 
 
373
        if t.is_readonly():
 
374
            self.assertRaises(TransportNotPossible,
 
375
                    t.put_file, 'a', BytesIO(b'some text for a\n'))
 
376
            return
 
377
 
 
378
        result = t.put_file('a', BytesIO(b'some text for a\n'))
 
379
        # put_file returns the length of the data written
 
380
        self.assertEqual(16, result)
 
381
        self.assertTrue(t.has('a'))
 
382
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
383
        # Put also replaces contents
 
384
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
 
385
        self.assertEqual(19, result)
 
386
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
387
        self.assertRaises(NoSuchFile,
 
388
                          t.put_file, 'path/doesnt/exist/c',
 
389
                              BytesIO(b'contents'))
 
390
 
 
391
    def test_put_file_non_atomic(self):
 
392
        t = self.get_transport()
 
393
 
 
394
        if t.is_readonly():
 
395
            self.assertRaises(TransportNotPossible,
 
396
                    t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
397
            return
 
398
 
 
399
        self.assertFalse(t.has('a'))
 
400
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
401
        self.assertTrue(t.has('a'))
 
402
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
403
        # Put also replaces contents
 
404
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
405
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
406
 
 
407
        # Make sure we can create another file
 
408
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
 
409
        # And overwrite 'a' with empty contents
 
410
        t.put_file_non_atomic('a', BytesIO(b''))
 
411
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
412
        self.check_transport_contents(b'', t, 'a')
 
413
 
 
414
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
415
                                       BytesIO(b'contents\n'))
 
416
        # Now test the create_parent flag
 
417
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
418
                                       BytesIO(b'contents\n'))
 
419
        self.assertFalse(t.has('dir/a'))
 
420
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
 
421
                              create_parent_dir=True)
 
422
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
423
 
 
424
        # But we still get NoSuchFile if we can't make the parent dir
 
425
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
426
                                       BytesIO(b'contents\n'),
 
427
                                       create_parent_dir=True)
 
428
 
 
429
    def test_put_file_permissions(self):
 
430
 
 
431
        t = self.get_transport()
 
432
 
 
433
        if t.is_readonly():
 
434
            return
 
435
        if not t._can_roundtrip_unix_modebits():
 
436
            # Can't roundtrip, so no need to run this test
 
437
            return
 
438
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
439
        self.assertTransportMode(t, 'mode644', 0o644)
 
440
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
441
        self.assertTransportMode(t, 'mode666', 0o666)
 
442
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
443
        self.assertTransportMode(t, 'mode600', 0o600)
 
444
        # Yes, you can put a file such that it becomes readonly
 
445
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
446
        self.assertTransportMode(t, 'mode400', 0o400)
 
447
        # The default permissions should be based on the current umask
 
448
        umask = osutils.get_umask()
 
449
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
450
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
451
 
 
452
    def test_put_file_non_atomic_permissions(self):
 
453
        t = self.get_transport()
 
454
 
 
455
        if t.is_readonly():
 
456
            return
 
457
        if not t._can_roundtrip_unix_modebits():
 
458
            # Can't roundtrip, so no need to run this test
 
459
            return
 
460
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
461
        self.assertTransportMode(t, 'mode644', 0o644)
 
462
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
463
        self.assertTransportMode(t, 'mode666', 0o666)
 
464
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
465
        self.assertTransportMode(t, 'mode600', 0o600)
 
466
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
467
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
468
        self.assertTransportMode(t, 'mode400', 0o400)
 
469
 
 
470
        # The default permissions should be based on the current umask
 
471
        umask = osutils.get_umask()
 
472
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
473
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
474
 
 
475
        # We should also be able to set the mode for a parent directory
 
476
        # when it is created
 
477
        sio = BytesIO()
 
478
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
479
                              dir_mode=0o700, create_parent_dir=True)
 
480
        self.assertTransportMode(t, 'dir700', 0o700)
 
481
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
482
                              dir_mode=0o770, create_parent_dir=True)
 
483
        self.assertTransportMode(t, 'dir770', 0o770)
 
484
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
485
                              dir_mode=0o777, create_parent_dir=True)
 
486
        self.assertTransportMode(t, 'dir777', 0o777)
 
487
 
 
488
    def test_put_bytes_unicode(self):
 
489
        t = self.get_transport()
 
490
        if t.is_readonly():
 
491
            return
 
492
        unicode_string = u'\u1234'
 
493
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
494
 
 
495
    def test_mkdir(self):
 
496
        t = self.get_transport()
 
497
 
 
498
        if t.is_readonly():
 
499
            # cannot mkdir on readonly transports. We're not testing for
 
500
            # cache coherency because cache behaviour is not currently
 
501
            # defined for the transport interface.
 
502
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
503
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
504
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
 
505
            return
 
506
        # Test mkdir
 
507
        t.mkdir('dir_a')
 
508
        self.assertEqual(t.has('dir_a'), True)
 
509
        self.assertEqual(t.has('dir_b'), False)
 
510
 
 
511
        t.mkdir('dir_b')
 
512
        self.assertEqual(t.has('dir_b'), True)
 
513
 
 
514
        self.assertEqual([t.has(n) for n in
 
515
            ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
516
            [True, True, False, True])
 
517
 
 
518
        # we were testing that a local mkdir followed by a transport
 
519
        # mkdir failed thusly, but given that we * in one process * do not
 
520
        # concurrently fiddle with disk dirs and then use transport to do
 
521
        # things, the win here seems marginal compared to the constraint on
 
522
        # the interface. RBC 20051227
 
523
        t.mkdir('dir_g')
 
524
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
525
 
 
526
        # Test get/put in sub-directories
 
527
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
528
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
529
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
530
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
 
531
 
 
532
        # mkdir of a dir with an absent parent
 
533
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
534
 
 
535
    def test_mkdir_permissions(self):
 
536
        t = self.get_transport()
 
537
        if t.is_readonly():
 
538
            return
 
539
        if not t._can_roundtrip_unix_modebits():
 
540
            # no sense testing on this transport
 
541
            return
 
542
        # Test mkdir with a mode
 
543
        t.mkdir('dmode755', mode=0o755)
 
544
        self.assertTransportMode(t, 'dmode755', 0o755)
 
545
        t.mkdir('dmode555', mode=0o555)
 
546
        self.assertTransportMode(t, 'dmode555', 0o555)
 
547
        t.mkdir('dmode777', mode=0o777)
 
548
        self.assertTransportMode(t, 'dmode777', 0o777)
 
549
        t.mkdir('dmode700', mode=0o700)
 
550
        self.assertTransportMode(t, 'dmode700', 0o700)
 
551
        t.mkdir('mdmode755', mode=0o755)
 
552
        self.assertTransportMode(t, 'mdmode755', 0o755)
 
553
 
 
554
        # Default mode should be based on umask
 
555
        umask = osutils.get_umask()
 
556
        t.mkdir('dnomode', mode=None)
 
557
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
 
558
 
 
559
    def test_opening_a_file_stream_creates_file(self):
 
560
        t = self.get_transport()
 
561
        if t.is_readonly():
 
562
            return
 
563
        handle = t.open_write_stream('foo')
 
564
        try:
 
565
            self.assertEqual(b'', t.get_bytes('foo'))
 
566
        finally:
 
567
            handle.close()
 
568
 
 
569
    def test_opening_a_file_stream_can_set_mode(self):
 
570
        t = self.get_transport()
 
571
        if t.is_readonly():
 
572
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
573
                              t.open_write_stream, 'foo')
 
574
            return
 
575
        if not t._can_roundtrip_unix_modebits():
 
576
            # Can't roundtrip, so no need to run this test
 
577
            return
 
578
 
 
579
        def check_mode(name, mode, expected):
 
580
            handle = t.open_write_stream(name, mode=mode)
 
581
            handle.close()
 
582
            self.assertTransportMode(t, name, expected)
 
583
        check_mode('mode644', 0o644, 0o644)
 
584
        check_mode('mode666', 0o666, 0o666)
 
585
        check_mode('mode600', 0o600, 0o600)
 
586
        # The default permissions should be based on the current umask
 
587
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
 
588
 
 
589
    def test_copy_to(self):
 
590
        # FIXME: test:   same server to same server (partly done)
 
591
        # same protocol two servers
 
592
        # and    different protocols (done for now except for MemoryTransport.
 
593
        # - RBC 20060122
 
594
 
 
595
        def simple_copy_files(transport_from, transport_to):
 
596
            files = ['a', 'b', 'c', 'd']
 
597
            self.build_tree(files, transport=transport_from)
 
598
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
599
            for f in files:
 
600
                self.check_transport_contents(transport_to.get_bytes(f),
 
601
                                              transport_from, f)
 
602
 
 
603
        t = self.get_transport()
 
604
        if t.__class__.__name__ == "SFTPTransport":
 
605
            self.skipTest("SFTP copy_to currently too flakey to use")
 
606
        temp_transport = MemoryTransport('memory:///')
 
607
        simple_copy_files(t, temp_transport)
 
608
        if not t.is_readonly():
 
609
            t.mkdir('copy_to_simple')
 
610
            t2 = t.clone('copy_to_simple')
 
611
            simple_copy_files(t, t2)
 
612
 
 
613
 
 
614
        # Test that copying into a missing directory raises
 
615
        # NoSuchFile
 
616
        if t.is_readonly():
 
617
            self.build_tree(['e/', 'e/f'])
 
618
        else:
 
619
            t.mkdir('e')
 
620
            t.put_bytes('e/f', b'contents of e')
 
621
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
622
        temp_transport.mkdir('e')
 
623
        t.copy_to(['e/f'], temp_transport)
 
624
 
 
625
        del temp_transport
 
626
        temp_transport = MemoryTransport('memory:///')
 
627
 
 
628
        files = ['a', 'b', 'c', 'd']
 
629
        t.copy_to(iter(files), temp_transport)
 
630
        for f in files:
 
631
            self.check_transport_contents(temp_transport.get_bytes(f),
 
632
                                          t, f)
 
633
        del temp_transport
 
634
 
 
635
        for mode in (0o666, 0o644, 0o600, 0o400):
 
636
            temp_transport = MemoryTransport("memory:///")
 
637
            t.copy_to(files, temp_transport, mode=mode)
 
638
            for f in files:
 
639
                self.assertTransportMode(temp_transport, f, mode)
 
640
 
 
641
    def test_create_prefix(self):
 
642
        t = self.get_transport()
 
643
        sub = t.clone('foo').clone('bar')
 
644
        try:
 
645
            sub.create_prefix()
 
646
        except TransportNotPossible:
 
647
            self.assertTrue(t.is_readonly())
 
648
        else:
 
649
            self.assertTrue(t.has('foo/bar'))
 
650
 
 
651
    def test_append_file(self):
 
652
        t = self.get_transport()
 
653
 
 
654
        if t.is_readonly():
 
655
            self.assertRaises(TransportNotPossible,
 
656
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
657
            return
 
658
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
659
        t.put_bytes('b', b'contents\nfor b\n')
 
660
 
 
661
        self.assertEqual(20,
 
662
            t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
663
 
 
664
        self.check_transport_contents(
 
665
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
666
            t, 'a')
 
667
 
 
668
        # a file with no parent should fail..
 
669
        self.assertRaises(NoSuchFile,
 
670
                          t.append_file, 'missing/path', BytesIO(b'content'))
 
671
 
 
672
        # And we can create new files, too
 
673
        self.assertEqual(0,
 
674
            t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
675
        self.check_transport_contents(b'some text\nfor a missing file\n',
 
676
                                      t, 'c')
 
677
 
 
678
    def test_append_bytes(self):
 
679
        t = self.get_transport()
 
680
 
 
681
        if t.is_readonly():
 
682
            self.assertRaises(TransportNotPossible,
 
683
                    t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
 
684
            return
 
685
 
 
686
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
687
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
 
688
 
 
689
        self.assertEqual(20,
 
690
            t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
 
691
 
 
692
        self.check_transport_contents(
 
693
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
694
            t, 'a')
 
695
 
 
696
        # a file with no parent should fail..
 
697
        self.assertRaises(NoSuchFile,
 
698
                          t.append_bytes, 'missing/path', b'content')
 
699
 
 
700
    def test_append_file_mode(self):
 
701
        """Check that append accepts a mode parameter"""
 
702
        # check append accepts a mode
 
703
        t = self.get_transport()
 
704
        if t.is_readonly():
 
705
            self.assertRaises(TransportNotPossible,
 
706
                t.append_file, 'f', BytesIO(b'f'), mode=None)
 
707
            return
 
708
        t.append_file('f', BytesIO(b'f'), mode=None)
 
709
 
 
710
    def test_append_bytes_mode(self):
 
711
        # check append_bytes accepts a mode
 
712
        t = self.get_transport()
 
713
        if t.is_readonly():
 
714
            self.assertRaises(TransportNotPossible,
 
715
                t.append_bytes, 'f', b'f', mode=None)
 
716
            return
 
717
        t.append_bytes('f', b'f', mode=None)
 
718
 
 
719
    def test_delete(self):
 
720
        # TODO: Test Transport.delete
 
721
        t = self.get_transport()
 
722
 
 
723
        # Not much to do with a readonly transport
 
724
        if t.is_readonly():
 
725
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
726
            return
 
727
 
 
728
        t.put_bytes('a', b'a little bit of text\n')
 
729
        self.assertTrue(t.has('a'))
 
730
        t.delete('a')
 
731
        self.assertFalse(t.has('a'))
 
732
 
 
733
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
734
 
 
735
        t.put_bytes('a', b'a text\n')
 
736
        t.put_bytes('b', b'b text\n')
 
737
        t.put_bytes('c', b'c text\n')
 
738
        self.assertEqual([True, True, True],
 
739
                [t.has(n) for n in ['a', 'b', 'c']])
 
740
        t.delete('a')
 
741
        t.delete('c')
 
742
        self.assertEqual([False, True, False],
 
743
                [t.has(n) for n in ['a', 'b', 'c']])
 
744
        self.assertFalse(t.has('a'))
 
745
        self.assertTrue(t.has('b'))
 
746
        self.assertFalse(t.has('c'))
 
747
 
 
748
        for name in ['a', 'c', 'd']:
 
749
            self.assertRaises(NoSuchFile, t.delete, name)
 
750
 
 
751
        # We should have deleted everything
 
752
        # SftpServer creates control files in the
 
753
        # working directory, so we can just do a
 
754
        # plain "listdir".
 
755
        # self.assertEqual([], os.listdir('.'))
 
756
 
 
757
    def test_recommended_page_size(self):
 
758
        """Transports recommend a page size for partial access to files."""
 
759
        t = self.get_transport()
 
760
        self.assertIsInstance(t.recommended_page_size(), int)
 
761
 
 
762
    def test_rmdir(self):
 
763
        t = self.get_transport()
 
764
        # Not much to do with a readonly transport
 
765
        if t.is_readonly():
 
766
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
767
            return
 
768
        t.mkdir('adir')
 
769
        t.mkdir('adir/bdir')
 
770
        t.rmdir('adir/bdir')
 
771
        # ftp may not be able to raise NoSuchFile for lack of
 
772
        # details when failing
 
773
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
774
        t.rmdir('adir')
 
775
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
776
 
 
777
    def test_rmdir_not_empty(self):
 
778
        """Deleting a non-empty directory raises an exception
 
779
 
 
780
        sftp (and possibly others) don't give us a specific "directory not
 
781
        empty" exception -- we can just see that the operation failed.
 
782
        """
 
783
        t = self.get_transport()
 
784
        if t.is_readonly():
 
785
            return
 
786
        t.mkdir('adir')
 
787
        t.mkdir('adir/bdir')
 
788
        self.assertRaises(PathError, t.rmdir, 'adir')
 
789
 
 
790
    def test_rmdir_empty_but_similar_prefix(self):
 
791
        """rmdir does not get confused by sibling paths.
 
792
 
 
793
        A naive implementation of MemoryTransport would refuse to rmdir
 
794
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
795
        uses "path.startswith(dir)" on all file paths to determine if directory
 
796
        is empty.
 
797
        """
 
798
        t = self.get_transport()
 
799
        if t.is_readonly():
 
800
            return
 
801
        t.mkdir('foo')
 
802
        t.put_bytes('foo-bar', b'')
 
803
        t.mkdir('foo-baz')
 
804
        t.rmdir('foo')
 
805
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
806
        self.assertTrue(t.has('foo-bar'))
 
807
 
 
808
    def test_rename_dir_succeeds(self):
 
809
        t = self.get_transport()
 
810
        if t.is_readonly():
 
811
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
812
                              t.rename, 'foo', 'bar')
 
813
            return
 
814
        t.mkdir('adir')
 
815
        t.mkdir('adir/asubdir')
 
816
        t.rename('adir', 'bdir')
 
817
        self.assertTrue(t.has('bdir/asubdir'))
 
818
        self.assertFalse(t.has('adir'))
 
819
 
 
820
    def test_rename_dir_nonempty(self):
 
821
        """Attempting to replace a nonemtpy directory should fail"""
 
822
        t = self.get_transport()
 
823
        if t.is_readonly():
 
824
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
825
                              t.rename, 'foo', 'bar')
 
826
            return
 
827
        t.mkdir('adir')
 
828
        t.mkdir('adir/asubdir')
 
829
        t.mkdir('bdir')
 
830
        t.mkdir('bdir/bsubdir')
 
831
        # any kind of PathError would be OK, though we normally expect
 
832
        # DirectoryNotEmpty
 
833
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
834
        # nothing was changed so it should still be as before
 
835
        self.assertTrue(t.has('bdir/bsubdir'))
 
836
        self.assertFalse(t.has('adir/bdir'))
 
837
        self.assertFalse(t.has('adir/bsubdir'))
 
838
 
 
839
    def test_rename_across_subdirs(self):
 
840
        t = self.get_transport()
 
841
        if t.is_readonly():
 
842
            raise TestNotApplicable("transport is readonly")
 
843
        t.mkdir('a')
 
844
        t.mkdir('b')
 
845
        ta = t.clone('a')
 
846
        tb = t.clone('b')
 
847
        ta.put_bytes('f', b'aoeu')
 
848
        ta.rename('f', '../b/f')
 
849
        self.assertTrue(tb.has('f'))
 
850
        self.assertFalse(ta.has('f'))
 
851
        self.assertTrue(t.has('b/f'))
 
852
 
 
853
    def test_delete_tree(self):
 
854
        t = self.get_transport()
 
855
 
 
856
        # Not much to do with a readonly transport
 
857
        if t.is_readonly():
 
858
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
859
            return
 
860
 
 
861
        # and does it like listing ?
 
862
        t.mkdir('adir')
 
863
        try:
 
864
            t.delete_tree('adir')
 
865
        except TransportNotPossible:
 
866
            # ok, this transport does not support delete_tree
 
867
            return
 
868
 
 
869
        # did it delete that trivial case?
 
870
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
871
 
 
872
        self.build_tree(['adir/',
 
873
                         'adir/file',
 
874
                         'adir/subdir/',
 
875
                         'adir/subdir/file',
 
876
                         'adir/subdir2/',
 
877
                         'adir/subdir2/file',
 
878
                         ], transport=t)
 
879
 
 
880
        t.delete_tree('adir')
 
881
        # adir should be gone now.
 
882
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
883
 
 
884
    def test_move(self):
 
885
        t = self.get_transport()
 
886
 
 
887
        if t.is_readonly():
 
888
            return
 
889
 
 
890
        # TODO: I would like to use os.listdir() to
 
891
        # make sure there are no extra files, but SftpServer
 
892
        # creates control files in the working directory
 
893
        # perhaps all of this could be done in a subdirectory
 
894
 
 
895
        t.put_bytes('a', b'a first file\n')
 
896
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
 
897
 
 
898
        t.move('a', 'b')
 
899
        self.assertTrue(t.has('b'))
 
900
        self.assertFalse(t.has('a'))
 
901
 
 
902
        self.check_transport_contents(b'a first file\n', t, 'b')
 
903
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
 
904
 
 
905
        # Overwrite a file
 
906
        t.put_bytes('c', b'c this file\n')
 
907
        t.move('c', 'b')
 
908
        self.assertFalse(t.has('c'))
 
909
        self.check_transport_contents(b'c this file\n', t, 'b')
 
910
 
 
911
        # TODO: Try to write a test for atomicity
 
912
        # TODO: Test moving into a non-existent subdirectory
 
913
 
 
914
    def test_copy(self):
 
915
        t = self.get_transport()
 
916
 
 
917
        if t.is_readonly():
 
918
            return
 
919
 
 
920
        t.put_bytes('a', b'a file\n')
 
921
        t.copy('a', 'b')
 
922
        self.check_transport_contents(b'a file\n', t, 'b')
 
923
 
 
924
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
925
        os.mkdir('c')
 
926
        # What should the assert be if you try to copy a
 
927
        # file over a directory?
 
928
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
929
        t.put_bytes('d', b'text in d\n')
 
930
        t.copy('d', 'b')
 
931
        self.check_transport_contents(b'text in d\n', t, 'b')
 
932
 
 
933
    def test_connection_error(self):
 
934
        """ConnectionError is raised when connection is impossible.
 
935
 
 
936
        The error should be raised from the first operation on the transport.
 
937
        """
 
938
        try:
 
939
            url = self._server.get_bogus_url()
 
940
        except NotImplementedError:
 
941
            raise TestSkipped("Transport %s has no bogus URL support." %
 
942
                              self._server.__class__)
 
943
        t = _mod_transport.get_transport_from_url(url)
 
944
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
945
 
 
946
    def test_stat(self):
 
947
        # TODO: Test stat, just try once, and if it throws, stop testing
 
948
        from stat import S_ISDIR, S_ISREG
 
949
 
 
950
        t = self.get_transport()
 
951
 
 
952
        try:
 
953
            st = t.stat('.')
 
954
        except TransportNotPossible as e:
 
955
            # This transport cannot stat
 
956
            return
 
957
 
 
958
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
959
        sizes = [14, 0, 16, 0, 18]
 
960
        self.build_tree(paths, transport=t, line_endings='binary')
 
961
 
 
962
        for path, size in zip(paths, sizes):
 
963
            st = t.stat(path)
 
964
            if path.endswith('/'):
 
965
                self.assertTrue(S_ISDIR(st.st_mode))
 
966
                # directory sizes are meaningless
 
967
            else:
 
968
                self.assertTrue(S_ISREG(st.st_mode))
 
969
                self.assertEqual(size, st.st_size)
 
970
 
 
971
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
972
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
973
 
 
974
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
975
        subdir = t.clone('subdir')
 
976
        st = subdir.stat('./file')
 
977
        st = subdir.stat('.')
 
978
 
 
979
    def test_hardlink(self):
 
980
        from stat import ST_NLINK
 
981
 
 
982
        t = self.get_transport()
 
983
 
 
984
        source_name = "original_target"
 
985
        link_name = "target_link"
 
986
 
 
987
        self.build_tree([source_name], transport=t)
 
988
 
 
989
        try:
 
990
            t.hardlink(source_name, link_name)
 
991
 
 
992
            self.assertTrue(t.has(source_name))
 
993
            self.assertTrue(t.has(link_name))
 
994
 
 
995
            st = t.stat(link_name)
 
996
            self.assertEqual(st[ST_NLINK], 2)
 
997
        except TransportNotPossible:
 
998
            raise TestSkipped("Transport %s does not support hardlinks." %
 
999
                              self._server.__class__)
 
1000
 
 
1001
    def test_symlink(self):
 
1002
        from stat import S_ISLNK
 
1003
 
 
1004
        t = self.get_transport()
 
1005
 
 
1006
        source_name = "original_target"
 
1007
        link_name = "target_link"
 
1008
 
 
1009
        self.build_tree([source_name], transport=t)
 
1010
 
 
1011
        try:
 
1012
            t.symlink(source_name, link_name)
 
1013
 
 
1014
            self.assertTrue(t.has(source_name))
 
1015
            self.assertTrue(t.has(link_name))
 
1016
 
 
1017
            st = t.stat(link_name)
 
1018
            self.assertTrue(S_ISLNK(st.st_mode),
 
1019
                "expected symlink, got mode %o" % st.st_mode)
 
1020
        except TransportNotPossible:
 
1021
            raise TestSkipped("Transport %s does not support symlinks." %
 
1022
                              self._server.__class__)
 
1023
        except IOError:
 
1024
            self.knownFailure("Paramiko fails to create symlinks during tests")
 
1025
 
 
1026
    def test_list_dir(self):
 
1027
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1028
        t = self.get_transport()
 
1029
 
 
1030
        if not t.listable():
 
1031
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1032
            return
 
1033
 
 
1034
        def sorted_list(d, transport):
 
1035
            l = sorted(transport.list_dir(d))
 
1036
            return l
 
1037
 
 
1038
        self.assertEqual([], sorted_list('.', t))
 
1039
        # c2 is precisely one letter longer than c here to test that
 
1040
        # suffixing is not confused.
 
1041
        # a%25b checks that quoting is done consistently across transports
 
1042
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1043
 
 
1044
        if not t.is_readonly():
 
1045
            self.build_tree(tree_names, transport=t)
 
1046
        else:
 
1047
            self.build_tree(tree_names)
 
1048
 
 
1049
        self.assertEqual(
 
1050
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1051
        self.assertEqual(
 
1052
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1053
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1054
 
 
1055
        # Cloning the transport produces an equivalent listing
 
1056
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1057
 
 
1058
        if not t.is_readonly():
 
1059
            t.delete('c/d')
 
1060
            t.delete('b')
 
1061
        else:
 
1062
            os.unlink('c/d')
 
1063
            os.unlink('b')
 
1064
 
 
1065
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1066
        self.assertEqual(['e'], sorted_list('c', t))
 
1067
 
 
1068
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1069
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1070
        # 'a' is a file, list_dir should raise an error
 
1071
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1072
 
 
1073
    def test_list_dir_result_is_url_escaped(self):
 
1074
        t = self.get_transport()
 
1075
        if not t.listable():
 
1076
            raise TestSkipped("transport not listable")
 
1077
 
 
1078
        if not t.is_readonly():
 
1079
            self.build_tree(['a/', 'a/%'], transport=t)
 
1080
        else:
 
1081
            self.build_tree(['a/', 'a/%'])
 
1082
 
 
1083
        names = list(t.list_dir('a'))
 
1084
        self.assertEqual(['%25'], names)
 
1085
        self.assertIsInstance(names[0], str)
 
1086
 
 
1087
    def test_clone_preserve_info(self):
 
1088
        t1 = self.get_transport()
 
1089
        if not isinstance(t1, ConnectedTransport):
 
1090
            raise TestSkipped("not a connected transport")
 
1091
 
 
1092
        t2 = t1.clone('subdir')
 
1093
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1094
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1095
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1096
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1097
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1098
 
 
1099
    def test__reuse_for(self):
 
1100
        t = self.get_transport()
 
1101
        if not isinstance(t, ConnectedTransport):
 
1102
            raise TestSkipped("not a connected transport")
 
1103
 
 
1104
        def new_url(scheme=None, user=None, password=None,
 
1105
                    host=None, port=None, path=None):
 
1106
            """Build a new url from t.base changing only parts of it.
 
1107
 
 
1108
            Only the parameters different from None will be changed.
 
1109
            """
 
1110
            if scheme   is None: scheme   = t._parsed_url.scheme
 
1111
            if user     is None: user     = t._parsed_url.user
 
1112
            if password is None: password = t._parsed_url.password
 
1113
            if user     is None: user     = t._parsed_url.user
 
1114
            if host     is None: host     = t._parsed_url.host
 
1115
            if port     is None: port     = t._parsed_url.port
 
1116
            if path     is None: path     = t._parsed_url.path
 
1117
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1118
 
 
1119
        if t._parsed_url.scheme == 'ftp':
 
1120
            scheme = 'sftp'
 
1121
        else:
 
1122
            scheme = 'ftp'
 
1123
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1124
        if t._parsed_url.user == 'me':
 
1125
            user = 'you'
 
1126
        else:
 
1127
            user = 'me'
 
1128
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1129
        # passwords are not taken into account because:
 
1130
        # - it makes no sense to have two different valid passwords for the
 
1131
        #   same user
 
1132
        # - _password in ConnectedTransport is intended to collect what the
 
1133
        #   user specified from the command-line and there are cases where the
 
1134
        #   new url can contain no password (if the url was built from an
 
1135
        #   existing transport.base for example)
 
1136
        # - password are considered part of the credentials provided at
 
1137
        #   connection creation time and as such may not be present in the url
 
1138
        #   (they may be typed by the user when prompted for example)
 
1139
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1140
        # We will not connect, we can use a invalid host
 
1141
        self.assertIsNot(t, t._reuse_for(new_url(host=t._parsed_url.host + 'bar')))
 
1142
        if t._parsed_url.port == 1234:
 
1143
            port = 4321
 
1144
        else:
 
1145
            port = 1234
 
1146
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1147
        # No point in trying to reuse a transport for a local URL
 
1148
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1149
 
 
1150
    def test_connection_sharing(self):
 
1151
        t = self.get_transport()
 
1152
        if not isinstance(t, ConnectedTransport):
 
1153
            raise TestSkipped("not a connected transport")
 
1154
 
 
1155
        c = t.clone('subdir')
 
1156
        # Some transports will create the connection  only when needed
 
1157
        t.has('surely_not') # Force connection
 
1158
        self.assertIs(t._get_connection(), c._get_connection())
 
1159
 
 
1160
        # Temporary failure, we need to create a new dummy connection
 
1161
        new_connection = None
 
1162
        t._set_connection(new_connection)
 
1163
        # Check that both transports use the same connection
 
1164
        self.assertIs(new_connection, t._get_connection())
 
1165
        self.assertIs(new_connection, c._get_connection())
 
1166
 
 
1167
    def test_reuse_connection_for_various_paths(self):
 
1168
        t = self.get_transport()
 
1169
        if not isinstance(t, ConnectedTransport):
 
1170
            raise TestSkipped("not a connected transport")
 
1171
 
 
1172
        t.has('surely_not') # Force connection
 
1173
        self.assertIsNot(None, t._get_connection())
 
1174
 
 
1175
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1176
        self.assertIsNot(t, subdir)
 
1177
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1178
 
 
1179
        home = subdir._reuse_for(t.base + 'home')
 
1180
        self.assertIs(t._get_connection(), home._get_connection())
 
1181
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1182
 
 
1183
    def test_clone(self):
 
1184
        # TODO: Test that clone moves up and down the filesystem
 
1185
        t1 = self.get_transport()
 
1186
 
 
1187
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1188
 
 
1189
        self.assertTrue(t1.has('a'))
 
1190
        self.assertTrue(t1.has('b/c'))
 
1191
        self.assertFalse(t1.has('c'))
 
1192
 
 
1193
        t2 = t1.clone('b')
 
1194
        self.assertEqual(t1.base + 'b/', t2.base)
 
1195
 
 
1196
        self.assertTrue(t2.has('c'))
 
1197
        self.assertFalse(t2.has('a'))
 
1198
 
 
1199
        t3 = t2.clone('..')
 
1200
        self.assertTrue(t3.has('a'))
 
1201
        self.assertFalse(t3.has('c'))
 
1202
 
 
1203
        self.assertFalse(t1.has('b/d'))
 
1204
        self.assertFalse(t2.has('d'))
 
1205
        self.assertFalse(t3.has('b/d'))
 
1206
 
 
1207
        if t1.is_readonly():
 
1208
            self.build_tree_contents([('b/d', b'newfile\n')])
 
1209
        else:
 
1210
            t2.put_bytes('d', b'newfile\n')
 
1211
 
 
1212
        self.assertTrue(t1.has('b/d'))
 
1213
        self.assertTrue(t2.has('d'))
 
1214
        self.assertTrue(t3.has('b/d'))
 
1215
 
 
1216
    def test_clone_to_root(self):
 
1217
        orig_transport = self.get_transport()
 
1218
        # Repeatedly go up to a parent directory until we're at the root
 
1219
        # directory of this transport
 
1220
        root_transport = orig_transport
 
1221
        new_transport = root_transport.clone("..")
 
1222
        # as we are walking up directories, the path must be
 
1223
        # growing less, except at the top
 
1224
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1225
            or new_transport.base == root_transport.base)
 
1226
        while new_transport.base != root_transport.base:
 
1227
            root_transport = new_transport
 
1228
            new_transport = root_transport.clone("..")
 
1229
            # as we are walking up directories, the path must be
 
1230
            # growing less, except at the top
 
1231
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1232
                or new_transport.base == root_transport.base)
 
1233
 
 
1234
        # Cloning to "/" should take us to exactly the same location.
 
1235
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1236
        # the abspath of "/" from the original transport should be the same
 
1237
        # as the base at the root:
 
1238
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1239
 
 
1240
        # At the root, the URL must still end with / as its a directory
 
1241
        self.assertEqual(root_transport.base[-1], '/')
 
1242
 
 
1243
    def test_clone_from_root(self):
 
1244
        """At the root, cloning to a simple dir should just do string append."""
 
1245
        orig_transport = self.get_transport()
 
1246
        root_transport = orig_transport.clone('/')
 
1247
        self.assertEqual(root_transport.base + '.bzr/',
 
1248
            root_transport.clone('.bzr').base)
 
1249
 
 
1250
    def test_base_url(self):
 
1251
        t = self.get_transport()
 
1252
        self.assertEqual('/', t.base[-1])
 
1253
 
 
1254
    def test_relpath(self):
 
1255
        t = self.get_transport()
 
1256
        self.assertEqual('', t.relpath(t.base))
 
1257
        # base ends with /
 
1258
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1259
        # subdirs which don't exist should still give relpaths.
 
1260
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1261
        # trailing slash should be the same.
 
1262
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1263
 
 
1264
    def test_relpath_at_root(self):
 
1265
        t = self.get_transport()
 
1266
        # clone all the way to the top
 
1267
        new_transport = t.clone('..')
 
1268
        while new_transport.base != t.base:
 
1269
            t = new_transport
 
1270
            new_transport = t.clone('..')
 
1271
        # we must be able to get a relpath below the root
 
1272
        self.assertEqual('', t.relpath(t.base))
 
1273
        # and a deeper one should work too
 
1274
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1275
 
 
1276
    def test_abspath(self):
 
1277
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1278
        # that have aliasing problems like symlinks should go in backend
 
1279
        # specific test cases.
 
1280
        transport = self.get_transport()
 
1281
 
 
1282
        self.assertEqual(transport.base + 'relpath',
 
1283
                         transport.abspath('relpath'))
 
1284
 
 
1285
        # This should work without raising an error.
 
1286
        transport.abspath("/")
 
1287
 
 
1288
        # the abspath of "/" and "/foo/.." should result in the same location
 
1289
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1290
 
 
1291
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1292
                         transport.abspath("/foo"))
 
1293
 
 
1294
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1295
    def test_win32_abspath(self):
 
1296
        # Note: we tried to set sys.platform='win32' so we could test on
 
1297
        # other platforms too, but then osutils does platform specific
 
1298
        # things at import time which defeated us...
 
1299
        if sys.platform != 'win32':
 
1300
            raise TestSkipped(
 
1301
                'Testing drive letters in abspath implemented only for win32')
 
1302
 
 
1303
        # smoke test for abspath on win32.
 
1304
        # a transport based on 'file:///' never fully qualifies the drive.
 
1305
        transport = _mod_transport.get_transport_from_url("file:///")
 
1306
        self.assertEqual(transport.abspath("/"), "file:///")
 
1307
 
 
1308
        # but a transport that starts with a drive spec must keep it.
 
1309
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1310
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1311
 
 
1312
    def test_local_abspath(self):
 
1313
        transport = self.get_transport()
 
1314
        try:
 
1315
            p = transport.local_abspath('.')
 
1316
        except (errors.NotLocalUrl, TransportNotPossible) as e:
 
1317
            # should be formattable
 
1318
            s = str(e)
 
1319
        else:
 
1320
            self.assertEqual(getcwd(), p)
 
1321
 
 
1322
    def test_abspath_at_root(self):
 
1323
        t = self.get_transport()
 
1324
        # clone all the way to the top
 
1325
        new_transport = t.clone('..')
 
1326
        while new_transport.base != t.base:
 
1327
            t = new_transport
 
1328
            new_transport = t.clone('..')
 
1329
        # we must be able to get a abspath of the root when we ask for
 
1330
        # t.abspath('..') - this due to our choice that clone('..')
 
1331
        # should return the root from the root, combined with the desire that
 
1332
        # the url from clone('..') and from abspath('..') should be the same.
 
1333
        self.assertEqual(t.base, t.abspath('..'))
 
1334
        # '' should give us the root
 
1335
        self.assertEqual(t.base, t.abspath(''))
 
1336
        # and a path should append to the url
 
1337
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1338
 
 
1339
    def test_iter_files_recursive(self):
 
1340
        transport = self.get_transport()
 
1341
        if not transport.listable():
 
1342
            self.assertRaises(TransportNotPossible,
 
1343
                              transport.iter_files_recursive)
 
1344
            return
 
1345
        self.build_tree(['isolated/',
 
1346
                         'isolated/dir/',
 
1347
                         'isolated/dir/foo',
 
1348
                         'isolated/dir/bar',
 
1349
                         'isolated/dir/b%25z', # make sure quoting is correct
 
1350
                         'isolated/bar'],
 
1351
                        transport=transport)
 
1352
        paths = set(transport.iter_files_recursive())
 
1353
        # nb the directories are not converted
 
1354
        self.assertEqual(paths,
 
1355
                    {'isolated/dir/foo',
 
1356
                         'isolated/dir/bar',
 
1357
                         'isolated/dir/b%2525z',
 
1358
                         'isolated/bar'})
 
1359
        sub_transport = transport.clone('isolated')
 
1360
        paths = set(sub_transport.iter_files_recursive())
 
1361
        self.assertEqual(paths,
 
1362
            {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1363
 
 
1364
    def test_copy_tree(self):
 
1365
        # TODO: test file contents and permissions are preserved. This test was
 
1366
        # added just to ensure that quoting was handled correctly.
 
1367
        # -- David Allouche 2006-08-11
 
1368
        transport = self.get_transport()
 
1369
        if not transport.listable():
 
1370
            self.assertRaises(TransportNotPossible,
 
1371
                              transport.iter_files_recursive)
 
1372
            return
 
1373
        if transport.is_readonly():
 
1374
            return
 
1375
        self.build_tree(['from/',
 
1376
                         'from/dir/',
 
1377
                         'from/dir/foo',
 
1378
                         'from/dir/bar',
 
1379
                         'from/dir/b%25z', # make sure quoting is correct
 
1380
                         'from/bar'],
 
1381
                        transport=transport)
 
1382
        transport.copy_tree('from', 'to')
 
1383
        paths = set(transport.iter_files_recursive())
 
1384
        self.assertEqual(paths,
 
1385
                    {'from/dir/foo',
 
1386
                         'from/dir/bar',
 
1387
                         'from/dir/b%2525z',
 
1388
                         'from/bar',
 
1389
                         'to/dir/foo',
 
1390
                         'to/dir/bar',
 
1391
                         'to/dir/b%2525z',
 
1392
                         'to/bar',})
 
1393
 
 
1394
    def test_copy_tree_to_transport(self):
 
1395
        transport = self.get_transport()
 
1396
        if not transport.listable():
 
1397
            self.assertRaises(TransportNotPossible,
 
1398
                              transport.iter_files_recursive)
 
1399
            return
 
1400
        if transport.is_readonly():
 
1401
            return
 
1402
        self.build_tree(['from/',
 
1403
                         'from/dir/',
 
1404
                         'from/dir/foo',
 
1405
                         'from/dir/bar',
 
1406
                         'from/dir/b%25z', # make sure quoting is correct
 
1407
                         'from/bar'],
 
1408
                        transport=transport)
 
1409
        from_transport = transport.clone('from')
 
1410
        to_transport = transport.clone('to')
 
1411
        to_transport.ensure_base()
 
1412
        from_transport.copy_tree_to_transport(to_transport)
 
1413
        paths = set(transport.iter_files_recursive())
 
1414
        self.assertEqual(paths,
 
1415
                    {'from/dir/foo',
 
1416
                         'from/dir/bar',
 
1417
                         'from/dir/b%2525z',
 
1418
                         'from/bar',
 
1419
                         'to/dir/foo',
 
1420
                         'to/dir/bar',
 
1421
                         'to/dir/b%2525z',
 
1422
                         'to/bar',})
 
1423
 
 
1424
    def test_unicode_paths(self):
 
1425
        """Test that we can read/write files with Unicode names."""
 
1426
        t = self.get_transport()
 
1427
 
 
1428
        # With FAT32 and certain encodings on win32
 
1429
        # '\xe5' and '\xe4' actually map to the same file
 
1430
        # adding a suffix kicks in the 'preserving but insensitive'
 
1431
        # route, and maintains the right files
 
1432
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1433
                 u'\xe4.2', # a w/ dots iso-8859-1
 
1434
                 u'\u017d', # Z with umlat iso-8859-2
 
1435
                 u'\u062c', # Arabic j
 
1436
                 u'\u0410', # Russian A
 
1437
                 u'\u65e5', # Kanji person
 
1438
                ]
 
1439
 
 
1440
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1441
        if no_unicode_support:
 
1442
            self.knownFailure("test server cannot handle unicode paths")
 
1443
 
 
1444
        try:
 
1445
            self.build_tree(files, transport=t, line_endings='binary')
 
1446
        except UnicodeError:
 
1447
            raise TestSkipped("cannot handle unicode paths in current encoding")
 
1448
 
 
1449
        # A plain unicode string is not a valid url
 
1450
        for fname in files:
 
1451
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
 
1452
 
 
1453
        for fname in files:
 
1454
            fname_utf8 = fname.encode('utf-8')
 
1455
            contents = b'contents of %s\n' % (fname_utf8,)
 
1456
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1457
 
 
1458
    def test_connect_twice_is_same_content(self):
 
1459
        # check that our server (whatever it is) is accessible reliably
 
1460
        # via get_transport and multiple connections share content.
 
1461
        transport = self.get_transport()
 
1462
        if transport.is_readonly():
 
1463
            return
 
1464
        transport.put_bytes('foo', b'bar')
 
1465
        transport3 = self.get_transport()
 
1466
        self.check_transport_contents(b'bar', transport3, 'foo')
 
1467
 
 
1468
        # now opening at a relative url should give use a sane result:
 
1469
        transport.mkdir('newdir')
 
1470
        transport5 = self.get_transport('newdir')
 
1471
        transport6 = transport5.clone('..')
 
1472
        self.check_transport_contents(b'bar', transport6, 'foo')
 
1473
 
 
1474
    def test_lock_write(self):
 
1475
        """Test transport-level write locks.
 
1476
 
 
1477
        These are deprecated and transports may decline to support them.
 
1478
        """
 
1479
        transport = self.get_transport()
 
1480
        if transport.is_readonly():
 
1481
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
 
1482
            return
 
1483
        transport.put_bytes('lock', b'')
 
1484
        try:
 
1485
            lock = transport.lock_write('lock')
 
1486
        except TransportNotPossible:
 
1487
            return
 
1488
        # TODO make this consistent on all platforms:
 
1489
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1490
        lock.unlock()
 
1491
 
 
1492
    def test_lock_read(self):
 
1493
        """Test transport-level read locks.
 
1494
 
 
1495
        These are deprecated and transports may decline to support them.
 
1496
        """
 
1497
        transport = self.get_transport()
 
1498
        if transport.is_readonly():
 
1499
            open('lock', 'w').close()
 
1500
        else:
 
1501
            transport.put_bytes('lock', b'')
 
1502
        try:
 
1503
            lock = transport.lock_read('lock')
 
1504
        except TransportNotPossible:
 
1505
            return
 
1506
        # TODO make this consistent on all platforms:
 
1507
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1508
        lock.unlock()
 
1509
 
 
1510
    def test_readv(self):
 
1511
        transport = self.get_transport()
 
1512
        if transport.is_readonly():
 
1513
            with open('a', 'w') as f: f.write('0123456789')
 
1514
        else:
 
1515
            transport.put_bytes('a', b'0123456789')
 
1516
 
 
1517
        d = list(transport.readv('a', ((0, 1),)))
 
1518
        self.assertEqual(d[0], (0, b'0'))
 
1519
 
 
1520
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1521
        self.assertEqual(d[0], (0, b'0'))
 
1522
        self.assertEqual(d[1], (1, b'1'))
 
1523
        self.assertEqual(d[2], (3, b'34'))
 
1524
        self.assertEqual(d[3], (9, b'9'))
 
1525
 
 
1526
    def test_readv_out_of_order(self):
 
1527
        transport = self.get_transport()
 
1528
        if transport.is_readonly():
 
1529
            with open('a', 'w') as f: f.write('0123456789')
 
1530
        else:
 
1531
            transport.put_bytes('a', b'01234567890')
 
1532
 
 
1533
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1534
        self.assertEqual(d[0], (1, b'1'))
 
1535
        self.assertEqual(d[1], (9, b'9'))
 
1536
        self.assertEqual(d[2], (0, b'0'))
 
1537
        self.assertEqual(d[3], (3, b'34'))
 
1538
 
 
1539
    def test_readv_with_adjust_for_latency(self):
 
1540
        transport = self.get_transport()
 
1541
        # the adjust for latency flag expands the data region returned
 
1542
        # according to a per-transport heuristic, so testing is a little
 
1543
        # tricky as we need more data than the largest combining that our
 
1544
        # transports do. To accomodate this we generate random data and cross
 
1545
        # reference the returned data with the random data. To avoid doing
 
1546
        # multiple large random byte look ups we do several tests on the same
 
1547
        # backing data.
 
1548
        content = osutils.rand_bytes(200*1024)
 
1549
        content_size = len(content)
 
1550
        if transport.is_readonly():
 
1551
            self.build_tree_contents([('a', content)])
 
1552
        else:
 
1553
            transport.put_bytes('a', content)
 
1554
        def check_result_data(result_vector):
 
1555
            for item in result_vector:
 
1556
                data_len = len(item[1])
 
1557
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1558
 
 
1559
        # start corner case
 
1560
        result = list(transport.readv('a', ((0, 30),),
 
1561
            adjust_for_latency=True, upper_limit=content_size))
 
1562
        # we expect 1 result, from 0, to something > 30
 
1563
        self.assertEqual(1, len(result))
 
1564
        self.assertEqual(0, result[0][0])
 
1565
        self.assertTrue(len(result[0][1]) >= 30)
 
1566
        check_result_data(result)
 
1567
        # end of file corner case
 
1568
        result = list(transport.readv('a', ((204700, 100),),
 
1569
            adjust_for_latency=True, upper_limit=content_size))
 
1570
        # we expect 1 result, from 204800- its length, to the end
 
1571
        self.assertEqual(1, len(result))
 
1572
        data_len = len(result[0][1])
 
1573
        self.assertEqual(204800-data_len, result[0][0])
 
1574
        self.assertTrue(data_len >= 100)
 
1575
        check_result_data(result)
 
1576
        # out of order ranges are made in order
 
1577
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1578
            adjust_for_latency=True, upper_limit=content_size))
 
1579
        # we expect 2 results, in order, start and end.
 
1580
        self.assertEqual(2, len(result))
 
1581
        # start
 
1582
        data_len = len(result[0][1])
 
1583
        self.assertEqual(0, result[0][0])
 
1584
        self.assertTrue(data_len >= 30)
 
1585
        # end
 
1586
        data_len = len(result[1][1])
 
1587
        self.assertEqual(204800-data_len, result[1][0])
 
1588
        self.assertTrue(data_len >= 100)
 
1589
        check_result_data(result)
 
1590
        # close ranges get combined (even if out of order)
 
1591
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
 
1592
            result = list(transport.readv('a', request_vector,
 
1593
                adjust_for_latency=True, upper_limit=content_size))
 
1594
            self.assertEqual(1, len(result))
 
1595
            data_len = len(result[0][1])
 
1596
            # minimum length is from 400 to 1034 - 634
 
1597
            self.assertTrue(data_len >= 634)
 
1598
            # must contain the region 400 to 1034
 
1599
            self.assertTrue(result[0][0] <= 400)
 
1600
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1601
            check_result_data(result)
 
1602
 
 
1603
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1604
        transport = self.get_transport()
 
1605
        # test from observed failure case.
 
1606
        if transport.is_readonly():
 
1607
            with open('a', 'w') as f: f.write('a'*1024*1024)
 
1608
        else:
 
1609
            transport.put_bytes('a', b'a'*1024*1024)
 
1610
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1611
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1612
            (465373, 800), (947422, 800)]
 
1613
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1614
        found_items = [False]*9
 
1615
        for pos, (start, length) in enumerate(broken_vector):
 
1616
            # check the range is covered by the result
 
1617
            for offset, data in results:
 
1618
                if offset <= start and start + length <= offset + len(data):
 
1619
                    found_items[pos] = True
 
1620
        self.assertEqual([True]*9, found_items)
 
1621
 
 
1622
    def test_get_with_open_write_stream_sees_all_content(self):
 
1623
        t = self.get_transport()
 
1624
        if t.is_readonly():
 
1625
            return
 
1626
        with t.open_write_stream('foo') as handle:
 
1627
            handle.write(b'bcd')
 
1628
            self.assertEqual([(0, b'b'), (2, b'd')], list(t.readv('foo', ((0, 1), (2, 1)))))
 
1629
 
 
1630
    def test_get_smart_medium(self):
 
1631
        """All transports must either give a smart medium, or know they can't.
 
1632
        """
 
1633
        transport = self.get_transport()
 
1634
        try:
 
1635
            client_medium = transport.get_smart_medium()
 
1636
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1637
        except errors.NoSmartMedium:
 
1638
            # as long as we got it we're fine
 
1639
            pass
 
1640
 
 
1641
    def test_readv_short_read(self):
 
1642
        transport = self.get_transport()
 
1643
        if transport.is_readonly():
 
1644
            with open('a', 'w') as f: f.write('0123456789')
 
1645
        else:
 
1646
            transport.put_bytes('a', b'01234567890')
 
1647
 
 
1648
        # This is intentionally reading off the end of the file
 
1649
        # since we are sure that it cannot get there
 
1650
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1651
                               # Can be raised by paramiko
 
1652
                               AssertionError),
 
1653
                              transport.readv, 'a', [(1, 1), (8, 10)])
 
1654
 
 
1655
        # This is trying to seek past the end of the file, it should
 
1656
        # also raise a special error
 
1657
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1658
                              transport.readv, 'a', [(12, 2)])
 
1659
 
 
1660
    def test_no_segment_parameters(self):
 
1661
        """Segment parameters should be stripped and stored in
 
1662
        transport.segment_parameters."""
 
1663
        transport = self.get_transport("foo")
 
1664
        self.assertEqual({}, transport.get_segment_parameters())
 
1665
 
 
1666
    def test_segment_parameters(self):
 
1667
        """Segment parameters should be stripped and stored in
 
1668
        transport.get_segment_parameters()."""
 
1669
        base_url = self._server.get_url()
 
1670
        parameters = {"key1": "val1", "key2": "val2"}
 
1671
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1672
        transport = _mod_transport.get_transport_from_url(url)
 
1673
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1674
 
 
1675
    def test_set_segment_parameters(self):
 
1676
        """Segment parameters can be set and show up in base."""
 
1677
        transport = self.get_transport("foo")
 
1678
        orig_base = transport.base
 
1679
        transport.set_segment_parameter("arm", "board")
 
1680
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1681
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1682
        transport.set_segment_parameter("arm", None)
 
1683
        transport.set_segment_parameter("nonexistant", None)
 
1684
        self.assertEqual({}, transport.get_segment_parameters())
 
1685
        self.assertEqual(orig_base, transport.base)
 
1686
 
 
1687
    def test_stat_symlink(self):
 
1688
        # if a transport points directly to a symlink (and supports symlinks
 
1689
        # at all) you can tell this.  helps with bug 32669.
 
1690
        t = self.get_transport()
 
1691
        try:
 
1692
            t.symlink('target', 'link')
 
1693
        except TransportNotPossible:
 
1694
            raise TestSkipped("symlinks not supported")
 
1695
        t2 = t.clone('link')
 
1696
        st = t2.stat('')
 
1697
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1698
 
 
1699
    def test_abspath_url_unquote_unreserved(self):
 
1700
        """URLs from abspath should have unreserved characters unquoted
 
1701
        
 
1702
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1703
        """
 
1704
        t = self.get_transport()
 
1705
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1706
        self.assertEqual(t.base + "-.09AZ_az~",
 
1707
            t.abspath(needlessly_escaped_dir))
 
1708
 
 
1709
    def test_clone_url_unquote_unreserved(self):
 
1710
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1711
        
 
1712
        Cloned transports should be prefix comparable for things like the
 
1713
        isolation checking of tests, see lp:842223 for more.
 
1714
        """
 
1715
        t1 = self.get_transport()
 
1716
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1717
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1718
        t2 = t1.clone(needlessly_escaped_dir)
 
1719
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1720
 
 
1721
    def test_hook_post_connection_one(self):
 
1722
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1723
        log = []
 
1724
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1725
        t = self.get_transport()
 
1726
        self.assertEqual([], log)
 
1727
        t.has("non-existant")
 
1728
        if isinstance(t, RemoteTransport):
 
1729
            self.assertEqual([t.get_smart_medium()], log)
 
1730
        elif isinstance(t, ConnectedTransport):
 
1731
            self.assertEqual([t], log)
 
1732
        else:
 
1733
            self.assertEqual([], log)
 
1734
 
 
1735
    def test_hook_post_connection_multi(self):
 
1736
        """Fire post_connect hook once per unshared underlying connection"""
 
1737
        log = []
 
1738
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1739
        t1 = self.get_transport()
 
1740
        t2 = t1.clone(".")
 
1741
        t3 = self.get_transport()
 
1742
        self.assertEqual([], log)
 
1743
        t1.has("x")
 
1744
        t2.has("x")
 
1745
        t3.has("x")
 
1746
        if isinstance(t1, RemoteTransport):
 
1747
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1748
        elif isinstance(t1, ConnectedTransport):
 
1749
            self.assertEqual([t1, t3], log)
 
1750
        else:
 
1751
            self.assertEqual([], log)