/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/per_transport.py

  • Committer: Jelmer Vernooij
  • Date: 2020-02-07 02:14:30 UTC
  • mto: This revision was merged to the branch mainline in revision 7492.
  • Revision ID: jelmer@jelmer.uk-20200207021430-m49iq3x4x8xlib6x
Drop python2 support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011, 2015, 2016 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for Transport implementations.
 
18
 
 
19
Transport implementations tested here are supplied by
 
20
TransportTestProviderAdapter.
 
21
"""
 
22
 
 
23
from io import BytesIO
 
24
import os
 
25
import stat
 
26
import sys
 
27
 
 
28
from .. import (
 
29
    errors,
 
30
    osutils,
 
31
    pyutils,
 
32
    tests,
 
33
    transport as _mod_transport,
 
34
    urlutils,
 
35
    )
 
36
from ..errors import (ConnectionError,
 
37
                      FileExists,
 
38
                      NoSuchFile,
 
39
                      PathError,
 
40
                      TransportNotPossible,
 
41
                      )
 
42
from ..osutils import getcwd
 
43
from ..bzr.smart import medium
 
44
from . import (
 
45
    TestSkipped,
 
46
    TestNotApplicable,
 
47
    multiply_tests,
 
48
    )
 
49
from . import test_server
 
50
from .test_transport import TestTransportImplementation
 
51
from ..transport import (
 
52
    ConnectedTransport,
 
53
    Transport,
 
54
    _get_transport_modules,
 
55
    )
 
56
from ..transport.memory import MemoryTransport
 
57
from ..transport.remote import RemoteTransport
 
58
 
 
59
 
 
60
def get_transport_test_permutations(module):
 
61
    """Get the permutations module wants to have tested."""
 
62
    if getattr(module, 'get_test_permutations', None) is None:
 
63
        raise AssertionError(
 
64
            "transport module %s doesn't provide get_test_permutations()"
 
65
            % module.__name__)
 
66
        return []
 
67
    return module.get_test_permutations()
 
68
 
 
69
 
 
70
def transport_test_permutations():
 
71
    """Return a list of the klass, server_factory pairs to test."""
 
72
    result = []
 
73
    for module in _get_transport_modules():
 
74
        try:
 
75
            permutations = get_transport_test_permutations(
 
76
                pyutils.get_named_object(module))
 
77
            for (klass, server_factory) in permutations:
 
78
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
79
                            {"transport_class": klass,
 
80
                             "transport_server": server_factory})
 
81
                result.append(scenario)
 
82
        except errors.DependencyNotPresent as e:
 
83
            # Continue even if a dependency prevents us
 
84
            # from adding this test
 
85
            pass
 
86
    return result
 
87
 
 
88
 
 
89
def load_tests(loader, standard_tests, pattern):
 
90
    """Multiply tests for tranport implementations."""
 
91
    result = loader.suiteClass()
 
92
    scenarios = transport_test_permutations()
 
93
    return multiply_tests(standard_tests, scenarios, result)
 
94
 
 
95
 
 
96
class TransportTests(TestTransportImplementation):
 
97
 
 
98
    def setUp(self):
 
99
        super(TransportTests, self).setUp()
 
100
        self.overrideEnv('BRZ_NO_SMART_VFS', None)
 
101
 
 
102
    def check_transport_contents(self, content, transport, relpath):
 
103
        """Check that transport.get_bytes(relpath) == content."""
 
104
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
105
 
 
106
    def test_ensure_base_missing(self):
 
107
        """.ensure_base() should create the directory if it doesn't exist"""
 
108
        t = self.get_transport()
 
109
        t_a = t.clone('a')
 
110
        if t_a.is_readonly():
 
111
            self.assertRaises(TransportNotPossible,
 
112
                              t_a.ensure_base)
 
113
            return
 
114
        self.assertTrue(t_a.ensure_base())
 
115
        self.assertTrue(t.has('a'))
 
116
 
 
117
    def test_ensure_base_exists(self):
 
118
        """.ensure_base() should just be happy if it already exists"""
 
119
        t = self.get_transport()
 
120
        if t.is_readonly():
 
121
            return
 
122
 
 
123
        t.mkdir('a')
 
124
        t_a = t.clone('a')
 
125
        # ensure_base returns False if it didn't create the base
 
126
        self.assertFalse(t_a.ensure_base())
 
127
 
 
128
    def test_ensure_base_missing_parent(self):
 
129
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
130
        t = self.get_transport()
 
131
        if t.is_readonly():
 
132
            return
 
133
 
 
134
        t_a = t.clone('a')
 
135
        t_b = t_a.clone('b')
 
136
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
137
 
 
138
    def test_external_url(self):
 
139
        """.external_url either works or raises InProcessTransport."""
 
140
        t = self.get_transport()
 
141
        try:
 
142
            t.external_url()
 
143
        except errors.InProcessTransport:
 
144
            pass
 
145
 
 
146
    def test_has(self):
 
147
        t = self.get_transport()
 
148
 
 
149
        files = ['a', 'b', 'e', 'g', '%']
 
150
        self.build_tree(files, transport=t)
 
151
        self.assertEqual(True, t.has('a'))
 
152
        self.assertEqual(False, t.has('c'))
 
153
        self.assertEqual(True, t.has(urlutils.escape('%')))
 
154
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
155
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
156
                                           urlutils.escape('%%')]))
 
157
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
158
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
 
159
 
 
160
    def test_has_root_works(self):
 
161
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
162
            raise TestNotApplicable(
 
163
                "SmartTCPServer_for_testing intentionally does not allow "
 
164
                "access to /.")
 
165
        current_transport = self.get_transport()
 
166
        self.assertTrue(current_transport.has('/'))
 
167
        root = current_transport.clone('/')
 
168
        self.assertTrue(root.has(''))
 
169
 
 
170
    def test_get(self):
 
171
        t = self.get_transport()
 
172
 
 
173
        files = ['a']
 
174
        content = b'contents of a\n'
 
175
        self.build_tree(['a'], transport=t, line_endings='binary')
 
176
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
177
        f = t.get('a')
 
178
        self.assertEqual(content, f.read())
 
179
 
 
180
    def test_get_unknown_file(self):
 
181
        t = self.get_transport()
 
182
        files = ['a', 'b']
 
183
        contents = [b'contents of a\n',
 
184
                    b'contents of b\n',
 
185
                    ]
 
186
        self.build_tree(files, transport=t, line_endings='binary')
 
187
        self.assertRaises(NoSuchFile, t.get, 'c')
 
188
 
 
189
        def iterate_and_close(func, *args):
 
190
            for f in func(*args):
 
191
                # We call f.read() here because things like paramiko actually
 
192
                # spawn a thread to prefetch the content, which we want to
 
193
                # consume before we close the handle.
 
194
                content = f.read()
 
195
                f.close()
 
196
 
 
197
    def test_get_directory_read_gives_ReadError(self):
 
198
        """consistent errors for read() on a file returned by get()."""
 
199
        t = self.get_transport()
 
200
        if t.is_readonly():
 
201
            self.build_tree(['a directory/'])
 
202
        else:
 
203
            t.mkdir('a%20directory')
 
204
        # getting the file must either work or fail with a PathError
 
205
        try:
 
206
            a_file = t.get('a%20directory')
 
207
        except (errors.PathError, errors.RedirectRequested):
 
208
            # early failure return immediately.
 
209
            return
 
210
        # having got a file, read() must either work (i.e. http reading a dir
 
211
        # listing) or fail with ReadError
 
212
        try:
 
213
            a_file.read()
 
214
        except errors.ReadError:
 
215
            pass
 
216
 
 
217
    def test_get_bytes(self):
 
218
        t = self.get_transport()
 
219
 
 
220
        files = ['a', 'b', 'e', 'g']
 
221
        contents = [b'contents of a\n',
 
222
                    b'contents of b\n',
 
223
                    b'contents of e\n',
 
224
                    b'contents of g\n',
 
225
                    ]
 
226
        self.build_tree(files, transport=t, line_endings='binary')
 
227
        self.check_transport_contents(b'contents of a\n', t, 'a')
 
228
 
 
229
        for content, fname in zip(contents, files):
 
230
            self.assertEqual(content, t.get_bytes(fname))
 
231
 
 
232
    def test_get_bytes_unknown_file(self):
 
233
        t = self.get_transport()
 
234
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
235
 
 
236
    def test_get_with_open_write_stream_sees_all_content(self):
 
237
        t = self.get_transport()
 
238
        if t.is_readonly():
 
239
            return
 
240
        with t.open_write_stream('foo') as handle:
 
241
            handle.write(b'b')
 
242
            self.assertEqual(b'b', t.get_bytes('foo'))
 
243
 
 
244
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
245
        t = self.get_transport()
 
246
        if t.is_readonly():
 
247
            return
 
248
        with t.open_write_stream('foo') as handle:
 
249
            handle.write(b'b')
 
250
            self.assertEqual(b'b', t.get_bytes('foo'))
 
251
            with t.get('foo') as f:
 
252
                self.assertEqual(b'b', f.read())
 
253
 
 
254
    def test_put_bytes(self):
 
255
        t = self.get_transport()
 
256
 
 
257
        if t.is_readonly():
 
258
            self.assertRaises(TransportNotPossible,
 
259
                              t.put_bytes, 'a', b'some text for a\n')
 
260
            return
 
261
 
 
262
        t.put_bytes('a', b'some text for a\n')
 
263
        self.assertTrue(t.has('a'))
 
264
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
265
 
 
266
        # The contents should be overwritten
 
267
        t.put_bytes('a', b'new text for a\n')
 
268
        self.check_transport_contents(b'new text for a\n', t, 'a')
 
269
 
 
270
        self.assertRaises(NoSuchFile,
 
271
                          t.put_bytes, 'path/doesnt/exist/c', b'contents')
 
272
 
 
273
    def test_put_bytes_non_atomic(self):
 
274
        t = self.get_transport()
 
275
 
 
276
        if t.is_readonly():
 
277
            self.assertRaises(TransportNotPossible,
 
278
                              t.put_bytes_non_atomic, 'a', b'some text for a\n')
 
279
            return
 
280
 
 
281
        self.assertFalse(t.has('a'))
 
282
        t.put_bytes_non_atomic('a', b'some text for a\n')
 
283
        self.assertTrue(t.has('a'))
 
284
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
285
        # Put also replaces contents
 
286
        t.put_bytes_non_atomic('a', b'new\ncontents for\na\n')
 
287
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
288
 
 
289
        # Make sure we can create another file
 
290
        t.put_bytes_non_atomic('d', b'contents for\nd\n')
 
291
        # And overwrite 'a' with empty contents
 
292
        t.put_bytes_non_atomic('a', b'')
 
293
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
294
        self.check_transport_contents(b'', t, 'a')
 
295
 
 
296
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
297
                          b'contents\n')
 
298
        # Now test the create_parent flag
 
299
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
300
                          b'contents\n')
 
301
        self.assertFalse(t.has('dir/a'))
 
302
        t.put_bytes_non_atomic('dir/a', b'contents for dir/a\n',
 
303
                               create_parent_dir=True)
 
304
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
305
 
 
306
        # But we still get NoSuchFile if we can't make the parent dir
 
307
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
308
                          b'contents\n',
 
309
                          create_parent_dir=True)
 
310
 
 
311
    def test_put_bytes_permissions(self):
 
312
        t = self.get_transport()
 
313
 
 
314
        if t.is_readonly():
 
315
            return
 
316
        if not t._can_roundtrip_unix_modebits():
 
317
            # Can't roundtrip, so no need to run this test
 
318
            return
 
319
        t.put_bytes('mode644', b'test text\n', mode=0o644)
 
320
        self.assertTransportMode(t, 'mode644', 0o644)
 
321
        t.put_bytes('mode666', b'test text\n', mode=0o666)
 
322
        self.assertTransportMode(t, 'mode666', 0o666)
 
323
        t.put_bytes('mode600', b'test text\n', mode=0o600)
 
324
        self.assertTransportMode(t, 'mode600', 0o600)
 
325
        # Yes, you can put_bytes a file such that it becomes readonly
 
326
        t.put_bytes('mode400', b'test text\n', mode=0o400)
 
327
        self.assertTransportMode(t, 'mode400', 0o400)
 
328
 
 
329
        # The default permissions should be based on the current umask
 
330
        umask = osutils.get_umask()
 
331
        t.put_bytes('nomode', b'test text\n', mode=None)
 
332
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
333
 
 
334
    def test_put_bytes_non_atomic_permissions(self):
 
335
        t = self.get_transport()
 
336
 
 
337
        if t.is_readonly():
 
338
            return
 
339
        if not t._can_roundtrip_unix_modebits():
 
340
            # Can't roundtrip, so no need to run this test
 
341
            return
 
342
        t.put_bytes_non_atomic('mode644', b'test text\n', mode=0o644)
 
343
        self.assertTransportMode(t, 'mode644', 0o644)
 
344
        t.put_bytes_non_atomic('mode666', b'test text\n', mode=0o666)
 
345
        self.assertTransportMode(t, 'mode666', 0o666)
 
346
        t.put_bytes_non_atomic('mode600', b'test text\n', mode=0o600)
 
347
        self.assertTransportMode(t, 'mode600', 0o600)
 
348
        t.put_bytes_non_atomic('mode400', b'test text\n', mode=0o400)
 
349
        self.assertTransportMode(t, 'mode400', 0o400)
 
350
 
 
351
        # The default permissions should be based on the current umask
 
352
        umask = osutils.get_umask()
 
353
        t.put_bytes_non_atomic('nomode', b'test text\n', mode=None)
 
354
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
355
 
 
356
        # We should also be able to set the mode for a parent directory
 
357
        # when it is created
 
358
        t.put_bytes_non_atomic('dir700/mode664', b'test text\n', mode=0o664,
 
359
                               dir_mode=0o700, create_parent_dir=True)
 
360
        self.assertTransportMode(t, 'dir700', 0o700)
 
361
        t.put_bytes_non_atomic('dir770/mode664', b'test text\n', mode=0o664,
 
362
                               dir_mode=0o770, create_parent_dir=True)
 
363
        self.assertTransportMode(t, 'dir770', 0o770)
 
364
        t.put_bytes_non_atomic('dir777/mode664', b'test text\n', mode=0o664,
 
365
                               dir_mode=0o777, create_parent_dir=True)
 
366
        self.assertTransportMode(t, 'dir777', 0o777)
 
367
 
 
368
    def test_put_file(self):
 
369
        t = self.get_transport()
 
370
 
 
371
        if t.is_readonly():
 
372
            self.assertRaises(TransportNotPossible,
 
373
                              t.put_file, 'a', BytesIO(b'some text for a\n'))
 
374
            return
 
375
 
 
376
        result = t.put_file('a', BytesIO(b'some text for a\n'))
 
377
        # put_file returns the length of the data written
 
378
        self.assertEqual(16, result)
 
379
        self.assertTrue(t.has('a'))
 
380
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
381
        # Put also replaces contents
 
382
        result = t.put_file('a', BytesIO(b'new\ncontents for\na\n'))
 
383
        self.assertEqual(19, result)
 
384
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
385
        self.assertRaises(NoSuchFile,
 
386
                          t.put_file, 'path/doesnt/exist/c',
 
387
                          BytesIO(b'contents'))
 
388
 
 
389
    def test_put_file_non_atomic(self):
 
390
        t = self.get_transport()
 
391
 
 
392
        if t.is_readonly():
 
393
            self.assertRaises(TransportNotPossible,
 
394
                              t.put_file_non_atomic, 'a', BytesIO(b'some text for a\n'))
 
395
            return
 
396
 
 
397
        self.assertFalse(t.has('a'))
 
398
        t.put_file_non_atomic('a', BytesIO(b'some text for a\n'))
 
399
        self.assertTrue(t.has('a'))
 
400
        self.check_transport_contents(b'some text for a\n', t, 'a')
 
401
        # Put also replaces contents
 
402
        t.put_file_non_atomic('a', BytesIO(b'new\ncontents for\na\n'))
 
403
        self.check_transport_contents(b'new\ncontents for\na\n', t, 'a')
 
404
 
 
405
        # Make sure we can create another file
 
406
        t.put_file_non_atomic('d', BytesIO(b'contents for\nd\n'))
 
407
        # And overwrite 'a' with empty contents
 
408
        t.put_file_non_atomic('a', BytesIO(b''))
 
409
        self.check_transport_contents(b'contents for\nd\n', t, 'd')
 
410
        self.check_transport_contents(b'', t, 'a')
 
411
 
 
412
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
413
                          BytesIO(b'contents\n'))
 
414
        # Now test the create_parent flag
 
415
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
416
                          BytesIO(b'contents\n'))
 
417
        self.assertFalse(t.has('dir/a'))
 
418
        t.put_file_non_atomic('dir/a', BytesIO(b'contents for dir/a\n'),
 
419
                              create_parent_dir=True)
 
420
        self.check_transport_contents(b'contents for dir/a\n', t, 'dir/a')
 
421
 
 
422
        # But we still get NoSuchFile if we can't make the parent dir
 
423
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
424
                          BytesIO(b'contents\n'),
 
425
                          create_parent_dir=True)
 
426
 
 
427
    def test_put_file_permissions(self):
 
428
 
 
429
        t = self.get_transport()
 
430
 
 
431
        if t.is_readonly():
 
432
            return
 
433
        if not t._can_roundtrip_unix_modebits():
 
434
            # Can't roundtrip, so no need to run this test
 
435
            return
 
436
        t.put_file('mode644', BytesIO(b'test text\n'), mode=0o644)
 
437
        self.assertTransportMode(t, 'mode644', 0o644)
 
438
        t.put_file('mode666', BytesIO(b'test text\n'), mode=0o666)
 
439
        self.assertTransportMode(t, 'mode666', 0o666)
 
440
        t.put_file('mode600', BytesIO(b'test text\n'), mode=0o600)
 
441
        self.assertTransportMode(t, 'mode600', 0o600)
 
442
        # Yes, you can put a file such that it becomes readonly
 
443
        t.put_file('mode400', BytesIO(b'test text\n'), mode=0o400)
 
444
        self.assertTransportMode(t, 'mode400', 0o400)
 
445
        # The default permissions should be based on the current umask
 
446
        umask = osutils.get_umask()
 
447
        t.put_file('nomode', BytesIO(b'test text\n'), mode=None)
 
448
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
449
 
 
450
    def test_put_file_non_atomic_permissions(self):
 
451
        t = self.get_transport()
 
452
 
 
453
        if t.is_readonly():
 
454
            return
 
455
        if not t._can_roundtrip_unix_modebits():
 
456
            # Can't roundtrip, so no need to run this test
 
457
            return
 
458
        t.put_file_non_atomic('mode644', BytesIO(b'test text\n'), mode=0o644)
 
459
        self.assertTransportMode(t, 'mode644', 0o644)
 
460
        t.put_file_non_atomic('mode666', BytesIO(b'test text\n'), mode=0o666)
 
461
        self.assertTransportMode(t, 'mode666', 0o666)
 
462
        t.put_file_non_atomic('mode600', BytesIO(b'test text\n'), mode=0o600)
 
463
        self.assertTransportMode(t, 'mode600', 0o600)
 
464
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
465
        t.put_file_non_atomic('mode400', BytesIO(b'test text\n'), mode=0o400)
 
466
        self.assertTransportMode(t, 'mode400', 0o400)
 
467
 
 
468
        # The default permissions should be based on the current umask
 
469
        umask = osutils.get_umask()
 
470
        t.put_file_non_atomic('nomode', BytesIO(b'test text\n'), mode=None)
 
471
        self.assertTransportMode(t, 'nomode', 0o666 & ~umask)
 
472
 
 
473
        # We should also be able to set the mode for a parent directory
 
474
        # when it is created
 
475
        sio = BytesIO()
 
476
        t.put_file_non_atomic('dir700/mode664', sio, mode=0o664,
 
477
                              dir_mode=0o700, create_parent_dir=True)
 
478
        self.assertTransportMode(t, 'dir700', 0o700)
 
479
        t.put_file_non_atomic('dir770/mode664', sio, mode=0o664,
 
480
                              dir_mode=0o770, create_parent_dir=True)
 
481
        self.assertTransportMode(t, 'dir770', 0o770)
 
482
        t.put_file_non_atomic('dir777/mode664', sio, mode=0o664,
 
483
                              dir_mode=0o777, create_parent_dir=True)
 
484
        self.assertTransportMode(t, 'dir777', 0o777)
 
485
 
 
486
    def test_put_bytes_unicode(self):
 
487
        t = self.get_transport()
 
488
        if t.is_readonly():
 
489
            return
 
490
        unicode_string = u'\u1234'
 
491
        self.assertRaises(TypeError, t.put_bytes, 'foo', unicode_string)
 
492
 
 
493
    def test_mkdir(self):
 
494
        t = self.get_transport()
 
495
 
 
496
        if t.is_readonly():
 
497
            # cannot mkdir on readonly transports. We're not testing for
 
498
            # cache coherency because cache behaviour is not currently
 
499
            # defined for the transport interface.
 
500
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
 
501
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
 
502
            self.assertRaises(TransportNotPossible,
 
503
                              t.mkdir, 'path/doesnt/exist')
 
504
            return
 
505
        # Test mkdir
 
506
        t.mkdir('dir_a')
 
507
        self.assertEqual(t.has('dir_a'), True)
 
508
        self.assertEqual(t.has('dir_b'), False)
 
509
 
 
510
        t.mkdir('dir_b')
 
511
        self.assertEqual(t.has('dir_b'), True)
 
512
 
 
513
        self.assertEqual([t.has(n) for n in
 
514
                          ['dir_a', 'dir_b', 'dir_q', 'dir_b']],
 
515
                         [True, True, False, True])
 
516
 
 
517
        # we were testing that a local mkdir followed by a transport
 
518
        # mkdir failed thusly, but given that we * in one process * do not
 
519
        # concurrently fiddle with disk dirs and then use transport to do
 
520
        # things, the win here seems marginal compared to the constraint on
 
521
        # the interface. RBC 20051227
 
522
        t.mkdir('dir_g')
 
523
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
524
 
 
525
        # Test get/put in sub-directories
 
526
        t.put_bytes('dir_a/a', b'contents of dir_a/a')
 
527
        t.put_file('dir_b/b', BytesIO(b'contents of dir_b/b'))
 
528
        self.check_transport_contents(b'contents of dir_a/a', t, 'dir_a/a')
 
529
        self.check_transport_contents(b'contents of dir_b/b', t, 'dir_b/b')
 
530
 
 
531
        # mkdir of a dir with an absent parent
 
532
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
 
533
 
 
534
    def test_mkdir_permissions(self):
 
535
        t = self.get_transport()
 
536
        if t.is_readonly():
 
537
            return
 
538
        if not t._can_roundtrip_unix_modebits():
 
539
            # no sense testing on this transport
 
540
            return
 
541
        # Test mkdir with a mode
 
542
        t.mkdir('dmode755', mode=0o755)
 
543
        self.assertTransportMode(t, 'dmode755', 0o755)
 
544
        t.mkdir('dmode555', mode=0o555)
 
545
        self.assertTransportMode(t, 'dmode555', 0o555)
 
546
        t.mkdir('dmode777', mode=0o777)
 
547
        self.assertTransportMode(t, 'dmode777', 0o777)
 
548
        t.mkdir('dmode700', mode=0o700)
 
549
        self.assertTransportMode(t, 'dmode700', 0o700)
 
550
        t.mkdir('mdmode755', mode=0o755)
 
551
        self.assertTransportMode(t, 'mdmode755', 0o755)
 
552
 
 
553
        # Default mode should be based on umask
 
554
        umask = osutils.get_umask()
 
555
        t.mkdir('dnomode', mode=None)
 
556
        self.assertTransportMode(t, 'dnomode', 0o777 & ~umask)
 
557
 
 
558
    def test_opening_a_file_stream_creates_file(self):
 
559
        t = self.get_transport()
 
560
        if t.is_readonly():
 
561
            return
 
562
        handle = t.open_write_stream('foo')
 
563
        try:
 
564
            self.assertEqual(b'', t.get_bytes('foo'))
 
565
        finally:
 
566
            handle.close()
 
567
 
 
568
    def test_opening_a_file_stream_can_set_mode(self):
 
569
        t = self.get_transport()
 
570
        if t.is_readonly():
 
571
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
572
                              t.open_write_stream, 'foo')
 
573
            return
 
574
        if not t._can_roundtrip_unix_modebits():
 
575
            # Can't roundtrip, so no need to run this test
 
576
            return
 
577
 
 
578
        def check_mode(name, mode, expected):
 
579
            handle = t.open_write_stream(name, mode=mode)
 
580
            handle.close()
 
581
            self.assertTransportMode(t, name, expected)
 
582
        check_mode('mode644', 0o644, 0o644)
 
583
        check_mode('mode666', 0o666, 0o666)
 
584
        check_mode('mode600', 0o600, 0o600)
 
585
        # The default permissions should be based on the current umask
 
586
        check_mode('nomode', None, 0o666 & ~osutils.get_umask())
 
587
 
 
588
    def test_copy_to(self):
 
589
        # FIXME: test:   same server to same server (partly done)
 
590
        # same protocol two servers
 
591
        # and    different protocols (done for now except for MemoryTransport.
 
592
        # - RBC 20060122
 
593
 
 
594
        def simple_copy_files(transport_from, transport_to):
 
595
            files = ['a', 'b', 'c', 'd']
 
596
            self.build_tree(files, transport=transport_from)
 
597
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
 
598
            for f in files:
 
599
                self.check_transport_contents(transport_to.get_bytes(f),
 
600
                                              transport_from, f)
 
601
 
 
602
        t = self.get_transport()
 
603
        if t.__class__.__name__ == "SFTPTransport":
 
604
            self.skipTest("SFTP copy_to currently too flakey to use")
 
605
        temp_transport = MemoryTransport('memory:///')
 
606
        simple_copy_files(t, temp_transport)
 
607
        if not t.is_readonly():
 
608
            t.mkdir('copy_to_simple')
 
609
            t2 = t.clone('copy_to_simple')
 
610
            simple_copy_files(t, t2)
 
611
 
 
612
        # Test that copying into a missing directory raises
 
613
        # NoSuchFile
 
614
        if t.is_readonly():
 
615
            self.build_tree(['e/', 'e/f'])
 
616
        else:
 
617
            t.mkdir('e')
 
618
            t.put_bytes('e/f', b'contents of e')
 
619
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
 
620
        temp_transport.mkdir('e')
 
621
        t.copy_to(['e/f'], temp_transport)
 
622
 
 
623
        del temp_transport
 
624
        temp_transport = MemoryTransport('memory:///')
 
625
 
 
626
        files = ['a', 'b', 'c', 'd']
 
627
        t.copy_to(iter(files), temp_transport)
 
628
        for f in files:
 
629
            self.check_transport_contents(temp_transport.get_bytes(f),
 
630
                                          t, f)
 
631
        del temp_transport
 
632
 
 
633
        for mode in (0o666, 0o644, 0o600, 0o400):
 
634
            temp_transport = MemoryTransport("memory:///")
 
635
            t.copy_to(files, temp_transport, mode=mode)
 
636
            for f in files:
 
637
                self.assertTransportMode(temp_transport, f, mode)
 
638
 
 
639
    def test_create_prefix(self):
 
640
        t = self.get_transport()
 
641
        sub = t.clone('foo').clone('bar')
 
642
        try:
 
643
            sub.create_prefix()
 
644
        except TransportNotPossible:
 
645
            self.assertTrue(t.is_readonly())
 
646
        else:
 
647
            self.assertTrue(t.has('foo/bar'))
 
648
 
 
649
    def test_append_file(self):
 
650
        t = self.get_transport()
 
651
 
 
652
        if t.is_readonly():
 
653
            self.assertRaises(TransportNotPossible,
 
654
                              t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
655
            return
 
656
        t.put_bytes('a', b'diff\ncontents for\na\n')
 
657
        t.put_bytes('b', b'contents\nfor b\n')
 
658
 
 
659
        self.assertEqual(20,
 
660
                         t.append_file('a', BytesIO(b'add\nsome\nmore\ncontents\n')))
 
661
 
 
662
        self.check_transport_contents(
 
663
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
664
            t, 'a')
 
665
 
 
666
        # a file with no parent should fail..
 
667
        self.assertRaises(NoSuchFile,
 
668
                          t.append_file, 'missing/path', BytesIO(b'content'))
 
669
 
 
670
        # And we can create new files, too
 
671
        self.assertEqual(0,
 
672
                         t.append_file('c', BytesIO(b'some text\nfor a missing file\n')))
 
673
        self.check_transport_contents(b'some text\nfor a missing file\n',
 
674
                                      t, 'c')
 
675
 
 
676
    def test_append_bytes(self):
 
677
        t = self.get_transport()
 
678
 
 
679
        if t.is_readonly():
 
680
            self.assertRaises(TransportNotPossible,
 
681
                              t.append_bytes, 'a', b'add\nsome\nmore\ncontents\n')
 
682
            return
 
683
 
 
684
        self.assertEqual(0, t.append_bytes('a', b'diff\ncontents for\na\n'))
 
685
        self.assertEqual(0, t.append_bytes('b', b'contents\nfor b\n'))
 
686
 
 
687
        self.assertEqual(20,
 
688
                         t.append_bytes('a', b'add\nsome\nmore\ncontents\n'))
 
689
 
 
690
        self.check_transport_contents(
 
691
            b'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
692
            t, 'a')
 
693
 
 
694
        # a file with no parent should fail..
 
695
        self.assertRaises(NoSuchFile,
 
696
                          t.append_bytes, 'missing/path', b'content')
 
697
 
 
698
    def test_append_file_mode(self):
 
699
        """Check that append accepts a mode parameter"""
 
700
        # check append accepts a mode
 
701
        t = self.get_transport()
 
702
        if t.is_readonly():
 
703
            self.assertRaises(TransportNotPossible,
 
704
                              t.append_file, 'f', BytesIO(b'f'), mode=None)
 
705
            return
 
706
        t.append_file('f', BytesIO(b'f'), mode=None)
 
707
 
 
708
    def test_append_bytes_mode(self):
 
709
        # check append_bytes accepts a mode
 
710
        t = self.get_transport()
 
711
        if t.is_readonly():
 
712
            self.assertRaises(TransportNotPossible,
 
713
                              t.append_bytes, 'f', b'f', mode=None)
 
714
            return
 
715
        t.append_bytes('f', b'f', mode=None)
 
716
 
 
717
    def test_delete(self):
 
718
        # TODO: Test Transport.delete
 
719
        t = self.get_transport()
 
720
 
 
721
        # Not much to do with a readonly transport
 
722
        if t.is_readonly():
 
723
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
 
724
            return
 
725
 
 
726
        t.put_bytes('a', b'a little bit of text\n')
 
727
        self.assertTrue(t.has('a'))
 
728
        t.delete('a')
 
729
        self.assertFalse(t.has('a'))
 
730
 
 
731
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
732
 
 
733
        t.put_bytes('a', b'a text\n')
 
734
        t.put_bytes('b', b'b text\n')
 
735
        t.put_bytes('c', b'c text\n')
 
736
        self.assertEqual([True, True, True],
 
737
                         [t.has(n) for n in ['a', 'b', 'c']])
 
738
        t.delete('a')
 
739
        t.delete('c')
 
740
        self.assertEqual([False, True, False],
 
741
                         [t.has(n) for n in ['a', 'b', 'c']])
 
742
        self.assertFalse(t.has('a'))
 
743
        self.assertTrue(t.has('b'))
 
744
        self.assertFalse(t.has('c'))
 
745
 
 
746
        for name in ['a', 'c', 'd']:
 
747
            self.assertRaises(NoSuchFile, t.delete, name)
 
748
 
 
749
        # We should have deleted everything
 
750
        # SftpServer creates control files in the
 
751
        # working directory, so we can just do a
 
752
        # plain "listdir".
 
753
        # self.assertEqual([], os.listdir('.'))
 
754
 
 
755
    def test_recommended_page_size(self):
 
756
        """Transports recommend a page size for partial access to files."""
 
757
        t = self.get_transport()
 
758
        self.assertIsInstance(t.recommended_page_size(), int)
 
759
 
 
760
    def test_rmdir(self):
 
761
        t = self.get_transport()
 
762
        # Not much to do with a readonly transport
 
763
        if t.is_readonly():
 
764
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
 
765
            return
 
766
        t.mkdir('adir')
 
767
        t.mkdir('adir/bdir')
 
768
        t.rmdir('adir/bdir')
 
769
        # ftp may not be able to raise NoSuchFile for lack of
 
770
        # details when failing
 
771
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
 
772
        t.rmdir('adir')
 
773
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
 
774
 
 
775
    def test_rmdir_not_empty(self):
 
776
        """Deleting a non-empty directory raises an exception
 
777
 
 
778
        sftp (and possibly others) don't give us a specific "directory not
 
779
        empty" exception -- we can just see that the operation failed.
 
780
        """
 
781
        t = self.get_transport()
 
782
        if t.is_readonly():
 
783
            return
 
784
        t.mkdir('adir')
 
785
        t.mkdir('adir/bdir')
 
786
        self.assertRaises(PathError, t.rmdir, 'adir')
 
787
 
 
788
    def test_rmdir_empty_but_similar_prefix(self):
 
789
        """rmdir does not get confused by sibling paths.
 
790
 
 
791
        A naive implementation of MemoryTransport would refuse to rmdir
 
792
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
793
        uses "path.startswith(dir)" on all file paths to determine if directory
 
794
        is empty.
 
795
        """
 
796
        t = self.get_transport()
 
797
        if t.is_readonly():
 
798
            return
 
799
        t.mkdir('foo')
 
800
        t.put_bytes('foo-bar', b'')
 
801
        t.mkdir('foo-baz')
 
802
        t.rmdir('foo')
 
803
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
804
        self.assertTrue(t.has('foo-bar'))
 
805
 
 
806
    def test_rename_dir_succeeds(self):
 
807
        t = self.get_transport()
 
808
        if t.is_readonly():
 
809
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
810
                              t.rename, 'foo', 'bar')
 
811
            return
 
812
        t.mkdir('adir')
 
813
        t.mkdir('adir/asubdir')
 
814
        t.rename('adir', 'bdir')
 
815
        self.assertTrue(t.has('bdir/asubdir'))
 
816
        self.assertFalse(t.has('adir'))
 
817
 
 
818
    def test_rename_dir_nonempty(self):
 
819
        """Attempting to replace a nonemtpy directory should fail"""
 
820
        t = self.get_transport()
 
821
        if t.is_readonly():
 
822
            self.assertRaises((TransportNotPossible, NotImplementedError),
 
823
                              t.rename, 'foo', 'bar')
 
824
            return
 
825
        t.mkdir('adir')
 
826
        t.mkdir('adir/asubdir')
 
827
        t.mkdir('bdir')
 
828
        t.mkdir('bdir/bsubdir')
 
829
        # any kind of PathError would be OK, though we normally expect
 
830
        # DirectoryNotEmpty
 
831
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
 
832
        # nothing was changed so it should still be as before
 
833
        self.assertTrue(t.has('bdir/bsubdir'))
 
834
        self.assertFalse(t.has('adir/bdir'))
 
835
        self.assertFalse(t.has('adir/bsubdir'))
 
836
 
 
837
    def test_rename_across_subdirs(self):
 
838
        t = self.get_transport()
 
839
        if t.is_readonly():
 
840
            raise TestNotApplicable("transport is readonly")
 
841
        t.mkdir('a')
 
842
        t.mkdir('b')
 
843
        ta = t.clone('a')
 
844
        tb = t.clone('b')
 
845
        ta.put_bytes('f', b'aoeu')
 
846
        ta.rename('f', '../b/f')
 
847
        self.assertTrue(tb.has('f'))
 
848
        self.assertFalse(ta.has('f'))
 
849
        self.assertTrue(t.has('b/f'))
 
850
 
 
851
    def test_delete_tree(self):
 
852
        t = self.get_transport()
 
853
 
 
854
        # Not much to do with a readonly transport
 
855
        if t.is_readonly():
 
856
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
 
857
            return
 
858
 
 
859
        # and does it like listing ?
 
860
        t.mkdir('adir')
 
861
        try:
 
862
            t.delete_tree('adir')
 
863
        except TransportNotPossible:
 
864
            # ok, this transport does not support delete_tree
 
865
            return
 
866
 
 
867
        # did it delete that trivial case?
 
868
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
869
 
 
870
        self.build_tree(['adir/',
 
871
                         'adir/file',
 
872
                         'adir/subdir/',
 
873
                         'adir/subdir/file',
 
874
                         'adir/subdir2/',
 
875
                         'adir/subdir2/file',
 
876
                         ], transport=t)
 
877
 
 
878
        t.delete_tree('adir')
 
879
        # adir should be gone now.
 
880
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
881
 
 
882
    def test_move(self):
 
883
        t = self.get_transport()
 
884
 
 
885
        if t.is_readonly():
 
886
            return
 
887
 
 
888
        # TODO: I would like to use os.listdir() to
 
889
        # make sure there are no extra files, but SftpServer
 
890
        # creates control files in the working directory
 
891
        # perhaps all of this could be done in a subdirectory
 
892
 
 
893
        t.put_bytes('a', b'a first file\n')
 
894
        self.assertEqual([True, False], [t.has(n) for n in ['a', 'b']])
 
895
 
 
896
        t.move('a', 'b')
 
897
        self.assertTrue(t.has('b'))
 
898
        self.assertFalse(t.has('a'))
 
899
 
 
900
        self.check_transport_contents(b'a first file\n', t, 'b')
 
901
        self.assertEqual([False, True], [t.has(n) for n in ['a', 'b']])
 
902
 
 
903
        # Overwrite a file
 
904
        t.put_bytes('c', b'c this file\n')
 
905
        t.move('c', 'b')
 
906
        self.assertFalse(t.has('c'))
 
907
        self.check_transport_contents(b'c this file\n', t, 'b')
 
908
 
 
909
        # TODO: Try to write a test for atomicity
 
910
        # TODO: Test moving into a non-existent subdirectory
 
911
 
 
912
    def test_copy(self):
 
913
        t = self.get_transport()
 
914
 
 
915
        if t.is_readonly():
 
916
            return
 
917
 
 
918
        t.put_bytes('a', b'a file\n')
 
919
        t.copy('a', 'b')
 
920
        self.check_transport_contents(b'a file\n', t, 'b')
 
921
 
 
922
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
923
        os.mkdir('c')
 
924
        # What should the assert be if you try to copy a
 
925
        # file over a directory?
 
926
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
927
        t.put_bytes('d', b'text in d\n')
 
928
        t.copy('d', 'b')
 
929
        self.check_transport_contents(b'text in d\n', t, 'b')
 
930
 
 
931
    def test_connection_error(self):
 
932
        """ConnectionError is raised when connection is impossible.
 
933
 
 
934
        The error should be raised from the first operation on the transport.
 
935
        """
 
936
        try:
 
937
            url = self._server.get_bogus_url()
 
938
        except NotImplementedError:
 
939
            raise TestSkipped("Transport %s has no bogus URL support." %
 
940
                              self._server.__class__)
 
941
        t = _mod_transport.get_transport_from_url(url)
 
942
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
 
943
 
 
944
    def test_stat(self):
 
945
        # TODO: Test stat, just try once, and if it throws, stop testing
 
946
        from stat import S_ISDIR, S_ISREG
 
947
 
 
948
        t = self.get_transport()
 
949
 
 
950
        try:
 
951
            st = t.stat('.')
 
952
        except TransportNotPossible as e:
 
953
            # This transport cannot stat
 
954
            return
 
955
 
 
956
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
957
        sizes = [14, 0, 16, 0, 18]
 
958
        self.build_tree(paths, transport=t, line_endings='binary')
 
959
 
 
960
        for path, size in zip(paths, sizes):
 
961
            st = t.stat(path)
 
962
            if path.endswith('/'):
 
963
                self.assertTrue(S_ISDIR(st.st_mode))
 
964
                # directory sizes are meaningless
 
965
            else:
 
966
                self.assertTrue(S_ISREG(st.st_mode))
 
967
                self.assertEqual(size, st.st_size)
 
968
 
 
969
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
970
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
971
 
 
972
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
 
973
        subdir = t.clone('subdir')
 
974
        st = subdir.stat('./file')
 
975
        st = subdir.stat('.')
 
976
 
 
977
    def test_hardlink(self):
 
978
        from stat import ST_NLINK
 
979
 
 
980
        t = self.get_transport()
 
981
 
 
982
        source_name = "original_target"
 
983
        link_name = "target_link"
 
984
 
 
985
        self.build_tree([source_name], transport=t)
 
986
 
 
987
        try:
 
988
            t.hardlink(source_name, link_name)
 
989
 
 
990
            self.assertTrue(t.has(source_name))
 
991
            self.assertTrue(t.has(link_name))
 
992
 
 
993
            st = t.stat(link_name)
 
994
            self.assertEqual(st[ST_NLINK], 2)
 
995
        except TransportNotPossible:
 
996
            raise TestSkipped("Transport %s does not support hardlinks." %
 
997
                              self._server.__class__)
 
998
 
 
999
    def test_symlink(self):
 
1000
        from stat import S_ISLNK
 
1001
 
 
1002
        t = self.get_transport()
 
1003
 
 
1004
        source_name = "original_target"
 
1005
        link_name = "target_link"
 
1006
 
 
1007
        self.build_tree([source_name], transport=t)
 
1008
 
 
1009
        try:
 
1010
            t.symlink(source_name, link_name)
 
1011
 
 
1012
            self.assertTrue(t.has(source_name))
 
1013
            self.assertTrue(t.has(link_name))
 
1014
 
 
1015
            st = t.stat(link_name)
 
1016
            self.assertTrue(S_ISLNK(st.st_mode),
 
1017
                            "expected symlink, got mode %o" % st.st_mode)
 
1018
        except TransportNotPossible:
 
1019
            raise TestSkipped("Transport %s does not support symlinks." %
 
1020
                              self._server.__class__)
 
1021
 
 
1022
        self.assertEqual(source_name, t.readlink(link_name))
 
1023
 
 
1024
    def test_readlink_nonexistent(self):
 
1025
        t = self.get_transport()
 
1026
        try:
 
1027
            self.assertRaises(NoSuchFile, t.readlink, 'nonexistent')
 
1028
        except TransportNotPossible:
 
1029
            raise TestSkipped("Transport %s does not support symlinks." %
 
1030
                              self._server.__class__)
 
1031
 
 
1032
    def test_list_dir(self):
 
1033
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
1034
        t = self.get_transport()
 
1035
 
 
1036
        if not t.listable():
 
1037
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
1038
            return
 
1039
 
 
1040
        def sorted_list(d, transport):
 
1041
            l = sorted(transport.list_dir(d))
 
1042
            return l
 
1043
 
 
1044
        self.assertEqual([], sorted_list('.', t))
 
1045
        # c2 is precisely one letter longer than c here to test that
 
1046
        # suffixing is not confused.
 
1047
        # a%25b checks that quoting is done consistently across transports
 
1048
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1049
 
 
1050
        if not t.is_readonly():
 
1051
            self.build_tree(tree_names, transport=t)
 
1052
        else:
 
1053
            self.build_tree(tree_names)
 
1054
 
 
1055
        self.assertEqual(
 
1056
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1057
        self.assertEqual(
 
1058
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1059
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1060
 
 
1061
        # Cloning the transport produces an equivalent listing
 
1062
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
 
1063
 
 
1064
        if not t.is_readonly():
 
1065
            t.delete('c/d')
 
1066
            t.delete('b')
 
1067
        else:
 
1068
            os.unlink('c/d')
 
1069
            os.unlink('b')
 
1070
 
 
1071
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1072
        self.assertEqual(['e'], sorted_list('c', t))
 
1073
 
 
1074
        self.assertListRaises(PathError, t.list_dir, 'q')
 
1075
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1076
        # 'a' is a file, list_dir should raise an error
 
1077
        self.assertListRaises(PathError, t.list_dir, 'a')
 
1078
 
 
1079
    def test_list_dir_result_is_url_escaped(self):
 
1080
        t = self.get_transport()
 
1081
        if not t.listable():
 
1082
            raise TestSkipped("transport not listable")
 
1083
 
 
1084
        if not t.is_readonly():
 
1085
            self.build_tree(['a/', 'a/%'], transport=t)
 
1086
        else:
 
1087
            self.build_tree(['a/', 'a/%'])
 
1088
 
 
1089
        names = list(t.list_dir('a'))
 
1090
        self.assertEqual(['%25'], names)
 
1091
        self.assertIsInstance(names[0], str)
 
1092
 
 
1093
    def test_clone_preserve_info(self):
 
1094
        t1 = self.get_transport()
 
1095
        if not isinstance(t1, ConnectedTransport):
 
1096
            raise TestSkipped("not a connected transport")
 
1097
 
 
1098
        t2 = t1.clone('subdir')
 
1099
        self.assertEqual(t1._parsed_url.scheme, t2._parsed_url.scheme)
 
1100
        self.assertEqual(t1._parsed_url.user, t2._parsed_url.user)
 
1101
        self.assertEqual(t1._parsed_url.password, t2._parsed_url.password)
 
1102
        self.assertEqual(t1._parsed_url.host, t2._parsed_url.host)
 
1103
        self.assertEqual(t1._parsed_url.port, t2._parsed_url.port)
 
1104
 
 
1105
    def test__reuse_for(self):
 
1106
        t = self.get_transport()
 
1107
        if not isinstance(t, ConnectedTransport):
 
1108
            raise TestSkipped("not a connected transport")
 
1109
 
 
1110
        def new_url(scheme=None, user=None, password=None,
 
1111
                    host=None, port=None, path=None):
 
1112
            """Build a new url from t.base changing only parts of it.
 
1113
 
 
1114
            Only the parameters different from None will be changed.
 
1115
            """
 
1116
            if scheme is None:
 
1117
                scheme = t._parsed_url.scheme
 
1118
            if user is None:
 
1119
                user = t._parsed_url.user
 
1120
            if password is None:
 
1121
                password = t._parsed_url.password
 
1122
            if user is None:
 
1123
                user = t._parsed_url.user
 
1124
            if host is None:
 
1125
                host = t._parsed_url.host
 
1126
            if port is None:
 
1127
                port = t._parsed_url.port
 
1128
            if path is None:
 
1129
                path = t._parsed_url.path
 
1130
            return str(urlutils.URL(scheme, user, password, host, port, path))
 
1131
 
 
1132
        if t._parsed_url.scheme == 'ftp':
 
1133
            scheme = 'sftp'
 
1134
        else:
 
1135
            scheme = 'ftp'
 
1136
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1137
        if t._parsed_url.user == 'me':
 
1138
            user = 'you'
 
1139
        else:
 
1140
            user = 'me'
 
1141
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1142
        # passwords are not taken into account because:
 
1143
        # - it makes no sense to have two different valid passwords for the
 
1144
        #   same user
 
1145
        # - _password in ConnectedTransport is intended to collect what the
 
1146
        #   user specified from the command-line and there are cases where the
 
1147
        #   new url can contain no password (if the url was built from an
 
1148
        #   existing transport.base for example)
 
1149
        # - password are considered part of the credentials provided at
 
1150
        #   connection creation time and as such may not be present in the url
 
1151
        #   (they may be typed by the user when prompted for example)
 
1152
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1153
        # We will not connect, we can use a invalid host
 
1154
        self.assertIsNot(t, t._reuse_for(
 
1155
            new_url(host=t._parsed_url.host + 'bar')))
 
1156
        if t._parsed_url.port == 1234:
 
1157
            port = 4321
 
1158
        else:
 
1159
            port = 1234
 
1160
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1161
        # No point in trying to reuse a transport for a local URL
 
1162
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1163
 
 
1164
    def test_connection_sharing(self):
 
1165
        t = self.get_transport()
 
1166
        if not isinstance(t, ConnectedTransport):
 
1167
            raise TestSkipped("not a connected transport")
 
1168
 
 
1169
        c = t.clone('subdir')
 
1170
        # Some transports will create the connection  only when needed
 
1171
        t.has('surely_not')  # Force connection
 
1172
        self.assertIs(t._get_connection(), c._get_connection())
 
1173
 
 
1174
        # Temporary failure, we need to create a new dummy connection
 
1175
        new_connection = None
 
1176
        t._set_connection(new_connection)
 
1177
        # Check that both transports use the same connection
 
1178
        self.assertIs(new_connection, t._get_connection())
 
1179
        self.assertIs(new_connection, c._get_connection())
 
1180
 
 
1181
    def test_reuse_connection_for_various_paths(self):
 
1182
        t = self.get_transport()
 
1183
        if not isinstance(t, ConnectedTransport):
 
1184
            raise TestSkipped("not a connected transport")
 
1185
 
 
1186
        t.has('surely_not')  # Force connection
 
1187
        self.assertIsNot(None, t._get_connection())
 
1188
 
 
1189
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1190
        self.assertIsNot(t, subdir)
 
1191
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1192
 
 
1193
        home = subdir._reuse_for(t.base + 'home')
 
1194
        self.assertIs(t._get_connection(), home._get_connection())
 
1195
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1196
 
 
1197
    def test_clone(self):
 
1198
        # TODO: Test that clone moves up and down the filesystem
 
1199
        t1 = self.get_transport()
 
1200
 
 
1201
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
 
1202
 
 
1203
        self.assertTrue(t1.has('a'))
 
1204
        self.assertTrue(t1.has('b/c'))
 
1205
        self.assertFalse(t1.has('c'))
 
1206
 
 
1207
        t2 = t1.clone('b')
 
1208
        self.assertEqual(t1.base + 'b/', t2.base)
 
1209
 
 
1210
        self.assertTrue(t2.has('c'))
 
1211
        self.assertFalse(t2.has('a'))
 
1212
 
 
1213
        t3 = t2.clone('..')
 
1214
        self.assertTrue(t3.has('a'))
 
1215
        self.assertFalse(t3.has('c'))
 
1216
 
 
1217
        self.assertFalse(t1.has('b/d'))
 
1218
        self.assertFalse(t2.has('d'))
 
1219
        self.assertFalse(t3.has('b/d'))
 
1220
 
 
1221
        if t1.is_readonly():
 
1222
            self.build_tree_contents([('b/d', b'newfile\n')])
 
1223
        else:
 
1224
            t2.put_bytes('d', b'newfile\n')
 
1225
 
 
1226
        self.assertTrue(t1.has('b/d'))
 
1227
        self.assertTrue(t2.has('d'))
 
1228
        self.assertTrue(t3.has('b/d'))
 
1229
 
 
1230
    def test_clone_to_root(self):
 
1231
        orig_transport = self.get_transport()
 
1232
        # Repeatedly go up to a parent directory until we're at the root
 
1233
        # directory of this transport
 
1234
        root_transport = orig_transport
 
1235
        new_transport = root_transport.clone("..")
 
1236
        # as we are walking up directories, the path must be
 
1237
        # growing less, except at the top
 
1238
        self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1239
                        new_transport.base == root_transport.base)
 
1240
        while new_transport.base != root_transport.base:
 
1241
            root_transport = new_transport
 
1242
            new_transport = root_transport.clone("..")
 
1243
            # as we are walking up directories, the path must be
 
1244
            # growing less, except at the top
 
1245
            self.assertTrue(len(new_transport.base) < len(root_transport.base) or
 
1246
                            new_transport.base == root_transport.base)
 
1247
 
 
1248
        # Cloning to "/" should take us to exactly the same location.
 
1249
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1250
        # the abspath of "/" from the original transport should be the same
 
1251
        # as the base at the root:
 
1252
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1253
 
 
1254
        # At the root, the URL must still end with / as its a directory
 
1255
        self.assertEqual(root_transport.base[-1], '/')
 
1256
 
 
1257
    def test_clone_from_root(self):
 
1258
        """At the root, cloning to a simple dir should just do string append."""
 
1259
        orig_transport = self.get_transport()
 
1260
        root_transport = orig_transport.clone('/')
 
1261
        self.assertEqual(root_transport.base + '.bzr/',
 
1262
                         root_transport.clone('.bzr').base)
 
1263
 
 
1264
    def test_base_url(self):
 
1265
        t = self.get_transport()
 
1266
        self.assertEqual('/', t.base[-1])
 
1267
 
 
1268
    def test_relpath(self):
 
1269
        t = self.get_transport()
 
1270
        self.assertEqual('', t.relpath(t.base))
 
1271
        # base ends with /
 
1272
        self.assertEqual('', t.relpath(t.base[:-1]))
 
1273
        # subdirs which don't exist should still give relpaths.
 
1274
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
 
1275
        # trailing slash should be the same.
 
1276
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
 
1277
 
 
1278
    def test_relpath_at_root(self):
 
1279
        t = self.get_transport()
 
1280
        # clone all the way to the top
 
1281
        new_transport = t.clone('..')
 
1282
        while new_transport.base != t.base:
 
1283
            t = new_transport
 
1284
            new_transport = t.clone('..')
 
1285
        # we must be able to get a relpath below the root
 
1286
        self.assertEqual('', t.relpath(t.base))
 
1287
        # and a deeper one should work too
 
1288
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
 
1289
 
 
1290
    def test_abspath(self):
 
1291
        # smoke test for abspath. Corner cases for backends like unix fs's
 
1292
        # that have aliasing problems like symlinks should go in backend
 
1293
        # specific test cases.
 
1294
        transport = self.get_transport()
 
1295
 
 
1296
        self.assertEqual(transport.base + 'relpath',
 
1297
                         transport.abspath('relpath'))
 
1298
 
 
1299
        # This should work without raising an error.
 
1300
        transport.abspath("/")
 
1301
 
 
1302
        # the abspath of "/" and "/foo/.." should result in the same location
 
1303
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1304
 
 
1305
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1306
                         transport.abspath("/foo"))
 
1307
 
 
1308
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
 
1309
    def test_win32_abspath(self):
 
1310
        # Note: we tried to set sys.platform='win32' so we could test on
 
1311
        # other platforms too, but then osutils does platform specific
 
1312
        # things at import time which defeated us...
 
1313
        if sys.platform != 'win32':
 
1314
            raise TestSkipped(
 
1315
                'Testing drive letters in abspath implemented only for win32')
 
1316
 
 
1317
        # smoke test for abspath on win32.
 
1318
        # a transport based on 'file:///' never fully qualifies the drive.
 
1319
        transport = _mod_transport.get_transport_from_url("file:///")
 
1320
        self.assertEqual(transport.abspath("/"), "file:///")
 
1321
 
 
1322
        # but a transport that starts with a drive spec must keep it.
 
1323
        transport = _mod_transport.get_transport_from_url("file:///C:/")
 
1324
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1325
 
 
1326
    def test_local_abspath(self):
 
1327
        transport = self.get_transport()
 
1328
        try:
 
1329
            p = transport.local_abspath('.')
 
1330
        except (errors.NotLocalUrl, TransportNotPossible) as e:
 
1331
            # should be formattable
 
1332
            s = str(e)
 
1333
        else:
 
1334
            self.assertEqual(getcwd(), p)
 
1335
 
 
1336
    def test_abspath_at_root(self):
 
1337
        t = self.get_transport()
 
1338
        # clone all the way to the top
 
1339
        new_transport = t.clone('..')
 
1340
        while new_transport.base != t.base:
 
1341
            t = new_transport
 
1342
            new_transport = t.clone('..')
 
1343
        # we must be able to get a abspath of the root when we ask for
 
1344
        # t.abspath('..') - this due to our choice that clone('..')
 
1345
        # should return the root from the root, combined with the desire that
 
1346
        # the url from clone('..') and from abspath('..') should be the same.
 
1347
        self.assertEqual(t.base, t.abspath('..'))
 
1348
        # '' should give us the root
 
1349
        self.assertEqual(t.base, t.abspath(''))
 
1350
        # and a path should append to the url
 
1351
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
 
1352
 
 
1353
    def test_iter_files_recursive(self):
 
1354
        transport = self.get_transport()
 
1355
        if not transport.listable():
 
1356
            self.assertRaises(TransportNotPossible,
 
1357
                              transport.iter_files_recursive)
 
1358
            return
 
1359
        self.build_tree(['isolated/',
 
1360
                         'isolated/dir/',
 
1361
                         'isolated/dir/foo',
 
1362
                         'isolated/dir/bar',
 
1363
                         'isolated/dir/b%25z',  # make sure quoting is correct
 
1364
                         'isolated/bar'],
 
1365
                        transport=transport)
 
1366
        paths = set(transport.iter_files_recursive())
 
1367
        # nb the directories are not converted
 
1368
        self.assertEqual(paths,
 
1369
                         {'isolated/dir/foo',
 
1370
                          'isolated/dir/bar',
 
1371
                          'isolated/dir/b%2525z',
 
1372
                          'isolated/bar'})
 
1373
        sub_transport = transport.clone('isolated')
 
1374
        paths = set(sub_transport.iter_files_recursive())
 
1375
        self.assertEqual(paths,
 
1376
                         {'dir/foo', 'dir/bar', 'dir/b%2525z', 'bar'})
 
1377
 
 
1378
    def test_copy_tree(self):
 
1379
        # TODO: test file contents and permissions are preserved. This test was
 
1380
        # added just to ensure that quoting was handled correctly.
 
1381
        # -- David Allouche 2006-08-11
 
1382
        transport = self.get_transport()
 
1383
        if not transport.listable():
 
1384
            self.assertRaises(TransportNotPossible,
 
1385
                              transport.iter_files_recursive)
 
1386
            return
 
1387
        if transport.is_readonly():
 
1388
            return
 
1389
        self.build_tree(['from/',
 
1390
                         'from/dir/',
 
1391
                         'from/dir/foo',
 
1392
                         'from/dir/bar',
 
1393
                         'from/dir/b%25z',  # make sure quoting is correct
 
1394
                         'from/bar'],
 
1395
                        transport=transport)
 
1396
        transport.copy_tree('from', 'to')
 
1397
        paths = set(transport.iter_files_recursive())
 
1398
        self.assertEqual(paths,
 
1399
                         {'from/dir/foo',
 
1400
                          'from/dir/bar',
 
1401
                          'from/dir/b%2525z',
 
1402
                          'from/bar',
 
1403
                          'to/dir/foo',
 
1404
                          'to/dir/bar',
 
1405
                          'to/dir/b%2525z',
 
1406
                          'to/bar', })
 
1407
 
 
1408
    def test_copy_tree_to_transport(self):
 
1409
        transport = self.get_transport()
 
1410
        if not transport.listable():
 
1411
            self.assertRaises(TransportNotPossible,
 
1412
                              transport.iter_files_recursive)
 
1413
            return
 
1414
        if transport.is_readonly():
 
1415
            return
 
1416
        self.build_tree(['from/',
 
1417
                         'from/dir/',
 
1418
                         'from/dir/foo',
 
1419
                         'from/dir/bar',
 
1420
                         'from/dir/b%25z',  # make sure quoting is correct
 
1421
                         'from/bar'],
 
1422
                        transport=transport)
 
1423
        from_transport = transport.clone('from')
 
1424
        to_transport = transport.clone('to')
 
1425
        to_transport.ensure_base()
 
1426
        from_transport.copy_tree_to_transport(to_transport)
 
1427
        paths = set(transport.iter_files_recursive())
 
1428
        self.assertEqual(paths,
 
1429
                         {'from/dir/foo',
 
1430
                          'from/dir/bar',
 
1431
                          'from/dir/b%2525z',
 
1432
                          'from/bar',
 
1433
                          'to/dir/foo',
 
1434
                          'to/dir/bar',
 
1435
                          'to/dir/b%2525z',
 
1436
                          'to/bar', })
 
1437
 
 
1438
    def test_unicode_paths(self):
 
1439
        """Test that we can read/write files with Unicode names."""
 
1440
        t = self.get_transport()
 
1441
 
 
1442
        # With FAT32 and certain encodings on win32
 
1443
        # '\xe5' and '\xe4' actually map to the same file
 
1444
        # adding a suffix kicks in the 'preserving but insensitive'
 
1445
        # route, and maintains the right files
 
1446
        files = [u'\xe5.1',  # a w/ circle iso-8859-1
 
1447
                 u'\xe4.2',  # a w/ dots iso-8859-1
 
1448
                 u'\u017d',  # Z with umlat iso-8859-2
 
1449
                 u'\u062c',  # Arabic j
 
1450
                 u'\u0410',  # Russian A
 
1451
                 u'\u65e5',  # Kanji person
 
1452
                 ]
 
1453
 
 
1454
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1455
        if no_unicode_support:
 
1456
            self.knownFailure("test server cannot handle unicode paths")
 
1457
 
 
1458
        try:
 
1459
            self.build_tree(files, transport=t, line_endings='binary')
 
1460
        except UnicodeError:
 
1461
            raise TestSkipped(
 
1462
                "cannot handle unicode paths in current encoding")
 
1463
 
 
1464
        # A plain unicode string is not a valid url
 
1465
        for fname in files:
 
1466
            self.assertRaises(urlutils.InvalidURL, t.get, fname)
 
1467
 
 
1468
        for fname in files:
 
1469
            fname_utf8 = fname.encode('utf-8')
 
1470
            contents = b'contents of %s\n' % (fname_utf8,)
 
1471
            self.check_transport_contents(contents, t, urlutils.escape(fname))
 
1472
 
 
1473
    def test_connect_twice_is_same_content(self):
 
1474
        # check that our server (whatever it is) is accessible reliably
 
1475
        # via get_transport and multiple connections share content.
 
1476
        transport = self.get_transport()
 
1477
        if transport.is_readonly():
 
1478
            return
 
1479
        transport.put_bytes('foo', b'bar')
 
1480
        transport3 = self.get_transport()
 
1481
        self.check_transport_contents(b'bar', transport3, 'foo')
 
1482
 
 
1483
        # now opening at a relative url should give use a sane result:
 
1484
        transport.mkdir('newdir')
 
1485
        transport5 = self.get_transport('newdir')
 
1486
        transport6 = transport5.clone('..')
 
1487
        self.check_transport_contents(b'bar', transport6, 'foo')
 
1488
 
 
1489
    def test_lock_write(self):
 
1490
        """Test transport-level write locks.
 
1491
 
 
1492
        These are deprecated and transports may decline to support them.
 
1493
        """
 
1494
        transport = self.get_transport()
 
1495
        if transport.is_readonly():
 
1496
            self.assertRaises(TransportNotPossible,
 
1497
                              transport.lock_write, 'foo')
 
1498
            return
 
1499
        transport.put_bytes('lock', b'')
 
1500
        try:
 
1501
            lock = transport.lock_write('lock')
 
1502
        except TransportNotPossible:
 
1503
            return
 
1504
        # TODO make this consistent on all platforms:
 
1505
        # self.assertRaises(LockError, transport.lock_write, 'lock')
 
1506
        lock.unlock()
 
1507
 
 
1508
    def test_lock_read(self):
 
1509
        """Test transport-level read locks.
 
1510
 
 
1511
        These are deprecated and transports may decline to support them.
 
1512
        """
 
1513
        transport = self.get_transport()
 
1514
        if transport.is_readonly():
 
1515
            open('lock', 'w').close()
 
1516
        else:
 
1517
            transport.put_bytes('lock', b'')
 
1518
        try:
 
1519
            lock = transport.lock_read('lock')
 
1520
        except TransportNotPossible:
 
1521
            return
 
1522
        # TODO make this consistent on all platforms:
 
1523
        # self.assertRaises(LockError, transport.lock_read, 'lock')
 
1524
        lock.unlock()
 
1525
 
 
1526
    def test_readv(self):
 
1527
        transport = self.get_transport()
 
1528
        if transport.is_readonly():
 
1529
            with open('a', 'w') as f:
 
1530
                f.write('0123456789')
 
1531
        else:
 
1532
            transport.put_bytes('a', b'0123456789')
 
1533
 
 
1534
        d = list(transport.readv('a', ((0, 1),)))
 
1535
        self.assertEqual(d[0], (0, b'0'))
 
1536
 
 
1537
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
 
1538
        self.assertEqual(d[0], (0, b'0'))
 
1539
        self.assertEqual(d[1], (1, b'1'))
 
1540
        self.assertEqual(d[2], (3, b'34'))
 
1541
        self.assertEqual(d[3], (9, b'9'))
 
1542
 
 
1543
    def test_readv_out_of_order(self):
 
1544
        transport = self.get_transport()
 
1545
        if transport.is_readonly():
 
1546
            with open('a', 'w') as f:
 
1547
                f.write('0123456789')
 
1548
        else:
 
1549
            transport.put_bytes('a', b'01234567890')
 
1550
 
 
1551
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1552
        self.assertEqual(d[0], (1, b'1'))
 
1553
        self.assertEqual(d[1], (9, b'9'))
 
1554
        self.assertEqual(d[2], (0, b'0'))
 
1555
        self.assertEqual(d[3], (3, b'34'))
 
1556
 
 
1557
    def test_readv_with_adjust_for_latency(self):
 
1558
        transport = self.get_transport()
 
1559
        # the adjust for latency flag expands the data region returned
 
1560
        # according to a per-transport heuristic, so testing is a little
 
1561
        # tricky as we need more data than the largest combining that our
 
1562
        # transports do. To accomodate this we generate random data and cross
 
1563
        # reference the returned data with the random data. To avoid doing
 
1564
        # multiple large random byte look ups we do several tests on the same
 
1565
        # backing data.
 
1566
        content = osutils.rand_bytes(200 * 1024)
 
1567
        content_size = len(content)
 
1568
        if transport.is_readonly():
 
1569
            self.build_tree_contents([('a', content)])
 
1570
        else:
 
1571
            transport.put_bytes('a', content)
 
1572
 
 
1573
        def check_result_data(result_vector):
 
1574
            for item in result_vector:
 
1575
                data_len = len(item[1])
 
1576
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1577
 
 
1578
        # start corner case
 
1579
        result = list(transport.readv('a', ((0, 30),),
 
1580
                                      adjust_for_latency=True, upper_limit=content_size))
 
1581
        # we expect 1 result, from 0, to something > 30
 
1582
        self.assertEqual(1, len(result))
 
1583
        self.assertEqual(0, result[0][0])
 
1584
        self.assertTrue(len(result[0][1]) >= 30)
 
1585
        check_result_data(result)
 
1586
        # end of file corner case
 
1587
        result = list(transport.readv('a', ((204700, 100),),
 
1588
                                      adjust_for_latency=True, upper_limit=content_size))
 
1589
        # we expect 1 result, from 204800- its length, to the end
 
1590
        self.assertEqual(1, len(result))
 
1591
        data_len = len(result[0][1])
 
1592
        self.assertEqual(204800 - data_len, result[0][0])
 
1593
        self.assertTrue(data_len >= 100)
 
1594
        check_result_data(result)
 
1595
        # out of order ranges are made in order
 
1596
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1597
                                      adjust_for_latency=True, upper_limit=content_size))
 
1598
        # we expect 2 results, in order, start and end.
 
1599
        self.assertEqual(2, len(result))
 
1600
        # start
 
1601
        data_len = len(result[0][1])
 
1602
        self.assertEqual(0, result[0][0])
 
1603
        self.assertTrue(data_len >= 30)
 
1604
        # end
 
1605
        data_len = len(result[1][1])
 
1606
        self.assertEqual(204800 - data_len, result[1][0])
 
1607
        self.assertTrue(data_len >= 100)
 
1608
        check_result_data(result)
 
1609
        # close ranges get combined (even if out of order)
 
1610
        for request_vector in [((400, 50), (800, 234)), ((800, 234), (400, 50))]:
 
1611
            result = list(transport.readv('a', request_vector,
 
1612
                                          adjust_for_latency=True, upper_limit=content_size))
 
1613
            self.assertEqual(1, len(result))
 
1614
            data_len = len(result[0][1])
 
1615
            # minimum length is from 400 to 1034 - 634
 
1616
            self.assertTrue(data_len >= 634)
 
1617
            # must contain the region 400 to 1034
 
1618
            self.assertTrue(result[0][0] <= 400)
 
1619
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1620
            check_result_data(result)
 
1621
 
 
1622
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1623
        transport = self.get_transport()
 
1624
        # test from observed failure case.
 
1625
        if transport.is_readonly():
 
1626
            with open('a', 'w') as f:
 
1627
                f.write('a' * 1024 * 1024)
 
1628
        else:
 
1629
            transport.put_bytes('a', b'a' * 1024 * 1024)
 
1630
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1631
                         (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1632
                         (465373, 800), (947422, 800)]
 
1633
        results = list(transport.readv('a', broken_vector, True, 1024 * 1024))
 
1634
        found_items = [False] * 9
 
1635
        for pos, (start, length) in enumerate(broken_vector):
 
1636
            # check the range is covered by the result
 
1637
            for offset, data in results:
 
1638
                if offset <= start and start + length <= offset + len(data):
 
1639
                    found_items[pos] = True
 
1640
        self.assertEqual([True] * 9, found_items)
 
1641
 
 
1642
    def test_get_with_open_write_stream_sees_all_content(self):
 
1643
        t = self.get_transport()
 
1644
        if t.is_readonly():
 
1645
            return
 
1646
        with t.open_write_stream('foo') as handle:
 
1647
            handle.write(b'bcd')
 
1648
            self.assertEqual([(0, b'b'), (2, b'd')], list(
 
1649
                t.readv('foo', ((0, 1), (2, 1)))))
 
1650
 
 
1651
    def test_get_smart_medium(self):
 
1652
        """All transports must either give a smart medium, or know they can't.
 
1653
        """
 
1654
        transport = self.get_transport()
 
1655
        try:
 
1656
            client_medium = transport.get_smart_medium()
 
1657
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1658
        except errors.NoSmartMedium:
 
1659
            # as long as we got it we're fine
 
1660
            pass
 
1661
 
 
1662
    def test_readv_short_read(self):
 
1663
        transport = self.get_transport()
 
1664
        if transport.is_readonly():
 
1665
            with open('a', 'w') as f:
 
1666
                f.write('0123456789')
 
1667
        else:
 
1668
            transport.put_bytes('a', b'01234567890')
 
1669
 
 
1670
        # This is intentionally reading off the end of the file
 
1671
        # since we are sure that it cannot get there
 
1672
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1673
                               # Can be raised by paramiko
 
1674
                               AssertionError),
 
1675
                              transport.readv, 'a', [(1, 1), (8, 10)])
 
1676
 
 
1677
        # This is trying to seek past the end of the file, it should
 
1678
        # also raise a special error
 
1679
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1680
                              transport.readv, 'a', [(12, 2)])
 
1681
 
 
1682
    def test_no_segment_parameters(self):
 
1683
        """Segment parameters should be stripped and stored in
 
1684
        transport.segment_parameters."""
 
1685
        transport = self.get_transport("foo")
 
1686
        self.assertEqual({}, transport.get_segment_parameters())
 
1687
 
 
1688
    def test_segment_parameters(self):
 
1689
        """Segment parameters should be stripped and stored in
 
1690
        transport.get_segment_parameters()."""
 
1691
        base_url = self._server.get_url()
 
1692
        parameters = {"key1": "val1", "key2": "val2"}
 
1693
        url = urlutils.join_segment_parameters(base_url, parameters)
 
1694
        transport = _mod_transport.get_transport_from_url(url)
 
1695
        self.assertEqual(parameters, transport.get_segment_parameters())
 
1696
 
 
1697
    def test_set_segment_parameters(self):
 
1698
        """Segment parameters can be set and show up in base."""
 
1699
        transport = self.get_transport("foo")
 
1700
        orig_base = transport.base
 
1701
        transport.set_segment_parameter("arm", "board")
 
1702
        self.assertEqual("%s,arm=board" % orig_base, transport.base)
 
1703
        self.assertEqual({"arm": "board"}, transport.get_segment_parameters())
 
1704
        transport.set_segment_parameter("arm", None)
 
1705
        transport.set_segment_parameter("nonexistant", None)
 
1706
        self.assertEqual({}, transport.get_segment_parameters())
 
1707
        self.assertEqual(orig_base, transport.base)
 
1708
 
 
1709
    def test_stat_symlink(self):
 
1710
        # if a transport points directly to a symlink (and supports symlinks
 
1711
        # at all) you can tell this.  helps with bug 32669.
 
1712
        t = self.get_transport()
 
1713
        try:
 
1714
            t.symlink('target', 'link')
 
1715
        except TransportNotPossible:
 
1716
            raise TestSkipped("symlinks not supported")
 
1717
        t2 = t.clone('link')
 
1718
        st = t2.stat('')
 
1719
        self.assertTrue(stat.S_ISLNK(st.st_mode))
 
1720
 
 
1721
    def test_abspath_url_unquote_unreserved(self):
 
1722
        """URLs from abspath should have unreserved characters unquoted
 
1723
 
 
1724
        Need consistent quoting notably for tildes, see lp:842223 for more.
 
1725
        """
 
1726
        t = self.get_transport()
 
1727
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1728
        self.assertEqual(t.base + "-.09AZ_az~",
 
1729
                         t.abspath(needlessly_escaped_dir))
 
1730
 
 
1731
    def test_clone_url_unquote_unreserved(self):
 
1732
        """Base URL of a cloned branch needs unreserved characters unquoted
 
1733
 
 
1734
        Cloned transports should be prefix comparable for things like the
 
1735
        isolation checking of tests, see lp:842223 for more.
 
1736
        """
 
1737
        t1 = self.get_transport()
 
1738
        needlessly_escaped_dir = "%2D%2E%30%39%41%5A%5F%61%7A%7E/"
 
1739
        self.build_tree([needlessly_escaped_dir], transport=t1)
 
1740
        t2 = t1.clone(needlessly_escaped_dir)
 
1741
        self.assertEqual(t1.base + "-.09AZ_az~/", t2.base)
 
1742
 
 
1743
    def test_hook_post_connection_one(self):
 
1744
        """Fire post_connect hook after a ConnectedTransport is first used"""
 
1745
        log = []
 
1746
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1747
        t = self.get_transport()
 
1748
        self.assertEqual([], log)
 
1749
        t.has("non-existant")
 
1750
        if isinstance(t, RemoteTransport):
 
1751
            self.assertEqual([t.get_smart_medium()], log)
 
1752
        elif isinstance(t, ConnectedTransport):
 
1753
            self.assertEqual([t], log)
 
1754
        else:
 
1755
            self.assertEqual([], log)
 
1756
 
 
1757
    def test_hook_post_connection_multi(self):
 
1758
        """Fire post_connect hook once per unshared underlying connection"""
 
1759
        log = []
 
1760
        Transport.hooks.install_named_hook("post_connect", log.append, None)
 
1761
        t1 = self.get_transport()
 
1762
        t2 = t1.clone(".")
 
1763
        t3 = self.get_transport()
 
1764
        self.assertEqual([], log)
 
1765
        t1.has("x")
 
1766
        t2.has("x")
 
1767
        t3.has("x")
 
1768
        if isinstance(t1, RemoteTransport):
 
1769
            self.assertEqual([t.get_smart_medium() for t in [t1, t3]], log)
 
1770
        elif isinstance(t1, ConnectedTransport):
 
1771
            self.assertEqual([t1, t3], log)
 
1772
        else:
 
1773
            self.assertEqual([], log)