/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
1887.1.1 by Adeodato Simó
Do not separate paragraphs in the copyright statement with blank lines,
2
#
1530.1.3 by Robert Collins
transport implementations now tested consistently.
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
1887.1.1 by Adeodato Simó
Do not separate paragraphs in the copyright statement with blank lines,
7
#
1530.1.3 by Robert Collins
transport implementations now tested consistently.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
1887.1.1 by Adeodato Simó
Do not separate paragraphs in the copyright statement with blank lines,
12
#
1530.1.3 by Robert Collins
transport implementations now tested consistently.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""Tests for Transport implementations.
18
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
21
"""
22
23
import os
24
from cStringIO import StringIO
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
25
import stat
26
import sys
1530.1.3 by Robert Collins
transport implementations now tested consistently.
27
1755.3.7 by John Arbash Meinel
Clean up and write tests for permissions. Now we use fstat which should be cheap, and lets us check the permissions and the file size
28
from bzrlib import (
29
    osutils,
30
    urlutils,
31
    )
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
32
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
1185.85.84 by John Arbash Meinel
[merge] bzr.dev 1573, lots of updates
33
                           LockError, PathError,
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
34
                           TransportNotPossible, ConnectionError,
35
                           InvalidURL)
1685.1.9 by John Arbash Meinel
Updated LocalTransport so that it's base is now a URL rather than a local path. This helps consistency with all other functions. To do so, I added local_abspath() which returns the local path, and local_path_to/from_url
36
from bzrlib.osutils import getcwd
1530.1.9 by Robert Collins
Test bogus urls with http in the new infrastructure.
37
from bzrlib.tests import TestCaseInTempDir, TestSkipped
1871.1.2 by Robert Collins
Reduce code duplication in transport-parameterised tests.
38
from bzrlib.tests.test_transport import TestTransportImplementation
1685.1.45 by John Arbash Meinel
Moved url functions into bzrlib.urlutils
39
from bzrlib.transport import memory
1530.1.3 by Robert Collins
transport implementations now tested consistently.
40
import bzrlib.transport
41
42
43
def _append(fn, txt):
44
    """Append the given text (file-like object) to the supplied filename."""
45
    f = open(fn, 'ab')
1530.1.21 by Robert Collins
Review feedback fixes.
46
    try:
47
        f.write(txt.read())
48
    finally:
49
        f.close()
1530.1.3 by Robert Collins
transport implementations now tested consistently.
50
51
1871.1.2 by Robert Collins
Reduce code duplication in transport-parameterised tests.
52
class TransportTests(TestTransportImplementation):
53
1530.1.3 by Robert Collins
transport implementations now tested consistently.
54
    def check_transport_contents(self, content, transport, relpath):
55
        """Check that transport.get(relpath).read() == content."""
1530.1.21 by Robert Collins
Review feedback fixes.
56
        self.assertEqualDiff(content, transport.get(relpath).read())
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
57
1530.1.3 by Robert Collins
transport implementations now tested consistently.
58
    def assertListRaises(self, excClass, func, *args, **kwargs):
1530.1.21 by Robert Collins
Review feedback fixes.
59
        """Fail unless excClass is raised when the iterator from func is used.
60
        
61
        Many transport functions can return generators this makes sure
1530.1.3 by Robert Collins
transport implementations now tested consistently.
62
        to wrap them in a list() call to make sure the whole generator
63
        is run, and that the proper exception is raised.
64
        """
65
        try:
66
            list(func(*args, **kwargs))
67
        except excClass:
68
            return
69
        else:
70
            if hasattr(excClass,'__name__'): excName = excClass.__name__
71
            else: excName = str(excClass)
72
            raise self.failureException, "%s not raised" % excName
73
74
    def test_has(self):
75
        t = self.get_transport()
76
77
        files = ['a', 'b', 'e', 'g', '%']
78
        self.build_tree(files, transport=t)
79
        self.assertEqual(True, t.has('a'))
80
        self.assertEqual(False, t.has('c'))
1685.1.45 by John Arbash Meinel
Moved url functions into bzrlib.urlutils
81
        self.assertEqual(True, t.has(urlutils.escape('%')))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
82
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
83
                [True, True, False, False, True, False, True, False])
84
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
1685.1.45 by John Arbash Meinel
Moved url functions into bzrlib.urlutils
85
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
86
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
87
                [True, True, False, False, True, False, True, False])
88
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
89
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
90
91
    def test_get(self):
92
        t = self.get_transport()
93
94
        files = ['a', 'b', 'e', 'g']
95
        contents = ['contents of a\n',
96
                    'contents of b\n',
97
                    'contents of e\n',
98
                    'contents of g\n',
99
                    ]
1551.2.39 by abentley
Fix line endings in tests
100
        self.build_tree(files, transport=t, line_endings='binary')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
101
        self.check_transport_contents('contents of a\n', t, 'a')
102
        content_f = t.get_multi(files)
103
        for content, f in zip(contents, content_f):
104
            self.assertEqual(content, f.read())
105
106
        content_f = t.get_multi(iter(files))
107
        for content, f in zip(contents, content_f):
108
            self.assertEqual(content, f.read())
109
110
        self.assertRaises(NoSuchFile, t.get, 'c')
111
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
112
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
113
114
    def test_put(self):
115
        t = self.get_transport()
116
117
        if t.is_readonly():
118
            self.assertRaises(TransportNotPossible,
1955.3.1 by John Arbash Meinel
Add put_bytes() and a base-level implementation for it
119
                    t.put, 'a', StringIO('some text for a\n'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
120
            return
121
122
        t.put('a', StringIO('some text for a\n'))
123
        self.failUnless(t.has('a'))
124
        self.check_transport_contents('some text for a\n', t, 'a')
125
        # Make sure 'has' is updated
126
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
127
                [True, False, False, False, False])
128
        # Put also replaces contents
129
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
130
                                      ('d', StringIO('contents\nfor d\n'))]),
131
                         2)
132
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
133
                [True, False, False, True, False])
134
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
135
        self.check_transport_contents('contents\nfor d\n', t, 'd')
136
137
        self.assertEqual(
138
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
139
                              ('d', StringIO('another contents\nfor d\n'))])),
140
                        2)
141
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
142
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
143
144
        self.assertRaises(NoSuchFile,
1955.3.1 by John Arbash Meinel
Add put_bytes() and a base-level implementation for it
145
                          t.put, 'path/doesnt/exist/c', StringIO('contents'))
146
147
    def test_put_bytes(self):
148
        t = self.get_transport()
149
150
        if t.is_readonly():
151
            self.assertRaises(TransportNotPossible,
152
                    t.put_bytes, 'a', 'some text for a\n')
153
            return
154
155
        t.put_bytes('a', 'some text for a\n')
156
        self.failUnless(t.has('a'))
157
        self.check_transport_contents('some text for a\n', t, 'a')
158
159
        # The contents should be overwritten
160
        t.put_bytes('a', 'new text for a\n')
161
        self.check_transport_contents('new text for a\n', t, 'a')
162
163
        self.assertRaises(NoSuchFile,
164
                          t.put_bytes, 'path/doesnt/exist/c', 'contents')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
165
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
166
    def test_put_permissions(self):
167
        t = self.get_transport()
168
169
        if t.is_readonly():
170
            return
1711.4.32 by John Arbash Meinel
Skip permission tests on win32 no modebits
171
        if not t._can_roundtrip_unix_modebits():
172
            # Can't roundtrip, so no need to run this test
173
            return
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
174
        t.put('mode644', StringIO('test text\n'), mode=0644)
1530.1.21 by Robert Collins
Review feedback fixes.
175
        self.assertTransportMode(t, 'mode644', 0644)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
176
        t.put('mode666', StringIO('test text\n'), mode=0666)
1530.1.21 by Robert Collins
Review feedback fixes.
177
        self.assertTransportMode(t, 'mode666', 0666)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
178
        t.put('mode600', StringIO('test text\n'), mode=0600)
1530.1.21 by Robert Collins
Review feedback fixes.
179
        self.assertTransportMode(t, 'mode600', 0600)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
180
        # Yes, you can put a file such that it becomes readonly
181
        t.put('mode400', StringIO('test text\n'), mode=0400)
1530.1.21 by Robert Collins
Review feedback fixes.
182
        self.assertTransportMode(t, 'mode400', 0400)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
183
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
1530.1.21 by Robert Collins
Review feedback fixes.
184
        self.assertTransportMode(t, 'mmode644', 0644)
1755.3.7 by John Arbash Meinel
Clean up and write tests for permissions. Now we use fstat which should be cheap, and lets us check the permissions and the file size
185
186
        # The default permissions should be based on the current umask
187
        umask = osutils.get_umask()
188
        t.put('nomode', StringIO('test text\n'), mode=None)
189
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
190
        
1955.3.1 by John Arbash Meinel
Add put_bytes() and a base-level implementation for it
191
    def test_put_bytes_permissions(self):
192
        t = self.get_transport()
193
194
        if t.is_readonly():
195
            return
196
        if not t._can_roundtrip_unix_modebits():
197
            # Can't roundtrip, so no need to run this test
198
            return
199
        t.put_bytes('mode644', 'test text\n', mode=0644)
200
        self.assertTransportMode(t, 'mode644', 0644)
201
        t.put_bytes('mode666', 'test text\n', mode=0666)
202
        self.assertTransportMode(t, 'mode666', 0666)
203
        t.put_bytes('mode600', 'test text\n', mode=0600)
204
        self.assertTransportMode(t, 'mode600', 0600)
205
        # Yes, you can put_bytes a file such that it becomes readonly
206
        t.put_bytes('mode400', 'test text\n', mode=0400)
207
        self.assertTransportMode(t, 'mode400', 0400)
208
209
        # The default permissions should be based on the current umask
210
        umask = osutils.get_umask()
211
        t.put_bytes('nomode', 'test text\n', mode=None)
212
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
213
        
1530.1.3 by Robert Collins
transport implementations now tested consistently.
214
    def test_mkdir(self):
215
        t = self.get_transport()
216
217
        if t.is_readonly():
218
            # cannot mkdir on readonly transports. We're not testing for 
219
            # cache coherency because cache behaviour is not currently
220
            # defined for the transport interface.
221
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
222
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
223
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
224
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
225
            return
226
        # Test mkdir
227
        t.mkdir('dir_a')
228
        self.assertEqual(t.has('dir_a'), True)
229
        self.assertEqual(t.has('dir_b'), False)
230
231
        t.mkdir('dir_b')
232
        self.assertEqual(t.has('dir_b'), True)
233
234
        t.mkdir_multi(['dir_c', 'dir_d'])
235
236
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
237
        self.assertEqual(list(t.has_multi(
238
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
239
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
240
            [True, True, True, False,
241
             True, True, True, True])
242
243
        # we were testing that a local mkdir followed by a transport
244
        # mkdir failed thusly, but given that we * in one process * do not
245
        # concurrently fiddle with disk dirs and then use transport to do 
246
        # things, the win here seems marginal compared to the constraint on
247
        # the interface. RBC 20051227
248
        t.mkdir('dir_g')
249
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
250
251
        # Test get/put in sub-directories
1563.2.3 by Robert Collins
Change the return signature of transport.append and append_multi to return the length of the pre-append content.
252
        self.assertEqual(2, 
1530.1.3 by Robert Collins
transport implementations now tested consistently.
253
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
1563.2.3 by Robert Collins
Change the return signature of transport.append and append_multi to return the length of the pre-append content.
254
                         ('dir_b/b', StringIO('contents of dir_b/b'))]))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
255
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
256
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
257
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
258
        # mkdir of a dir with an absent parent
259
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
260
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
261
    def test_mkdir_permissions(self):
262
        t = self.get_transport()
263
        if t.is_readonly():
264
            return
1608.2.7 by Martin Pool
Rename supports_unix_modebits to _can_roundtrip_unix_modebits for clarity
265
        if not t._can_roundtrip_unix_modebits():
1608.2.5 by Martin Pool
Add Transport.supports_unix_modebits, so tests can
266
            # no sense testing on this transport
267
            return
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
268
        # Test mkdir with a mode
269
        t.mkdir('dmode755', mode=0755)
1530.1.21 by Robert Collins
Review feedback fixes.
270
        self.assertTransportMode(t, 'dmode755', 0755)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
271
        t.mkdir('dmode555', mode=0555)
1530.1.21 by Robert Collins
Review feedback fixes.
272
        self.assertTransportMode(t, 'dmode555', 0555)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
273
        t.mkdir('dmode777', mode=0777)
1530.1.21 by Robert Collins
Review feedback fixes.
274
        self.assertTransportMode(t, 'dmode777', 0777)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
275
        t.mkdir('dmode700', mode=0700)
1530.1.21 by Robert Collins
Review feedback fixes.
276
        self.assertTransportMode(t, 'dmode700', 0700)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
277
        t.mkdir_multi(['mdmode755'], mode=0755)
1530.1.21 by Robert Collins
Review feedback fixes.
278
        self.assertTransportMode(t, 'mdmode755', 0755)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
279
1755.3.7 by John Arbash Meinel
Clean up and write tests for permissions. Now we use fstat which should be cheap, and lets us check the permissions and the file size
280
        # Default mode should be based on umask
281
        umask = osutils.get_umask()
282
        t.mkdir('dnomode', mode=None)
283
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
284
1530.1.3 by Robert Collins
transport implementations now tested consistently.
285
    def test_copy_to(self):
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
286
        # FIXME: test:   same server to same server (partly done)
287
        # same protocol two servers
288
        # and    different protocols (done for now except for MemoryTransport.
289
        # - RBC 20060122
1530.1.3 by Robert Collins
transport implementations now tested consistently.
290
        from bzrlib.transport.memory import MemoryTransport
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
291
292
        def simple_copy_files(transport_from, transport_to):
293
            files = ['a', 'b', 'c', 'd']
294
            self.build_tree(files, transport=transport_from)
1563.2.3 by Robert Collins
Change the return signature of transport.append and append_multi to return the length of the pre-append content.
295
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
296
            for f in files:
297
                self.check_transport_contents(transport_to.get(f).read(),
298
                                              transport_from, f)
299
1530.1.3 by Robert Collins
transport implementations now tested consistently.
300
        t = self.get_transport()
1685.1.42 by John Arbash Meinel
A couple more fixes to make sure memory:/// works correctly.
301
        temp_transport = MemoryTransport('memory:///')
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
302
        simple_copy_files(t, temp_transport)
303
        if not t.is_readonly():
304
            t.mkdir('copy_to_simple')
305
            t2 = t.clone('copy_to_simple')
306
            simple_copy_files(t, t2)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
307
308
309
        # Test that copying into a missing directory raises
310
        # NoSuchFile
311
        if t.is_readonly():
1530.1.21 by Robert Collins
Review feedback fixes.
312
            self.build_tree(['e/', 'e/f'])
1530.1.3 by Robert Collins
transport implementations now tested consistently.
313
        else:
314
            t.mkdir('e')
315
            t.put('e/f', StringIO('contents of e'))
316
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
317
        temp_transport.mkdir('e')
318
        t.copy_to(['e/f'], temp_transport)
319
320
        del temp_transport
1685.1.42 by John Arbash Meinel
A couple more fixes to make sure memory:/// works correctly.
321
        temp_transport = MemoryTransport('memory:///')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
322
323
        files = ['a', 'b', 'c', 'd']
324
        t.copy_to(iter(files), temp_transport)
325
        for f in files:
326
            self.check_transport_contents(temp_transport.get(f).read(),
327
                                          t, f)
328
        del temp_transport
329
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
330
        for mode in (0666, 0644, 0600, 0400):
1685.1.42 by John Arbash Meinel
A couple more fixes to make sure memory:/// works correctly.
331
            temp_transport = MemoryTransport("memory:///")
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
332
            t.copy_to(files, temp_transport, mode=mode)
333
            for f in files:
1530.1.21 by Robert Collins
Review feedback fixes.
334
                self.assertTransportMode(temp_transport, f, mode)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
335
1530.1.3 by Robert Collins
transport implementations now tested consistently.
336
    def test_append(self):
337
        t = self.get_transport()
338
339
        if t.is_readonly():
340
            self.assertRaises(TransportNotPossible,
341
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
342
            return
343
        t.put_multi([
344
                ('a', StringIO('diff\ncontents for\na\n')),
345
                ('b', StringIO('contents\nfor b\n'))
346
                ])
347
348
        self.assertEqual(20,
349
            t.append('a', StringIO('add\nsome\nmore\ncontents\n')))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
350
351
        self.check_transport_contents(
352
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
353
            t, 'a')
354
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
355
        self.assertEqual((43, 15),
356
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
357
                            ('b', StringIO('some\nmore\nfor\nb\n'))]))
358
1530.1.3 by Robert Collins
transport implementations now tested consistently.
359
        self.check_transport_contents(
360
            'diff\ncontents for\na\n'
361
            'add\nsome\nmore\ncontents\n'
362
            'and\nthen\nsome\nmore\n',
363
            t, 'a')
364
        self.check_transport_contents(
365
                'contents\nfor b\n'
366
                'some\nmore\nfor\nb\n',
367
                t, 'b')
368
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
369
        self.assertEqual((62, 31),
370
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
371
                                 ('b', StringIO('from an iterator\n'))])))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
372
        self.check_transport_contents(
373
            'diff\ncontents for\na\n'
374
            'add\nsome\nmore\ncontents\n'
375
            'and\nthen\nsome\nmore\n'
376
            'a little bit more\n',
377
            t, 'a')
378
        self.check_transport_contents(
379
                'contents\nfor b\n'
380
                'some\nmore\nfor\nb\n'
381
                'from an iterator\n',
382
                t, 'b')
383
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
384
        self.assertEqual(0,
385
            t.append('c', StringIO('some text\nfor a missing file\n')))
386
        self.assertEqual((80, 0),
387
            t.append_multi([('a', StringIO('some text in a\n')),
388
                            ('d', StringIO('missing file r\n'))]))
389
1530.1.3 by Robert Collins
transport implementations now tested consistently.
390
        self.check_transport_contents(
391
            'diff\ncontents for\na\n'
392
            'add\nsome\nmore\ncontents\n'
393
            'and\nthen\nsome\nmore\n'
394
            'a little bit more\n'
395
            'some text in a\n',
396
            t, 'a')
397
        self.check_transport_contents('some text\nfor a missing file\n',
398
                                      t, 'c')
399
        self.check_transport_contents('missing file r\n', t, 'd')
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
400
        
401
        # a file with no parent should fail..
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
402
        self.assertRaises(NoSuchFile,
403
                          t.append, 'missing/path',
404
                          StringIO('content'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
405
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
406
    def test_append_bytes(self):
1530.1.3 by Robert Collins
transport implementations now tested consistently.
407
        t = self.get_transport()
408
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
409
        if t.is_readonly():
410
            self.assertRaises(TransportNotPossible,
411
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
412
            return
413
414
        self.assertEqual(0, t.append_bytes('a', 'diff\ncontents for\na\n'))
415
        self.assertEqual(0, t.append_bytes('b', 'contents\nfor b\n'))
416
417
        self.assertEqual(20,
418
            t.append_bytes('a', 'add\nsome\nmore\ncontents\n'))
419
420
        self.check_transport_contents(
421
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
422
            t, 'a')
423
424
        # a file with no parent should fail..
425
        self.assertRaises(NoSuchFile,
426
                          t.append_bytes, 'missing/path', 'content')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
427
1666.1.6 by Robert Collins
Make knit the default format.
428
    def test_append_mode(self):
429
        # check append accepts a mode
430
        t = self.get_transport()
431
        if t.is_readonly():
432
            return
433
        t.append('f', StringIO('f'), mode=None)
434
        
1955.3.2 by John Arbash Meinel
Implement and test 'Transport.append_bytes', cleanup the tests of plain append
435
    def test_append_bytes_mode(self):
436
        # check append_bytes accepts a mode
437
        t = self.get_transport()
438
        if t.is_readonly():
439
            return
440
        t.append('f', StringIO('f'), mode=None)
441
        
1530.1.3 by Robert Collins
transport implementations now tested consistently.
442
    def test_delete(self):
443
        # TODO: Test Transport.delete
444
        t = self.get_transport()
445
446
        # Not much to do with a readonly transport
447
        if t.is_readonly():
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
448
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
449
            return
450
451
        t.put('a', StringIO('a little bit of text\n'))
452
        self.failUnless(t.has('a'))
453
        t.delete('a')
454
        self.failIf(t.has('a'))
455
456
        self.assertRaises(NoSuchFile, t.delete, 'a')
457
458
        t.put('a', StringIO('a text\n'))
459
        t.put('b', StringIO('b text\n'))
460
        t.put('c', StringIO('c text\n'))
461
        self.assertEqual([True, True, True],
462
                list(t.has_multi(['a', 'b', 'c'])))
463
        t.delete_multi(['a', 'c'])
464
        self.assertEqual([False, True, False],
465
                list(t.has_multi(['a', 'b', 'c'])))
466
        self.failIf(t.has('a'))
467
        self.failUnless(t.has('b'))
468
        self.failIf(t.has('c'))
469
470
        self.assertRaises(NoSuchFile,
471
                t.delete_multi, ['a', 'b', 'c'])
472
473
        self.assertRaises(NoSuchFile,
474
                t.delete_multi, iter(['a', 'b', 'c']))
475
476
        t.put('a', StringIO('another a text\n'))
477
        t.put('c', StringIO('another c text\n'))
478
        t.delete_multi(iter(['a', 'b', 'c']))
479
480
        # We should have deleted everything
481
        # SftpServer creates control files in the
482
        # working directory, so we can just do a
483
        # plain "listdir".
484
        # self.assertEqual([], os.listdir('.'))
485
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
486
    def test_rmdir(self):
487
        t = self.get_transport()
488
        # Not much to do with a readonly transport
489
        if t.is_readonly():
490
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
491
            return
492
        t.mkdir('adir')
493
        t.mkdir('adir/bdir')
494
        t.rmdir('adir/bdir')
495
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
496
        t.rmdir('adir')
497
        self.assertRaises(NoSuchFile, t.stat, 'adir')
498
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
499
    def test_rmdir_not_empty(self):
500
        """Deleting a non-empty directory raises an exception
501
        
502
        sftp (and possibly others) don't give us a specific "directory not
503
        empty" exception -- we can just see that the operation failed.
504
        """
505
        t = self.get_transport()
506
        if t.is_readonly():
507
            return
508
        t.mkdir('adir')
509
        t.mkdir('adir/bdir')
510
        self.assertRaises(PathError, t.rmdir, 'adir')
511
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
512
    def test_rename_dir_succeeds(self):
513
        t = self.get_transport()
514
        if t.is_readonly():
515
            raise TestSkipped("transport is readonly")
516
        t.mkdir('adir')
517
        t.mkdir('adir/asubdir')
518
        t.rename('adir', 'bdir')
519
        self.assertTrue(t.has('bdir/asubdir'))
520
        self.assertFalse(t.has('adir'))
521
522
    def test_rename_dir_nonempty(self):
523
        """Attempting to replace a nonemtpy directory should fail"""
524
        t = self.get_transport()
525
        if t.is_readonly():
526
            raise TestSkipped("transport is readonly")
527
        t.mkdir('adir')
528
        t.mkdir('adir/asubdir')
529
        t.mkdir('bdir')
530
        t.mkdir('bdir/bsubdir')
531
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
532
        # nothing was changed so it should still be as before
533
        self.assertTrue(t.has('bdir/bsubdir'))
534
        self.assertFalse(t.has('adir/bdir'))
535
        self.assertFalse(t.has('adir/bsubdir'))
536
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
537
    def test_delete_tree(self):
538
        t = self.get_transport()
539
540
        # Not much to do with a readonly transport
541
        if t.is_readonly():
542
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
543
            return
544
545
        # and does it like listing ?
546
        t.mkdir('adir')
547
        try:
548
            t.delete_tree('adir')
549
        except TransportNotPossible:
550
            # ok, this transport does not support delete_tree
551
            return
552
        
553
        # did it delete that trivial case?
554
        self.assertRaises(NoSuchFile, t.stat, 'adir')
555
556
        self.build_tree(['adir/',
557
                         'adir/file', 
558
                         'adir/subdir/', 
559
                         'adir/subdir/file', 
560
                         'adir/subdir2/',
561
                         'adir/subdir2/file',
562
                         ], transport=t)
563
564
        t.delete_tree('adir')
565
        # adir should be gone now.
566
        self.assertRaises(NoSuchFile, t.stat, 'adir')
567
1530.1.3 by Robert Collins
transport implementations now tested consistently.
568
    def test_move(self):
569
        t = self.get_transport()
570
571
        if t.is_readonly():
572
            return
573
574
        # TODO: I would like to use os.listdir() to
575
        # make sure there are no extra files, but SftpServer
576
        # creates control files in the working directory
577
        # perhaps all of this could be done in a subdirectory
578
579
        t.put('a', StringIO('a first file\n'))
580
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
581
582
        t.move('a', 'b')
583
        self.failUnless(t.has('b'))
584
        self.failIf(t.has('a'))
585
586
        self.check_transport_contents('a first file\n', t, 'b')
587
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
588
589
        # Overwrite a file
590
        t.put('c', StringIO('c this file\n'))
591
        t.move('c', 'b')
592
        self.failIf(t.has('c'))
593
        self.check_transport_contents('c this file\n', t, 'b')
594
595
        # TODO: Try to write a test for atomicity
596
        # TODO: Test moving into a non-existant subdirectory
597
        # TODO: Test Transport.move_multi
598
599
    def test_copy(self):
600
        t = self.get_transport()
601
602
        if t.is_readonly():
603
            return
604
605
        t.put('a', StringIO('a file\n'))
606
        t.copy('a', 'b')
607
        self.check_transport_contents('a file\n', t, 'b')
608
609
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
610
        os.mkdir('c')
611
        # What should the assert be if you try to copy a
612
        # file over a directory?
613
        #self.assertRaises(Something, t.copy, 'a', 'c')
614
        t.put('d', StringIO('text in d\n'))
615
        t.copy('d', 'b')
616
        self.check_transport_contents('text in d\n', t, 'b')
617
618
        # TODO: test copy_multi
619
620
    def test_connection_error(self):
621
        """ConnectionError is raised when connection is impossible"""
1530.1.9 by Robert Collins
Test bogus urls with http in the new infrastructure.
622
        try:
623
            url = self._server.get_bogus_url()
624
        except NotImplementedError:
625
            raise TestSkipped("Transport %s has no bogus URL support." %
626
                              self._server.__class__)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
627
        try:
1711.2.42 by John Arbash Meinel
enable bogus_url support for SFTP tests
628
            t = bzrlib.transport.get_transport(url)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
629
            t.get('.bzr/branch')
630
        except (ConnectionError, NoSuchFile), e:
631
            pass
632
        except (Exception), e:
1786.1.27 by John Arbash Meinel
Fix up the http transports so that tests pass with the new configuration.
633
            self.fail('Wrong exception thrown (%s.%s): %s' 
634
                        % (e.__class__.__module__, e.__class__.__name__, e))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
635
        else:
1707.3.11 by John Arbash Meinel
fixing more tests.
636
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
637
638
    def test_stat(self):
639
        # TODO: Test stat, just try once, and if it throws, stop testing
640
        from stat import S_ISDIR, S_ISREG
641
642
        t = self.get_transport()
643
644
        try:
645
            st = t.stat('.')
646
        except TransportNotPossible, e:
647
            # This transport cannot stat
648
            return
649
650
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
651
        sizes = [14, 0, 16, 0, 18] 
1551.2.39 by abentley
Fix line endings in tests
652
        self.build_tree(paths, transport=t, line_endings='binary')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
653
654
        for path, size in zip(paths, sizes):
655
            st = t.stat(path)
656
            if path.endswith('/'):
657
                self.failUnless(S_ISDIR(st.st_mode))
658
                # directory sizes are meaningless
659
            else:
660
                self.failUnless(S_ISREG(st.st_mode))
661
                self.assertEqual(size, st.st_size)
662
663
        remote_stats = list(t.stat_multi(paths))
664
        remote_iter_stats = list(t.stat_multi(iter(paths)))
665
666
        self.assertRaises(NoSuchFile, t.stat, 'q')
667
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
668
669
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
670
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
671
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
672
        subdir = t.clone('subdir')
673
        subdir.stat('./file')
1534.4.26 by Robert Collins
Move working tree initialisation out from Branch.initialize, deprecated Branch.initialize to Branch.create.
674
        subdir.stat('.')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
675
676
    def test_list_dir(self):
677
        # TODO: Test list_dir, just try once, and if it throws, stop testing
678
        t = self.get_transport()
679
        
680
        if not t.listable():
681
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
682
            return
683
684
        def sorted_list(d):
685
            l = list(t.list_dir(d))
686
            l.sort()
687
            return l
688
689
        # SftpServer creates control files in the working directory
690
        # so lets move down a directory to avoid those.
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
691
        if not t.is_readonly():
692
            t.mkdir('wd')
693
        else:
694
            os.mkdir('wd')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
695
        t = t.clone('wd')
696
697
        self.assertEqual([], sorted_list(u'.'))
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
698
        # c2 is precisely one letter longer than c here to test that
699
        # suffixing is not confused.
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
700
        if not t.is_readonly():
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
701
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
702
        else:
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
703
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
1530.1.3 by Robert Collins
transport implementations now tested consistently.
704
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
705
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
706
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
707
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
708
        if not t.is_readonly():
709
            t.delete('c/d')
710
            t.delete('b')
711
        else:
712
            os.unlink('wd/c/d')
713
            os.unlink('wd/b')
714
            
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
715
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
716
        self.assertEqual([u'e'], sorted_list(u'c'))
717
1662.1.12 by Martin Pool
Translate unknown sftp errors to PathError, no NoSuchFile
718
        self.assertListRaises(PathError, t.list_dir, 'q')
719
        self.assertListRaises(PathError, t.list_dir, 'c/f')
720
        self.assertListRaises(PathError, t.list_dir, 'a')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
721
722
    def test_clone(self):
723
        # TODO: Test that clone moves up and down the filesystem
724
        t1 = self.get_transport()
725
726
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
727
728
        self.failUnless(t1.has('a'))
729
        self.failUnless(t1.has('b/c'))
730
        self.failIf(t1.has('c'))
731
732
        t2 = t1.clone('b')
733
        self.assertEqual(t1.base + 'b/', t2.base)
734
735
        self.failUnless(t2.has('c'))
736
        self.failIf(t2.has('a'))
737
738
        t3 = t2.clone('..')
739
        self.failUnless(t3.has('a'))
740
        self.failIf(t3.has('c'))
741
742
        self.failIf(t1.has('b/d'))
743
        self.failIf(t2.has('d'))
744
        self.failIf(t3.has('b/d'))
745
746
        if t1.is_readonly():
747
            open('b/d', 'wb').write('newfile\n')
748
        else:
749
            t2.put('d', StringIO('newfile\n'))
750
751
        self.failUnless(t1.has('b/d'))
752
        self.failUnless(t2.has('d'))
753
        self.failUnless(t3.has('b/d'))
754
755
    def test_relpath(self):
756
        t = self.get_transport()
757
        self.assertEqual('', t.relpath(t.base))
758
        # base ends with /
759
        self.assertEqual('', t.relpath(t.base[:-1]))
760
        # subdirs which dont exist should still give relpaths.
761
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
762
        # trailing slash should be the same.
763
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
764
1636.1.1 by Robert Collins
Fix calling relpath() and abspath() on transports at their root.
765
    def test_relpath_at_root(self):
766
        t = self.get_transport()
767
        # clone all the way to the top
768
        new_transport = t.clone('..')
769
        while new_transport.base != t.base:
770
            t = new_transport
771
            new_transport = t.clone('..')
772
        # we must be able to get a relpath below the root
773
        self.assertEqual('', t.relpath(t.base))
774
        # and a deeper one should work too
775
        self.assertEqual('foo/bar', t.relpath(t.base + 'foo/bar'))
776
1530.1.3 by Robert Collins
transport implementations now tested consistently.
777
    def test_abspath(self):
778
        # smoke test for abspath. Corner cases for backends like unix fs's
779
        # that have aliasing problems like symlinks should go in backend
780
        # specific test cases.
781
        transport = self.get_transport()
1540.3.24 by Martin Pool
Add new protocol 'http+pycurl' that always uses PyCurl.
782
        
783
        # disabled because some transports might normalize urls in generating
784
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
1530.1.3 by Robert Collins
transport implementations now tested consistently.
785
        self.assertEqual(transport.base + 'relpath',
786
                         transport.abspath('relpath'))
787
1685.1.9 by John Arbash Meinel
Updated LocalTransport so that it's base is now a URL rather than a local path. This helps consistency with all other functions. To do so, I added local_abspath() which returns the local path, and local_path_to/from_url
788
    def test_local_abspath(self):
789
        transport = self.get_transport()
790
        try:
791
            p = transport.local_abspath('.')
792
        except TransportNotPossible:
793
            pass # This is not a local transport
794
        else:
795
            self.assertEqual(getcwd(), p)
796
1636.1.1 by Robert Collins
Fix calling relpath() and abspath() on transports at their root.
797
    def test_abspath_at_root(self):
798
        t = self.get_transport()
799
        # clone all the way to the top
800
        new_transport = t.clone('..')
801
        while new_transport.base != t.base:
802
            t = new_transport
803
            new_transport = t.clone('..')
804
        # we must be able to get a abspath of the root when we ask for
805
        # t.abspath('..') - this due to our choice that clone('..')
806
        # should return the root from the root, combined with the desire that
807
        # the url from clone('..') and from abspath('..') should be the same.
808
        self.assertEqual(t.base, t.abspath('..'))
809
        # '' should give us the root
810
        self.assertEqual(t.base, t.abspath(''))
811
        # and a path should append to the url
812
        self.assertEqual(t.base + 'foo', t.abspath('foo'))
813
1530.1.3 by Robert Collins
transport implementations now tested consistently.
814
    def test_iter_files_recursive(self):
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
815
        transport = self.get_transport()
816
        if not transport.listable():
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
817
            self.assertRaises(TransportNotPossible,
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
818
                              transport.iter_files_recursive)
819
            return
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
820
        self.build_tree(['isolated/',
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
821
                         'isolated/dir/',
822
                         'isolated/dir/foo',
823
                         'isolated/dir/bar',
824
                         'isolated/bar'],
825
                        transport=transport)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
826
        paths = set(transport.iter_files_recursive())
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
827
        # nb the directories are not converted
828
        self.assertEqual(paths,
829
                    set(['isolated/dir/foo',
830
                         'isolated/dir/bar',
831
                         'isolated/bar']))
832
        sub_transport = transport.clone('isolated')
833
        paths = set(sub_transport.iter_files_recursive())
1530.1.3 by Robert Collins
transport implementations now tested consistently.
834
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
835
836
    def test_unicode_paths(self):
1685.1.57 by Martin Pool
[broken] Skip unicode blackbox tests if not supported by filesystem
837
        """Test that we can read/write files with Unicode names."""
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
838
        t = self.get_transport()
839
1711.7.36 by John Arbash Meinel
Use different filenames to avoid path collisions on win32 w/ FAT32
840
        # With FAT32 and certain encodings on win32
841
        # '\xe5' and '\xe4' actually map to the same file
842
        # adding a suffix kicks in the 'preserving but insensitive'
843
        # route, and maintains the right files
844
        files = [u'\xe5.1', # a w/ circle iso-8859-1
845
                 u'\xe4.2', # a w/ dots iso-8859-1
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
846
                 u'\u017d', # Z with umlat iso-8859-2
847
                 u'\u062c', # Arabic j
848
                 u'\u0410', # Russian A
849
                 u'\u65e5', # Kanji person
850
                ]
851
1685.1.72 by Wouter van Heyst
StubSFTPServer should use bytestreams rather than unicode
852
        try:
1711.4.13 by John Arbash Meinel
Use line_endings='binary' for win32
853
            self.build_tree(files, transport=t, line_endings='binary')
1685.1.72 by Wouter van Heyst
StubSFTPServer should use bytestreams rather than unicode
854
        except UnicodeError:
855
            raise TestSkipped("cannot handle unicode paths in current encoding")
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
856
857
        # A plain unicode string is not a valid url
858
        for fname in files:
859
            self.assertRaises(InvalidURL, t.get, fname)
860
861
        for fname in files:
862
            fname_utf8 = fname.encode('utf-8')
863
            contents = 'contents of %s\n' % (fname_utf8,)
1685.1.45 by John Arbash Meinel
Moved url functions into bzrlib.urlutils
864
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
865
1534.4.26 by Robert Collins
Move working tree initialisation out from Branch.initialize, deprecated Branch.initialize to Branch.create.
866
    def test_connect_twice_is_same_content(self):
867
        # check that our server (whatever it is) is accessable reliably
868
        # via get_transport and multiple connections share content.
869
        transport = self.get_transport()
870
        if transport.is_readonly():
871
            return
872
        transport.put('foo', StringIO('bar'))
873
        transport2 = self.get_transport()
874
        self.check_transport_contents('bar', transport2, 'foo')
875
        # its base should be usable.
876
        transport2 = bzrlib.transport.get_transport(transport.base)
877
        self.check_transport_contents('bar', transport2, 'foo')
878
879
        # now opening at a relative url should give use a sane result:
880
        transport.mkdir('newdir')
881
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
882
        transport2 = transport2.clone('..')
883
        self.check_transport_contents('bar', transport2, 'foo')
884
885
    def test_lock_write(self):
886
        transport = self.get_transport()
887
        if transport.is_readonly():
888
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
889
            return
890
        transport.put('lock', StringIO())
891
        lock = transport.lock_write('lock')
892
        # TODO make this consistent on all platforms:
893
        # self.assertRaises(LockError, transport.lock_write, 'lock')
894
        lock.unlock()
895
896
    def test_lock_read(self):
897
        transport = self.get_transport()
898
        if transport.is_readonly():
899
            file('lock', 'w').close()
900
        else:
901
            transport.put('lock', StringIO())
902
        lock = transport.lock_read('lock')
903
        # TODO make this consistent on all platforms:
904
        # self.assertRaises(LockError, transport.lock_read, 'lock')
905
        lock.unlock()
1185.85.80 by John Arbash Meinel
[merge] jam-integration 1527, including branch-formats, help text, misc bug fixes.
906
1594.2.5 by Robert Collins
Readv patch from Johan Rydberg giving knits partial download support.
907
    def test_readv(self):
908
        transport = self.get_transport()
909
        if transport.is_readonly():
910
            file('a', 'w').write('0123456789')
911
        else:
1786.1.8 by John Arbash Meinel
[merge] Johan Rydberg test updates
912
            transport.put('a', StringIO('0123456789'))
1185.85.76 by John Arbash Meinel
Adding an InvalidURL so transports can report they expect utf-8 quoted paths. Updated tests
913
1594.2.17 by Robert Collins
Better readv coalescing, now with test, and progress during knit index reading.
914
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1594.2.5 by Robert Collins
Readv patch from Johan Rydberg giving knits partial download support.
915
        self.assertEqual(d[0], (0, '0'))
1594.2.17 by Robert Collins
Better readv coalescing, now with test, and progress during knit index reading.
916
        self.assertEqual(d[1], (1, '1'))
917
        self.assertEqual(d[2], (3, '34'))
918
        self.assertEqual(d[3], (9, '9'))
1786.1.8 by John Arbash Meinel
[merge] Johan Rydberg test updates
919
1864.5.2 by John Arbash Meinel
always read in sorted order, and return in requested order, but only cache what is currently out of order
920
    def test_readv_out_of_order(self):
921
        transport = self.get_transport()
922
        if transport.is_readonly():
923
            file('a', 'w').write('0123456789')
924
        else:
925
            transport.put('a', StringIO('01234567890'))
926
927
        d = list(transport.readv('a', ((1, 1), (9, 1), (0, 1), (3, 2))))
928
        self.assertEqual(d[0], (1, '1'))
929
        self.assertEqual(d[1], (9, '9'))
930
        self.assertEqual(d[2], (0, '0'))
931
        self.assertEqual(d[3], (3, '34'))