/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

  • Committer: Robert Collins
  • Date: 2008-08-20 02:07:36 UTC
  • mfrom: (3640 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3682.
  • Revision ID: robertc@robertcollins.net-20080820020736-g2xe4921zzxtymle
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
 
29
import unittest
27
30
 
28
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
29
 
                           LockError, PathError,
30
 
                           TransportNotPossible, ConnectionError,
31
 
                           InvalidURL)
 
31
from bzrlib import (
 
32
    errors,
 
33
    osutils,
 
34
    tests,
 
35
    urlutils,
 
36
    )
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
 
44
                           PathError,
 
45
                           TransportNotPossible,
 
46
                           )
32
47
from bzrlib.osutils import getcwd
33
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
34
 
from bzrlib.transport import memory
35
 
import bzrlib.transport
36
 
import bzrlib.urlutils as urlutils
37
 
 
38
 
 
39
 
def _append(fn, txt):
40
 
    """Append the given text (file-like object) to the supplied filename."""
41
 
    f = open(fn, 'ab')
42
 
    try:
43
 
        f.write(txt.read())
44
 
    finally:
45
 
        f.close()
46
 
 
47
 
 
48
 
class TestTransportImplementation(TestCaseInTempDir):
49
 
    """Implementation verification for transports.
50
 
    
51
 
    To verify a transport we need a server factory, which is a callable
52
 
    that accepts no parameters and returns an implementation of
53
 
    bzrlib.transport.Server.
54
 
    
55
 
    That Server is then used to construct transport instances and test
56
 
    the transport via loopback activity.
57
 
 
58
 
    Currently this assumes that the Transport object is connected to the 
59
 
    current working directory.  So that whatever is done 
60
 
    through the transport, should show up in the working 
61
 
    directory, and vice-versa. This is a bug, because its possible to have
62
 
    URL schemes which provide access to something that may not be 
63
 
    result in storage on the local disk, i.e. due to file system limits, or 
64
 
    due to it being a database or some other non-filesystem tool.
65
 
 
66
 
    This also tests to make sure that the functions work with both
67
 
    generators and lists (assuming iter(list) is effectively a generator)
 
48
from bzrlib.smart import medium
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
 
51
    TestScenarioApplier,
 
52
    TestSkipped,
 
53
    TestNotApplicable,
 
54
    )
 
55
from bzrlib.tests.test_transport import TestTransportImplementation
 
56
from bzrlib.transport import (
 
57
    ConnectedTransport,
 
58
    get_transport,
 
59
    _get_transport_modules,
 
60
    )
 
61
from bzrlib.transport.memory import MemoryTransport
 
62
 
 
63
 
 
64
class TransportTestProviderAdapter(TestScenarioApplier):
 
65
    """A tool to generate a suite testing all transports for a single test.
 
66
 
 
67
    This is done by copying the test once for each transport and injecting
 
68
    the transport_class and transport_server classes into each copy. Each copy
 
69
    is also given a new id() to make it easy to identify.
68
70
    """
69
 
    
 
71
 
 
72
    def __init__(self):
 
73
        self.scenarios = self._test_permutations()
 
74
 
 
75
    def get_transport_test_permutations(self, module):
 
76
        """Get the permutations module wants to have tested."""
 
77
        if getattr(module, 'get_test_permutations', None) is None:
 
78
            raise AssertionError(
 
79
                "transport module %s doesn't provide get_test_permutations()"
 
80
                % module.__name__)
 
81
            return []
 
82
        return module.get_test_permutations()
 
83
 
 
84
    def _test_permutations(self):
 
85
        """Return a list of the klass, server_factory pairs to test."""
 
86
        result = []
 
87
        for module in _get_transport_modules():
 
88
            try:
 
89
                permutations = self.get_transport_test_permutations(
 
90
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
 
91
                for (klass, server_factory) in permutations:
 
92
                    scenario = (server_factory.__name__,
 
93
                        {"transport_class":klass,
 
94
                         "transport_server":server_factory})
 
95
                    result.append(scenario)
 
96
            except errors.DependencyNotPresent, e:
 
97
                # Continue even if a dependency prevents us 
 
98
                # from adding this test
 
99
                pass
 
100
        return result
 
101
 
 
102
 
 
103
def load_tests(standard_tests, module, loader):
 
104
    """Multiply tests for tranport implementations."""
 
105
    result = loader.suiteClass()
 
106
    adapter = TransportTestProviderAdapter()
 
107
    for test in tests.iter_suite_tests(standard_tests):
 
108
        result.addTests(adapter.adapt(test))
 
109
    return result
 
110
 
 
111
 
 
112
class TransportTests(TestTransportImplementation):
 
113
 
70
114
    def setUp(self):
71
 
        super(TestTransportImplementation, self).setUp()
72
 
        self._server = self.transport_server()
73
 
        self._server.setUp()
 
115
        super(TransportTests, self).setUp()
 
116
        self._captureVar('BZR_NO_SMART_VFS', None)
74
117
 
75
 
    def tearDown(self):
76
 
        super(TestTransportImplementation, self).tearDown()
77
 
        self._server.tearDown()
78
 
        
79
118
    def check_transport_contents(self, content, transport, relpath):
80
119
        """Check that transport.get(relpath).read() == content."""
81
120
        self.assertEqualDiff(content, transport.get(relpath).read())
82
121
 
83
 
    def get_transport(self):
84
 
        """Return a connected transport to the local directory."""
85
 
        base_url = self._server.get_url()
86
 
        t = bzrlib.transport.get_transport(base_url)
87
 
        if not isinstance(t, self.transport_class):
88
 
            # we want to make sure to construct one particular class, even if
89
 
            # there are several available implementations of this transport;
90
 
            # therefore construct it by hand rather than through the regular
91
 
            # get_transport method
92
 
            t = self.transport_class(base_url)
93
 
        return t
94
 
 
95
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
96
 
        """Fail unless excClass is raised when the iterator from func is used.
97
 
        
98
 
        Many transport functions can return generators this makes sure
99
 
        to wrap them in a list() call to make sure the whole generator
100
 
        is run, and that the proper exception is raised.
101
 
        """
 
122
    def test_ensure_base_missing(self):
 
123
        """.ensure_base() should create the directory if it doesn't exist"""
 
124
        t = self.get_transport()
 
125
        t_a = t.clone('a')
 
126
        if t_a.is_readonly():
 
127
            self.assertRaises(TransportNotPossible,
 
128
                              t_a.ensure_base)
 
129
            return
 
130
        self.assertTrue(t_a.ensure_base())
 
131
        self.assertTrue(t.has('a'))
 
132
 
 
133
    def test_ensure_base_exists(self):
 
134
        """.ensure_base() should just be happy if it already exists"""
 
135
        t = self.get_transport()
 
136
        if t.is_readonly():
 
137
            return
 
138
 
 
139
        t.mkdir('a')
 
140
        t_a = t.clone('a')
 
141
        # ensure_base returns False if it didn't create the base
 
142
        self.assertFalse(t_a.ensure_base())
 
143
 
 
144
    def test_ensure_base_missing_parent(self):
 
145
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
146
        t = self.get_transport()
 
147
        if t.is_readonly():
 
148
            return
 
149
 
 
150
        t_a = t.clone('a')
 
151
        t_b = t_a.clone('b')
 
152
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
153
 
 
154
    def test_external_url(self):
 
155
        """.external_url either works or raises InProcessTransport."""
 
156
        t = self.get_transport()
102
157
        try:
103
 
            list(func(*args, **kwargs))
104
 
        except excClass:
105
 
            return
106
 
        else:
107
 
            if hasattr(excClass,'__name__'): excName = excClass.__name__
108
 
            else: excName = str(excClass)
109
 
            raise self.failureException, "%s not raised" % excName
 
158
            t.external_url()
 
159
        except errors.InProcessTransport:
 
160
            pass
110
161
 
111
162
    def test_has(self):
112
163
        t = self.get_transport()
125
176
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
126
177
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
127
178
 
 
179
    def test_has_root_works(self):
 
180
        from bzrlib.smart import server
 
181
        if self.transport_server is server.SmartTCPServer_for_testing:
 
182
            raise TestNotApplicable(
 
183
                "SmartTCPServer_for_testing intentionally does not allow "
 
184
                "access to /.")
 
185
        current_transport = self.get_transport()
 
186
        self.assertTrue(current_transport.has('/'))
 
187
        root = current_transport.clone('/')
 
188
        self.assertTrue(root.has(''))
 
189
 
128
190
    def test_get(self):
129
191
        t = self.get_transport()
130
192
 
137
199
        self.build_tree(files, transport=t, line_endings='binary')
138
200
        self.check_transport_contents('contents of a\n', t, 'a')
139
201
        content_f = t.get_multi(files)
140
 
        for content, f in zip(contents, content_f):
 
202
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
203
        # evaluate their inputs, the transport requests should be issued and
 
204
        # handled sequentially (we don't want to force transport to buffer).
 
205
        for content, f in itertools.izip(contents, content_f):
141
206
            self.assertEqual(content, f.read())
142
207
 
143
208
        content_f = t.get_multi(iter(files))
144
 
        for content, f in zip(contents, content_f):
 
209
        # Use itertools.izip() for the same reason
 
210
        for content, f in itertools.izip(contents, content_f):
145
211
            self.assertEqual(content, f.read())
146
212
 
147
213
        self.assertRaises(NoSuchFile, t.get, 'c')
148
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
149
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
150
216
 
151
 
    def test_put(self):
152
 
        t = self.get_transport()
153
 
 
154
 
        if t.is_readonly():
155
 
            self.assertRaises(TransportNotPossible,
156
 
                    t.put, 'a', 'some text for a\n')
157
 
            return
158
 
 
159
 
        t.put('a', StringIO('some text for a\n'))
160
 
        self.failUnless(t.has('a'))
161
 
        self.check_transport_contents('some text for a\n', t, 'a')
162
 
        # Make sure 'has' is updated
163
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
164
 
                [True, False, False, False, False])
165
 
        # Put also replaces contents
166
 
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
167
 
                                      ('d', StringIO('contents\nfor d\n'))]),
168
 
                         2)
169
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
170
 
                [True, False, False, True, False])
171
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
172
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
173
 
 
174
 
        self.assertEqual(
175
 
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
176
 
                              ('d', StringIO('another contents\nfor d\n'))])),
177
 
                        2)
178
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
179
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
180
 
 
181
 
        self.assertRaises(NoSuchFile,
182
 
                          t.put, 'path/doesnt/exist/c', 'contents')
183
 
 
184
 
    def test_put_permissions(self):
185
 
        t = self.get_transport()
186
 
 
187
 
        if t.is_readonly():
188
 
            return
189
 
        t.put('mode644', StringIO('test text\n'), mode=0644)
190
 
        self.assertTransportMode(t, 'mode644', 0644)
191
 
        t.put('mode666', StringIO('test text\n'), mode=0666)
192
 
        self.assertTransportMode(t, 'mode666', 0666)
193
 
        t.put('mode600', StringIO('test text\n'), mode=0600)
 
217
    def test_get_directory_read_gives_ReadError(self):
 
218
        """consistent errors for read() on a file returned by get()."""
 
219
        t = self.get_transport()
 
220
        if t.is_readonly():
 
221
            self.build_tree(['a directory/'])
 
222
        else:
 
223
            t.mkdir('a%20directory')
 
224
        # getting the file must either work or fail with a PathError
 
225
        try:
 
226
            a_file = t.get('a%20directory')
 
227
        except (errors.PathError, errors.RedirectRequested):
 
228
            # early failure return immediately.
 
229
            return
 
230
        # having got a file, read() must either work (i.e. http reading a dir
 
231
        # listing) or fail with ReadError
 
232
        try:
 
233
            a_file.read()
 
234
        except errors.ReadError:
 
235
            pass
 
236
 
 
237
    def test_get_bytes(self):
 
238
        t = self.get_transport()
 
239
 
 
240
        files = ['a', 'b', 'e', 'g']
 
241
        contents = ['contents of a\n',
 
242
                    'contents of b\n',
 
243
                    'contents of e\n',
 
244
                    'contents of g\n',
 
245
                    ]
 
246
        self.build_tree(files, transport=t, line_endings='binary')
 
247
        self.check_transport_contents('contents of a\n', t, 'a')
 
248
 
 
249
        for content, fname in zip(contents, files):
 
250
            self.assertEqual(content, t.get_bytes(fname))
 
251
 
 
252
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
 
253
 
 
254
    def test_get_with_open_write_stream_sees_all_content(self):
 
255
        t = self.get_transport()
 
256
        if t.is_readonly():
 
257
            return
 
258
        handle = t.open_write_stream('foo')
 
259
        try:
 
260
            handle.write('b')
 
261
            self.assertEqual('b', t.get('foo').read())
 
262
        finally:
 
263
            handle.close()
 
264
 
 
265
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
266
        t = self.get_transport()
 
267
        if t.is_readonly():
 
268
            return
 
269
        handle = t.open_write_stream('foo')
 
270
        try:
 
271
            handle.write('b')
 
272
            self.assertEqual('b', t.get_bytes('foo'))
 
273
            self.assertEqual('b', t.get('foo').read())
 
274
        finally:
 
275
            handle.close()
 
276
 
 
277
    def test_put_bytes(self):
 
278
        t = self.get_transport()
 
279
 
 
280
        if t.is_readonly():
 
281
            self.assertRaises(TransportNotPossible,
 
282
                    t.put_bytes, 'a', 'some text for a\n')
 
283
            return
 
284
 
 
285
        t.put_bytes('a', 'some text for a\n')
 
286
        self.failUnless(t.has('a'))
 
287
        self.check_transport_contents('some text for a\n', t, 'a')
 
288
 
 
289
        # The contents should be overwritten
 
290
        t.put_bytes('a', 'new text for a\n')
 
291
        self.check_transport_contents('new text for a\n', t, 'a')
 
292
 
 
293
        self.assertRaises(NoSuchFile,
 
294
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
 
295
 
 
296
    def test_put_bytes_non_atomic(self):
 
297
        t = self.get_transport()
 
298
 
 
299
        if t.is_readonly():
 
300
            self.assertRaises(TransportNotPossible,
 
301
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
 
302
            return
 
303
 
 
304
        self.failIf(t.has('a'))
 
305
        t.put_bytes_non_atomic('a', 'some text for a\n')
 
306
        self.failUnless(t.has('a'))
 
307
        self.check_transport_contents('some text for a\n', t, 'a')
 
308
        # Put also replaces contents
 
309
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
 
310
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
311
 
 
312
        # Make sure we can create another file
 
313
        t.put_bytes_non_atomic('d', 'contents for\nd\n')
 
314
        # And overwrite 'a' with empty contents
 
315
        t.put_bytes_non_atomic('a', '')
 
316
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
317
        self.check_transport_contents('', t, 'a')
 
318
 
 
319
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'no/such/path',
 
320
                                       'contents\n')
 
321
        # Now test the create_parent flag
 
322
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
 
323
                                       'contents\n')
 
324
        self.failIf(t.has('dir/a'))
 
325
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
 
326
                               create_parent_dir=True)
 
327
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
328
        
 
329
        # But we still get NoSuchFile if we can't make the parent dir
 
330
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
 
331
                                       'contents\n',
 
332
                                       create_parent_dir=True)
 
333
 
 
334
    def test_put_bytes_permissions(self):
 
335
        t = self.get_transport()
 
336
 
 
337
        if t.is_readonly():
 
338
            return
 
339
        if not t._can_roundtrip_unix_modebits():
 
340
            # Can't roundtrip, so no need to run this test
 
341
            return
 
342
        t.put_bytes('mode644', 'test text\n', mode=0644)
 
343
        self.assertTransportMode(t, 'mode644', 0644)
 
344
        t.put_bytes('mode666', 'test text\n', mode=0666)
 
345
        self.assertTransportMode(t, 'mode666', 0666)
 
346
        t.put_bytes('mode600', 'test text\n', mode=0600)
 
347
        self.assertTransportMode(t, 'mode600', 0600)
 
348
        # Yes, you can put_bytes a file such that it becomes readonly
 
349
        t.put_bytes('mode400', 'test text\n', mode=0400)
 
350
        self.assertTransportMode(t, 'mode400', 0400)
 
351
 
 
352
        # The default permissions should be based on the current umask
 
353
        umask = osutils.get_umask()
 
354
        t.put_bytes('nomode', 'test text\n', mode=None)
 
355
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
356
        
 
357
    def test_put_bytes_non_atomic_permissions(self):
 
358
        t = self.get_transport()
 
359
 
 
360
        if t.is_readonly():
 
361
            return
 
362
        if not t._can_roundtrip_unix_modebits():
 
363
            # Can't roundtrip, so no need to run this test
 
364
            return
 
365
        t.put_bytes_non_atomic('mode644', 'test text\n', mode=0644)
 
366
        self.assertTransportMode(t, 'mode644', 0644)
 
367
        t.put_bytes_non_atomic('mode666', 'test text\n', mode=0666)
 
368
        self.assertTransportMode(t, 'mode666', 0666)
 
369
        t.put_bytes_non_atomic('mode600', 'test text\n', mode=0600)
 
370
        self.assertTransportMode(t, 'mode600', 0600)
 
371
        t.put_bytes_non_atomic('mode400', 'test text\n', mode=0400)
 
372
        self.assertTransportMode(t, 'mode400', 0400)
 
373
 
 
374
        # The default permissions should be based on the current umask
 
375
        umask = osutils.get_umask()
 
376
        t.put_bytes_non_atomic('nomode', 'test text\n', mode=None)
 
377
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
378
 
 
379
        # We should also be able to set the mode for a parent directory
 
380
        # when it is created
 
381
        t.put_bytes_non_atomic('dir700/mode664', 'test text\n', mode=0664,
 
382
                               dir_mode=0700, create_parent_dir=True)
 
383
        self.assertTransportMode(t, 'dir700', 0700)
 
384
        t.put_bytes_non_atomic('dir770/mode664', 'test text\n', mode=0664,
 
385
                               dir_mode=0770, create_parent_dir=True)
 
386
        self.assertTransportMode(t, 'dir770', 0770)
 
387
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
 
388
                               dir_mode=0777, create_parent_dir=True)
 
389
        self.assertTransportMode(t, 'dir777', 0777)
 
390
        
 
391
    def test_put_file(self):
 
392
        t = self.get_transport()
 
393
 
 
394
        if t.is_readonly():
 
395
            self.assertRaises(TransportNotPossible,
 
396
                    t.put_file, 'a', StringIO('some text for a\n'))
 
397
            return
 
398
 
 
399
        result = t.put_file('a', StringIO('some text for a\n'))
 
400
        # put_file returns the length of the data written
 
401
        self.assertEqual(16, result)
 
402
        self.failUnless(t.has('a'))
 
403
        self.check_transport_contents('some text for a\n', t, 'a')
 
404
        # Put also replaces contents
 
405
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
406
        self.assertEqual(19, result)
 
407
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
408
        self.assertRaises(NoSuchFile,
 
409
                          t.put_file, 'path/doesnt/exist/c',
 
410
                              StringIO('contents'))
 
411
 
 
412
    def test_put_file_non_atomic(self):
 
413
        t = self.get_transport()
 
414
 
 
415
        if t.is_readonly():
 
416
            self.assertRaises(TransportNotPossible,
 
417
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
 
418
            return
 
419
 
 
420
        self.failIf(t.has('a'))
 
421
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
 
422
        self.failUnless(t.has('a'))
 
423
        self.check_transport_contents('some text for a\n', t, 'a')
 
424
        # Put also replaces contents
 
425
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
 
426
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
 
427
 
 
428
        # Make sure we can create another file
 
429
        t.put_file_non_atomic('d', StringIO('contents for\nd\n'))
 
430
        # And overwrite 'a' with empty contents
 
431
        t.put_file_non_atomic('a', StringIO(''))
 
432
        self.check_transport_contents('contents for\nd\n', t, 'd')
 
433
        self.check_transport_contents('', t, 'a')
 
434
 
 
435
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'no/such/path',
 
436
                                       StringIO('contents\n'))
 
437
        # Now test the create_parent flag
 
438
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
 
439
                                       StringIO('contents\n'))
 
440
        self.failIf(t.has('dir/a'))
 
441
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
 
442
                              create_parent_dir=True)
 
443
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
 
444
        
 
445
        # But we still get NoSuchFile if we can't make the parent dir
 
446
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
 
447
                                       StringIO('contents\n'),
 
448
                                       create_parent_dir=True)
 
449
 
 
450
    def test_put_file_permissions(self):
 
451
 
 
452
        t = self.get_transport()
 
453
 
 
454
        if t.is_readonly():
 
455
            return
 
456
        if not t._can_roundtrip_unix_modebits():
 
457
            # Can't roundtrip, so no need to run this test
 
458
            return
 
459
        t.put_file('mode644', StringIO('test text\n'), mode=0644)
 
460
        self.assertTransportMode(t, 'mode644', 0644)
 
461
        t.put_file('mode666', StringIO('test text\n'), mode=0666)
 
462
        self.assertTransportMode(t, 'mode666', 0666)
 
463
        t.put_file('mode600', StringIO('test text\n'), mode=0600)
194
464
        self.assertTransportMode(t, 'mode600', 0600)
195
465
        # Yes, you can put a file such that it becomes readonly
196
 
        t.put('mode400', StringIO('test text\n'), mode=0400)
197
 
        self.assertTransportMode(t, 'mode400', 0400)
198
 
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
199
 
        self.assertTransportMode(t, 'mmode644', 0644)
200
 
        
 
466
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
 
467
        self.assertTransportMode(t, 'mode400', 0400)
 
468
        # The default permissions should be based on the current umask
 
469
        umask = osutils.get_umask()
 
470
        t.put_file('nomode', StringIO('test text\n'), mode=None)
 
471
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
472
        
 
473
    def test_put_file_non_atomic_permissions(self):
 
474
        t = self.get_transport()
 
475
 
 
476
        if t.is_readonly():
 
477
            return
 
478
        if not t._can_roundtrip_unix_modebits():
 
479
            # Can't roundtrip, so no need to run this test
 
480
            return
 
481
        t.put_file_non_atomic('mode644', StringIO('test text\n'), mode=0644)
 
482
        self.assertTransportMode(t, 'mode644', 0644)
 
483
        t.put_file_non_atomic('mode666', StringIO('test text\n'), mode=0666)
 
484
        self.assertTransportMode(t, 'mode666', 0666)
 
485
        t.put_file_non_atomic('mode600', StringIO('test text\n'), mode=0600)
 
486
        self.assertTransportMode(t, 'mode600', 0600)
 
487
        # Yes, you can put_file_non_atomic a file such that it becomes readonly
 
488
        t.put_file_non_atomic('mode400', StringIO('test text\n'), mode=0400)
 
489
        self.assertTransportMode(t, 'mode400', 0400)
 
490
 
 
491
        # The default permissions should be based on the current umask
 
492
        umask = osutils.get_umask()
 
493
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
 
494
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
 
495
        
 
496
        # We should also be able to set the mode for a parent directory
 
497
        # when it is created
 
498
        sio = StringIO()
 
499
        t.put_file_non_atomic('dir700/mode664', sio, mode=0664,
 
500
                              dir_mode=0700, create_parent_dir=True)
 
501
        self.assertTransportMode(t, 'dir700', 0700)
 
502
        t.put_file_non_atomic('dir770/mode664', sio, mode=0664,
 
503
                              dir_mode=0770, create_parent_dir=True)
 
504
        self.assertTransportMode(t, 'dir770', 0770)
 
505
        t.put_file_non_atomic('dir777/mode664', sio, mode=0664,
 
506
                              dir_mode=0777, create_parent_dir=True)
 
507
        self.assertTransportMode(t, 'dir777', 0777)
 
508
 
 
509
    def test_put_bytes_unicode(self):
 
510
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
511
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
512
        # (we don't want to encode unicode here at all, callers should be
 
513
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
514
        # compatibility.  At some point we should use a specific exception.
 
515
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
516
        t = self.get_transport()
 
517
        if t.is_readonly():
 
518
            return
 
519
        unicode_string = u'\u1234'
 
520
        self.assertRaises(
 
521
            (AssertionError, UnicodeEncodeError),
 
522
            t.put_bytes, 'foo', unicode_string)
 
523
 
 
524
    def test_put_file_unicode(self):
 
525
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
526
        # This situation can happen (and has) if code is careless about the type
 
527
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
528
        # cStringIO, because it never returns unicode from read.
 
529
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
530
        # raise, but we raise it for hysterical raisins.
 
531
        t = self.get_transport()
 
532
        if t.is_readonly():
 
533
            return
 
534
        unicode_file = pyStringIO(u'\u1234')
 
535
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
 
536
 
201
537
    def test_mkdir(self):
202
538
        t = self.get_transport()
203
539
 
236
572
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
237
573
 
238
574
        # Test get/put in sub-directories
239
 
        self.assertEqual(2, 
240
 
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
241
 
                         ('dir_b/b', StringIO('contents of dir_b/b'))]))
 
575
        t.put_bytes('dir_a/a', 'contents of dir_a/a')
 
576
        t.put_file('dir_b/b', StringIO('contents of dir_b/b'))
242
577
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
243
578
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
244
579
 
261
596
        self.assertTransportMode(t, 'dmode777', 0777)
262
597
        t.mkdir('dmode700', mode=0700)
263
598
        self.assertTransportMode(t, 'dmode700', 0700)
264
 
        # TODO: jam 20051215 test mkdir_multi with a mode
265
599
        t.mkdir_multi(['mdmode755'], mode=0755)
266
600
        self.assertTransportMode(t, 'mdmode755', 0755)
267
601
 
 
602
        # Default mode should be based on umask
 
603
        umask = osutils.get_umask()
 
604
        t.mkdir('dnomode', mode=None)
 
605
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
 
606
 
 
607
    def test_opening_a_file_stream_creates_file(self):
 
608
        t = self.get_transport()
 
609
        if t.is_readonly():
 
610
            return
 
611
        handle = t.open_write_stream('foo')
 
612
        try:
 
613
            self.assertEqual('', t.get_bytes('foo'))
 
614
        finally:
 
615
            handle.close()
 
616
 
 
617
    def test_opening_a_file_stream_can_set_mode(self):
 
618
        t = self.get_transport()
 
619
        if t.is_readonly():
 
620
            return
 
621
        if not t._can_roundtrip_unix_modebits():
 
622
            # Can't roundtrip, so no need to run this test
 
623
            return
 
624
        def check_mode(name, mode, expected):
 
625
            handle = t.open_write_stream(name, mode=mode)
 
626
            handle.close()
 
627
            self.assertTransportMode(t, name, expected)
 
628
        check_mode('mode644', 0644, 0644)
 
629
        check_mode('mode666', 0666, 0666)
 
630
        check_mode('mode600', 0600, 0600)
 
631
        # The default permissions should be based on the current umask
 
632
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
633
 
268
634
    def test_copy_to(self):
269
635
        # FIXME: test:   same server to same server (partly done)
270
636
        # same protocol two servers
271
637
        # and    different protocols (done for now except for MemoryTransport.
272
638
        # - RBC 20060122
273
 
        from bzrlib.transport.memory import MemoryTransport
274
639
 
275
640
        def simple_copy_files(transport_from, transport_to):
276
641
            files = ['a', 'b', 'c', 'd']
295
660
            self.build_tree(['e/', 'e/f'])
296
661
        else:
297
662
            t.mkdir('e')
298
 
            t.put('e/f', StringIO('contents of e'))
 
663
            t.put_bytes('e/f', 'contents of e')
299
664
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
300
665
        temp_transport.mkdir('e')
301
666
        t.copy_to(['e/f'], temp_transport)
316
681
            for f in files:
317
682
                self.assertTransportMode(temp_transport, f, mode)
318
683
 
319
 
    def test_append(self):
320
 
        t = self.get_transport()
321
 
 
322
 
        if t.is_readonly():
323
 
            open('a', 'wb').write('diff\ncontents for\na\n')
324
 
            open('b', 'wb').write('contents\nfor b\n')
325
 
        else:
326
 
            t.put_multi([
327
 
                    ('a', StringIO('diff\ncontents for\na\n')),
328
 
                    ('b', StringIO('contents\nfor b\n'))
329
 
                    ])
330
 
 
331
 
        if t.is_readonly():
332
 
            self.assertRaises(TransportNotPossible,
333
 
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
334
 
            _append('a', StringIO('add\nsome\nmore\ncontents\n'))
335
 
        else:
336
 
            self.assertEqual(20,
337
 
                t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
338
 
 
339
 
        self.check_transport_contents(
340
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
341
 
            t, 'a')
342
 
 
343
 
        if t.is_readonly():
344
 
            self.assertRaises(TransportNotPossible,
345
 
                    t.append_multi,
346
 
                        [('a', 'and\nthen\nsome\nmore\n'),
347
 
                         ('b', 'some\nmore\nfor\nb\n')])
348
 
            _append('a', StringIO('and\nthen\nsome\nmore\n'))
349
 
            _append('b', StringIO('some\nmore\nfor\nb\n'))
350
 
        else:
351
 
            self.assertEqual((43, 15), 
352
 
                t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
353
 
                                ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
684
    def test_append_file(self):
 
685
        t = self.get_transport()
 
686
 
 
687
        if t.is_readonly():
 
688
            self.assertRaises(TransportNotPossible,
 
689
                    t.append_file, 'a', 'add\nsome\nmore\ncontents\n')
 
690
            return
 
691
        t.put_bytes('a', 'diff\ncontents for\na\n')
 
692
        t.put_bytes('b', 'contents\nfor b\n')
 
693
 
 
694
        self.assertEqual(20,
 
695
            t.append_file('a', StringIO('add\nsome\nmore\ncontents\n')))
 
696
 
 
697
        self.check_transport_contents(
 
698
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
699
            t, 'a')
 
700
 
 
701
        # a file with no parent should fail..
 
702
        self.assertRaises(NoSuchFile,
 
703
                          t.append_file, 'missing/path', StringIO('content'))
 
704
 
 
705
        # And we can create new files, too
 
706
        self.assertEqual(0,
 
707
            t.append_file('c', StringIO('some text\nfor a missing file\n')))
 
708
        self.check_transport_contents('some text\nfor a missing file\n',
 
709
                                      t, 'c')
 
710
 
 
711
    def test_append_bytes(self):
 
712
        t = self.get_transport()
 
713
 
 
714
        if t.is_readonly():
 
715
            self.assertRaises(TransportNotPossible,
 
716
                    t.append_bytes, 'a', 'add\nsome\nmore\ncontents\n')
 
717
            return
 
718
 
 
719
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
 
720
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
 
721
 
 
722
        self.assertEqual(20,
 
723
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
 
724
 
 
725
        self.check_transport_contents(
 
726
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
 
727
            t, 'a')
 
728
 
 
729
        # a file with no parent should fail..
 
730
        self.assertRaises(NoSuchFile,
 
731
                          t.append_bytes, 'missing/path', 'content')
 
732
 
 
733
    def test_append_multi(self):
 
734
        t = self.get_transport()
 
735
 
 
736
        if t.is_readonly():
 
737
            return
 
738
        t.put_bytes('a', 'diff\ncontents for\na\n'
 
739
                         'add\nsome\nmore\ncontents\n')
 
740
        t.put_bytes('b', 'contents\nfor b\n')
 
741
 
 
742
        self.assertEqual((43, 15),
 
743
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
 
744
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
 
745
 
354
746
        self.check_transport_contents(
355
747
            'diff\ncontents for\na\n'
356
748
            'add\nsome\nmore\ncontents\n'
361
753
                'some\nmore\nfor\nb\n',
362
754
                t, 'b')
363
755
 
364
 
        if t.is_readonly():
365
 
            _append('a', StringIO('a little bit more\n'))
366
 
            _append('b', StringIO('from an iterator\n'))
367
 
        else:
368
 
            self.assertEqual((62, 31),
369
 
                t.append_multi(iter([('a', StringIO('a little bit more\n')),
370
 
                                     ('b', StringIO('from an iterator\n'))])))
 
756
        self.assertEqual((62, 31),
 
757
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
 
758
                                 ('b', StringIO('from an iterator\n'))])))
371
759
        self.check_transport_contents(
372
760
            'diff\ncontents for\na\n'
373
761
            'add\nsome\nmore\ncontents\n'
380
768
                'from an iterator\n',
381
769
                t, 'b')
382
770
 
383
 
        if t.is_readonly():
384
 
            _append('c', StringIO('some text\nfor a missing file\n'))
385
 
            _append('a', StringIO('some text in a\n'))
386
 
            _append('d', StringIO('missing file r\n'))
387
 
        else:
388
 
            self.assertEqual(0,
389
 
                t.append('c', StringIO('some text\nfor a missing file\n')))
390
 
            self.assertEqual((80, 0),
391
 
                t.append_multi([('a', StringIO('some text in a\n')),
392
 
                                ('d', StringIO('missing file r\n'))]))
 
771
        self.assertEqual((80, 0),
 
772
            t.append_multi([('a', StringIO('some text in a\n')),
 
773
                            ('d', StringIO('missing file r\n'))]))
 
774
 
393
775
        self.check_transport_contents(
394
776
            'diff\ncontents for\na\n'
395
777
            'add\nsome\nmore\ncontents\n'
397
779
            'a little bit more\n'
398
780
            'some text in a\n',
399
781
            t, 'a')
400
 
        self.check_transport_contents('some text\nfor a missing file\n',
401
 
                                      t, 'c')
402
782
        self.check_transport_contents('missing file r\n', t, 'd')
403
 
        
404
 
        # a file with no parent should fail..
405
 
        if not t.is_readonly():
406
 
            self.assertRaises(NoSuchFile,
407
 
                              t.append, 'missing/path', 
408
 
                              StringIO('content'))
409
 
 
410
 
    def test_append_file(self):
411
 
        t = self.get_transport()
412
 
 
413
 
        contents = [
414
 
            ('f1', StringIO('this is a string\nand some more stuff\n')),
415
 
            ('f2', StringIO('here is some text\nand a bit more\n')),
416
 
            ('f3', StringIO('some text for the\nthird file created\n')),
417
 
            ('f4', StringIO('this is a string\nand some more stuff\n')),
418
 
            ('f5', StringIO('here is some text\nand a bit more\n')),
419
 
            ('f6', StringIO('some text for the\nthird file created\n'))
420
 
        ]
421
 
        
422
 
        if t.is_readonly():
423
 
            for f, val in contents:
424
 
                open(f, 'wb').write(val.read())
425
 
        else:
426
 
            t.put_multi(contents)
427
 
 
428
 
        a1 = StringIO('appending to\none\n')
429
 
        if t.is_readonly():
430
 
            _append('f1', a1)
431
 
        else:
432
 
            t.append('f1', a1)
433
 
 
434
 
        del a1
435
 
 
436
 
        self.check_transport_contents(
437
 
                'this is a string\nand some more stuff\n'
438
 
                'appending to\none\n',
439
 
                t, 'f1')
440
 
 
441
 
        a2 = StringIO('adding more\ntext to two\n')
442
 
        a3 = StringIO('some garbage\nto put in three\n')
443
 
 
444
 
        if t.is_readonly():
445
 
            _append('f2', a2)
446
 
            _append('f3', a3)
447
 
        else:
448
 
            t.append_multi([('f2', a2), ('f3', a3)])
449
 
 
450
 
        del a2, a3
451
 
 
452
 
        self.check_transport_contents(
453
 
                'here is some text\nand a bit more\n'
454
 
                'adding more\ntext to two\n',
455
 
                t, 'f2')
456
 
        self.check_transport_contents( 
457
 
                'some text for the\nthird file created\n'
458
 
                'some garbage\nto put in three\n',
459
 
                t, 'f3')
460
 
 
461
 
        # Test that an actual file object can be used with put
462
 
        a4 = t.get('f1')
463
 
        if t.is_readonly():
464
 
            _append('f4', a4)
465
 
        else:
466
 
            t.append('f4', a4)
467
 
 
468
 
        del a4
469
 
 
470
 
        self.check_transport_contents(
471
 
                'this is a string\nand some more stuff\n'
472
 
                'this is a string\nand some more stuff\n'
473
 
                'appending to\none\n',
474
 
                t, 'f4')
475
 
 
476
 
        a5 = t.get('f2')
477
 
        a6 = t.get('f3')
478
 
        if t.is_readonly():
479
 
            _append('f5', a5)
480
 
            _append('f6', a6)
481
 
        else:
482
 
            t.append_multi([('f5', a5), ('f6', a6)])
483
 
 
484
 
        del a5, a6
485
 
 
486
 
        self.check_transport_contents(
487
 
                'here is some text\nand a bit more\n'
488
 
                'here is some text\nand a bit more\n'
489
 
                'adding more\ntext to two\n',
490
 
                t, 'f5')
491
 
        self.check_transport_contents(
492
 
                'some text for the\nthird file created\n'
493
 
                'some text for the\nthird file created\n'
494
 
                'some garbage\nto put in three\n',
495
 
                t, 'f6')
496
 
 
497
 
        a5 = t.get('f2')
498
 
        a6 = t.get('f2')
499
 
        a7 = t.get('f3')
500
 
        if t.is_readonly():
501
 
            _append('c', a5)
502
 
            _append('a', a6)
503
 
            _append('d', a7)
504
 
        else:
505
 
            t.append('c', a5)
506
 
            t.append_multi([('a', a6), ('d', a7)])
507
 
        del a5, a6, a7
508
 
        self.check_transport_contents(t.get('f2').read(), t, 'c')
509
 
        self.check_transport_contents(t.get('f3').read(), t, 'd')
510
 
 
511
 
    def test_append_mode(self):
 
783
 
 
784
    def test_append_file_mode(self):
 
785
        """Check that append accepts a mode parameter"""
512
786
        # check append accepts a mode
513
787
        t = self.get_transport()
514
788
        if t.is_readonly():
515
 
            return
516
 
        t.append('f', StringIO('f'), mode=None)
 
789
            self.assertRaises(TransportNotPossible,
 
790
                t.append_file, 'f', StringIO('f'), mode=None)
 
791
            return
 
792
        t.append_file('f', StringIO('f'), mode=None)
 
793
        
 
794
    def test_append_bytes_mode(self):
 
795
        # check append_bytes accepts a mode
 
796
        t = self.get_transport()
 
797
        if t.is_readonly():
 
798
            self.assertRaises(TransportNotPossible,
 
799
                t.append_bytes, 'f', 'f', mode=None)
 
800
            return
 
801
        t.append_bytes('f', 'f', mode=None)
517
802
        
518
803
    def test_delete(self):
519
804
        # TODO: Test Transport.delete
524
809
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
525
810
            return
526
811
 
527
 
        t.put('a', StringIO('a little bit of text\n'))
 
812
        t.put_bytes('a', 'a little bit of text\n')
528
813
        self.failUnless(t.has('a'))
529
814
        t.delete('a')
530
815
        self.failIf(t.has('a'))
531
816
 
532
817
        self.assertRaises(NoSuchFile, t.delete, 'a')
533
818
 
534
 
        t.put('a', StringIO('a text\n'))
535
 
        t.put('b', StringIO('b text\n'))
536
 
        t.put('c', StringIO('c text\n'))
 
819
        t.put_bytes('a', 'a text\n')
 
820
        t.put_bytes('b', 'b text\n')
 
821
        t.put_bytes('c', 'c text\n')
537
822
        self.assertEqual([True, True, True],
538
823
                list(t.has_multi(['a', 'b', 'c'])))
539
824
        t.delete_multi(['a', 'c'])
549
834
        self.assertRaises(NoSuchFile,
550
835
                t.delete_multi, iter(['a', 'b', 'c']))
551
836
 
552
 
        t.put('a', StringIO('another a text\n'))
553
 
        t.put('c', StringIO('another c text\n'))
 
837
        t.put_bytes('a', 'another a text\n')
 
838
        t.put_bytes('c', 'another c text\n')
554
839
        t.delete_multi(iter(['a', 'b', 'c']))
555
840
 
556
841
        # We should have deleted everything
559
844
        # plain "listdir".
560
845
        # self.assertEqual([], os.listdir('.'))
561
846
 
 
847
    def test_recommended_page_size(self):
 
848
        """Transports recommend a page size for partial access to files."""
 
849
        t = self.get_transport()
 
850
        self.assertIsInstance(t.recommended_page_size(), int)
 
851
 
562
852
    def test_rmdir(self):
563
853
        t = self.get_transport()
564
854
        # Not much to do with a readonly transport
568
858
        t.mkdir('adir')
569
859
        t.mkdir('adir/bdir')
570
860
        t.rmdir('adir/bdir')
571
 
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
 
861
        # ftp may not be able to raise NoSuchFile for lack of
 
862
        # details when failing
 
863
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir/bdir')
572
864
        t.rmdir('adir')
573
 
        self.assertRaises(NoSuchFile, t.stat, 'adir')
 
865
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'adir')
574
866
 
575
867
    def test_rmdir_not_empty(self):
576
868
        """Deleting a non-empty directory raises an exception
585
877
        t.mkdir('adir/bdir')
586
878
        self.assertRaises(PathError, t.rmdir, 'adir')
587
879
 
 
880
    def test_rmdir_empty_but_similar_prefix(self):
 
881
        """rmdir does not get confused by sibling paths.
 
882
        
 
883
        A naive implementation of MemoryTransport would refuse to rmdir
 
884
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
885
        uses "path.startswith(dir)" on all file paths to determine if directory
 
886
        is empty.
 
887
        """
 
888
        t = self.get_transport()
 
889
        if t.is_readonly():
 
890
            return
 
891
        t.mkdir('foo')
 
892
        t.put_bytes('foo-bar', '')
 
893
        t.mkdir('foo-baz')
 
894
        t.rmdir('foo')
 
895
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
896
        self.failUnless(t.has('foo-bar'))
 
897
 
588
898
    def test_rename_dir_succeeds(self):
589
899
        t = self.get_transport()
590
900
        if t.is_readonly():
604
914
        t.mkdir('adir/asubdir')
605
915
        t.mkdir('bdir')
606
916
        t.mkdir('bdir/bsubdir')
 
917
        # any kind of PathError would be OK, though we normally expect
 
918
        # DirectoryNotEmpty
607
919
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
608
920
        # nothing was changed so it should still be as before
609
921
        self.assertTrue(t.has('bdir/bsubdir'))
610
922
        self.assertFalse(t.has('adir/bdir'))
611
923
        self.assertFalse(t.has('adir/bsubdir'))
612
924
 
 
925
    def test_rename_across_subdirs(self):
 
926
        t = self.get_transport()
 
927
        if t.is_readonly():
 
928
            raise TestNotApplicable("transport is readonly")
 
929
        t.mkdir('a')
 
930
        t.mkdir('b')
 
931
        ta = t.clone('a')
 
932
        tb = t.clone('b')
 
933
        ta.put_bytes('f', 'aoeu')
 
934
        ta.rename('f', '../b/f')
 
935
        self.assertTrue(tb.has('f'))
 
936
        self.assertFalse(ta.has('f'))
 
937
        self.assertTrue(t.has('b/f'))
 
938
 
613
939
    def test_delete_tree(self):
614
940
        t = self.get_transport()
615
941
 
625
951
        except TransportNotPossible:
626
952
            # ok, this transport does not support delete_tree
627
953
            return
628
 
        
 
954
 
629
955
        # did it delete that trivial case?
630
956
        self.assertRaises(NoSuchFile, t.stat, 'adir')
631
957
 
632
958
        self.build_tree(['adir/',
633
 
                         'adir/file', 
634
 
                         'adir/subdir/', 
635
 
                         'adir/subdir/file', 
 
959
                         'adir/file',
 
960
                         'adir/subdir/',
 
961
                         'adir/subdir/file',
636
962
                         'adir/subdir2/',
637
963
                         'adir/subdir2/file',
638
964
                         ], transport=t)
652
978
        # creates control files in the working directory
653
979
        # perhaps all of this could be done in a subdirectory
654
980
 
655
 
        t.put('a', StringIO('a first file\n'))
 
981
        t.put_bytes('a', 'a first file\n')
656
982
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
657
983
 
658
984
        t.move('a', 'b')
663
989
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
664
990
 
665
991
        # Overwrite a file
666
 
        t.put('c', StringIO('c this file\n'))
 
992
        t.put_bytes('c', 'c this file\n')
667
993
        t.move('c', 'b')
668
994
        self.failIf(t.has('c'))
669
995
        self.check_transport_contents('c this file\n', t, 'b')
670
996
 
671
997
        # TODO: Try to write a test for atomicity
672
 
        # TODO: Test moving into a non-existant subdirectory
 
998
        # TODO: Test moving into a non-existent subdirectory
673
999
        # TODO: Test Transport.move_multi
674
1000
 
675
1001
    def test_copy(self):
678
1004
        if t.is_readonly():
679
1005
            return
680
1006
 
681
 
        t.put('a', StringIO('a file\n'))
 
1007
        t.put_bytes('a', 'a file\n')
682
1008
        t.copy('a', 'b')
683
1009
        self.check_transport_contents('a file\n', t, 'b')
684
1010
 
687
1013
        # What should the assert be if you try to copy a
688
1014
        # file over a directory?
689
1015
        #self.assertRaises(Something, t.copy, 'a', 'c')
690
 
        t.put('d', StringIO('text in d\n'))
 
1016
        t.put_bytes('d', 'text in d\n')
691
1017
        t.copy('d', 'b')
692
1018
        self.check_transport_contents('text in d\n', t, 'b')
693
1019
 
694
1020
        # TODO: test copy_multi
695
1021
 
696
1022
    def test_connection_error(self):
697
 
        """ConnectionError is raised when connection is impossible"""
 
1023
        """ConnectionError is raised when connection is impossible.
 
1024
        
 
1025
        The error should be raised from the first operation on the transport.
 
1026
        """
698
1027
        try:
699
1028
            url = self._server.get_bogus_url()
700
1029
        except NotImplementedError:
701
1030
            raise TestSkipped("Transport %s has no bogus URL support." %
702
1031
                              self._server.__class__)
703
 
        t = bzrlib.transport.get_transport(url)
704
 
        try:
705
 
            t.get('.bzr/branch')
706
 
        except (ConnectionError, NoSuchFile), e:
707
 
            pass
708
 
        except (Exception), e:
709
 
            self.fail('Wrong exception thrown (%s): %s' 
710
 
                        % (e.__class__.__name__, e))
711
 
        else:
712
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
1032
        t = get_transport(url)
 
1033
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
713
1034
 
714
1035
    def test_stat(self):
715
1036
        # TODO: Test stat, just try once, and if it throws, stop testing
724
1045
            return
725
1046
 
726
1047
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
727
 
        sizes = [14, 0, 16, 0, 18] 
 
1048
        sizes = [14, 0, 16, 0, 18]
728
1049
        self.build_tree(paths, transport=t, line_endings='binary')
729
1050
 
730
1051
        for path, size in zip(paths, sizes):
752
1073
    def test_list_dir(self):
753
1074
        # TODO: Test list_dir, just try once, and if it throws, stop testing
754
1075
        t = self.get_transport()
755
 
        
 
1076
 
756
1077
        if not t.listable():
757
1078
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
758
1079
            return
759
1080
 
760
 
        def sorted_list(d):
761
 
            l = list(t.list_dir(d))
 
1081
        def sorted_list(d, transport):
 
1082
            l = list(transport.list_dir(d))
762
1083
            l.sort()
763
1084
            return l
764
1085
 
765
 
        # SftpServer creates control files in the working directory
766
 
        # so lets move down a directory to avoid those.
767
 
        if not t.is_readonly():
768
 
            t.mkdir('wd')
769
 
        else:
770
 
            os.mkdir('wd')
771
 
        t = t.clone('wd')
772
 
 
773
 
        self.assertEqual([], sorted_list(u'.'))
 
1086
        self.assertEqual([], sorted_list('.', t))
774
1087
        # c2 is precisely one letter longer than c here to test that
775
1088
        # suffixing is not confused.
 
1089
        # a%25b checks that quoting is done consistently across transports
 
1090
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1091
 
776
1092
        if not t.is_readonly():
777
 
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
 
1093
            self.build_tree(tree_names, transport=t)
778
1094
        else:
779
 
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
780
 
 
781
 
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
782
 
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
1095
            self.build_tree(tree_names)
 
1096
 
 
1097
        self.assertEqual(
 
1098
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1099
        self.assertEqual(
 
1100
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1101
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1102
 
 
1103
        # Cloning the transport produces an equivalent listing
 
1104
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
783
1105
 
784
1106
        if not t.is_readonly():
785
1107
            t.delete('c/d')
786
1108
            t.delete('b')
787
1109
        else:
788
 
            os.unlink('wd/c/d')
789
 
            os.unlink('wd/b')
790
 
            
791
 
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
792
 
        self.assertEqual([u'e'], sorted_list(u'c'))
 
1110
            os.unlink('c/d')
 
1111
            os.unlink('b')
 
1112
 
 
1113
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1114
        self.assertEqual(['e'], sorted_list('c', t))
793
1115
 
794
1116
        self.assertListRaises(PathError, t.list_dir, 'q')
795
1117
        self.assertListRaises(PathError, t.list_dir, 'c/f')
796
1118
        self.assertListRaises(PathError, t.list_dir, 'a')
797
1119
 
 
1120
    def test_list_dir_result_is_url_escaped(self):
 
1121
        t = self.get_transport()
 
1122
        if not t.listable():
 
1123
            raise TestSkipped("transport not listable")
 
1124
 
 
1125
        if not t.is_readonly():
 
1126
            self.build_tree(['a/', 'a/%'], transport=t)
 
1127
        else:
 
1128
            self.build_tree(['a/', 'a/%'])
 
1129
 
 
1130
        names = list(t.list_dir('a'))
 
1131
        self.assertEqual(['%25'], names)
 
1132
        self.assertIsInstance(names[0], str)
 
1133
 
 
1134
    def test_clone_preserve_info(self):
 
1135
        t1 = self.get_transport()
 
1136
        if not isinstance(t1, ConnectedTransport):
 
1137
            raise TestSkipped("not a connected transport")
 
1138
 
 
1139
        t2 = t1.clone('subdir')
 
1140
        self.assertEquals(t1._scheme, t2._scheme)
 
1141
        self.assertEquals(t1._user, t2._user)
 
1142
        self.assertEquals(t1._password, t2._password)
 
1143
        self.assertEquals(t1._host, t2._host)
 
1144
        self.assertEquals(t1._port, t2._port)
 
1145
 
 
1146
    def test__reuse_for(self):
 
1147
        t = self.get_transport()
 
1148
        if not isinstance(t, ConnectedTransport):
 
1149
            raise TestSkipped("not a connected transport")
 
1150
 
 
1151
        def new_url(scheme=None, user=None, password=None,
 
1152
                    host=None, port=None, path=None):
 
1153
            """Build a new url from t.base changing only parts of it.
 
1154
 
 
1155
            Only the parameters different from None will be changed.
 
1156
            """
 
1157
            if scheme   is None: scheme   = t._scheme
 
1158
            if user     is None: user     = t._user
 
1159
            if password is None: password = t._password
 
1160
            if user     is None: user     = t._user
 
1161
            if host     is None: host     = t._host
 
1162
            if port     is None: port     = t._port
 
1163
            if path     is None: path     = t._path
 
1164
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1165
 
 
1166
        if t._scheme == 'ftp':
 
1167
            scheme = 'sftp'
 
1168
        else:
 
1169
            scheme = 'ftp'
 
1170
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1171
        if t._user == 'me':
 
1172
            user = 'you'
 
1173
        else:
 
1174
            user = 'me'
 
1175
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1176
        # passwords are not taken into account because:
 
1177
        # - it makes no sense to have two different valid passwords for the
 
1178
        #   same user
 
1179
        # - _password in ConnectedTransport is intended to collect what the
 
1180
        #   user specified from the command-line and there are cases where the
 
1181
        #   new url can contain no password (if the url was built from an
 
1182
        #   existing transport.base for example)
 
1183
        # - password are considered part of the credentials provided at
 
1184
        #   connection creation time and as such may not be present in the url
 
1185
        #   (they may be typed by the user when prompted for example)
 
1186
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1187
        # We will not connect, we can use a invalid host
 
1188
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1189
        if t._port == 1234:
 
1190
            port = 4321
 
1191
        else:
 
1192
            port = 1234
 
1193
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1194
        # No point in trying to reuse a transport for a local URL
 
1195
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1196
 
 
1197
    def test_connection_sharing(self):
 
1198
        t = self.get_transport()
 
1199
        if not isinstance(t, ConnectedTransport):
 
1200
            raise TestSkipped("not a connected transport")
 
1201
 
 
1202
        c = t.clone('subdir')
 
1203
        # Some transports will create the connection  only when needed
 
1204
        t.has('surely_not') # Force connection
 
1205
        self.assertIs(t._get_connection(), c._get_connection())
 
1206
 
 
1207
        # Temporary failure, we need to create a new dummy connection
 
1208
        new_connection = object()
 
1209
        t._set_connection(new_connection)
 
1210
        # Check that both transports use the same connection
 
1211
        self.assertIs(new_connection, t._get_connection())
 
1212
        self.assertIs(new_connection, c._get_connection())
 
1213
 
 
1214
    def test_reuse_connection_for_various_paths(self):
 
1215
        t = self.get_transport()
 
1216
        if not isinstance(t, ConnectedTransport):
 
1217
            raise TestSkipped("not a connected transport")
 
1218
 
 
1219
        t.has('surely_not') # Force connection
 
1220
        self.assertIsNot(None, t._get_connection())
 
1221
 
 
1222
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1223
        self.assertIsNot(t, subdir)
 
1224
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1225
 
 
1226
        home = subdir._reuse_for(t.base + 'home')
 
1227
        self.assertIs(t._get_connection(), home._get_connection())
 
1228
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1229
 
798
1230
    def test_clone(self):
799
1231
        # TODO: Test that clone moves up and down the filesystem
800
1232
        t1 = self.get_transport()
822
1254
        if t1.is_readonly():
823
1255
            open('b/d', 'wb').write('newfile\n')
824
1256
        else:
825
 
            t2.put('d', StringIO('newfile\n'))
 
1257
            t2.put_bytes('d', 'newfile\n')
826
1258
 
827
1259
        self.failUnless(t1.has('b/d'))
828
1260
        self.failUnless(t2.has('d'))
829
1261
        self.failUnless(t3.has('b/d'))
830
1262
 
 
1263
    def test_clone_to_root(self):
 
1264
        orig_transport = self.get_transport()
 
1265
        # Repeatedly go up to a parent directory until we're at the root
 
1266
        # directory of this transport
 
1267
        root_transport = orig_transport
 
1268
        new_transport = root_transport.clone("..")
 
1269
        # as we are walking up directories, the path must be
 
1270
        # growing less, except at the top
 
1271
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1272
            or new_transport.base == root_transport.base)
 
1273
        while new_transport.base != root_transport.base:
 
1274
            root_transport = new_transport
 
1275
            new_transport = root_transport.clone("..")
 
1276
            # as we are walking up directories, the path must be
 
1277
            # growing less, except at the top
 
1278
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1279
                or new_transport.base == root_transport.base)
 
1280
 
 
1281
        # Cloning to "/" should take us to exactly the same location.
 
1282
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1283
        # the abspath of "/" from the original transport should be the same
 
1284
        # as the base at the root:
 
1285
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1286
 
 
1287
        # At the root, the URL must still end with / as its a directory
 
1288
        self.assertEqual(root_transport.base[-1], '/')
 
1289
 
 
1290
    def test_clone_from_root(self):
 
1291
        """At the root, cloning to a simple dir should just do string append."""
 
1292
        orig_transport = self.get_transport()
 
1293
        root_transport = orig_transport.clone('/')
 
1294
        self.assertEqual(root_transport.base + '.bzr/',
 
1295
            root_transport.clone('.bzr').base)
 
1296
 
 
1297
    def test_base_url(self):
 
1298
        t = self.get_transport()
 
1299
        self.assertEqual('/', t.base[-1])
 
1300
 
831
1301
    def test_relpath(self):
832
1302
        t = self.get_transport()
833
1303
        self.assertEqual('', t.relpath(t.base))
834
1304
        # base ends with /
835
1305
        self.assertEqual('', t.relpath(t.base[:-1]))
836
 
        # subdirs which dont exist should still give relpaths.
 
1306
        # subdirs which don't exist should still give relpaths.
837
1307
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
838
1308
        # trailing slash should be the same.
839
1309
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
855
1325
        # that have aliasing problems like symlinks should go in backend
856
1326
        # specific test cases.
857
1327
        transport = self.get_transport()
858
 
        
859
 
        # disabled because some transports might normalize urls in generating
860
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
 
1328
 
861
1329
        self.assertEqual(transport.base + 'relpath',
862
1330
                         transport.abspath('relpath'))
863
1331
 
 
1332
        # This should work without raising an error.
 
1333
        transport.abspath("/")
 
1334
 
 
1335
        # the abspath of "/" and "/foo/.." should result in the same location
 
1336
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1337
 
 
1338
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1339
                         transport.abspath("/foo"))
 
1340
 
864
1341
    def test_local_abspath(self):
865
1342
        transport = self.get_transport()
866
1343
        try:
867
1344
            p = transport.local_abspath('.')
868
 
        except TransportNotPossible:
869
 
            pass # This is not a local transport
 
1345
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1346
            # should be formattable
 
1347
            s = str(e)
870
1348
        else:
871
1349
            self.assertEqual(getcwd(), p)
872
1350
 
897
1375
                         'isolated/dir/',
898
1376
                         'isolated/dir/foo',
899
1377
                         'isolated/dir/bar',
 
1378
                         'isolated/dir/b%25z', # make sure quoting is correct
900
1379
                         'isolated/bar'],
901
1380
                        transport=transport)
902
1381
        paths = set(transport.iter_files_recursive())
904
1383
        self.assertEqual(paths,
905
1384
                    set(['isolated/dir/foo',
906
1385
                         'isolated/dir/bar',
 
1386
                         'isolated/dir/b%2525z',
907
1387
                         'isolated/bar']))
908
1388
        sub_transport = transport.clone('isolated')
909
1389
        paths = set(sub_transport.iter_files_recursive())
910
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
1390
        self.assertEqual(paths,
 
1391
            set(['dir/foo', 'dir/bar', 'dir/b%2525z', 'bar']))
 
1392
 
 
1393
    def test_copy_tree(self):
 
1394
        # TODO: test file contents and permissions are preserved. This test was
 
1395
        # added just to ensure that quoting was handled correctly.
 
1396
        # -- David Allouche 2006-08-11
 
1397
        transport = self.get_transport()
 
1398
        if not transport.listable():
 
1399
            self.assertRaises(TransportNotPossible,
 
1400
                              transport.iter_files_recursive)
 
1401
            return
 
1402
        if transport.is_readonly():
 
1403
            return
 
1404
        self.build_tree(['from/',
 
1405
                         'from/dir/',
 
1406
                         'from/dir/foo',
 
1407
                         'from/dir/bar',
 
1408
                         'from/dir/b%25z', # make sure quoting is correct
 
1409
                         'from/bar'],
 
1410
                        transport=transport)
 
1411
        transport.copy_tree('from', 'to')
 
1412
        paths = set(transport.iter_files_recursive())
 
1413
        self.assertEqual(paths,
 
1414
                    set(['from/dir/foo',
 
1415
                         'from/dir/bar',
 
1416
                         'from/dir/b%2525z',
 
1417
                         'from/bar',
 
1418
                         'to/dir/foo',
 
1419
                         'to/dir/bar',
 
1420
                         'to/dir/b%2525z',
 
1421
                         'to/bar',]))
911
1422
 
912
1423
    def test_unicode_paths(self):
913
1424
        """Test that we can read/write files with Unicode names."""
914
1425
        t = self.get_transport()
915
1426
 
916
 
        files = [u'\xe5', # a w/ circle iso-8859-1
917
 
                 u'\xe4', # a w/ dots iso-8859-1
 
1427
        # With FAT32 and certain encodings on win32
 
1428
        # '\xe5' and '\xe4' actually map to the same file
 
1429
        # adding a suffix kicks in the 'preserving but insensitive'
 
1430
        # route, and maintains the right files
 
1431
        files = [u'\xe5.1', # a w/ circle iso-8859-1
 
1432
                 u'\xe4.2', # a w/ dots iso-8859-1
918
1433
                 u'\u017d', # Z with umlat iso-8859-2
919
1434
                 u'\u062c', # Arabic j
920
1435
                 u'\u0410', # Russian A
922
1437
                ]
923
1438
 
924
1439
        try:
925
 
            self.build_tree(files, transport=t)
 
1440
            self.build_tree(files, transport=t, line_endings='binary')
926
1441
        except UnicodeError:
927
1442
            raise TestSkipped("cannot handle unicode paths in current encoding")
928
1443
 
936
1451
            self.check_transport_contents(contents, t, urlutils.escape(fname))
937
1452
 
938
1453
    def test_connect_twice_is_same_content(self):
939
 
        # check that our server (whatever it is) is accessable reliably
 
1454
        # check that our server (whatever it is) is accessible reliably
940
1455
        # via get_transport and multiple connections share content.
941
1456
        transport = self.get_transport()
942
1457
        if transport.is_readonly():
943
1458
            return
944
 
        transport.put('foo', StringIO('bar'))
945
 
        transport2 = self.get_transport()
946
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1459
        transport.put_bytes('foo', 'bar')
 
1460
        transport3 = self.get_transport()
 
1461
        self.check_transport_contents('bar', transport3, 'foo')
947
1462
        # its base should be usable.
948
 
        transport2 = bzrlib.transport.get_transport(transport.base)
949
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1463
        transport4 = get_transport(transport.base)
 
1464
        self.check_transport_contents('bar', transport4, 'foo')
950
1465
 
951
1466
        # now opening at a relative url should give use a sane result:
952
1467
        transport.mkdir('newdir')
953
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
954
 
        transport2 = transport2.clone('..')
955
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1468
        transport5 = get_transport(transport.base + "newdir")
 
1469
        transport6 = transport5.clone('..')
 
1470
        self.check_transport_contents('bar', transport6, 'foo')
956
1471
 
957
1472
    def test_lock_write(self):
 
1473
        """Test transport-level write locks.
 
1474
 
 
1475
        These are deprecated and transports may decline to support them.
 
1476
        """
958
1477
        transport = self.get_transport()
959
1478
        if transport.is_readonly():
960
1479
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
961
1480
            return
962
 
        transport.put('lock', StringIO())
963
 
        lock = transport.lock_write('lock')
 
1481
        transport.put_bytes('lock', '')
 
1482
        try:
 
1483
            lock = transport.lock_write('lock')
 
1484
        except TransportNotPossible:
 
1485
            return
964
1486
        # TODO make this consistent on all platforms:
965
1487
        # self.assertRaises(LockError, transport.lock_write, 'lock')
966
1488
        lock.unlock()
967
1489
 
968
1490
    def test_lock_read(self):
 
1491
        """Test transport-level read locks.
 
1492
 
 
1493
        These are deprecated and transports may decline to support them.
 
1494
        """
969
1495
        transport = self.get_transport()
970
1496
        if transport.is_readonly():
971
1497
            file('lock', 'w').close()
972
1498
        else:
973
 
            transport.put('lock', StringIO())
974
 
        lock = transport.lock_read('lock')
 
1499
            transport.put_bytes('lock', '')
 
1500
        try:
 
1501
            lock = transport.lock_read('lock')
 
1502
        except TransportNotPossible:
 
1503
            return
975
1504
        # TODO make this consistent on all platforms:
976
1505
        # self.assertRaises(LockError, transport.lock_read, 'lock')
977
1506
        lock.unlock()
981
1510
        if transport.is_readonly():
982
1511
            file('a', 'w').write('0123456789')
983
1512
        else:
984
 
            transport.put('a', StringIO('01234567890'))
 
1513
            transport.put_bytes('a', '0123456789')
 
1514
 
 
1515
        d = list(transport.readv('a', ((0, 1),)))
 
1516
        self.assertEqual(d[0], (0, '0'))
985
1517
 
986
1518
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
987
1519
        self.assertEqual(d[0], (0, '0'))
988
1520
        self.assertEqual(d[1], (1, '1'))
989
1521
        self.assertEqual(d[2], (3, '34'))
990
1522
        self.assertEqual(d[3], (9, '9'))
 
1523
 
 
1524
    def test_readv_out_of_order(self):
 
1525
        transport = self.get_transport()
 
1526
        if transport.is_readonly():
 
1527
            file('a', 'w').write('0123456789')
 
1528
        else:
 
1529
            transport.put_bytes('a', '01234567890')
 
1530
 
 
1531
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
 
1532
        self.assertEqual(d[0], (1, '1'))
 
1533
        self.assertEqual(d[1], (9, '9'))
 
1534
        self.assertEqual(d[2], (0, '0'))
 
1535
        self.assertEqual(d[3], (3, '34'))
 
1536
 
 
1537
    def test_readv_with_adjust_for_latency(self):
 
1538
        transport = self.get_transport()
 
1539
        # the adjust for latency flag expands the data region returned
 
1540
        # according to a per-transport heuristic, so testing is a little
 
1541
        # tricky as we need more data than the largest combining that our
 
1542
        # transports do. To accomodate this we generate random data and cross
 
1543
        # reference the returned data with the random data. To avoid doing
 
1544
        # multiple large random byte look ups we do several tests on the same
 
1545
        # backing data.
 
1546
        content = osutils.rand_bytes(200*1024)
 
1547
        content_size = len(content)
 
1548
        if transport.is_readonly():
 
1549
            file('a', 'w').write(content)
 
1550
        else:
 
1551
            transport.put_bytes('a', content)
 
1552
        def check_result_data(result_vector):
 
1553
            for item in result_vector:
 
1554
                data_len = len(item[1])
 
1555
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1556
 
 
1557
        # start corner case
 
1558
        result = list(transport.readv('a', ((0, 30),),
 
1559
            adjust_for_latency=True, upper_limit=content_size))
 
1560
        # we expect 1 result, from 0, to something > 30
 
1561
        self.assertEqual(1, len(result))
 
1562
        self.assertEqual(0, result[0][0])
 
1563
        self.assertTrue(len(result[0][1]) >= 30)
 
1564
        check_result_data(result)
 
1565
        # end of file corner case
 
1566
        result = list(transport.readv('a', ((204700, 100),),
 
1567
            adjust_for_latency=True, upper_limit=content_size))
 
1568
        # we expect 1 result, from 204800- its length, to the end
 
1569
        self.assertEqual(1, len(result))
 
1570
        data_len = len(result[0][1])
 
1571
        self.assertEqual(204800-data_len, result[0][0])
 
1572
        self.assertTrue(data_len >= 100)
 
1573
        check_result_data(result)
 
1574
        # out of order ranges are made in order
 
1575
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1576
            adjust_for_latency=True, upper_limit=content_size))
 
1577
        # we expect 2 results, in order, start and end.
 
1578
        self.assertEqual(2, len(result))
 
1579
        # start
 
1580
        data_len = len(result[0][1])
 
1581
        self.assertEqual(0, result[0][0])
 
1582
        self.assertTrue(data_len >= 30)
 
1583
        # end
 
1584
        data_len = len(result[1][1])
 
1585
        self.assertEqual(204800-data_len, result[1][0])
 
1586
        self.assertTrue(data_len >= 100)
 
1587
        check_result_data(result)
 
1588
        # close ranges get combined (even if out of order)
 
1589
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1590
            result = list(transport.readv('a', request_vector,
 
1591
                adjust_for_latency=True, upper_limit=content_size))
 
1592
            self.assertEqual(1, len(result))
 
1593
            data_len = len(result[0][1])
 
1594
            # minimum length is from 400 to 1034 - 634
 
1595
            self.assertTrue(data_len >= 634)
 
1596
            # must contain the region 400 to 1034
 
1597
            self.assertTrue(result[0][0] <= 400)
 
1598
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1599
            check_result_data(result)
 
1600
 
 
1601
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1602
        transport = self.get_transport()
 
1603
        # test from observed failure case.
 
1604
        if transport.is_readonly():
 
1605
            file('a', 'w').write('a'*1024*1024)
 
1606
        else:
 
1607
            transport.put_bytes('a', 'a'*1024*1024)
 
1608
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1609
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1610
            (465373, 800), (947422, 800)]
 
1611
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1612
        found_items = [False]*9
 
1613
        for pos, (start, length) in enumerate(broken_vector):
 
1614
            # check the range is covered by the result
 
1615
            for offset, data in results:
 
1616
                if offset <= start and start + length <= offset + len(data):
 
1617
                    found_items[pos] = True
 
1618
        self.assertEqual([True]*9, found_items)
 
1619
 
 
1620
    def test_get_with_open_write_stream_sees_all_content(self):
 
1621
        t = self.get_transport()
 
1622
        if t.is_readonly():
 
1623
            return
 
1624
        handle = t.open_write_stream('foo')
 
1625
        try:
 
1626
            handle.write('bcd')
 
1627
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1628
        finally:
 
1629
            handle.close()
 
1630
 
 
1631
    def test_get_smart_medium(self):
 
1632
        """All transports must either give a smart medium, or know they can't.
 
1633
        """
 
1634
        transport = self.get_transport()
 
1635
        try:
 
1636
            client_medium = transport.get_smart_medium()
 
1637
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1638
        except errors.NoSmartMedium:
 
1639
            # as long as we got it we're fine
 
1640
            pass
 
1641
 
 
1642
    def test_readv_short_read(self):
 
1643
        transport = self.get_transport()
 
1644
        if transport.is_readonly():
 
1645
            file('a', 'w').write('0123456789')
 
1646
        else:
 
1647
            transport.put_bytes('a', '01234567890')
 
1648
 
 
1649
        # This is intentionally reading off the end of the file
 
1650
        # since we are sure that it cannot get there
 
1651
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1652
                               # Can be raised by paramiko
 
1653
                               AssertionError),
 
1654
                              transport.readv, 'a', [(1,1), (8,10)])
 
1655
 
 
1656
        # This is trying to seek past the end of the file, it should
 
1657
        # also raise a special error
 
1658
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1659
                              transport.readv, 'a', [(12,2)])