/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2018-11-21 03:39:28 UTC
  • mto: This revision was merged to the branch mainline in revision 7206.
  • Revision ID: jelmer@jelmer.uk-20181121033928-ck4sb5zfdwosw35b
Fix test.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
import os
 
24
import stat
 
25
import sys
 
26
 
 
27
from .. import (
 
28
    errors,
 
29
    osutils,
 
30
    pyutils,
 
31
    tests,
 
32
    transport as _mod_transport,
 
33
    urlutils,
 
34
    )
 
35
from ..errors import (ConnectionError,
 
36
                      FileExists,
 
37
                      NoSuchFile,
 
38
                      PathError,
 
39
                      TransportNotPossible,
 
40
                      )
 
41
from ..osutils import getcwd
 
42
from ..sixish import (
 
43
    BytesIO,
 
44
    zip,
 
45
    )
 
46
from ..bzr.smart import medium
 
47
from . import (
 
48
    TestSkipped,
 
49
    TestNotApplicable,
 
50
    multiply_tests,
 
51
    )
 
52
from . import test_server
 
53
from .test_transport import TestTransportImplementation
 
54
from ..transport import (
 
55
    ConnectedTransport,
 
56
    Transport,
 
57
    _get_transport_modules,
 
58
    )
 
59
from ..transport.memory import MemoryTransport
 
60
from ..transport.remote import RemoteTransport
 
61
 
 
62
 
 
63
def get_transport_test_permutations(module):
 
64
    """Get the permutations module wants to have tested."""
 
65
    if getattr(module, 'get_test_permutations', None) is None:
 
66
        raise AssertionError(
 
67
            "transport module %s doesn't provide get_test_permutations()"
 
68
            % module.__name__)
 
69
        return []
 
70
    return module.get_test_permutations()
 
71
 
 
72
 
 
73
def transport_test_permutations():
 
74
    """Return a list of the klass, server_factory pairs to test."""
 
75
    result = []
 
76
    for module in _get_transport_modules():
 
77
        try:
 
78
            permutations = get_transport_test_permutations(
 
79
                pyutils.get_named_object(module))
 
80
            for (klass, server_factory) in permutations:
 
81
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
82
                            {"transport_class": klass,
 
83
                             "transport_server": server_factory})
 
84
                result.append(scenario)
 
85
        except errors.DependencyNotPresent as e:
 
86
            # Continue even if a dependency prevents us
 
87
            # from adding this test
 
88
            pass
 
89
    return result
 
90
 
 
91
 
 
92
def load_tests(loader, standard_tests, pattern):
 
93
    """Multiply tests for tranport implementations."""
 
94
    result = loader.suiteClass()
 
95
    scenarios = transport_test_permutations()
 
96
    return multiply_tests(standard_tests, scenarios, result)
 
97
 
 
98
 
 
99
class TransportTests(TestTransportImplementation):
 
100
 
 
101
    def setUp(self):
 
102
        super(TransportTests, self).setUp()
 
103
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
 
104
 
 
105
    def check_transport_contents(self, content, transport, relpath):
 
106
        """Check that transport.get_bytes(relpath) == content."""
 
107
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
108
 
 
109
    def test_ensure_base_missing(self):
 
110
        """.ensure_base() should create the directory if it doesn't exist"""
 
111
        t = self.get_transport()
 
112
        t_a = t.clone('a')
 
113
        if t_a.is_readonly():
 
114
            self.assertRaises(TransportNotPossible,
 
115
                              t_a.ensure_base)
 
116
            return
 
117
        self.assertTrue(t_a.ensure_base())
 
118
        self.assertTrue(t.has('a'))
 
119
 
 
120
    def test_ensure_base_exists(self):
 
121
        """.ensure_base() should just be happy if it already exists"""
 
122
        t = self.get_transport()
 
123
        if t.is_readonly():
 
124
            return
 
125
 
 
126
        t.mkdir('a')
 
127
        t_a = t.clone('a')
 
128
        # ensure_base returns False if it didn't create the base
 
129
        self.assertFalse(t_a.ensure_base())
 
130
 
 
131
    def test_ensure_base_missing_parent(self):
 
132
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
133
        t = self.get_transport()
 
134
        if t.is_readonly():
 
135
            return
 
136
 
 
137
        t_a = t.clone('a')
 
138
        t_b = t_a.clone('b')
 
139
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
140
 
 
141
    def test_external_url(self):
 
142
        """.external_url either works or raises InProcessTransport."""
 
143
        t = self.get_transport()
 
144
        try:
 
145
            t.external_url()
 
146
        except errors.InProcessTransport:
 
147
            pass
 
148
 
 
149
    def test_has(self):
 
150
        t = self.get_transport()
 
151
 
 
152
        files = ['a', 'b', 'e', 'g', '%']
 
153
        self.build_tree(files, transport=t)
 
154
        self.assertEqual(True, t.has('a'))
 
155
        self.assertEqual(False, t.has('c'))
 
156
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
157
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
158
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
159
                                           urlutils.escape('%%')]))
 
160
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
161
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
162
 
 
163
    def test_has_root_works(self):
 
164
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
165
            raise TestNotApplicable(
 
166
                "SmartTCPServer_for_testing intentionally does not allow "
 
167
                "access to /.")
 
168
        current_transport = self.get_transport()
 
169
        self.assertTrue(current_transport.has('/'))
 
170
        root = current_transport.clone('/')
 
171
        self.assertTrue(root.has(''))
 
172
 
 
173
    def test_get(self):
 
174
        t = self.get_transport()
 
175
 
 
176
        files = ['a']
 
177
        content = b'contents of a\n'
 
178
        self.build_tree(['a'], transport=t, line_endings='binary')
 
179
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
180
        f = t.get('a')
 
181
        self.assertEqual(content, f.read())
 
182
 
 
183
    def test_get_unknown_file(self):
 
184
        t = self.get_transport()
 
185
        files = ['a', 'b']
 
186
        contents = [b'contents of a\n',
 
187
                    b'contents of b\n',
 
188
                    ]
 
189
        self.build_tree(files, transport=t, line_endings='binary')
 
190
        self.assertRaises(NoSuchFile, t.get, 'c')
 
191
 
 
192
        def iterate_and_close(func, *args):
 
193
            for f in func(*args):
 
194
                # We call f.read() here because things like paramiko actually
 
195
                # spawn a thread to prefetch the content, which we want to
 
196
                # consume before we close the handle.
 
197
                content = f.read()
 
198
                f.close()
 
199
 
 
200
    def test_get_directory_read_gives_ReadError(self):
 
201
        """consistent errors for read() on a file returned by get()."""
 
202
        t = self.get_transport()
 
203
        if t.is_readonly():
 
204
            self.build_tree(['a directory/'])
 
205
        else:
 
206
            t.mkdir('a%20directory')
 
207
        # getting the file must either work or fail with a PathError
 
208
        try:
 
209
            a_file = t.get('a%20directory')
 
210
        except (errors.PathError, errors.RedirectRequested):
 
211
            # early failure return immediately.
 
212
            return
 
213
        # having got a file, read() must either work (i.e. http reading a dir
 
214
        # listing) or fail with ReadError
 
215
        try:
 
216
            a_file.read()
 
217
        except errors.ReadError:
 
218
            pass
 
219
 
 
220
    def test_get_bytes(self):
 
221
        t = self.get_transport()
 
222
 
 
223
        files = ['a', 'b', 'e', 'g']
 
224
        contents = [b'contents of a\n',
 
225
                    b'contents of b\n',
 
226
                    b'contents of e\n',
 
227
                    b'contents of g\n',
 
228
                    ]
 
229
        self.build_tree(files, transport=t, line_endings='binary')
 
230
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
231
 
 
232
        for content, fname in zip(contents, files):
 
233
            self.assertEqual(content, t.get_bytes(fname))
 
234
 
 
235
    def test_get_bytes_unknown_file(self):
 
236
        t = self.get_transport()
 
237
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
238
 
 
239
    def test_get_with_open_write_stream_sees_all_content(self):
 
240
        t = self.get_transport()
 
241
        if t.is_readonly():
 
242
            return
 
243
        with t.open_write_stream('foo') as handle:
 
244
            handle.write(b'b')
 
245
            self.assertEqual(b'b', t.get_bytes('foo'))
 
246
 
 
247
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
248
        t = self.get_transport()
 
249
        if t.is_readonly():
 
250
            return
 
251
        with t.open_write_stream('foo') as handle:
 
252
            handle.write(b'b')
 
253
            self.assertEqual(b'b', t.get_bytes('foo'))
 
254
            with t.get('foo') as f:
 
255
                self.assertEqual(b'b', f.read())
 
256
 
 
257
    def test_put_bytes(self):
 
258
        t = self.get_transport()
 
259
 
 
260
        if t.is_readonly():
 
261
            self.assertRaises(TransportNotPossible,
 
262
                              t.put_bytes, 'a', b'some text for a\n')
 
263
            return
 
264
 
 
265
        t.put_bytes('a', b'some text for a\n')
 
266
        self.assertTrue(t.has('a'))
 
267
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
268
 
 
269
        # The contents should be overwritten
 
270
        t.put_bytes('a', b'new text for a\n')
 
271
        self.check_transport_contents(b'new text for a\n', t, 'a')
 
272
 
 
273
        self.assertRaises(NoSuchFile,
 
274
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
 
275
 
 
276
    def test_put_bytes_non_atomic(self):
 
277
        t = self.get_transport()
 
278
 
 
279
        if t.is_readonly():
 
280
            self.assertRaises(TransportNotPossible,
 
281
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
 
282
            return
 
283
 
 
284
        self.assertFalse(t.has('a'))
 
285
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
286
        self.assertTrue(t.has('a'))
 
287
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
288
        # Put also replaces contents
 
289
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
290
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
291
 
 
292
        # Make sure we can create another file
 
293
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
 
294
        # And overwrite 'a' with empty contents
 
295
        t.put_bytes_non_atomic('a', b'')
 
296
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
297
        self.check_transport_contents(b'', t, 'a')
 
298
 
 
299
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
300
                          b'contents\n')
 
301
        # Now test the create_parent flag
 
302
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
303
                          b'contents\n')
 
304
        self.assertFalse(t.has('dir/a'))
 
305
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
 
306
                               create_parent_dir=True)
 
307
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
308
 
 
309
        # But we still get NoSuchFile if we can't make the parent dir
 
310
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
311
                          b'contents\n',
 
312
                          create_parent_dir=True)
 
313
 
 
314
    def test_put_bytes_permissions(self):
 
315
        t = self.get_transport()
 
316
 
 
317
        if t.is_readonly():
 
318
            return
 
319
        if not t._can_roundtrip_unix_modebits():
 
320
            # Can't roundtrip, so no need to run this test
 
321
            return
 
322
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
323
        self.assertTransportMode(t, 'mode644', 0o644)
 
324
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
325
        self.assertTransportMode(t, 'mode666', 0o666)
 
326
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
327
        self.assertTransportMode(t, 'mode600', 0o600)
 
328
        # Yes, you can put_bytes a file such that it becomes readonly
 
329
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
330
        self.assertTransportMode(t, 'mode400', 0o400)
 
331
 
 
332
        # The default permissions should be based on the current umask
 
333
        umask = osutils.get_umask()
 
334
        t.put_bytes('nomode', b'test text\n', mode=None)
 
335
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
336
 
 
337
    def test_put_bytes_non_atomic_permissions(self):
 
338
        t = self.get_transport()
 
339
 
 
340
        if t.is_readonly():
 
341
            return
 
342
        if not t._can_roundtrip_unix_modebits():
 
343
            # Can't roundtrip, so no need to run this test
 
344
            return
 
345
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
346
        self.assertTransportMode(t, 'mode644', 0o644)
 
347
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
348
        self.assertTransportMode(t, 'mode666', 0o666)
 
349
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
350
        self.assertTransportMode(t, 'mode600', 0o600)
 
351
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
352
        self.assertTransportMode(t, 'mode400', 0o400)
 
353
 
 
354
        # The default permissions should be based on the current umask
 
355
        umask = osutils.get_umask()
 
356
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
357
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
358
 
 
359
        # We should also be able to set the mode for a parent directory
 
360
        # when it is created
 
361
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
362
                               dir_mode=0o700, create_parent_dir=True)
 
363
        self.assertTransportMode(t, 'dir700', 0o700)
 
364
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
365
                               dir_mode=0o770, create_parent_dir=True)
 
366
        self.assertTransportMode(t, 'dir770', 0o770)
 
367
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
368
                               dir_mode=0o777, create_parent_dir=True)
 
369
        self.assertTransportMode(t, 'dir777', 0o777)
 
370
 
 
371
    def test_put_file(self):
 
372
        t = self.get_transport()
 
373
 
 
374
        if t.is_readonly():
 
375
            self.assertRaises(TransportNotPossible,
 
376
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
 
377
            return
 
378
 
 
379
        result = t.put_file('a', BytesIO(b'some text for a\n'))
 
380
        # put_file returns the length of the data written
 
381
        self.assertEqual(16, result)
 
382
        self.assertTrue(t.has('a'))
 
383
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
384
        # Put also replaces contents
 
385
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
 
386
        self.assertEqual(19, result)
 
387
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
388
        self.assertRaises(NoSuchFile,
 
389
                          t.put_file, 'path/doesnt/exist/c',
 
390
                          BytesIO(b'contents'))
 
391
 
 
392
    def test_put_file_non_atomic(self):
 
393
        t = self.get_transport()
 
394
 
 
395
        if t.is_readonly():
 
396
            self.assertRaises(TransportNotPossible,
 
397
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
398
            return
 
399
 
 
400
        self.assertFalse(t.has('a'))
 
401
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
402
        self.assertTrue(t.has('a'))
 
403
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
404
        # Put also replaces contents
 
405
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
406
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
407
 
 
408
        # Make sure we can create another file
 
409
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
 
410
        # And overwrite 'a' with empty contents
 
411
        t.put_file_non_atomic('a', BytesIO(b''))
 
412
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
413
        self.check_transport_contents(b'', t, 'a')
 
414
 
 
415
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
416
                          BytesIO(b'contents\n'))
 
417
        # Now test the create_parent flag
 
418
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
419
                          BytesIO(b'contents\n'))
 
420
        self.assertFalse(t.has('dir/a'))
 
421
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
 
422
                              create_parent_dir=True)
 
423
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
424
 
 
425
        # But we still get NoSuchFile if we can't make the parent dir
 
426
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
427
                          BytesIO(b'contents\n'),
 
428
                          create_parent_dir=True)
 
429
 
 
430
    def test_put_file_permissions(self):
 
431
 
 
432
        t = self.get_transport()
 
433
 
 
434
        if t.is_readonly():
 
435
            return
 
436
        if not t._can_roundtrip_unix_modebits():
 
437
            # Can't roundtrip, so no need to run this test
 
438
            return
 
439
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
440
        self.assertTransportMode(t, 'mode644', 0o644)
 
441
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
442
        self.assertTransportMode(t, 'mode666', 0o666)
 
443
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
444
        self.assertTransportMode(t, 'mode600', 0o600)
 
445
        # Yes, you can put a file such that it becomes readonly
 
446
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
447
        self.assertTransportMode(t, 'mode400', 0o400)
 
448
        # The default permissions should be based on the current umask
 
449
        umask = osutils.get_umask()
 
450
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
451
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
452
 
 
453
    def test_put_file_non_atomic_permissions(self):
 
454
        t = self.get_transport()
 
455
 
 
456
        if t.is_readonly():
 
457
            return
 
458
        if not t._can_roundtrip_unix_modebits():
 
459
            # Can't roundtrip, so no need to run this test
 
460
            return
 
461
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
462
        self.assertTransportMode(t, 'mode644', 0o644)
 
463
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
464
        self.assertTransportMode(t, 'mode666', 0o666)
 
465
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
466
        self.assertTransportMode(t, 'mode600', 0o600)
 
467
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
468
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
469
        self.assertTransportMode(t, 'mode400', 0o400)
 
470
 
 
471
        # The default permissions should be based on the current umask
 
472
        umask = osutils.get_umask()
 
473
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
474
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
475
 
 
476
        # We should also be able to set the mode for a parent directory
 
477
        # when it is created
 
478
        sio = BytesIO()
 
479
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
480
                              dir_mode=0o700, create_parent_dir=True)
 
481
        self.assertTransportMode(t, 'dir700', 0o700)
 
482
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
483
                              dir_mode=0o770, create_parent_dir=True)
 
484
        self.assertTransportMode(t, 'dir770', 0o770)
 
485
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
486
                              dir_mode=0o777, create_parent_dir=True)
 
487
        self.assertTransportMode(t, 'dir777', 0o777)
 
488
 
 
489
    def test_put_bytes_unicode(self):
 
490
        t = self.get_transport()
 
491
        if t.is_readonly():
 
492
            return
 
493
        unicode_string = u'\u1234'
 
494
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
495
 
 
496
    def test_mkdir(self):
 
497
        t = self.get_transport()
 
498
 
 
499
        if t.is_readonly():
 
500
            # cannot mkdir on readonly transports. We're not testing for
 
501
            # cache coherency because cache behaviour is not currently
 
502
            # defined for the transport interface.
 
503
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
504
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
505
            self.assertRaises(TransportNotPossible,
 
506
                              t.mkdir, 'path/doesnt/exist')
 
507
            return
 
508
        # Test mkdir
 
509
        t.mkdir('dir_a')
 
510
        self.assertEqual(t.has('dir_a'), True)
 
511
        self.assertEqual(t.has('dir_b'), False)
 
512
 
 
513
        t.mkdir('dir_b')
 
514
        self.assertEqual(t.has('dir_b'), True)
 
515
 
 
516
        self.assertEqual([t.has(n) for n in
 
517
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
518
                         [True, True, False, True])
 
519
 
 
520
        # we were testing that a local mkdir followed by a transport
 
521
        # mkdir failed thusly, but given that we * in one process * do not
 
522
        # concurrently fiddle with disk dirs and then use transport to do
 
523
        # things, the win here seems marginal compared to the constraint on
 
524
        # the interface. RBC 20051227
 
525
        t.mkdir('dir_g')
 
526
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
527
 
 
528
        # Test get/put in sub-directories
 
529
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
530
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
531
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
532
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
 
533
 
 
534
        # mkdir of a dir with an absent parent
 
535
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
536
 
 
537
    def test_mkdir_permissions(self):
 
538
        t = self.get_transport()
 
539
        if t.is_readonly():
 
540
            return
 
541
        if not t._can_roundtrip_unix_modebits():
 
542
            # no sense testing on this transport
 
543
            return
 
544
        # Test mkdir with a mode
 
545
        t.mkdir('dmode755', mode=0o755)
 
546
        self.assertTransportMode(t, 'dmode755', 0o755)
 
547
        t.mkdir('dmode555', mode=0o555)
 
548
        self.assertTransportMode(t, 'dmode555', 0o555)
 
549
        t.mkdir('dmode777', mode=0o777)
 
550
        self.assertTransportMode(t, 'dmode777', 0o777)
 
551
        t.mkdir('dmode700', mode=0o700)
 
552
        self.assertTransportMode(t, 'dmode700', 0o700)
 
553
        t.mkdir('mdmode755', mode=0o755)
 
554
        self.assertTransportMode(t, 'mdmode755', 0o755)
 
555
 
 
556
        # Default mode should be based on umask
 
557
        umask = osutils.get_umask()
 
558
        t.mkdir('dnomode', mode=None)
 
559
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
 
560
 
 
561
    def test_opening_a_file_stream_creates_file(self):
 
562
        t = self.get_transport()
 
563
        if t.is_readonly():
 
564
            return
 
565
        handle = t.open_write_stream('foo')
 
566
        try:
 
567
            self.assertEqual(b'', t.get_bytes('foo'))
 
568
        finally:
 
569
            handle.close()
 
570
 
 
571
    def test_opening_a_file_stream_can_set_mode(self):
 
572
        t = self.get_transport()
 
573
        if t.is_readonly():
 
574
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
575
                              t.open_write_stream, 'foo')
 
576
            return
 
577
        if not t._can_roundtrip_unix_modebits():
 
578
            # Can't roundtrip, so no need to run this test
 
579
            return
 
580
 
 
581
        def check_mode(name, mode, expected):
 
582
            handle = t.open_write_stream(name, mode=mode)
 
583
            handle.close()
 
584
            self.assertTransportMode(t, name, expected)
 
585
        check_mode('mode644', 0o644, 0o644)
 
586
        check_mode('mode666', 0o666, 0o666)
 
587
        check_mode('mode600', 0o600, 0o600)
 
588
        # The default permissions should be based on the current umask
 
589
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
 
590
 
 
591
    def test_copy_to(self):
 
592
        # FIXME: test:   same server to same server (partly done)
 
593
        # same protocol two servers
 
594
        # and    different protocols (done for now except for MemoryTransport.
 
595
        # - RBC 20060122
 
596
 
 
597
        def simple_copy_files(transport_from, transport_to):
 
598
            files = ['a', 'b', 'c', 'd']
 
599
            self.build_tree(files, transport=transport_from)
 
600
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
601
            for f in files:
 
602
                self.check_transport_contents(transport_to.get_bytes(f),
 
603
                                              transport_from, f)
 
604
 
 
605
        t = self.get_transport()
 
606
        if t.__class__.__name__ == "SFTPTransport":
 
607
            self.skipTest("SFTP copy_to currently too flakey to use")
 
608
        temp_transport = MemoryTransport('memory:///')
 
609
        simple_copy_files(t, temp_transport)
 
610
        if not t.is_readonly():
 
611
            t.mkdir('copy_to_simple')
 
612
            t2 = t.clone('copy_to_simple')
 
613
            simple_copy_files(t, t2)
 
614
 
 
615
        # Test that copying into a missing directory raises
 
616
        # NoSuchFile
 
617
        if t.is_readonly():
 
618
            self.build_tree(['e/', 'e/f'])
 
619
        else:
 
620
            t.mkdir('e')
 
621
            t.put_bytes('e/f', b'contents of e')
 
622
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
623
        temp_transport.mkdir('e')
 
624
        t.copy_to(['e/f'], temp_transport)
 
625
 
 
626
        del temp_transport
 
627
        temp_transport = MemoryTransport('memory:///')
 
628
 
 
629
        files = ['a', 'b', 'c', 'd']
 
630
        t.copy_to(iter(files), temp_transport)
 
631
        for f in files:
 
632
            self.check_transport_contents(temp_transport.get_bytes(f),
 
633
                                          t, f)
 
634
        del temp_transport
 
635
 
 
636
        for mode in (0o666, 0o644, 0o600, 0o400):
 
637
            temp_transport = MemoryTransport("memory:///")
 
638
            t.copy_to(files, temp_transport, mode=mode)
 
639
            for f in files:
 
640
                self.assertTransportMode(temp_transport, f, mode)
 
641
 
 
642
    def test_create_prefix(self):
 
643
        t = self.get_transport()
 
644
        sub = t.clone('foo').clone('bar')
 
645
        try:
 
646
            sub.create_prefix()
 
647
        except TransportNotPossible:
 
648
            self.assertTrue(t.is_readonly())
 
649
        else:
 
650
            self.assertTrue(t.has('foo/bar'))
 
651
 
 
652
    def test_append_file(self):
 
653
        t = self.get_transport()
 
654
 
 
655
        if t.is_readonly():
 
656
            self.assertRaises(TransportNotPossible,
 
657
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
658
            return
 
659
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
660
        t.put_bytes('b', b'contents\nfor b\n')
 
661
 
 
662
        self.assertEqual(20,
 
663
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
664
 
 
665
        self.check_transport_contents(
 
666
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
667
            t, 'a')
 
668
 
 
669
        # a file with no parent should fail..
 
670
        self.assertRaises(NoSuchFile,
 
671
                          t.append_file, 'missing/path', BytesIO(b'content'))
 
672
 
 
673
        # And we can create new files, too
 
674
        self.assertEqual(0,
 
675
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
676
        self.check_transport_contents(b'some text\nfor a missing file\n',
 
677
                                      t, 'c')
 
678
 
 
679
    def test_append_bytes(self):
 
680
        t = self.get_transport()
 
681
 
 
682
        if t.is_readonly():
 
683
            self.assertRaises(TransportNotPossible,
 
684
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
 
685
            return
 
686
 
 
687
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
688
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
 
689
 
 
690
        self.assertEqual(20,
 
691
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
 
692
 
 
693
        self.check_transport_contents(
 
694
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
695
            t, 'a')
 
696
 
 
697
        # a file with no parent should fail..
 
698
        self.assertRaises(NoSuchFile,
 
699
                          t.append_bytes, 'missing/path', b'content')
 
700
 
 
701
    def test_append_file_mode(self):
 
702
        """Check that append accepts a mode parameter"""
 
703
        # check append accepts a mode
 
704
        t = self.get_transport()
 
705
        if t.is_readonly():
 
706
            self.assertRaises(TransportNotPossible,
 
707
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
 
708
            return
 
709
        t.append_file('f', BytesIO(b'f'), mode=None)
 
710
 
 
711
    def test_append_bytes_mode(self):
 
712
        # check append_bytes accepts a mode
 
713
        t = self.get_transport()
 
714
        if t.is_readonly():
 
715
            self.assertRaises(TransportNotPossible,
 
716
                              t.append_bytes, 'f', b'f', mode=None)
 
717
            return
 
718
        t.append_bytes('f', b'f', mode=None)
 
719
 
 
720
    def test_delete(self):
 
721
        # TODO: Test Transport.delete
 
722
        t = self.get_transport()
 
723
 
 
724
        # Not much to do with a readonly transport
 
725
        if t.is_readonly():
 
726
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
727
            return
 
728
 
 
729
        t.put_bytes('a', b'a little bit of text\n')
 
730
        self.assertTrue(t.has('a'))
 
731
        t.delete('a')
 
732
        self.assertFalse(t.has('a'))
 
733
 
 
734
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
735
 
 
736
        t.put_bytes('a', b'a text\n')
 
737
        t.put_bytes('b', b'b text\n')
 
738
        t.put_bytes('c', b'c text\n')
 
739
        self.assertEqual([True, True, True],
 
740
                         [t.has(n) for n in ['a', 'b', 'c']])
 
741
        t.delete('a')
 
742
        t.delete('c')
 
743
        self.assertEqual([False, True, False],
 
744
                         [t.has(n) for n in ['a', 'b', 'c']])
 
745
        self.assertFalse(t.has('a'))
 
746
        self.assertTrue(t.has('b'))
 
747
        self.assertFalse(t.has('c'))
 
748
 
 
749
        for name in ['a', 'c', 'd']:
 
750
            self.assertRaises(NoSuchFile, t.delete, name)
 
751
 
 
752
        # We should have deleted everything
 
753
        # SftpServer creates control files in the
 
754
        # working directory, so we can just do a
 
755
        # plain "listdir".
 
756
        # self.assertEqual([], os.listdir('.'))
 
757
 
 
758
    def test_recommended_page_size(self):
 
759
        """Transports recommend a page size for partial access to files."""
 
760
        t = self.get_transport()
 
761
        self.assertIsInstance(t.recommended_page_size(), int)
 
762
 
 
763
    def test_rmdir(self):
 
764
        t = self.get_transport()
 
765
        # Not much to do with a readonly transport
 
766
        if t.is_readonly():
 
767
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
768
            return
 
769
        t.mkdir('adir')
 
770
        t.mkdir('adir/bdir')
 
771
        t.rmdir('adir/bdir')
 
772
        # ftp may not be able to raise NoSuchFile for lack of
 
773
        # details when failing
 
774
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
775
        t.rmdir('adir')
 
776
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
777
 
 
778
    def test_rmdir_not_empty(self):
 
779
        """Deleting a non-empty directory raises an exception
 
780
 
 
781
        sftp (and possibly others) don't give us a specific "directory not
 
782
        empty" exception -- we can just see that the operation failed.
 
783
        """
 
784
        t = self.get_transport()
 
785
        if t.is_readonly():
 
786
            return
 
787
        t.mkdir('adir')
 
788
        t.mkdir('adir/bdir')
 
789
        self.assertRaises(PathError, t.rmdir, 'adir')
 
790
 
 
791
    def test_rmdir_empty_but_similar_prefix(self):
 
792
        """rmdir does not get confused by sibling paths.
 
793
 
 
794
        A naive implementation of MemoryTransport would refuse to rmdir
 
795
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
796
        uses "path.startswith(dir)" on all file paths to determine if directory
 
797
        is empty.
 
798
        """
 
799
        t = self.get_transport()
 
800
        if t.is_readonly():
 
801
            return
 
802
        t.mkdir('foo')
 
803
        t.put_bytes('foo-bar', b'')
 
804
        t.mkdir('foo-baz')
 
805
        t.rmdir('foo')
 
806
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
807
        self.assertTrue(t.has('foo-bar'))
 
808
 
 
809
    def test_rename_dir_succeeds(self):
 
810
        t = self.get_transport()
 
811
        if t.is_readonly():
 
812
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
813
                              t.rename, 'foo', 'bar')
 
814
            return
 
815
        t.mkdir('adir')
 
816
        t.mkdir('adir/asubdir')
 
817
        t.rename('adir', 'bdir')
 
818
        self.assertTrue(t.has('bdir/asubdir'))
 
819
        self.assertFalse(t.has('adir'))
 
820
 
 
821
    def test_rename_dir_nonempty(self):
 
822
        """Attempting to replace a nonemtpy directory should fail"""
 
823
        t = self.get_transport()
 
824
        if t.is_readonly():
 
825
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
826
                              t.rename, 'foo', 'bar')
 
827
            return
 
828
        t.mkdir('adir')
 
829
        t.mkdir('adir/asubdir')
 
830
        t.mkdir('bdir')
 
831
        t.mkdir('bdir/bsubdir')
 
832
        # any kind of PathError would be OK, though we normally expect
 
833
        # DirectoryNotEmpty
 
834
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
835
        # nothing was changed so it should still be as before
 
836
        self.assertTrue(t.has('bdir/bsubdir'))
 
837
        self.assertFalse(t.has('adir/bdir'))
 
838
        self.assertFalse(t.has('adir/bsubdir'))
 
839
 
 
840
    def test_rename_across_subdirs(self):
 
841
        t = self.get_transport()
 
842
        if t.is_readonly():
 
843
            raise TestNotApplicable("transport is readonly")
 
844
        t.mkdir('a')
 
845
        t.mkdir('b')
 
846
        ta = t.clone('a')
 
847
        tb = t.clone('b')
 
848
        ta.put_bytes('f', b'aoeu')
 
849
        ta.rename('f', '../b/f')
 
850
        self.assertTrue(tb.has('f'))
 
851
        self.assertFalse(ta.has('f'))
 
852
        self.assertTrue(t.has('b/f'))
 
853
 
 
854
    def test_delete_tree(self):
 
855
        t = self.get_transport()
 
856
 
 
857
        # Not much to do with a readonly transport
 
858
        if t.is_readonly():
 
859
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
860
            return
 
861
 
 
862
        # and does it like listing ?
 
863
        t.mkdir('adir')
 
864
        try:
 
865
            t.delete_tree('adir')
 
866
        except TransportNotPossible:
 
867
            # ok, this transport does not support delete_tree
 
868
            return
 
869
 
 
870
        # did it delete that trivial case?
 
871
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
872
 
 
873
        self.build_tree(['adir/',
 
874
                         'adir/file',
 
875
                         'adir/subdir/',
 
876
                         'adir/subdir/file',
 
877
                         'adir/subdir2/',
 
878
                         'adir/subdir2/file',
 
879
                         ], transport=t)
 
880
 
 
881
        t.delete_tree('adir')
 
882
        # adir should be gone now.
 
883
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
884
 
 
885
    def test_move(self):
 
886
        t = self.get_transport()
 
887
 
 
888
        if t.is_readonly():
 
889
            return
 
890
 
 
891
        # TODO: I would like to use os.listdir() to
 
892
        # make sure there are no extra files, but SftpServer
 
893
        # creates control files in the working directory
 
894
        # perhaps all of this could be done in a subdirectory
 
895
 
 
896
        t.put_bytes('a', b'a first file\n')
 
897
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
 
898
 
 
899
        t.move('a', 'b')
 
900
        self.assertTrue(t.has('b'))
 
901
        self.assertFalse(t.has('a'))
 
902
 
 
903
        self.check_transport_contents(b'a first file\n', t, 'b')
 
904
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
 
905
 
 
906
        # Overwrite a file
 
907
        t.put_bytes('c', b'c this file\n')
 
908
        t.move('c', 'b')
 
909
        self.assertFalse(t.has('c'))
 
910
        self.check_transport_contents(b'c this file\n', t, 'b')
 
911
 
 
912
        # TODO: Try to write a test for atomicity
 
913
        # TODO: Test moving into a non-existent subdirectory
 
914
 
 
915
    def test_copy(self):
 
916
        t = self.get_transport()
 
917
 
 
918
        if t.is_readonly():
 
919
            return
 
920
 
 
921
        t.put_bytes('a', b'a file\n')
 
922
        t.copy('a', 'b')
 
923
        self.check_transport_contents(b'a file\n', t, 'b')
 
924
 
 
925
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
926
        os.mkdir('c')
 
927
        # What should the assert be if you try to copy a
 
928
        # file over a directory?
 
929
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
930
        t.put_bytes('d', b'text in d\n')
 
931
        t.copy('d', 'b')
 
932
        self.check_transport_contents(b'text in d\n', t, 'b')
 
933
 
 
934
    def test_connection_error(self):
 
935
        """ConnectionError is raised when connection is impossible.
 
936
 
 
937
        The error should be raised from the first operation on the transport.
 
938
        """
 
939
        try:
 
940
            url = self._server.get_bogus_url()
 
941
        except NotImplementedError:
 
942
            raise TestSkipped("Transport %s has no bogus URL support." %
 
943
                              self._server.__class__)
 
944
        t = _mod_transport.get_transport_from_url(url)
 
945
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
946
 
 
947
    def test_stat(self):
 
948
        # TODO: Test stat, just try once, and if it throws, stop testing
 
949
        from stat import S_ISDIR, S_ISREG
 
950
 
 
951
        t = self.get_transport()
 
952
 
 
953
        try:
 
954
            st = t.stat('.')
 
955
        except TransportNotPossible as e:
 
956
            # This transport cannot stat
 
957
            return
 
958
 
 
959
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
960
        sizes = [14, 0, 16, 0, 18]
 
961
        self.build_tree(paths, transport=t, line_endings='binary')
 
962
 
 
963
        for path, size in zip(paths, sizes):
 
964
            st = t.stat(path)
 
965
            if path.endswith('/'):
 
966
                self.assertTrue(S_ISDIR(st.st_mode))
 
967
                # directory sizes are meaningless
 
968
            else:
 
969
                self.assertTrue(S_ISREG(st.st_mode))
 
970
                self.assertEqual(size, st.st_size)
 
971
 
 
972
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
973
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
974
 
 
975
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
976
        subdir = t.clone('subdir')
 
977
        st = subdir.stat('./file')
 
978
        st = subdir.stat('.')
 
979
 
 
980
    def test_hardlink(self):
 
981
        from stat import ST_NLINK
 
982
 
 
983
        t = self.get_transport()
 
984
 
 
985
        source_name = "original_target"
 
986
        link_name = "target_link"
 
987
 
 
988
        self.build_tree([source_name], transport=t)
 
989
 
 
990
        try:
 
991
            t.hardlink(source_name, link_name)
 
992
 
 
993
            self.assertTrue(t.has(source_name))
 
994
            self.assertTrue(t.has(link_name))
 
995
 
 
996
            st = t.stat(link_name)
 
997
            self.assertEqual(st[ST_NLINK], 2)
 
998
        except TransportNotPossible:
 
999
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1000
                              self._server.__class__)
 
1001
 
 
1002
    def test_symlink(self):
 
1003
        from stat import S_ISLNK
 
1004
 
 
1005
        t = self.get_transport()
 
1006
 
 
1007
        source_name = "original_target"
 
1008
        link_name = "target_link"
 
1009
 
 
1010
        self.build_tree([source_name], transport=t)
 
1011
 
 
1012
        try:
 
1013
            t.symlink(source_name, link_name)
 
1014
 
 
1015
            self.assertTrue(t.has(source_name))
 
1016
            self.assertTrue(t.has(link_name))
 
1017
 
 
1018
            st = t.stat(link_name)
 
1019
            self.assertTrue(S_ISLNK(st.st_mode),
 
1020
                            "expected symlink, got mode %o" % st.st_mode)
 
1021
        except TransportNotPossible:
 
1022
            raise TestSkipped("Transport %s does not support symlinks." %
 
1023
                              self._server.__class__)
 
1024
 
 
1025
        self.assertEqual(source_name, t.readlink(link_name))
 
1026
 
 
1027
    def test_readlink_nonexistent(self):
 
1028
        t = self.get_transport()
 
1029
        try:
 
1030
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
 
1031
        except TransportNotPossible:
 
1032
            raise TestSkipped("Transport %s does not support symlinks." %
 
1033
                              self._server.__class__)
 
1034
 
 
1035
    def test_list_dir(self):
 
1036
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1037
        t = self.get_transport()
 
1038
 
 
1039
        if not t.listable():
 
1040
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1041
            return
 
1042
 
 
1043
        def sorted_list(d, transport):
 
1044
            l = sorted(transport.list_dir(d))
 
1045
            return l
 
1046
 
 
1047
        self.assertEqual([], sorted_list('.', t))
 
1048
        # c2 is precisely one letter longer than c here to test that
 
1049
        # suffixing is not confused.
 
1050
        # a%25b checks that quoting is done consistently across transports
 
1051
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1052
 
 
1053
        if not t.is_readonly():
 
1054
            self.build_tree(tree_names, transport=t)
 
1055
        else:
 
1056
            self.build_tree(tree_names)
 
1057
 
 
1058
        self.assertEqual(
 
1059
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1060
        self.assertEqual(
 
1061
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1062
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1063
 
 
1064
        # Cloning the transport produces an equivalent listing
 
1065
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1066
 
 
1067
        if not t.is_readonly():
 
1068
            t.delete('c/d')
 
1069
            t.delete('b')
 
1070
        else:
 
1071
            os.unlink('c/d')
 
1072
            os.unlink('b')
 
1073
 
 
1074
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1075
        self.assertEqual(['e'], sorted_list('c', t))
 
1076
 
 
1077
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1078
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1079
        # 'a' is a file, list_dir should raise an error
 
1080
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1081
 
 
1082
    def test_list_dir_result_is_url_escaped(self):
 
1083
        t = self.get_transport()
 
1084
        if not t.listable():
 
1085
            raise TestSkipped("transport not listable")
 
1086
 
 
1087
        if not t.is_readonly():
 
1088
            self.build_tree(['a/', 'a/%'], transport=t)
 
1089
        else:
 
1090
            self.build_tree(['a/', 'a/%'])
 
1091
 
 
1092
        names = list(t.list_dir('a'))
 
1093
        self.assertEqual(['%25'], names)
 
1094
        self.assertIsInstance(names[0], str)
 
1095
 
 
1096
    def test_clone_preserve_info(self):
 
1097
        t1 = self.get_transport()
 
1098
        if not isinstance(t1, ConnectedTransport):
 
1099
            raise TestSkipped("not a connected transport")
 
1100
 
 
1101
        t2 = t1.clone('subdir')
 
1102
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1103
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1104
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1105
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1106
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1107
 
 
1108
    def test__reuse_for(self):
 
1109
        t = self.get_transport()
 
1110
        if not isinstance(t, ConnectedTransport):
 
1111
            raise TestSkipped("not a connected transport")
 
1112
 
 
1113
        def new_url(scheme=None, user=None, password=None,
 
1114
                    host=None, port=None, path=None):
 
1115
            """Build a new url from t.base changing only parts of it.
 
1116
 
 
1117
            Only the parameters different from None will be changed.
 
1118
            """
 
1119
            if scheme is None:
 
1120
                scheme = t._parsed_url.scheme
 
1121
            if user is None:
 
1122
                user = t._parsed_url.user
 
1123
            if password is None:
 
1124
                password = t._parsed_url.password
 
1125
            if user is None:
 
1126
                user = t._parsed_url.user
 
1127
            if host is None:
 
1128
                host = t._parsed_url.host
 
1129
            if port is None:
 
1130
                port = t._parsed_url.port
 
1131
            if path is None:
 
1132
                path = t._parsed_url.path
 
1133
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1134
 
 
1135
        if t._parsed_url.scheme == 'ftp':
 
1136
            scheme = 'sftp'
 
1137
        else:
 
1138
            scheme = 'ftp'
 
1139
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1140
        if t._parsed_url.user == 'me':
 
1141
            user = 'you'
 
1142
        else:
 
1143
            user = 'me'
 
1144
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1145
        # passwords are not taken into account because:
 
1146
        # - it makes no sense to have two different valid passwords for the
 
1147
        #   same user
 
1148
        # - _password in ConnectedTransport is intended to collect what the
 
1149
        #   user specified from the command-line and there are cases where the
 
1150
        #   new url can contain no password (if the url was built from an
 
1151
        #   existing transport.base for example)
 
1152
        # - password are considered part of the credentials provided at
 
1153
        #   connection creation time and as such may not be present in the url
 
1154
        #   (they may be typed by the user when prompted for example)
 
1155
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1156
        # We will not connect, we can use a invalid host
 
1157
        self.assertIsNot(t, t._reuse_for(
 
1158
            new_url(host=t._parsed_url.host + 'bar')))
 
1159
        if t._parsed_url.port == 1234:
 
1160
            port = 4321
 
1161
        else:
 
1162
            port = 1234
 
1163
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1164
        # No point in trying to reuse a transport for a local URL
 
1165
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1166
 
 
1167
    def test_connection_sharing(self):
 
1168
        t = self.get_transport()
 
1169
        if not isinstance(t, ConnectedTransport):
 
1170
            raise TestSkipped("not a connected transport")
 
1171
 
 
1172
        c = t.clone('subdir')
 
1173
        # Some transports will create the connection  only when needed
 
1174
        t.has('surely_not')  # Force connection
 
1175
        self.assertIs(t._get_connection(), c._get_connection())
 
1176
 
 
1177
        # Temporary failure, we need to create a new dummy connection
 
1178
        new_connection = None
 
1179
        t._set_connection(new_connection)
 
1180
        # Check that both transports use the same connection
 
1181
        self.assertIs(new_connection, t._get_connection())
 
1182
        self.assertIs(new_connection, c._get_connection())
 
1183
 
 
1184
    def test_reuse_connection_for_various_paths(self):
 
1185
        t = self.get_transport()
 
1186
        if not isinstance(t, ConnectedTransport):
 
1187
            raise TestSkipped("not a connected transport")
 
1188
 
 
1189
        t.has('surely_not')  # Force connection
 
1190
        self.assertIsNot(None, t._get_connection())
 
1191
 
 
1192
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1193
        self.assertIsNot(t, subdir)
 
1194
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1195
 
 
1196
        home = subdir._reuse_for(t.base + 'home')
 
1197
        self.assertIs(t._get_connection(), home._get_connection())
 
1198
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1199
 
 
1200
    def test_clone(self):
 
1201
        # TODO: Test that clone moves up and down the filesystem
 
1202
        t1 = self.get_transport()
 
1203
 
 
1204
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1205
 
 
1206
        self.assertTrue(t1.has('a'))
 
1207
        self.assertTrue(t1.has('b/c'))
 
1208
        self.assertFalse(t1.has('c'))
 
1209
 
 
1210
        t2 = t1.clone('b')
 
1211
        self.assertEqual(t1.base + 'b/', t2.base)
 
1212
 
 
1213
        self.assertTrue(t2.has('c'))
 
1214
        self.assertFalse(t2.has('a'))
 
1215
 
 
1216
        t3 = t2.clone('..')
 
1217
        self.assertTrue(t3.has('a'))
 
1218
        self.assertFalse(t3.has('c'))
 
1219
 
 
1220
        self.assertFalse(t1.has('b/d'))
 
1221
        self.assertFalse(t2.has('d'))
 
1222
        self.assertFalse(t3.has('b/d'))
 
1223
 
 
1224
        if t1.is_readonly():
 
1225
            self.build_tree_contents([('b/d', b'newfile\n')])
 
1226
        else:
 
1227
            t2.put_bytes('d', b'newfile\n')
 
1228
 
 
1229
        self.assertTrue(t1.has('b/d'))
 
1230
        self.assertTrue(t2.has('d'))
 
1231
        self.assertTrue(t3.has('b/d'))
 
1232
 
 
1233
    def test_clone_to_root(self):
 
1234
        orig_transport = self.get_transport()
 
1235
        # Repeatedly go up to a parent directory until we're at the root
 
1236
        # directory of this transport
 
1237
        root_transport = orig_transport
 
1238
        new_transport = root_transport.clone("..")
 
1239
        # as we are walking up directories, the path must be
 
1240
        # growing less, except at the top
 
1241
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1242
                        new_transport.base == root_transport.base)
 
1243
        while new_transport.base != root_transport.base:
 
1244
            root_transport = new_transport
 
1245
            new_transport = root_transport.clone("..")
 
1246
            # as we are walking up directories, the path must be
 
1247
            # growing less, except at the top
 
1248
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1249
                            new_transport.base == root_transport.base)
 
1250
 
 
1251
        # Cloning to "/" should take us to exactly the same location.
 
1252
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1253
        # the abspath of "/" from the original transport should be the same
 
1254
        # as the base at the root:
 
1255
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1256
 
 
1257
        # At the root, the URL must still end with / as its a directory
 
1258
        self.assertEqual(root_transport.base[-1], '/')
 
1259
 
 
1260
    def test_clone_from_root(self):
 
1261
        """At the root, cloning to a simple dir should just do string append."""
 
1262
        orig_transport = self.get_transport()
 
1263
        root_transport = orig_transport.clone('/')
 
1264
        self.assertEqual(root_transport.base + '.bzr/',
 
1265
                         root_transport.clone('.bzr').base)
 
1266
 
 
1267
    def test_base_url(self):
 
1268
        t = self.get_transport()
 
1269
        self.assertEqual('/', t.base[-1])
 
1270
 
 
1271
    def test_relpath(self):
 
1272
        t = self.get_transport()
 
1273
        self.assertEqual('', t.relpath(t.base))
 
1274
        # base ends with /
 
1275
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1276
        # subdirs which don't exist should still give relpaths.
 
1277
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1278
        # trailing slash should be the same.
 
1279
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1280
 
 
1281
    def test_relpath_at_root(self):
 
1282
        t = self.get_transport()
 
1283
        # clone all the way to the top
 
1284
        new_transport = t.clone('..')
 
1285
        while new_transport.base != t.base:
 
1286
            t = new_transport
 
1287
            new_transport = t.clone('..')
 
1288
        # we must be able to get a relpath below the root
 
1289
        self.assertEqual('', t.relpath(t.base))
 
1290
        # and a deeper one should work too
 
1291
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1292
 
 
1293
    def test_abspath(self):
 
1294
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1295
        # that have aliasing problems like symlinks should go in backend
 
1296
        # specific test cases.
 
1297
        transport = self.get_transport()
 
1298
 
 
1299
        self.assertEqual(transport.base + 'relpath',
 
1300
                         transport.abspath('relpath'))
 
1301
 
 
1302
        # This should work without raising an error.
 
1303
        transport.abspath("/")
 
1304
 
 
1305
        # the abspath of "/" and "/foo/.." should result in the same location
 
1306
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1307
 
 
1308
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1309
                         transport.abspath("/foo"))
 
1310
 
 
1311
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1312
    def test_win32_abspath(self):
 
1313
        # Note: we tried to set sys.platform='win32' so we could test on
 
1314
        # other platforms too, but then osutils does platform specific
 
1315
        # things at import time which defeated us...
 
1316
        if sys.platform != 'win32':
 
1317
            raise TestSkipped(
 
1318
                'Testing drive letters in abspath implemented only for win32')
 
1319
 
 
1320
        # smoke test for abspath on win32.
 
1321
        # a transport based on 'file:///' never fully qualifies the drive.
 
1322
        transport = _mod_transport.get_transport_from_url("file:///")
 
1323
        self.assertEqual(transport.abspath("/"), "file:///")
 
1324
 
 
1325
        # but a transport that starts with a drive spec must keep it.
 
1326
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1327
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1328
 
 
1329
    def test_local_abspath(self):
 
1330
        transport = self.get_transport()
 
1331
        try:
 
1332
            p = transport.local_abspath('.')
 
1333
        except (errors.NotLocalUrl, TransportNotPossible) as e:
 
1334
            # should be formattable
 
1335
            s = str(e)
 
1336
        else:
 
1337
            self.assertEqual(getcwd(), p)
 
1338
 
 
1339
    def test_abspath_at_root(self):
 
1340
        t = self.get_transport()
 
1341
        # clone all the way to the top
 
1342
        new_transport = t.clone('..')
 
1343
        while new_transport.base != t.base:
 
1344
            t = new_transport
 
1345
            new_transport = t.clone('..')
 
1346
        # we must be able to get a abspath of the root when we ask for
 
1347
        # t.abspath('..') - this due to our choice that clone('..')
 
1348
        # should return the root from the root, combined with the desire that
 
1349
        # the url from clone('..') and from abspath('..') should be the same.
 
1350
        self.assertEqual(t.base, t.abspath('..'))
 
1351
        # '' should give us the root
 
1352
        self.assertEqual(t.base, t.abspath(''))
 
1353
        # and a path should append to the url
 
1354
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1355
 
 
1356
    def test_iter_files_recursive(self):
 
1357
        transport = self.get_transport()
 
1358
        if not transport.listable():
 
1359
            self.assertRaises(TransportNotPossible,
 
1360
                              transport.iter_files_recursive)
 
1361
            return
 
1362
        self.build_tree(['isolated/',
 
1363
                         'isolated/dir/',
 
1364
                         'isolated/dir/foo',
 
1365
                         'isolated/dir/bar',
 
1366
                         'isolated/dir/b%25z',  # make sure quoting is correct
 
1367
                         'isolated/bar'],
 
1368
                        transport=transport)
 
1369
        paths = set(transport.iter_files_recursive())
 
1370
        # nb the directories are not converted
 
1371
        self.assertEqual(paths,
 
1372
                         {'isolated/dir/foo',
 
1373
                          'isolated/dir/bar',
 
1374
                          'isolated/dir/b%2525z',
 
1375
                          'isolated/bar'})
 
1376
        sub_transport = transport.clone('isolated')
 
1377
        paths = set(sub_transport.iter_files_recursive())
 
1378
        self.assertEqual(paths,
 
1379
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1380
 
 
1381
    def test_copy_tree(self):
 
1382
        # TODO: test file contents and permissions are preserved. This test was
 
1383
        # added just to ensure that quoting was handled correctly.
 
1384
        # -- David Allouche 2006-08-11
 
1385
        transport = self.get_transport()
 
1386
        if not transport.listable():
 
1387
            self.assertRaises(TransportNotPossible,
 
1388
                              transport.iter_files_recursive)
 
1389
            return
 
1390
        if transport.is_readonly():
 
1391
            return
 
1392
        self.build_tree(['from/',
 
1393
                         'from/dir/',
 
1394
                         'from/dir/foo',
 
1395
                         'from/dir/bar',
 
1396
                         'from/dir/b%25z',  # make sure quoting is correct
 
1397
                         'from/bar'],
 
1398
                        transport=transport)
 
1399
        transport.copy_tree('from', 'to')
 
1400
        paths = set(transport.iter_files_recursive())
 
1401
        self.assertEqual(paths,
 
1402
                         {'from/dir/foo',
 
1403
                          'from/dir/bar',
 
1404
                          'from/dir/b%2525z',
 
1405
                          'from/bar',
 
1406
                          'to/dir/foo',
 
1407
                          'to/dir/bar',
 
1408
                          'to/dir/b%2525z',
 
1409
                          'to/bar', })
 
1410
 
 
1411
    def test_copy_tree_to_transport(self):
 
1412
        transport = self.get_transport()
 
1413
        if not transport.listable():
 
1414
            self.assertRaises(TransportNotPossible,
 
1415
                              transport.iter_files_recursive)
 
1416
            return
 
1417
        if transport.is_readonly():
 
1418
            return
 
1419
        self.build_tree(['from/',
 
1420
                         'from/dir/',
 
1421
                         'from/dir/foo',
 
1422
                         'from/dir/bar',
 
1423
                         'from/dir/b%25z',  # make sure quoting is correct
 
1424
                         'from/bar'],
 
1425
                        transport=transport)
 
1426
        from_transport = transport.clone('from')
 
1427
        to_transport = transport.clone('to')
 
1428
        to_transport.ensure_base()
 
1429
        from_transport.copy_tree_to_transport(to_transport)
 
1430
        paths = set(transport.iter_files_recursive())
 
1431
        self.assertEqual(paths,
 
1432
                         {'from/dir/foo',
 
1433
                          'from/dir/bar',
 
1434
                          'from/dir/b%2525z',
 
1435
                          'from/bar',
 
1436
                          'to/dir/foo',
 
1437
                          'to/dir/bar',
 
1438
                          'to/dir/b%2525z',
 
1439
                          'to/bar', })
 
1440
 
 
1441
    def test_unicode_paths(self):
 
1442
        """Test that we can read/write files with Unicode names."""
 
1443
        t = self.get_transport()
 
1444
 
 
1445
        # With FAT32 and certain encodings on win32
 
1446
        # '\xe5' and '\xe4' actually map to the same file
 
1447
        # adding a suffix kicks in the 'preserving but insensitive'
 
1448
        # route, and maintains the right files
 
1449
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1450
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1451
                 u'\u017d',  # Z with umlat iso-8859-2
 
1452
                 u'\u062c',  # Arabic j
 
1453
                 u'\u0410',  # Russian A
 
1454
                 u'\u65e5',  # Kanji person
 
1455
                 ]
 
1456
 
 
1457
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1458
        if no_unicode_support:
 
1459
            self.knownFailure("test server cannot handle unicode paths")
 
1460
 
 
1461
        try:
 
1462
            self.build_tree(files, transport=t, line_endings='binary')
 
1463
        except UnicodeError:
 
1464
            raise TestSkipped(
 
1465
                "cannot handle unicode paths in current encoding")
 
1466
 
 
1467
        # A plain unicode string is not a valid url
 
1468
        for fname in files:
 
1469
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
 
1470
 
 
1471
        for fname in files:
 
1472
            fname_utf8 = fname.encode('utf-8')
 
1473
            contents = b'contents of %s\n' % (fname_utf8,)
 
1474
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1475
 
 
1476
    def test_connect_twice_is_same_content(self):
 
1477
        # check that our server (whatever it is) is accessible reliably
 
1478
        # via get_transport and multiple connections share content.
 
1479
        transport = self.get_transport()
 
1480
        if transport.is_readonly():
 
1481
            return
 
1482
        transport.put_bytes('foo', b'bar')
 
1483
        transport3 = self.get_transport()
 
1484
        self.check_transport_contents(b'bar', transport3, 'foo')
 
1485
 
 
1486
        # now opening at a relative url should give use a sane result:
 
1487
        transport.mkdir('newdir')
 
1488
        transport5 = self.get_transport('newdir')
 
1489
        transport6 = transport5.clone('..')
 
1490
        self.check_transport_contents(b'bar', transport6, 'foo')
 
1491
 
 
1492
    def test_lock_write(self):
 
1493
        """Test transport-level write locks.
 
1494
 
 
1495
        These are deprecated and transports may decline to support them.
 
1496
        """
 
1497
        transport = self.get_transport()
 
1498
        if transport.is_readonly():
 
1499
            self.assertRaises(TransportNotPossible,
 
1500
                              transport.lock_write, 'foo')
 
1501
            return
 
1502
        transport.put_bytes('lock', b'')
 
1503
        try:
 
1504
            lock = transport.lock_write('lock')
 
1505
        except TransportNotPossible:
 
1506
            return
 
1507
        # TODO make this consistent on all platforms:
 
1508
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1509
        lock.unlock()
 
1510
 
 
1511
    def test_lock_read(self):
 
1512
        """Test transport-level read locks.
 
1513
 
 
1514
        These are deprecated and transports may decline to support them.
 
1515
        """
 
1516
        transport = self.get_transport()
 
1517
        if transport.is_readonly():
 
1518
            open('lock', 'w').close()
 
1519
        else:
 
1520
            transport.put_bytes('lock', b'')
 
1521
        try:
 
1522
            lock = transport.lock_read('lock')
 
1523
        except TransportNotPossible:
 
1524
            return
 
1525
        # TODO make this consistent on all platforms:
 
1526
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1527
        lock.unlock()
 
1528
 
 
1529
    def test_readv(self):
 
1530
        transport = self.get_transport()
 
1531
        if transport.is_readonly():
 
1532
            with open('a', 'w') as f:
 
1533
                f.write('0123456789')
 
1534
        else:
 
1535
            transport.put_bytes('a', b'0123456789')
 
1536
 
 
1537
        d = list(transport.readv('a', ((0, 1),)))
 
1538
        self.assertEqual(d[0], (0, b'0'))
 
1539
 
 
1540
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1541
        self.assertEqual(d[0], (0, b'0'))
 
1542
        self.assertEqual(d[1], (1, b'1'))
 
1543
        self.assertEqual(d[2], (3, b'34'))
 
1544
        self.assertEqual(d[3], (9, b'9'))
 
1545
 
 
1546
    def test_readv_out_of_order(self):
 
1547
        transport = self.get_transport()
 
1548
        if transport.is_readonly():
 
1549
            with open('a', 'w') as f:
 
1550
                f.write('0123456789')
 
1551
        else:
 
1552
            transport.put_bytes('a', b'01234567890')
 
1553
 
 
1554
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1555
        self.assertEqual(d[0], (1, b'1'))
 
1556
        self.assertEqual(d[1], (9, b'9'))
 
1557
        self.assertEqual(d[2], (0, b'0'))
 
1558
        self.assertEqual(d[3], (3, b'34'))
 
1559
 
 
1560
    def test_readv_with_adjust_for_latency(self):
 
1561
        transport = self.get_transport()
 
1562
        # the adjust for latency flag expands the data region returned
 
1563
        # according to a per-transport heuristic, so testing is a little
 
1564
        # tricky as we need more data than the largest combining that our
 
1565
        # transports do. To accomodate this we generate random data and cross
 
1566
        # reference the returned data with the random data. To avoid doing
 
1567
        # multiple large random byte look ups we do several tests on the same
 
1568
        # backing data.
 
1569
        content = osutils.rand_bytes(200 * 1024)
 
1570
        content_size = len(content)
 
1571
        if transport.is_readonly():
 
1572
            self.build_tree_contents([('a', content)])
 
1573
        else:
 
1574
            transport.put_bytes('a', content)
 
1575
 
 
1576
        def check_result_data(result_vector):
 
1577
            for item in result_vector:
 
1578
                data_len = len(item[1])
 
1579
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1580
 
 
1581
        # start corner case
 
1582
        result = list(transport.readv('a', ((0, 30),),
 
1583
                                      adjust_for_latency=True, upper_limit=content_size))
 
1584
        # we expect 1 result, from 0, to something > 30
 
1585
        self.assertEqual(1, len(result))
 
1586
        self.assertEqual(0, result[0][0])
 
1587
        self.assertTrue(len(result[0][1]) >= 30)
 
1588
        check_result_data(result)
 
1589
        # end of file corner case
 
1590
        result = list(transport.readv('a', ((204700, 100),),
 
1591
                                      adjust_for_latency=True, upper_limit=content_size))
 
1592
        # we expect 1 result, from 204800- its length, to the end
 
1593
        self.assertEqual(1, len(result))
 
1594
        data_len = len(result[0][1])
 
1595
        self.assertEqual(204800 - data_len, result[0][0])
 
1596
        self.assertTrue(data_len >= 100)
 
1597
        check_result_data(result)
 
1598
        # out of order ranges are made in order
 
1599
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1600
                                      adjust_for_latency=True, upper_limit=content_size))
 
1601
        # we expect 2 results, in order, start and end.
 
1602
        self.assertEqual(2, len(result))
 
1603
        # start
 
1604
        data_len = len(result[0][1])
 
1605
        self.assertEqual(0, result[0][0])
 
1606
        self.assertTrue(data_len >= 30)
 
1607
        # end
 
1608
        data_len = len(result[1][1])
 
1609
        self.assertEqual(204800 - data_len, result[1][0])
 
1610
        self.assertTrue(data_len >= 100)
 
1611
        check_result_data(result)
 
1612
        # close ranges get combined (even if out of order)
 
1613
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
 
1614
            result = list(transport.readv('a', request_vector,
 
1615
                                          adjust_for_latency=True, upper_limit=content_size))
 
1616
            self.assertEqual(1, len(result))
 
1617
            data_len = len(result[0][1])
 
1618
            # minimum length is from 400 to 1034 - 634
 
1619
            self.assertTrue(data_len >= 634)
 
1620
            # must contain the region 400 to 1034
 
1621
            self.assertTrue(result[0][0] <= 400)
 
1622
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1623
            check_result_data(result)
 
1624
 
 
1625
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1626
        transport = self.get_transport()
 
1627
        # test from observed failure case.
 
1628
        if transport.is_readonly():
 
1629
            with open('a', 'w') as f:
 
1630
                f.write('a' * 1024 * 1024)
 
1631
        else:
 
1632
            transport.put_bytes('a', b'a' * 1024 * 1024)
 
1633
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1634
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1635
                         (465373, 800), (947422, 800)]
 
1636
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1637
        found_items = [False] * 9
 
1638
        for pos, (start, length) in enumerate(broken_vector):
 
1639
            # check the range is covered by the result
 
1640
            for offset, data in results:
 
1641
                if offset <= start and start + length <= offset + len(data):
 
1642
                    found_items[pos] = True
 
1643
        self.assertEqual([True] * 9, found_items)
 
1644
 
 
1645
    def test_get_with_open_write_stream_sees_all_content(self):
 
1646
        t = self.get_transport()
 
1647
        if t.is_readonly():
 
1648
            return
 
1649
        with t.open_write_stream('foo') as handle:
 
1650
            handle.write(b'bcd')
 
1651
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1652
                t.readv('foo', ((0, 1), (2, 1)))))
 
1653
 
 
1654
    def test_get_smart_medium(self):
 
1655
        """All transports must either give a smart medium, or know they can't.
 
1656
        """
 
1657
        transport = self.get_transport()
 
1658
        try:
 
1659
            client_medium = transport.get_smart_medium()
 
1660
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1661
        except errors.NoSmartMedium:
 
1662
            # as long as we got it we're fine
 
1663
            pass
 
1664
 
 
1665
    def test_readv_short_read(self):
 
1666
        transport = self.get_transport()
 
1667
        if transport.is_readonly():
 
1668
            with open('a', 'w') as f:
 
1669
                f.write('0123456789')
 
1670
        else:
 
1671
            transport.put_bytes('a', b'01234567890')
 
1672
 
 
1673
        # This is intentionally reading off the end of the file
 
1674
        # since we are sure that it cannot get there
 
1675
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1676
                               # Can be raised by paramiko
 
1677
                               AssertionError),
 
1678
                              transport.readv, 'a', [(1, 1), (8, 10)])
 
1679
 
 
1680
        # This is trying to seek past the end of the file, it should
 
1681
        # also raise a special error
 
1682
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1683
                              transport.readv, 'a', [(12, 2)])
 
1684
 
 
1685
    def test_no_segment_parameters(self):
 
1686
        """Segment parameters should be stripped and stored in
 
1687
        transport.segment_parameters."""
 
1688
        transport = self.get_transport("foo")
 
1689
        self.assertEqual({}, transport.get_segment_parameters())
 
1690
 
 
1691
    def test_segment_parameters(self):
 
1692
        """Segment parameters should be stripped and stored in
 
1693
        transport.get_segment_parameters()."""
 
1694
        base_url = self._server.get_url()
 
1695
        parameters = {"key1": "val1", "key2": "val2"}
 
1696
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1697
        transport = _mod_transport.get_transport_from_url(url)
 
1698
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1699
 
 
1700
    def test_set_segment_parameters(self):
 
1701
        """Segment parameters can be set and show up in base."""
 
1702
        transport = self.get_transport("foo")
 
1703
        orig_base = transport.base
 
1704
        transport.set_segment_parameter("arm", "board")
 
1705
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1706
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1707
        transport.set_segment_parameter("arm", None)
 
1708
        transport.set_segment_parameter("nonexistant", None)
 
1709
        self.assertEqual({}, transport.get_segment_parameters())
 
1710
        self.assertEqual(orig_base, transport.base)
 
1711
 
 
1712
    def test_stat_symlink(self):
 
1713
        # if a transport points directly to a symlink (and supports symlinks
 
1714
        # at all) you can tell this.  helps with bug 32669.
 
1715
        t = self.get_transport()
 
1716
        try:
 
1717
            t.symlink('target', 'link')
 
1718
        except TransportNotPossible:
 
1719
            raise TestSkipped("symlinks not supported")
 
1720
        t2 = t.clone('link')
 
1721
        st = t2.stat('')
 
1722
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1723
 
 
1724
    def test_abspath_url_unquote_unreserved(self):
 
1725
        """URLs from abspath should have unreserved characters unquoted
 
1726
 
 
1727
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1728
        """
 
1729
        t = self.get_transport()
 
1730
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1731
        self.assertEqual(t.base + "-.09AZ_az~",
 
1732
                         t.abspath(needlessly_escaped_dir))
 
1733
 
 
1734
    def test_clone_url_unquote_unreserved(self):
 
1735
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1736
 
 
1737
        Cloned transports should be prefix comparable for things like the
 
1738
        isolation checking of tests, see lp:842223 for more.
 
1739
        """
 
1740
        t1 = self.get_transport()
 
1741
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1742
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1743
        t2 = t1.clone(needlessly_escaped_dir)
 
1744
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1745
 
 
1746
    def test_hook_post_connection_one(self):
 
1747
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1748
        log = []
 
1749
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1750
        t = self.get_transport()
 
1751
        self.assertEqual([], log)
 
1752
        t.has("non-existant")
 
1753
        if isinstance(t, RemoteTransport):
 
1754
            self.assertEqual([t.get_smart_medium()], log)
 
1755
        elif isinstance(t, ConnectedTransport):
 
1756
            self.assertEqual([t], log)
 
1757
        else:
 
1758
            self.assertEqual([], log)
 
1759
 
 
1760
    def test_hook_post_connection_multi(self):
 
1761
        """Fire post_connect hook once per unshared underlying connection"""
 
1762
        log = []
 
1763
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1764
        t1 = self.get_transport()
 
1765
        t2 = t1.clone(".")
 
1766
        t3 = self.get_transport()
 
1767
        self.assertEqual([], log)
 
1768
        t1.has("x")
 
1769
        t2.has("x")
 
1770
        t3.has("x")
 
1771
        if isinstance(t1, RemoteTransport):
 
1772
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1773
        elif isinstance(t1, ConnectedTransport):
 
1774
            self.assertEqual([t1, t3], log)
 
1775
        else:
 
1776
            self.assertEqual([], log)