/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
1530.1.3 by Robert Collins
transport implementations now tested consistently.
2
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""Tests for Transport implementations.
18
19
Transport implementations tested here are supplied by
20
TransportTestProviderAdapter.
21
"""
22
23
import os
24
from cStringIO import StringIO
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
25
import stat
26
import sys
1530.1.3 by Robert Collins
transport implementations now tested consistently.
27
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
28
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
1534.4.26 by Robert Collins
Move working tree initialisation out from Branch.initialize, deprecated Branch.initialize to Branch.create.
29
                           LockError,
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
30
                           PathError,
1530.1.3 by Robert Collins
transport implementations now tested consistently.
31
                           TransportNotPossible, ConnectionError)
1530.1.9 by Robert Collins
Test bogus urls with http in the new infrastructure.
32
from bzrlib.tests import TestCaseInTempDir, TestSkipped
1530.1.3 by Robert Collins
transport implementations now tested consistently.
33
from bzrlib.transport import memory, urlescape
34
import bzrlib.transport
35
36
37
def _append(fn, txt):
38
    """Append the given text (file-like object) to the supplied filename."""
39
    f = open(fn, 'ab')
1530.1.21 by Robert Collins
Review feedback fixes.
40
    try:
41
        f.write(txt.read())
42
    finally:
43
        f.close()
1530.1.3 by Robert Collins
transport implementations now tested consistently.
44
45
46
class TestTransportImplementation(TestCaseInTempDir):
47
    """Implementation verification for transports.
48
    
49
    To verify a transport we need a server factory, which is a callable
50
    that accepts no parameters and returns an implementation of
51
    bzrlib.transport.Server.
52
    
53
    That Server is then used to construct transport instances and test
54
    the transport via loopback activity.
55
56
    Currently this assumes that the Transport object is connected to the 
57
    current working directory.  So that whatever is done 
58
    through the transport, should show up in the working 
59
    directory, and vice-versa. This is a bug, because its possible to have
60
    URL schemes which provide access to something that may not be 
61
    result in storage on the local disk, i.e. due to file system limits, or 
62
    due to it being a database or some other non-filesystem tool.
63
64
    This also tests to make sure that the functions work with both
65
    generators and lists (assuming iter(list) is effectively a generator)
66
    """
67
    
68
    def setUp(self):
69
        super(TestTransportImplementation, self).setUp()
70
        self._server = self.transport_server()
71
        self._server.setUp()
72
73
    def tearDown(self):
74
        super(TestTransportImplementation, self).tearDown()
75
        self._server.tearDown()
76
        
77
    def check_transport_contents(self, content, transport, relpath):
78
        """Check that transport.get(relpath).read() == content."""
1530.1.21 by Robert Collins
Review feedback fixes.
79
        self.assertEqualDiff(content, transport.get(relpath).read())
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
80
1530.1.3 by Robert Collins
transport implementations now tested consistently.
81
    def get_transport(self):
82
        """Return a connected transport to the local directory."""
83
        t = bzrlib.transport.get_transport(self._server.get_url())
84
        self.failUnless(isinstance(t, self.transport_class), 
85
                        "Got the wrong class from get_transport"
86
                        "(%r, expected %r)" % (t.__class__, 
87
                                               self.transport_class))
88
        return t
89
90
    def assertListRaises(self, excClass, func, *args, **kwargs):
1530.1.21 by Robert Collins
Review feedback fixes.
91
        """Fail unless excClass is raised when the iterator from func is used.
92
        
93
        Many transport functions can return generators this makes sure
1530.1.3 by Robert Collins
transport implementations now tested consistently.
94
        to wrap them in a list() call to make sure the whole generator
95
        is run, and that the proper exception is raised.
96
        """
97
        try:
98
            list(func(*args, **kwargs))
99
        except excClass:
100
            return
101
        else:
102
            if hasattr(excClass,'__name__'): excName = excClass.__name__
103
            else: excName = str(excClass)
104
            raise self.failureException, "%s not raised" % excName
105
106
    def test_has(self):
107
        t = self.get_transport()
108
109
        files = ['a', 'b', 'e', 'g', '%']
110
        self.build_tree(files, transport=t)
111
        self.assertEqual(True, t.has('a'))
112
        self.assertEqual(False, t.has('c'))
113
        self.assertEqual(True, t.has(urlescape('%')))
114
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
115
                [True, True, False, False, True, False, True, False])
116
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
117
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
118
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
119
                [True, True, False, False, True, False, True, False])
120
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
121
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
122
123
    def test_get(self):
124
        t = self.get_transport()
125
126
        files = ['a', 'b', 'e', 'g']
127
        contents = ['contents of a\n',
128
                    'contents of b\n',
129
                    'contents of e\n',
130
                    'contents of g\n',
131
                    ]
132
        self.build_tree(files, transport=t)
133
        self.check_transport_contents('contents of a\n', t, 'a')
134
        content_f = t.get_multi(files)
135
        for content, f in zip(contents, content_f):
136
            self.assertEqual(content, f.read())
137
138
        content_f = t.get_multi(iter(files))
139
        for content, f in zip(contents, content_f):
140
            self.assertEqual(content, f.read())
141
142
        self.assertRaises(NoSuchFile, t.get, 'c')
143
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
144
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
145
146
    def test_put(self):
147
        t = self.get_transport()
148
149
        if t.is_readonly():
150
            self.assertRaises(TransportNotPossible,
151
                    t.put, 'a', 'some text for a\n')
152
            return
153
154
        t.put('a', StringIO('some text for a\n'))
155
        self.failUnless(t.has('a'))
156
        self.check_transport_contents('some text for a\n', t, 'a')
157
        # Make sure 'has' is updated
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
159
                [True, False, False, False, False])
160
        # Put also replaces contents
161
        self.assertEqual(t.put_multi([('a', StringIO('new\ncontents for\na\n')),
162
                                      ('d', StringIO('contents\nfor d\n'))]),
163
                         2)
164
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
165
                [True, False, False, True, False])
166
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
167
        self.check_transport_contents('contents\nfor d\n', t, 'd')
168
169
        self.assertEqual(
170
            t.put_multi(iter([('a', StringIO('diff\ncontents for\na\n')),
171
                              ('d', StringIO('another contents\nfor d\n'))])),
172
                        2)
173
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
174
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
175
176
        self.assertRaises(NoSuchFile,
177
                          t.put, 'path/doesnt/exist/c', 'contents')
178
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
179
    def test_put_permissions(self):
180
        t = self.get_transport()
181
182
        if t.is_readonly():
183
            return
184
        t.put('mode644', StringIO('test text\n'), mode=0644)
1530.1.21 by Robert Collins
Review feedback fixes.
185
        self.assertTransportMode(t, 'mode644', 0644)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
186
        t.put('mode666', StringIO('test text\n'), mode=0666)
1530.1.21 by Robert Collins
Review feedback fixes.
187
        self.assertTransportMode(t, 'mode666', 0666)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
188
        t.put('mode600', StringIO('test text\n'), mode=0600)
1530.1.21 by Robert Collins
Review feedback fixes.
189
        self.assertTransportMode(t, 'mode600', 0600)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
190
        # Yes, you can put a file such that it becomes readonly
191
        t.put('mode400', StringIO('test text\n'), mode=0400)
1530.1.21 by Robert Collins
Review feedback fixes.
192
        self.assertTransportMode(t, 'mode400', 0400)
1530.1.15 by Robert Collins
Move put mode tests into test_transport_implementation.
193
        t.put_multi([('mmode644', StringIO('text\n'))], mode=0644)
1530.1.21 by Robert Collins
Review feedback fixes.
194
        self.assertTransportMode(t, 'mmode644', 0644)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
195
        
1530.1.3 by Robert Collins
transport implementations now tested consistently.
196
    def test_mkdir(self):
197
        t = self.get_transport()
198
199
        if t.is_readonly():
200
            # cannot mkdir on readonly transports. We're not testing for 
201
            # cache coherency because cache behaviour is not currently
202
            # defined for the transport interface.
203
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
204
            self.assertRaises(TransportNotPossible, t.mkdir, 'new_dir')
205
            self.assertRaises(TransportNotPossible, t.mkdir_multi, ['new_dir'])
206
            self.assertRaises(TransportNotPossible, t.mkdir, 'path/doesnt/exist')
207
            return
208
        # Test mkdir
209
        t.mkdir('dir_a')
210
        self.assertEqual(t.has('dir_a'), True)
211
        self.assertEqual(t.has('dir_b'), False)
212
213
        t.mkdir('dir_b')
214
        self.assertEqual(t.has('dir_b'), True)
215
216
        t.mkdir_multi(['dir_c', 'dir_d'])
217
218
        t.mkdir_multi(iter(['dir_e', 'dir_f']))
219
        self.assertEqual(list(t.has_multi(
220
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
221
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
222
            [True, True, True, False,
223
             True, True, True, True])
224
225
        # we were testing that a local mkdir followed by a transport
226
        # mkdir failed thusly, but given that we * in one process * do not
227
        # concurrently fiddle with disk dirs and then use transport to do 
228
        # things, the win here seems marginal compared to the constraint on
229
        # the interface. RBC 20051227
230
        t.mkdir('dir_g')
231
        self.assertRaises(FileExists, t.mkdir, 'dir_g')
232
233
        # Test get/put in sub-directories
234
        self.assertEqual(
235
            t.put_multi([('dir_a/a', StringIO('contents of dir_a/a')),
236
                         ('dir_b/b', StringIO('contents of dir_b/b'))])
237
                        , 2)
238
        self.check_transport_contents('contents of dir_a/a', t, 'dir_a/a')
239
        self.check_transport_contents('contents of dir_b/b', t, 'dir_b/b')
240
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
241
        # mkdir of a dir with an absent parent
242
        self.assertRaises(NoSuchFile, t.mkdir, 'missing/dir')
243
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
244
    def test_mkdir_permissions(self):
245
        t = self.get_transport()
246
        if t.is_readonly():
247
            return
248
        # Test mkdir with a mode
249
        t.mkdir('dmode755', mode=0755)
1530.1.21 by Robert Collins
Review feedback fixes.
250
        self.assertTransportMode(t, 'dmode755', 0755)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
251
        t.mkdir('dmode555', mode=0555)
1530.1.21 by Robert Collins
Review feedback fixes.
252
        self.assertTransportMode(t, 'dmode555', 0555)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
253
        t.mkdir('dmode777', mode=0777)
1530.1.21 by Robert Collins
Review feedback fixes.
254
        self.assertTransportMode(t, 'dmode777', 0777)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
255
        t.mkdir('dmode700', mode=0700)
1530.1.21 by Robert Collins
Review feedback fixes.
256
        self.assertTransportMode(t, 'dmode700', 0700)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
257
        # TODO: jam 20051215 test mkdir_multi with a mode
258
        t.mkdir_multi(['mdmode755'], mode=0755)
1530.1.21 by Robert Collins
Review feedback fixes.
259
        self.assertTransportMode(t, 'mdmode755', 0755)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
260
1530.1.3 by Robert Collins
transport implementations now tested consistently.
261
    def test_copy_to(self):
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
262
        # FIXME: test:   same server to same server (partly done)
263
        # same protocol two servers
264
        # and    different protocols (done for now except for MemoryTransport.
265
        # - RBC 20060122
1530.1.3 by Robert Collins
transport implementations now tested consistently.
266
        from bzrlib.transport.memory import MemoryTransport
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
267
268
        def simple_copy_files(transport_from, transport_to):
269
            files = ['a', 'b', 'c', 'd']
270
            self.build_tree(files, transport=transport_from)
271
            transport_from.copy_to(files, transport_to)
272
            for f in files:
273
                self.check_transport_contents(transport_to.get(f).read(),
274
                                              transport_from, f)
275
1530.1.3 by Robert Collins
transport implementations now tested consistently.
276
        t = self.get_transport()
277
        temp_transport = MemoryTransport('memory:/')
1534.4.21 by Robert Collins
Extend the copy_to tests to smoke test server-to-same-server copies to catch optimised code paths, and fix sftps optimised code path by removing dead code.
278
        simple_copy_files(t, temp_transport)
279
        if not t.is_readonly():
280
            t.mkdir('copy_to_simple')
281
            t2 = t.clone('copy_to_simple')
282
            simple_copy_files(t, t2)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
283
284
285
        # Test that copying into a missing directory raises
286
        # NoSuchFile
287
        if t.is_readonly():
1530.1.21 by Robert Collins
Review feedback fixes.
288
            self.build_tree(['e/', 'e/f'])
1530.1.3 by Robert Collins
transport implementations now tested consistently.
289
        else:
290
            t.mkdir('e')
291
            t.put('e/f', StringIO('contents of e'))
292
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], temp_transport)
293
        temp_transport.mkdir('e')
294
        t.copy_to(['e/f'], temp_transport)
295
296
        del temp_transport
297
        temp_transport = MemoryTransport('memory:/')
298
299
        files = ['a', 'b', 'c', 'd']
300
        t.copy_to(iter(files), temp_transport)
301
        for f in files:
302
            self.check_transport_contents(temp_transport.get(f).read(),
303
                                          t, f)
304
        del temp_transport
305
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
306
        for mode in (0666, 0644, 0600, 0400):
307
            temp_transport = MemoryTransport("memory:/")
308
            t.copy_to(files, temp_transport, mode=mode)
309
            for f in files:
1530.1.21 by Robert Collins
Review feedback fixes.
310
                self.assertTransportMode(temp_transport, f, mode)
1530.1.16 by Robert Collins
Move mkdir and copy_to permissions tests to test_transport_impleentation.
311
1530.1.3 by Robert Collins
transport implementations now tested consistently.
312
    def test_append(self):
313
        t = self.get_transport()
314
315
        if t.is_readonly():
316
            open('a', 'wb').write('diff\ncontents for\na\n')
317
            open('b', 'wb').write('contents\nfor b\n')
318
        else:
319
            t.put_multi([
320
                    ('a', StringIO('diff\ncontents for\na\n')),
321
                    ('b', StringIO('contents\nfor b\n'))
322
                    ])
323
324
        if t.is_readonly():
325
            self.assertRaises(TransportNotPossible,
326
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
327
            _append('a', StringIO('add\nsome\nmore\ncontents\n'))
328
        else:
329
            t.append('a', StringIO('add\nsome\nmore\ncontents\n'))
330
331
        self.check_transport_contents(
332
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
333
            t, 'a')
334
335
        if t.is_readonly():
336
            self.assertRaises(TransportNotPossible,
337
                    t.append_multi,
338
                        [('a', 'and\nthen\nsome\nmore\n'),
339
                         ('b', 'some\nmore\nfor\nb\n')])
340
            _append('a', StringIO('and\nthen\nsome\nmore\n'))
341
            _append('b', StringIO('some\nmore\nfor\nb\n'))
342
        else:
343
            t.append_multi([('a', StringIO('and\nthen\nsome\nmore\n')),
344
                    ('b', StringIO('some\nmore\nfor\nb\n'))])
345
        self.check_transport_contents(
346
            'diff\ncontents for\na\n'
347
            'add\nsome\nmore\ncontents\n'
348
            'and\nthen\nsome\nmore\n',
349
            t, 'a')
350
        self.check_transport_contents(
351
                'contents\nfor b\n'
352
                'some\nmore\nfor\nb\n',
353
                t, 'b')
354
355
        if t.is_readonly():
356
            _append('a', StringIO('a little bit more\n'))
357
            _append('b', StringIO('from an iterator\n'))
358
        else:
359
            t.append_multi(iter([('a', StringIO('a little bit more\n')),
360
                    ('b', StringIO('from an iterator\n'))]))
361
        self.check_transport_contents(
362
            'diff\ncontents for\na\n'
363
            'add\nsome\nmore\ncontents\n'
364
            'and\nthen\nsome\nmore\n'
365
            'a little bit more\n',
366
            t, 'a')
367
        self.check_transport_contents(
368
                'contents\nfor b\n'
369
                'some\nmore\nfor\nb\n'
370
                'from an iterator\n',
371
                t, 'b')
372
373
        if t.is_readonly():
374
            _append('c', StringIO('some text\nfor a missing file\n'))
375
            _append('a', StringIO('some text in a\n'))
376
            _append('d', StringIO('missing file r\n'))
377
        else:
378
            t.append('c', StringIO('some text\nfor a missing file\n'))
379
            t.append_multi([('a', StringIO('some text in a\n')),
380
                            ('d', StringIO('missing file r\n'))])
381
        self.check_transport_contents(
382
            'diff\ncontents for\na\n'
383
            'add\nsome\nmore\ncontents\n'
384
            'and\nthen\nsome\nmore\n'
385
            'a little bit more\n'
386
            'some text in a\n',
387
            t, 'a')
388
        self.check_transport_contents('some text\nfor a missing file\n',
389
                                      t, 'c')
390
        self.check_transport_contents('missing file r\n', t, 'd')
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
391
        
392
        # a file with no parent should fail..
393
        if not t.is_readonly():
394
            self.assertRaises(NoSuchFile,
395
                              t.append, 'missing/path', 
396
                              StringIO('content'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
397
398
    def test_append_file(self):
399
        t = self.get_transport()
400
401
        contents = [
402
            ('f1', StringIO('this is a string\nand some more stuff\n')),
403
            ('f2', StringIO('here is some text\nand a bit more\n')),
404
            ('f3', StringIO('some text for the\nthird file created\n')),
405
            ('f4', StringIO('this is a string\nand some more stuff\n')),
406
            ('f5', StringIO('here is some text\nand a bit more\n')),
407
            ('f6', StringIO('some text for the\nthird file created\n'))
408
        ]
409
        
410
        if t.is_readonly():
411
            for f, val in contents:
412
                open(f, 'wb').write(val.read())
413
        else:
414
            t.put_multi(contents)
415
416
        a1 = StringIO('appending to\none\n')
417
        if t.is_readonly():
418
            _append('f1', a1)
419
        else:
420
            t.append('f1', a1)
421
422
        del a1
423
424
        self.check_transport_contents(
425
                'this is a string\nand some more stuff\n'
426
                'appending to\none\n',
427
                t, 'f1')
428
429
        a2 = StringIO('adding more\ntext to two\n')
430
        a3 = StringIO('some garbage\nto put in three\n')
431
432
        if t.is_readonly():
433
            _append('f2', a2)
434
            _append('f3', a3)
435
        else:
436
            t.append_multi([('f2', a2), ('f3', a3)])
437
438
        del a2, a3
439
440
        self.check_transport_contents(
441
                'here is some text\nand a bit more\n'
442
                'adding more\ntext to two\n',
443
                t, 'f2')
444
        self.check_transport_contents( 
445
                'some text for the\nthird file created\n'
446
                'some garbage\nto put in three\n',
447
                t, 'f3')
448
449
        # Test that an actual file object can be used with put
450
        a4 = t.get('f1')
451
        if t.is_readonly():
452
            _append('f4', a4)
453
        else:
454
            t.append('f4', a4)
455
456
        del a4
457
458
        self.check_transport_contents(
459
                'this is a string\nand some more stuff\n'
460
                'this is a string\nand some more stuff\n'
461
                'appending to\none\n',
462
                t, 'f4')
463
464
        a5 = t.get('f2')
465
        a6 = t.get('f3')
466
        if t.is_readonly():
467
            _append('f5', a5)
468
            _append('f6', a6)
469
        else:
470
            t.append_multi([('f5', a5), ('f6', a6)])
471
472
        del a5, a6
473
474
        self.check_transport_contents(
475
                'here is some text\nand a bit more\n'
476
                'here is some text\nand a bit more\n'
477
                'adding more\ntext to two\n',
478
                t, 'f5')
479
        self.check_transport_contents(
480
                'some text for the\nthird file created\n'
481
                'some text for the\nthird file created\n'
482
                'some garbage\nto put in three\n',
483
                t, 'f6')
484
485
        a5 = t.get('f2')
486
        a6 = t.get('f2')
487
        a7 = t.get('f3')
488
        if t.is_readonly():
489
            _append('c', a5)
490
            _append('a', a6)
491
            _append('d', a7)
492
        else:
493
            t.append('c', a5)
494
            t.append_multi([('a', a6), ('d', a7)])
495
        del a5, a6, a7
496
        self.check_transport_contents(t.get('f2').read(), t, 'c')
497
        self.check_transport_contents(t.get('f3').read(), t, 'd')
498
499
    def test_delete(self):
500
        # TODO: Test Transport.delete
501
        t = self.get_transport()
502
503
        # Not much to do with a readonly transport
504
        if t.is_readonly():
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
505
            self.assertRaises(TransportNotPossible, t.delete, 'missing')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
506
            return
507
508
        t.put('a', StringIO('a little bit of text\n'))
509
        self.failUnless(t.has('a'))
510
        t.delete('a')
511
        self.failIf(t.has('a'))
512
513
        self.assertRaises(NoSuchFile, t.delete, 'a')
514
515
        t.put('a', StringIO('a text\n'))
516
        t.put('b', StringIO('b text\n'))
517
        t.put('c', StringIO('c text\n'))
518
        self.assertEqual([True, True, True],
519
                list(t.has_multi(['a', 'b', 'c'])))
520
        t.delete_multi(['a', 'c'])
521
        self.assertEqual([False, True, False],
522
                list(t.has_multi(['a', 'b', 'c'])))
523
        self.failIf(t.has('a'))
524
        self.failUnless(t.has('b'))
525
        self.failIf(t.has('c'))
526
527
        self.assertRaises(NoSuchFile,
528
                t.delete_multi, ['a', 'b', 'c'])
529
530
        self.assertRaises(NoSuchFile,
531
                t.delete_multi, iter(['a', 'b', 'c']))
532
533
        t.put('a', StringIO('another a text\n'))
534
        t.put('c', StringIO('another c text\n'))
535
        t.delete_multi(iter(['a', 'b', 'c']))
536
537
        # We should have deleted everything
538
        # SftpServer creates control files in the
539
        # working directory, so we can just do a
540
        # plain "listdir".
541
        # self.assertEqual([], os.listdir('.'))
542
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
543
    def test_rmdir(self):
544
        t = self.get_transport()
545
        # Not much to do with a readonly transport
546
        if t.is_readonly():
547
            self.assertRaises(TransportNotPossible, t.rmdir, 'missing')
548
            return
549
        t.mkdir('adir')
550
        t.mkdir('adir/bdir')
551
        t.rmdir('adir/bdir')
552
        self.assertRaises(NoSuchFile, t.stat, 'adir/bdir')
553
        t.rmdir('adir')
554
        self.assertRaises(NoSuchFile, t.stat, 'adir')
555
1553.5.10 by Martin Pool
New DirectoryNotEmpty exception, and raise this from local and memory
556
    def test_rmdir_not_empty(self):
557
        """Deleting a non-empty directory raises an exception
558
        
559
        sftp (and possibly others) don't give us a specific "directory not
560
        empty" exception -- we can just see that the operation failed.
561
        """
562
        t = self.get_transport()
563
        if t.is_readonly():
564
            return
565
        t.mkdir('adir')
566
        t.mkdir('adir/bdir')
567
        self.assertRaises(PathError, t.rmdir, 'adir')
568
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
569
    def test_rename_dir_succeeds(self):
570
        t = self.get_transport()
571
        if t.is_readonly():
572
            raise TestSkipped("transport is readonly")
573
        t.mkdir('adir')
574
        t.mkdir('adir/asubdir')
575
        t.rename('adir', 'bdir')
576
        self.assertTrue(t.has('bdir/asubdir'))
577
        self.assertFalse(t.has('adir'))
578
579
    def test_rename_dir_nonempty(self):
580
        """Attempting to replace a nonemtpy directory should fail"""
581
        t = self.get_transport()
582
        if t.is_readonly():
583
            raise TestSkipped("transport is readonly")
584
        t.mkdir('adir')
585
        t.mkdir('adir/asubdir')
586
        t.mkdir('bdir')
587
        t.mkdir('bdir/bsubdir')
588
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
589
        # nothing was changed so it should still be as before
590
        self.assertTrue(t.has('bdir/bsubdir'))
591
        self.assertFalse(t.has('adir/bdir'))
592
        self.assertFalse(t.has('adir/bsubdir'))
593
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
594
    def test_delete_tree(self):
595
        t = self.get_transport()
596
597
        # Not much to do with a readonly transport
598
        if t.is_readonly():
599
            self.assertRaises(TransportNotPossible, t.delete_tree, 'missing')
600
            return
601
602
        # and does it like listing ?
603
        t.mkdir('adir')
604
        try:
605
            t.delete_tree('adir')
606
        except TransportNotPossible:
607
            # ok, this transport does not support delete_tree
608
            return
609
        
610
        # did it delete that trivial case?
611
        self.assertRaises(NoSuchFile, t.stat, 'adir')
612
613
        self.build_tree(['adir/',
614
                         'adir/file', 
615
                         'adir/subdir/', 
616
                         'adir/subdir/file', 
617
                         'adir/subdir2/',
618
                         'adir/subdir2/file',
619
                         ], transport=t)
620
621
        t.delete_tree('adir')
622
        # adir should be gone now.
623
        self.assertRaises(NoSuchFile, t.stat, 'adir')
624
1530.1.3 by Robert Collins
transport implementations now tested consistently.
625
    def test_move(self):
626
        t = self.get_transport()
627
628
        if t.is_readonly():
629
            return
630
631
        # TODO: I would like to use os.listdir() to
632
        # make sure there are no extra files, but SftpServer
633
        # creates control files in the working directory
634
        # perhaps all of this could be done in a subdirectory
635
636
        t.put('a', StringIO('a first file\n'))
637
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
638
639
        t.move('a', 'b')
640
        self.failUnless(t.has('b'))
641
        self.failIf(t.has('a'))
642
643
        self.check_transport_contents('a first file\n', t, 'b')
644
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
645
646
        # Overwrite a file
647
        t.put('c', StringIO('c this file\n'))
648
        t.move('c', 'b')
649
        self.failIf(t.has('c'))
650
        self.check_transport_contents('c this file\n', t, 'b')
651
652
        # TODO: Try to write a test for atomicity
653
        # TODO: Test moving into a non-existant subdirectory
654
        # TODO: Test Transport.move_multi
655
656
    def test_copy(self):
657
        t = self.get_transport()
658
659
        if t.is_readonly():
660
            return
661
662
        t.put('a', StringIO('a file\n'))
663
        t.copy('a', 'b')
664
        self.check_transport_contents('a file\n', t, 'b')
665
666
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
667
        os.mkdir('c')
668
        # What should the assert be if you try to copy a
669
        # file over a directory?
670
        #self.assertRaises(Something, t.copy, 'a', 'c')
671
        t.put('d', StringIO('text in d\n'))
672
        t.copy('d', 'b')
673
        self.check_transport_contents('text in d\n', t, 'b')
674
675
        # TODO: test copy_multi
676
677
    def test_connection_error(self):
678
        """ConnectionError is raised when connection is impossible"""
1530.1.9 by Robert Collins
Test bogus urls with http in the new infrastructure.
679
        try:
680
            url = self._server.get_bogus_url()
681
        except NotImplementedError:
682
            raise TestSkipped("Transport %s has no bogus URL support." %
683
                              self._server.__class__)
684
        t = bzrlib.transport.get_transport(url)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
685
        try:
686
            t.get('.bzr/branch')
687
        except (ConnectionError, NoSuchFile), e:
688
            pass
689
        except (Exception), e:
690
            self.failIf(True, 'Wrong exception thrown: %s' % e)
691
        else:
692
            self.failIf(True, 'Did not get the expected exception.')
693
694
    def test_stat(self):
695
        # TODO: Test stat, just try once, and if it throws, stop testing
696
        from stat import S_ISDIR, S_ISREG
697
698
        t = self.get_transport()
699
700
        try:
701
            st = t.stat('.')
702
        except TransportNotPossible, e:
703
            # This transport cannot stat
704
            return
705
706
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
707
        sizes = [14, 0, 16, 0, 18] 
708
        self.build_tree(paths, transport=t)
709
710
        for path, size in zip(paths, sizes):
711
            st = t.stat(path)
712
            if path.endswith('/'):
713
                self.failUnless(S_ISDIR(st.st_mode))
714
                # directory sizes are meaningless
715
            else:
716
                self.failUnless(S_ISREG(st.st_mode))
717
                self.assertEqual(size, st.st_size)
718
719
        remote_stats = list(t.stat_multi(paths))
720
        remote_iter_stats = list(t.stat_multi(iter(paths)))
721
722
        self.assertRaises(NoSuchFile, t.stat, 'q')
723
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
724
725
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
726
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
727
        self.build_tree(['subdir/', 'subdir/file'], transport=t)
728
        subdir = t.clone('subdir')
729
        subdir.stat('./file')
1534.4.26 by Robert Collins
Move working tree initialisation out from Branch.initialize, deprecated Branch.initialize to Branch.create.
730
        subdir.stat('.')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
731
732
    def test_list_dir(self):
733
        # TODO: Test list_dir, just try once, and if it throws, stop testing
734
        t = self.get_transport()
735
        
736
        if not t.listable():
737
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
738
            return
739
740
        def sorted_list(d):
741
            l = list(t.list_dir(d))
742
            l.sort()
743
            return l
744
745
        # SftpServer creates control files in the working directory
746
        # so lets move down a directory to avoid those.
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
747
        if not t.is_readonly():
748
            t.mkdir('wd')
749
        else:
750
            os.mkdir('wd')
1530.1.3 by Robert Collins
transport implementations now tested consistently.
751
        t = t.clone('wd')
752
753
        self.assertEqual([], sorted_list(u'.'))
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
754
        # c2 is precisely one letter longer than c here to test that
755
        # suffixing is not confused.
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
756
        if not t.is_readonly():
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
757
            self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e', 'c2/'], transport=t)
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
758
        else:
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
759
            self.build_tree(['wd/a', 'wd/b', 'wd/c/', 'wd/c/d', 'wd/c/e', 'wd/c2/'])
1530.1.3 by Robert Collins
transport implementations now tested consistently.
760
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
761
        self.assertEqual([u'a', u'b', u'c', u'c2'], sorted_list(u'.'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
762
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
763
1534.4.9 by Robert Collins
Add a readonly decorator for transports.
764
        if not t.is_readonly():
765
            t.delete('c/d')
766
            t.delete('b')
767
        else:
768
            os.unlink('wd/c/d')
769
            os.unlink('wd/b')
770
            
1534.4.15 by Robert Collins
Remove shutil dependency in upgrade - create a delete_tree method for transports.
771
        self.assertEqual([u'a', u'c', u'c2'], sorted_list('.'))
1530.1.3 by Robert Collins
transport implementations now tested consistently.
772
        self.assertEqual([u'e'], sorted_list(u'c'))
773
774
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
775
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
776
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
777
778
    def test_clone(self):
779
        # TODO: Test that clone moves up and down the filesystem
780
        t1 = self.get_transport()
781
782
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
783
784
        self.failUnless(t1.has('a'))
785
        self.failUnless(t1.has('b/c'))
786
        self.failIf(t1.has('c'))
787
788
        t2 = t1.clone('b')
789
        self.assertEqual(t1.base + 'b/', t2.base)
790
791
        self.failUnless(t2.has('c'))
792
        self.failIf(t2.has('a'))
793
794
        t3 = t2.clone('..')
795
        self.failUnless(t3.has('a'))
796
        self.failIf(t3.has('c'))
797
798
        self.failIf(t1.has('b/d'))
799
        self.failIf(t2.has('d'))
800
        self.failIf(t3.has('b/d'))
801
802
        if t1.is_readonly():
803
            open('b/d', 'wb').write('newfile\n')
804
        else:
805
            t2.put('d', StringIO('newfile\n'))
806
807
        self.failUnless(t1.has('b/d'))
808
        self.failUnless(t2.has('d'))
809
        self.failUnless(t3.has('b/d'))
810
811
    def test_relpath(self):
812
        t = self.get_transport()
813
        self.assertEqual('', t.relpath(t.base))
814
        # base ends with /
815
        self.assertEqual('', t.relpath(t.base[:-1]))
816
        # subdirs which dont exist should still give relpaths.
817
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
818
        # trailing slash should be the same.
819
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
820
821
    def test_abspath(self):
822
        # smoke test for abspath. Corner cases for backends like unix fs's
823
        # that have aliasing problems like symlinks should go in backend
824
        # specific test cases.
825
        transport = self.get_transport()
826
        self.assertEqual(transport.base + 'relpath',
827
                         transport.abspath('relpath'))
828
829
    def test_iter_files_recursive(self):
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
830
        transport = self.get_transport()
831
        if not transport.listable():
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
832
            self.assertRaises(TransportNotPossible,
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
833
                              transport.iter_files_recursive)
834
            return
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
835
        self.build_tree(['isolated/',
1530.1.4 by Robert Collins
integrate Memory tests into transport interface tests.
836
                         'isolated/dir/',
837
                         'isolated/dir/foo',
838
                         'isolated/dir/bar',
839
                         'isolated/bar'],
840
                        transport=transport)
1530.1.3 by Robert Collins
transport implementations now tested consistently.
841
        paths = set(transport.iter_files_recursive())
1553.5.13 by Martin Pool
New Transport.rename that mustn't overwrite
842
        # nb the directories are not converted
843
        self.assertEqual(paths,
844
                    set(['isolated/dir/foo',
845
                         'isolated/dir/bar',
846
                         'isolated/bar']))
847
        sub_transport = transport.clone('isolated')
848
        paths = set(sub_transport.iter_files_recursive())
1530.1.3 by Robert Collins
transport implementations now tested consistently.
849
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
1534.4.26 by Robert Collins
Move working tree initialisation out from Branch.initialize, deprecated Branch.initialize to Branch.create.
850
851
    def test_connect_twice_is_same_content(self):
852
        # check that our server (whatever it is) is accessable reliably
853
        # via get_transport and multiple connections share content.
854
        transport = self.get_transport()
855
        if transport.is_readonly():
856
            return
857
        transport.put('foo', StringIO('bar'))
858
        transport2 = self.get_transport()
859
        self.check_transport_contents('bar', transport2, 'foo')
860
        # its base should be usable.
861
        transport2 = bzrlib.transport.get_transport(transport.base)
862
        self.check_transport_contents('bar', transport2, 'foo')
863
864
        # now opening at a relative url should give use a sane result:
865
        transport.mkdir('newdir')
866
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
867
        transport2 = transport2.clone('..')
868
        self.check_transport_contents('bar', transport2, 'foo')
869
870
    def test_lock_write(self):
871
        transport = self.get_transport()
872
        if transport.is_readonly():
873
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
874
            return
875
        transport.put('lock', StringIO())
876
        lock = transport.lock_write('lock')
877
        # TODO make this consistent on all platforms:
878
        # self.assertRaises(LockError, transport.lock_write, 'lock')
879
        lock.unlock()
880
881
    def test_lock_read(self):
882
        transport = self.get_transport()
883
        if transport.is_readonly():
884
            file('lock', 'w').close()
885
        else:
886
            transport.put('lock', StringIO())
887
        lock = transport.lock_read('lock')
888
        # TODO make this consistent on all platforms:
889
        # self.assertRaises(LockError, transport.lock_read, 'lock')
890
        lock.unlock()