/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport_implementations.py

merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
25
26
from StringIO import StringIO as pyStringIO
30
31
from bzrlib import (
31
32
    errors,
32
33
    osutils,
 
34
    tests,
33
35
    urlutils,
34
36
    )
35
37
from bzrlib.errors import (ConnectionError,
37
39
                           FileExists,
38
40
                           InvalidURL,
39
41
                           LockError,
40
 
                           NoSmartServer,
41
42
                           NoSuchFile,
42
43
                           NotLocalUrl,
43
44
                           PathError,
45
46
                           )
46
47
from bzrlib.osutils import getcwd
47
48
from bzrlib.smart import medium
48
 
from bzrlib.symbol_versioning import zero_eleven
49
 
from bzrlib.tests import TestCaseInTempDir, TestScenarioApplier, TestSkipped
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
 
51
    TestSkipped,
 
52
    TestNotApplicable,
 
53
    multiply_tests,
 
54
    )
50
55
from bzrlib.tests.test_transport import TestTransportImplementation
51
56
from bzrlib.transport import (
52
57
    ConnectedTransport,
56
61
from bzrlib.transport.memory import MemoryTransport
57
62
 
58
63
 
59
 
class TransportTestProviderAdapter(TestScenarioApplier):
60
 
    """A tool to generate a suite testing all transports for a single test.
61
 
 
62
 
    This is done by copying the test once for each transport and injecting
63
 
    the transport_class and transport_server classes into each copy. Each copy
64
 
    is also given a new id() to make it easy to identify.
65
 
    """
66
 
 
67
 
    def __init__(self):
68
 
        self.scenarios = self._test_permutations()
69
 
 
70
 
    def get_transport_test_permutations(self, module):
71
 
        """Get the permutations module wants to have tested."""
72
 
        if getattr(module, 'get_test_permutations', None) is None:
73
 
            raise AssertionError("transport module %s doesn't provide get_test_permutations()"
74
 
                    % module.__name__)
75
 
            ##warning("transport module %s doesn't provide get_test_permutations()"
76
 
            ##       % module.__name__)
77
 
            return []
78
 
        return module.get_test_permutations()
79
 
 
80
 
    def _test_permutations(self):
81
 
        """Return a list of the klass, server_factory pairs to test."""
82
 
        result = []
83
 
        for module in _get_transport_modules():
84
 
            try:
85
 
                permutations = self.get_transport_test_permutations(
86
 
                    reduce(getattr, (module).split('.')[1:], __import__(module)))
87
 
                for (klass, server_factory) in permutations:
88
 
                    scenario = (server_factory.__name__,
89
 
                        {"transport_class":klass,
90
 
                         "transport_server":server_factory})
91
 
                    result.append(scenario)
92
 
            except errors.DependencyNotPresent, e:
93
 
                # Continue even if a dependency prevents us 
94
 
                # from running this test
95
 
                pass
96
 
        return result
97
 
 
 
64
def get_transport_test_permutations(module):
 
65
    """Get the permutations module wants to have tested."""
 
66
    if getattr(module, 'get_test_permutations', None) is None:
 
67
        raise AssertionError(
 
68
            "transport module %s doesn't provide get_test_permutations()"
 
69
            % module.__name__)
 
70
        return []
 
71
    return module.get_test_permutations()
 
72
 
 
73
 
 
74
def transport_test_permutations():
 
75
    """Return a list of the klass, server_factory pairs to test."""
 
76
    result = []
 
77
    for module in _get_transport_modules():
 
78
        try:
 
79
            permutations = get_transport_test_permutations(
 
80
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
81
            for (klass, server_factory) in permutations:
 
82
                scenario = (server_factory.__name__,
 
83
                    {"transport_class":klass,
 
84
                     "transport_server":server_factory})
 
85
                result.append(scenario)
 
86
        except errors.DependencyNotPresent, e:
 
87
            # Continue even if a dependency prevents us
 
88
            # from adding this test
 
89
            pass
 
90
    return result
 
91
 
 
92
 
 
93
def load_tests(standard_tests, module, loader):
 
94
    """Multiply tests for tranport implementations."""
 
95
    result = loader.suiteClass()
 
96
    scenarios = transport_test_permutations()
 
97
    return multiply_tests(standard_tests, scenarios, result)
98
98
 
99
99
 
100
100
class TransportTests(TestTransportImplementation):
155
155
        self.assertEqual(True, t.has('a'))
156
156
        self.assertEqual(False, t.has('c'))
157
157
        self.assertEqual(True, t.has(urlutils.escape('%')))
158
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
159
 
                [True, True, False, False, True, False, True, False])
 
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
159
                                           'e', 'f', 'g', 'h'])),
 
160
                         [True, True, False, False,
 
161
                          True, False, True, False])
160
162
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
161
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
162
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
163
 
                [True, True, False, False, True, False, True, False])
 
163
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
164
                                           urlutils.escape('%%')]))
 
165
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
166
                                                'e', 'f', 'g', 'h']))),
 
167
                         [True, True, False, False,
 
168
                          True, False, True, False])
164
169
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
165
170
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
166
171
 
167
172
    def test_has_root_works(self):
 
173
        from bzrlib.smart import server
 
174
        if self.transport_server is server.SmartTCPServer_for_testing:
 
175
            raise TestNotApplicable(
 
176
                "SmartTCPServer_for_testing intentionally does not allow "
 
177
                "access to /.")
168
178
        current_transport = self.get_transport()
169
179
        self.assertTrue(current_transport.has('/'))
170
180
        root = current_transport.clone('/')
182
192
        self.build_tree(files, transport=t, line_endings='binary')
183
193
        self.check_transport_contents('contents of a\n', t, 'a')
184
194
        content_f = t.get_multi(files)
185
 
        for content, f in zip(contents, content_f):
 
195
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
196
        # evaluate their inputs, the transport requests should be issued and
 
197
        # handled sequentially (we don't want to force transport to buffer).
 
198
        for content, f in itertools.izip(contents, content_f):
186
199
            self.assertEqual(content, f.read())
187
200
 
188
201
        content_f = t.get_multi(iter(files))
189
 
        for content, f in zip(contents, content_f):
 
202
        # Use itertools.izip() for the same reason
 
203
        for content, f in itertools.izip(contents, content_f):
190
204
            self.assertEqual(content, f.read())
191
205
 
192
206
        self.assertRaises(NoSuchFile, t.get, 'c')
206
220
        except (errors.PathError, errors.RedirectRequested):
207
221
            # early failure return immediately.
208
222
            return
209
 
        # having got a file, read() must either work (i.e. http reading a dir listing) or
210
 
        # fail with ReadError
 
223
        # having got a file, read() must either work (i.e. http reading a dir
 
224
        # listing) or fail with ReadError
211
225
        try:
212
226
            a_file.read()
213
227
        except errors.ReadError:
230
244
 
231
245
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
232
246
 
233
 
    def test_put(self):
234
 
        t = self.get_transport()
235
 
 
236
 
        if t.is_readonly():
237
 
            return
238
 
 
239
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
240
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
241
 
 
242
 
        self.applyDeprecated(zero_eleven,
243
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
244
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
245
 
 
246
 
        self.assertRaises(NoSuchFile,
247
 
            self.applyDeprecated,
248
 
            zero_eleven,
249
 
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
247
    def test_get_with_open_write_stream_sees_all_content(self):
 
248
        t = self.get_transport()
 
249
        if t.is_readonly():
 
250
            return
 
251
        handle = t.open_write_stream('foo')
 
252
        try:
 
253
            handle.write('b')
 
254
            self.assertEqual('b', t.get('foo').read())
 
255
        finally:
 
256
            handle.close()
 
257
 
 
258
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
259
        t = self.get_transport()
 
260
        if t.is_readonly():
 
261
            return
 
262
        handle = t.open_write_stream('foo')
 
263
        try:
 
264
            handle.write('b')
 
265
            self.assertEqual('b', t.get_bytes('foo'))
 
266
            self.assertEqual('b', t.get('foo').read())
 
267
        finally:
 
268
            handle.close()
250
269
 
251
270
    def test_put_bytes(self):
252
271
        t = self.get_transport()
299
318
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
300
319
                               create_parent_dir=True)
301
320
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
302
 
        
 
321
 
303
322
        # But we still get NoSuchFile if we can't make the parent dir
304
323
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
305
324
                                       'contents\n',
327
346
        umask = osutils.get_umask()
328
347
        t.put_bytes('nomode', 'test text\n', mode=None)
329
348
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
330
 
        
 
349
 
331
350
    def test_put_bytes_non_atomic_permissions(self):
332
351
        t = self.get_transport()
333
352
 
361
380
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
362
381
                               dir_mode=0777, create_parent_dir=True)
363
382
        self.assertTransportMode(t, 'dir777', 0777)
364
 
        
 
383
 
365
384
    def test_put_file(self):
366
385
        t = self.get_transport()
367
386
 
370
389
                    t.put_file, 'a', StringIO('some text for a\n'))
371
390
            return
372
391
 
373
 
        t.put_file('a', StringIO('some text for a\n'))
 
392
        result = t.put_file('a', StringIO('some text for a\n'))
 
393
        # put_file returns the length of the data written
 
394
        self.assertEqual(16, result)
374
395
        self.failUnless(t.has('a'))
375
396
        self.check_transport_contents('some text for a\n', t, 'a')
376
397
        # Put also replaces contents
377
 
        t.put_file('a', StringIO('new\ncontents for\na\n'))
 
398
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
399
        self.assertEqual(19, result)
378
400
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
379
401
        self.assertRaises(NoSuchFile,
380
402
                          t.put_file, 'path/doesnt/exist/c',
412
434
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
413
435
                              create_parent_dir=True)
414
436
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
415
 
        
 
437
 
416
438
        # But we still get NoSuchFile if we can't make the parent dir
417
439
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
418
440
                                       StringIO('contents\n'),
436
458
        # Yes, you can put a file such that it becomes readonly
437
459
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
438
460
        self.assertTransportMode(t, 'mode400', 0400)
439
 
 
440
 
        # XXX: put_multi is deprecated, so do we really care anymore?
441
 
        self.applyDeprecated(zero_eleven, t.put_multi,
442
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
443
 
        self.assertTransportMode(t, 'mmode644', 0644)
444
 
 
445
461
        # The default permissions should be based on the current umask
446
462
        umask = osutils.get_umask()
447
463
        t.put_file('nomode', StringIO('test text\n'), mode=None)
448
464
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
449
 
        
 
465
 
450
466
    def test_put_file_non_atomic_permissions(self):
451
467
        t = self.get_transport()
452
468
 
469
485
        umask = osutils.get_umask()
470
486
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
471
487
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
472
 
        
 
488
 
473
489
        # We should also be able to set the mode for a parent directory
474
490
        # when it is created
475
491
        sio = StringIO()
511
527
        unicode_file = pyStringIO(u'\u1234')
512
528
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
513
529
 
514
 
    def test_put_multi(self):
515
 
        t = self.get_transport()
516
 
 
517
 
        if t.is_readonly():
518
 
            return
519
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
520
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
521
 
                          ('d', StringIO('contents\nfor d\n'))]
522
 
            ))
523
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
524
 
                [True, False, False, True])
525
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
526
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
527
 
 
528
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
529
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
530
 
                              ('d', StringIO('another contents\nfor d\n'))])
531
 
            ))
532
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
533
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
534
 
 
535
 
    def test_put_permissions(self):
536
 
        t = self.get_transport()
537
 
 
538
 
        if t.is_readonly():
539
 
            return
540
 
        if not t._can_roundtrip_unix_modebits():
541
 
            # Can't roundtrip, so no need to run this test
542
 
            return
543
 
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
544
 
                             StringIO('test text\n'), mode=0644)
545
 
        self.assertTransportMode(t, 'mode644', 0644)
546
 
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
547
 
                             StringIO('test text\n'), mode=0666)
548
 
        self.assertTransportMode(t, 'mode666', 0666)
549
 
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
550
 
                             StringIO('test text\n'), mode=0600)
551
 
        self.assertTransportMode(t, 'mode600', 0600)
552
 
        # Yes, you can put a file such that it becomes readonly
553
 
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
554
 
                             StringIO('test text\n'), mode=0400)
555
 
        self.assertTransportMode(t, 'mode400', 0400)
556
 
        self.applyDeprecated(zero_eleven, t.put_multi,
557
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
558
 
        self.assertTransportMode(t, 'mmode644', 0644)
559
 
 
560
 
        # The default permissions should be based on the current umask
561
 
        umask = osutils.get_umask()
562
 
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
563
 
                             StringIO('test text\n'), mode=None)
564
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
565
 
        
566
530
    def test_mkdir(self):
567
531
        t = self.get_transport()
568
532
 
569
533
        if t.is_readonly():
570
 
            # cannot mkdir on readonly transports. We're not testing for 
 
534
            # cannot mkdir on readonly transports. We're not testing for
571
535
            # cache coherency because cache behaviour is not currently
572
536
            # defined for the transport interface.
573
537
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
594
558
 
595
559
        # we were testing that a local mkdir followed by a transport
596
560
        # mkdir failed thusly, but given that we * in one process * do not
597
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
561
        # concurrently fiddle with disk dirs and then use transport to do
598
562
        # things, the win here seems marginal compared to the constraint on
599
563
        # the interface. RBC 20051227
600
564
        t.mkdir('dir_g')
633
597
        t.mkdir('dnomode', mode=None)
634
598
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
635
599
 
 
600
    def test_opening_a_file_stream_creates_file(self):
 
601
        t = self.get_transport()
 
602
        if t.is_readonly():
 
603
            return
 
604
        handle = t.open_write_stream('foo')
 
605
        try:
 
606
            self.assertEqual('', t.get_bytes('foo'))
 
607
        finally:
 
608
            handle.close()
 
609
 
 
610
    def test_opening_a_file_stream_can_set_mode(self):
 
611
        t = self.get_transport()
 
612
        if t.is_readonly():
 
613
            return
 
614
        if not t._can_roundtrip_unix_modebits():
 
615
            # Can't roundtrip, so no need to run this test
 
616
            return
 
617
        def check_mode(name, mode, expected):
 
618
            handle = t.open_write_stream(name, mode=mode)
 
619
            handle.close()
 
620
            self.assertTransportMode(t, name, expected)
 
621
        check_mode('mode644', 0644, 0644)
 
622
        check_mode('mode666', 0666, 0666)
 
623
        check_mode('mode600', 0600, 0600)
 
624
        # The default permissions should be based on the current umask
 
625
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
626
 
636
627
    def test_copy_to(self):
637
628
        # FIXME: test:   same server to same server (partly done)
638
629
        # same protocol two servers
683
674
            for f in files:
684
675
                self.assertTransportMode(temp_transport, f, mode)
685
676
 
686
 
    def test_append(self):
 
677
    def test_create_prefix(self):
687
678
        t = self.get_transport()
688
 
 
689
 
        if t.is_readonly():
690
 
            return
691
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
692
 
        t.put_bytes('b', 'contents\nfor b\n')
693
 
 
694
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
695
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
696
 
 
697
 
        self.check_transport_contents(
698
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
699
 
            t, 'a')
700
 
 
701
 
        # And we can create new files, too
702
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
703
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
704
 
        self.check_transport_contents('some text\nfor a missing file\n',
705
 
                                      t, 'c')
 
679
        sub = t.clone('foo').clone('bar')
 
680
        try:
 
681
            sub.create_prefix()
 
682
        except TransportNotPossible:
 
683
            self.assertTrue(t.is_readonly())
 
684
        else:
 
685
            self.assertTrue(t.has('foo/bar'))
 
686
 
706
687
    def test_append_file(self):
707
688
        t = self.get_transport()
708
689
 
812
793
                t.append_file, 'f', StringIO('f'), mode=None)
813
794
            return
814
795
        t.append_file('f', StringIO('f'), mode=None)
815
 
        
 
796
 
816
797
    def test_append_bytes_mode(self):
817
798
        # check append_bytes accepts a mode
818
799
        t = self.get_transport()
821
802
                t.append_bytes, 'f', 'f', mode=None)
822
803
            return
823
804
        t.append_bytes('f', 'f', mode=None)
824
 
        
 
805
 
825
806
    def test_delete(self):
826
807
        # TODO: Test Transport.delete
827
808
        t = self.get_transport()
866
847
        # plain "listdir".
867
848
        # self.assertEqual([], os.listdir('.'))
868
849
 
 
850
    def test_recommended_page_size(self):
 
851
        """Transports recommend a page size for partial access to files."""
 
852
        t = self.get_transport()
 
853
        self.assertIsInstance(t.recommended_page_size(), int)
 
854
 
869
855
    def test_rmdir(self):
870
856
        t = self.get_transport()
871
857
        # Not much to do with a readonly transport
883
869
 
884
870
    def test_rmdir_not_empty(self):
885
871
        """Deleting a non-empty directory raises an exception
886
 
        
 
872
 
887
873
        sftp (and possibly others) don't give us a specific "directory not
888
874
        empty" exception -- we can just see that the operation failed.
889
875
        """
896
882
 
897
883
    def test_rmdir_empty_but_similar_prefix(self):
898
884
        """rmdir does not get confused by sibling paths.
899
 
        
 
885
 
900
886
        A naive implementation of MemoryTransport would refuse to rmdir
901
887
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
902
888
        uses "path.startswith(dir)" on all file paths to determine if directory
939
925
        self.assertFalse(t.has('adir/bdir'))
940
926
        self.assertFalse(t.has('adir/bsubdir'))
941
927
 
 
928
    def test_rename_across_subdirs(self):
 
929
        t = self.get_transport()
 
930
        if t.is_readonly():
 
931
            raise TestNotApplicable("transport is readonly")
 
932
        t.mkdir('a')
 
933
        t.mkdir('b')
 
934
        ta = t.clone('a')
 
935
        tb = t.clone('b')
 
936
        ta.put_bytes('f', 'aoeu')
 
937
        ta.rename('f', '../b/f')
 
938
        self.assertTrue(tb.has('f'))
 
939
        self.assertFalse(ta.has('f'))
 
940
        self.assertTrue(t.has('b/f'))
 
941
 
942
942
    def test_delete_tree(self):
943
943
        t = self.get_transport()
944
944
 
954
954
        except TransportNotPossible:
955
955
            # ok, this transport does not support delete_tree
956
956
            return
957
 
        
 
957
 
958
958
        # did it delete that trivial case?
959
959
        self.assertRaises(NoSuchFile, t.stat, 'adir')
960
960
 
961
961
        self.build_tree(['adir/',
962
 
                         'adir/file', 
963
 
                         'adir/subdir/', 
964
 
                         'adir/subdir/file', 
 
962
                         'adir/file',
 
963
                         'adir/subdir/',
 
964
                         'adir/subdir/file',
965
965
                         'adir/subdir2/',
966
966
                         'adir/subdir2/file',
967
967
                         ], transport=t)
998
998
        self.check_transport_contents('c this file\n', t, 'b')
999
999
 
1000
1000
        # TODO: Try to write a test for atomicity
1001
 
        # TODO: Test moving into a non-existant subdirectory
 
1001
        # TODO: Test moving into a non-existent subdirectory
1002
1002
        # TODO: Test Transport.move_multi
1003
1003
 
1004
1004
    def test_copy(self):
1024
1024
 
1025
1025
    def test_connection_error(self):
1026
1026
        """ConnectionError is raised when connection is impossible.
1027
 
        
1028
 
        The error may be raised from either the constructor or the first
1029
 
        operation on the transport.
 
1027
 
 
1028
        The error should be raised from the first operation on the transport.
1030
1029
        """
1031
1030
        try:
1032
1031
            url = self._server.get_bogus_url()
1049
1048
            return
1050
1049
 
1051
1050
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
1052
 
        sizes = [14, 0, 16, 0, 18] 
 
1051
        sizes = [14, 0, 16, 0, 18]
1053
1052
        self.build_tree(paths, transport=t, line_endings='binary')
1054
1053
 
1055
1054
        for path, size in zip(paths, sizes):
1077
1076
    def test_list_dir(self):
1078
1077
        # TODO: Test list_dir, just try once, and if it throws, stop testing
1079
1078
        t = self.get_transport()
1080
 
        
 
1079
 
1081
1080
        if not t.listable():
1082
1081
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
1083
1082
            return
1084
1083
 
1085
 
        def sorted_list(d):
1086
 
            l = list(t.list_dir(d))
 
1084
        def sorted_list(d, transport):
 
1085
            l = list(transport.list_dir(d))
1087
1086
            l.sort()
1088
1087
            return l
1089
1088
 
1090
 
        self.assertEqual([], sorted_list('.'))
 
1089
        self.assertEqual([], sorted_list('.', t))
1091
1090
        # c2 is precisely one letter longer than c here to test that
1092
1091
        # suffixing is not confused.
1093
1092
        # a%25b checks that quoting is done consistently across transports
1099
1098
            self.build_tree(tree_names)
1100
1099
 
1101
1100
        self.assertEqual(
1102
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
1103
 
        self.assertEqual(['d', 'e'], sorted_list('c'))
 
1101
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1102
        self.assertEqual(
 
1103
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1104
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1105
 
 
1106
        # Cloning the transport produces an equivalent listing
 
1107
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1104
1108
 
1105
1109
        if not t.is_readonly():
1106
1110
            t.delete('c/d')
1108
1112
        else:
1109
1113
            os.unlink('c/d')
1110
1114
            os.unlink('b')
1111
 
            
1112
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
1113
 
        self.assertEqual(['e'], sorted_list('c'))
 
1115
 
 
1116
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1117
        self.assertEqual(['e'], sorted_list('c', t))
1114
1118
 
1115
1119
        self.assertListRaises(PathError, t.list_dir, 'q')
1116
1120
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1121
        # 'a' is a file, list_dir should raise an error
1117
1122
        self.assertListRaises(PathError, t.list_dir, 'a')
1118
1123
 
1119
1124
    def test_list_dir_result_is_url_escaped(self):
1125
1130
            self.build_tree(['a/', 'a/%'], transport=t)
1126
1131
        else:
1127
1132
            self.build_tree(['a/', 'a/%'])
1128
 
        
 
1133
 
1129
1134
        names = list(t.list_dir('a'))
1130
1135
        self.assertEqual(['%25'], names)
1131
1136
        self.assertIsInstance(names[0], str)
1149
1154
 
1150
1155
        def new_url(scheme=None, user=None, password=None,
1151
1156
                    host=None, port=None, path=None):
1152
 
            """Build a new url from t.base chaging only parts of it.
 
1157
            """Build a new url from t.base changing only parts of it.
1153
1158
 
1154
1159
            Only the parameters different from None will be changed.
1155
1160
            """
1162
1167
            if path     is None: path     = t._path
1163
1168
            return t._unsplit_url(scheme, user, password, host, port, path)
1164
1169
 
1165
 
        self.assertIsNot(t, t._reuse_for(new_url(scheme='foo')))
 
1170
        if t._scheme == 'ftp':
 
1171
            scheme = 'sftp'
 
1172
        else:
 
1173
            scheme = 'ftp'
 
1174
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
1166
1175
        if t._user == 'me':
1167
1176
            user = 'you'
1168
1177
        else:
1186
1195
        else:
1187
1196
            port = 1234
1188
1197
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1198
        # No point in trying to reuse a transport for a local URL
 
1199
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
1189
1200
 
1190
1201
    def test_connection_sharing(self):
1191
1202
        t = self.get_transport()
1245
1256
        self.failIf(t3.has('b/d'))
1246
1257
 
1247
1258
        if t1.is_readonly():
1248
 
            open('b/d', 'wb').write('newfile\n')
 
1259
            self.build_tree_contents([('b/d', 'newfile\n')])
1249
1260
        else:
1250
1261
            t2.put_bytes('d', 'newfile\n')
1251
1262
 
1296
1307
        self.assertEqual('', t.relpath(t.base))
1297
1308
        # base ends with /
1298
1309
        self.assertEqual('', t.relpath(t.base[:-1]))
1299
 
        # subdirs which dont exist should still give relpaths.
 
1310
        # subdirs which don't exist should still give relpaths.
1300
1311
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1301
1312
        # trailing slash should be the same.
1302
1313
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1331
1342
        self.assertEqual(transport.clone("/").abspath('foo'),
1332
1343
                         transport.abspath("/foo"))
1333
1344
 
 
1345
    def test_win32_abspath(self):
 
1346
        # Note: we tried to set sys.platform='win32' so we could test on
 
1347
        # other platforms too, but then osutils does platform specific
 
1348
        # things at import time which defeated us...
 
1349
        if sys.platform != 'win32':
 
1350
            raise TestSkipped(
 
1351
                'Testing drive letters in abspath implemented only for win32')
 
1352
 
 
1353
        # smoke test for abspath on win32.
 
1354
        # a transport based on 'file:///' never fully qualifies the drive.
 
1355
        transport = get_transport("file:///")
 
1356
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1357
 
 
1358
        # but a transport that starts with a drive spec must keep it.
 
1359
        transport = get_transport("file:///C:/")
 
1360
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1361
 
1334
1362
    def test_local_abspath(self):
1335
1363
        transport = self.get_transport()
1336
1364
        try:
1413
1441
                         'to/dir/b%2525z',
1414
1442
                         'to/bar',]))
1415
1443
 
 
1444
    def test_copy_tree_to_transport(self):
 
1445
        transport = self.get_transport()
 
1446
        if not transport.listable():
 
1447
            self.assertRaises(TransportNotPossible,
 
1448
                              transport.iter_files_recursive)
 
1449
            return
 
1450
        if transport.is_readonly():
 
1451
            return
 
1452
        self.build_tree(['from/',
 
1453
                         'from/dir/',
 
1454
                         'from/dir/foo',
 
1455
                         'from/dir/bar',
 
1456
                         'from/dir/b%25z', # make sure quoting is correct
 
1457
                         'from/bar'],
 
1458
                        transport=transport)
 
1459
        from_transport = transport.clone('from')
 
1460
        to_transport = transport.clone('to')
 
1461
        to_transport.ensure_base()
 
1462
        from_transport.copy_tree_to_transport(to_transport)
 
1463
        paths = set(transport.iter_files_recursive())
 
1464
        self.assertEqual(paths,
 
1465
                    set(['from/dir/foo',
 
1466
                         'from/dir/bar',
 
1467
                         'from/dir/b%2525z',
 
1468
                         'from/bar',
 
1469
                         'to/dir/foo',
 
1470
                         'to/dir/bar',
 
1471
                         'to/dir/b%2525z',
 
1472
                         'to/bar',]))
 
1473
 
1416
1474
    def test_unicode_paths(self):
1417
1475
        """Test that we can read/write files with Unicode names."""
1418
1476
        t = self.get_transport()
1452
1510
        transport.put_bytes('foo', 'bar')
1453
1511
        transport3 = self.get_transport()
1454
1512
        self.check_transport_contents('bar', transport3, 'foo')
1455
 
        # its base should be usable.
1456
 
        transport4 = get_transport(transport.base)
1457
 
        self.check_transport_contents('bar', transport4, 'foo')
1458
1513
 
1459
1514
        # now opening at a relative url should give use a sane result:
1460
1515
        transport.mkdir('newdir')
1461
 
        transport5 = get_transport(transport.base + "newdir")
 
1516
        transport5 = self.get_transport('newdir')
1462
1517
        transport6 = transport5.clone('..')
1463
1518
        self.check_transport_contents('bar', transport6, 'foo')
1464
1519
 
1527
1582
        self.assertEqual(d[2], (0, '0'))
1528
1583
        self.assertEqual(d[3], (3, '34'))
1529
1584
 
 
1585
    def test_readv_with_adjust_for_latency(self):
 
1586
        transport = self.get_transport()
 
1587
        # the adjust for latency flag expands the data region returned
 
1588
        # according to a per-transport heuristic, so testing is a little
 
1589
        # tricky as we need more data than the largest combining that our
 
1590
        # transports do. To accomodate this we generate random data and cross
 
1591
        # reference the returned data with the random data. To avoid doing
 
1592
        # multiple large random byte look ups we do several tests on the same
 
1593
        # backing data.
 
1594
        content = osutils.rand_bytes(200*1024)
 
1595
        content_size = len(content)
 
1596
        if transport.is_readonly():
 
1597
            self.build_tree_contents([('a', content)])
 
1598
        else:
 
1599
            transport.put_bytes('a', content)
 
1600
        def check_result_data(result_vector):
 
1601
            for item in result_vector:
 
1602
                data_len = len(item[1])
 
1603
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1604
 
 
1605
        # start corner case
 
1606
        result = list(transport.readv('a', ((0, 30),),
 
1607
            adjust_for_latency=True, upper_limit=content_size))
 
1608
        # we expect 1 result, from 0, to something > 30
 
1609
        self.assertEqual(1, len(result))
 
1610
        self.assertEqual(0, result[0][0])
 
1611
        self.assertTrue(len(result[0][1]) >= 30)
 
1612
        check_result_data(result)
 
1613
        # end of file corner case
 
1614
        result = list(transport.readv('a', ((204700, 100),),
 
1615
            adjust_for_latency=True, upper_limit=content_size))
 
1616
        # we expect 1 result, from 204800- its length, to the end
 
1617
        self.assertEqual(1, len(result))
 
1618
        data_len = len(result[0][1])
 
1619
        self.assertEqual(204800-data_len, result[0][0])
 
1620
        self.assertTrue(data_len >= 100)
 
1621
        check_result_data(result)
 
1622
        # out of order ranges are made in order
 
1623
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1624
            adjust_for_latency=True, upper_limit=content_size))
 
1625
        # we expect 2 results, in order, start and end.
 
1626
        self.assertEqual(2, len(result))
 
1627
        # start
 
1628
        data_len = len(result[0][1])
 
1629
        self.assertEqual(0, result[0][0])
 
1630
        self.assertTrue(data_len >= 30)
 
1631
        # end
 
1632
        data_len = len(result[1][1])
 
1633
        self.assertEqual(204800-data_len, result[1][0])
 
1634
        self.assertTrue(data_len >= 100)
 
1635
        check_result_data(result)
 
1636
        # close ranges get combined (even if out of order)
 
1637
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1638
            result = list(transport.readv('a', request_vector,
 
1639
                adjust_for_latency=True, upper_limit=content_size))
 
1640
            self.assertEqual(1, len(result))
 
1641
            data_len = len(result[0][1])
 
1642
            # minimum length is from 400 to 1034 - 634
 
1643
            self.assertTrue(data_len >= 634)
 
1644
            # must contain the region 400 to 1034
 
1645
            self.assertTrue(result[0][0] <= 400)
 
1646
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1647
            check_result_data(result)
 
1648
 
 
1649
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1650
        transport = self.get_transport()
 
1651
        # test from observed failure case.
 
1652
        if transport.is_readonly():
 
1653
            file('a', 'w').write('a'*1024*1024)
 
1654
        else:
 
1655
            transport.put_bytes('a', 'a'*1024*1024)
 
1656
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1657
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1658
            (465373, 800), (947422, 800)]
 
1659
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1660
        found_items = [False]*9
 
1661
        for pos, (start, length) in enumerate(broken_vector):
 
1662
            # check the range is covered by the result
 
1663
            for offset, data in results:
 
1664
                if offset <= start and start + length <= offset + len(data):
 
1665
                    found_items[pos] = True
 
1666
        self.assertEqual([True]*9, found_items)
 
1667
 
 
1668
    def test_get_with_open_write_stream_sees_all_content(self):
 
1669
        t = self.get_transport()
 
1670
        if t.is_readonly():
 
1671
            return
 
1672
        handle = t.open_write_stream('foo')
 
1673
        try:
 
1674
            handle.write('bcd')
 
1675
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1676
        finally:
 
1677
            handle.close()
 
1678
 
1530
1679
    def test_get_smart_medium(self):
1531
1680
        """All transports must either give a smart medium, or know they can't.
1532
1681
        """