/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

[merge] (Goffredo) faster merge/fetch by peeking into weave

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
 
18
18
import os
 
19
import sys
 
20
import stat
19
21
from cStringIO import StringIO
20
22
 
21
 
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
22
 
                           ConnectionError)
23
 
from bzrlib.selftest import TestCase, TestCaseInTempDir
24
 
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
23
from bzrlib.errors import (NoSuchFile, FileExists,
 
24
                           TransportNotPossible, ConnectionError)
 
25
from bzrlib.tests import TestCase, TestCaseInTempDir
 
26
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
25
27
from bzrlib.transport import memory, urlescape
 
28
from bzrlib.osutils import pathjoin
26
29
 
27
30
 
28
31
def _append(fn, txt):
33
36
    f.close()
34
37
    del f
35
38
 
 
39
 
 
40
if sys.platform != 'win32':
 
41
    def check_mode(test, path, mode):
 
42
        """Check that a particular path has the correct mode."""
 
43
        actual_mode = stat.S_IMODE(os.stat(path).st_mode)
 
44
        test.assertEqual(mode, actual_mode,
 
45
            'mode of %r incorrect (%o != %o)' % (path, mode, actual_mode))
 
46
else:
 
47
    def check_mode(test, path, mode):
 
48
        """On win32 chmod doesn't have any effect, 
 
49
        so don't actually check anything
 
50
        """
 
51
        return
 
52
 
 
53
 
36
54
class TestTransport(TestCase):
37
55
    """Test the non transport-concrete class functionality."""
38
56
 
56
74
        """
57
75
        raise NotImplementedError
58
76
 
 
77
    def assertListRaises(self, excClass, func, *args, **kwargs):
 
78
        """Many transport functions can return generators this makes sure
 
79
        to wrap them in a list() call to make sure the whole generator
 
80
        is run, and that the proper exception is raised.
 
81
        """
 
82
        try:
 
83
            list(func(*args, **kwargs))
 
84
        except excClass:
 
85
            return
 
86
        else:
 
87
            if hasattr(excClass,'__name__'): excName = excClass.__name__
 
88
            else: excName = str(excClass)
 
89
            raise self.failureException, "%s not raised" % excName
 
90
 
59
91
    def test_has(self):
60
92
        t = self.get_transport()
61
93
 
62
94
        files = ['a', 'b', 'e', 'g', '%']
63
95
        self.build_tree(files)
64
 
        self.assertEqual(t.has('a'), True)
65
 
        self.assertEqual(t.has('c'), False)
66
 
        self.assertEqual(t.has(urlescape('%')), True)
 
96
        self.assertEqual(True, t.has('a'))
 
97
        self.assertEqual(False, t.has('c'))
 
98
        self.assertEqual(True, t.has(urlescape('%')))
67
99
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
68
100
                [True, True, False, False, True, False, True, False])
69
 
        self.assertEqual(t.has_any(['a', 'b', 'c']), True)
70
 
        self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
 
101
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
 
102
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlescape('%%')]))
71
103
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
72
104
                [True, True, False, False, True, False, True, False])
73
 
        self.assertEqual(t.has_any(['c', 'c', 'c']), False)
74
 
        self.assertEqual(t.has_any(['b', 'b', 'b']), True)
 
105
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
 
106
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
75
107
 
76
108
    def test_get(self):
77
109
        t = self.get_transport()
78
110
 
79
111
        files = ['a', 'b', 'e', 'g']
80
112
        self.build_tree(files)
81
 
        self.assertEqual(t.get('a').read(), open('a').read())
 
113
        self.assertEqual(open('a', 'rb').read(), t.get('a').read())
82
114
        content_f = t.get_multi(files)
83
115
        for path,f in zip(files, content_f):
84
 
            self.assertEqual(open(path).read(), f.read())
 
116
            self.assertEqual(f.read(), open(path, 'rb').read())
85
117
 
86
118
        content_f = t.get_multi(iter(files))
87
119
        for path,f in zip(files, content_f):
88
 
            self.assertEqual(open(path).read(), f.read())
 
120
            self.assertEqual(f.read(), open(path, 'rb').read())
89
121
 
90
122
        self.assertRaises(NoSuchFile, t.get, 'c')
91
 
        try:
92
 
            files = list(t.get_multi(['a', 'b', 'c']))
93
 
        except NoSuchFile:
94
 
            pass
95
 
        else:
96
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
97
 
        try:
98
 
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
99
 
        except NoSuchFile:
100
 
            pass
101
 
        else:
102
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
123
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
 
124
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
103
125
 
104
126
    def test_put(self):
105
127
        t = self.get_transport()
106
128
 
 
129
        # TODO: jam 20051215 No need to do anything if the test is readonly
 
130
        #                    origininally it was thought that it would give
 
131
        #                    more of a workout to readonly tests. By now the
 
132
        #                    suite is probably thorough enough without testing
 
133
        #                    readonly protocols in write sections
 
134
        #                    The only thing that needs to be tested is that the
 
135
        #                    right error is raised
 
136
 
107
137
        if self.readonly:
108
138
            self.assertRaises(TransportNotPossible,
109
139
                    t.put, 'a', 'some text for a\n')
154
184
            self.assertRaises(NoSuchFile,
155
185
                    t.put, 'path/doesnt/exist/c', 'contents')
156
186
 
 
187
        if not self.readonly:
 
188
            t.put('mode644', 'test text\n', mode=0644)
 
189
            check_mode(self, 'mode644', 0644)
 
190
 
 
191
            t.put('mode666', 'test text\n', mode=0666)
 
192
            check_mode(self, 'mode666', 0666)
 
193
 
 
194
            t.put('mode600', 'test text\n', mode=0600)
 
195
            check_mode(self, 'mode600', 0600)
 
196
 
 
197
            # Yes, you can put a file such that it becomes readonly
 
198
            t.put('mode400', 'test text\n', mode=0400)
 
199
            check_mode(self, 'mode400', 0400)
 
200
 
 
201
            t.put_multi([('mmode644', 'text\n')], mode=0644)
 
202
            check_mode(self, 'mmode644', 0644)
 
203
 
 
204
        # TODO: jam 20051215 test put_multi with a mode. I didn't bother because
 
205
        #                    it seems most people don't like the _multi functions
 
206
 
157
207
    def test_put_file(self):
158
208
        t = self.get_transport()
159
209
 
208
258
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
209
259
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
210
260
 
211
 
 
 
261
        if not self.readonly:
 
262
            sio = StringIO('test text\n')
 
263
            t.put('mode644', sio, mode=0644)
 
264
            check_mode(self, 'mode644', 0644)
 
265
 
 
266
            a = open('mode644', 'rb')
 
267
            t.put('mode666', a, mode=0666)
 
268
            check_mode(self, 'mode666', 0666)
 
269
 
 
270
            a = open('mode644', 'rb')
 
271
            t.put('mode600', a, mode=0600)
 
272
            check_mode(self, 'mode600', 0600)
 
273
 
 
274
            # Yes, you can put a file such that it becomes readonly
 
275
            a = open('mode644', 'rb')
 
276
            t.put('mode400', a, mode=0400)
 
277
            check_mode(self, 'mode400', 0400)
212
278
 
213
279
    def test_mkdir(self):
214
280
        t = self.get_transport()
276
342
                             ('dir_b/b', 'contents of dir_b/b')])
277
343
                          , 2)
278
344
        for f in ('dir_a/a', 'dir_b/b'):
279
 
            self.assertEqual(t.get(f).read(), open(f).read())
 
345
            self.assertEqual(t.get(f).read(), open(f, 'rb').read())
 
346
 
 
347
        if not self.readonly:
 
348
            # Test mkdir with a mode
 
349
            t.mkdir('dmode755', mode=0755)
 
350
            check_mode(self, 'dmode755', 0755)
 
351
 
 
352
            t.mkdir('dmode555', mode=0555)
 
353
            check_mode(self, 'dmode555', 0555)
 
354
 
 
355
            t.mkdir('dmode777', mode=0777)
 
356
            check_mode(self, 'dmode777', 0777)
 
357
 
 
358
            t.mkdir('dmode700', mode=0700)
 
359
            check_mode(self, 'dmode700', 0700)
 
360
 
 
361
            # TODO: jam 20051215 test mkdir_multi with a mode
 
362
            t.mkdir_multi(['mdmode755'], mode=0755)
 
363
            check_mode(self, 'mdmode755', 0755)
 
364
 
280
365
 
281
366
    def test_copy_to(self):
282
367
        import tempfile
287
372
        files = ['a', 'b', 'c', 'd']
288
373
        self.build_tree(files)
289
374
 
290
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
291
 
        dtmp_base = os.path.basename(dtmp)
292
 
        local_t = LocalTransport(dtmp)
 
375
        def get_temp_local():
 
376
            dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
377
            dtmp_base = os.path.basename(dtmp)
 
378
            return dtmp_base, LocalTransport(dtmp)
 
379
        dtmp_base, local_t = get_temp_local()
293
380
 
294
381
        t.copy_to(files, local_t)
295
382
        for f in files:
296
 
            self.assertEquals(open(f).read(),
297
 
                    open(os.path.join(dtmp_base, f)).read())
 
383
            self.assertEquals(open(f, 'rb').read(),
 
384
                    open(pathjoin(dtmp_base, f), 'rb').read())
298
385
 
299
386
        # Test that copying into a missing directory raises
300
387
        # NoSuchFile
302
389
        open('e/f', 'wb').write('contents of e')
303
390
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
304
391
 
305
 
        os.mkdir(os.path.join(dtmp_base, 'e'))
 
392
        os.mkdir(pathjoin(dtmp_base, 'e'))
306
393
        t.copy_to(['e/f'], local_t)
307
394
 
308
 
        del dtmp, dtmp_base, local_t
 
395
        del dtmp_base, local_t
309
396
 
310
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
311
 
        dtmp_base = os.path.basename(dtmp)
312
 
        local_t = LocalTransport(dtmp)
 
397
        dtmp_base, local_t = get_temp_local()
313
398
 
314
399
        files = ['a', 'b', 'c', 'd']
315
400
        t.copy_to(iter(files), local_t)
316
401
        for f in files:
317
 
            self.assertEquals(open(f).read(),
318
 
                    open(os.path.join(dtmp_base, f)).read())
319
 
 
320
 
        del dtmp, dtmp_base, local_t
 
402
            self.assertEquals(open(f, 'rb').read(),
 
403
                    open(pathjoin(dtmp_base, f), 'rb').read())
 
404
 
 
405
        del dtmp_base, local_t
 
406
 
 
407
        for mode in (0666, 0644, 0600, 0400):
 
408
            dtmp_base, local_t = get_temp_local()
 
409
            t.copy_to(files, local_t, mode=mode)
 
410
            for f in files:
 
411
                check_mode(self, os.path.join(dtmp_base, f), mode)
321
412
 
322
413
    def test_append(self):
323
414
        t = self.get_transport()
375
466
                'some\nmore\nfor\nb\n'
376
467
                'from an iterator\n')
377
468
 
 
469
        if self.readonly:
 
470
            _append('c', 'some text\nfor a missing file\n')
 
471
            _append('a', 'some text in a\n')
 
472
            _append('d', 'missing file r\n')
 
473
        else:
 
474
            t.append('c', 'some text\nfor a missing file\n')
 
475
            t.append_multi([('a', 'some text in a\n'),
 
476
                            ('d', 'missing file r\n')])
 
477
        self.check_file_contents('a', 
 
478
            'diff\ncontents for\na\n'
 
479
            'add\nsome\nmore\ncontents\n'
 
480
            'and\nthen\nsome\nmore\n'
 
481
            'a little bit more\n'
 
482
            'some text in a\n')
 
483
        self.check_file_contents('c', 'some text\nfor a missing file\n')
 
484
        self.check_file_contents('d', 'missing file r\n')
 
485
 
378
486
    def test_append_file(self):
379
487
        t = self.get_transport()
380
488
 
456
564
                'some text for the\nthird file created\n'
457
565
                'some garbage\nto put in three\n')
458
566
 
 
567
        a5 = open('f2', 'rb')
 
568
        a6 = open('f2', 'rb')
 
569
        a7 = open('f3', 'rb')
 
570
        if self.readonly:
 
571
            _append('c', a5.read())
 
572
            _append('a', a6.read())
 
573
            _append('d', a7.read())
 
574
        else:
 
575
            t.append('c', a5)
 
576
            t.append_multi([('a', a6), ('d', a7)])
 
577
        del a5, a6, a7
 
578
        self.check_file_contents('c', open('f2', 'rb').read())
 
579
        self.check_file_contents('d', open('f3', 'rb').read())
 
580
 
 
581
 
459
582
    def test_delete(self):
460
583
        # TODO: Test Transport.delete
461
 
        pass
 
584
        t = self.get_transport()
 
585
 
 
586
        # Not much to do with a readonly transport
 
587
        if self.readonly:
 
588
            return
 
589
 
 
590
        open('a', 'wb').write('a little bit of text\n')
 
591
        self.failUnless(t.has('a'))
 
592
        self.failUnlessExists('a')
 
593
        t.delete('a')
 
594
        self.failIf(os.path.lexists('a'))
 
595
 
 
596
        self.assertRaises(NoSuchFile, t.delete, 'a')
 
597
 
 
598
        open('a', 'wb').write('a text\n')
 
599
        open('b', 'wb').write('b text\n')
 
600
        open('c', 'wb').write('c text\n')
 
601
        self.assertEqual([True, True, True],
 
602
                list(t.has_multi(['a', 'b', 'c'])))
 
603
        t.delete_multi(['a', 'c'])
 
604
        self.assertEqual([False, True, False],
 
605
                list(t.has_multi(['a', 'b', 'c'])))
 
606
        self.failIf(os.path.lexists('a'))
 
607
        self.failUnlessExists('b')
 
608
        self.failIf(os.path.lexists('c'))
 
609
 
 
610
        self.assertRaises(NoSuchFile,
 
611
                t.delete_multi, ['a', 'b', 'c'])
 
612
 
 
613
        self.assertRaises(NoSuchFile,
 
614
                t.delete_multi, iter(['a', 'b', 'c']))
 
615
 
 
616
        open('a', 'wb').write('another a text\n')
 
617
        open('c', 'wb').write('another c text\n')
 
618
        t.delete_multi(iter(['a', 'b', 'c']))
 
619
 
 
620
        # We should have deleted everything
 
621
        # SftpServer creates control files in the
 
622
        # working directory, so we can just do a
 
623
        # plain "listdir".
 
624
        # self.assertEqual([], os.listdir('.'))
462
625
 
463
626
    def test_move(self):
464
 
        # TODO: Test Transport.move
465
 
        pass
 
627
        t = self.get_transport()
 
628
 
 
629
        if self.readonly:
 
630
            return
 
631
 
 
632
        # TODO: I would like to use os.listdir() to
 
633
        # make sure there are no extra files, but SftpServer
 
634
        # creates control files in the working directory
 
635
        # perhaps all of this could be done in a subdirectory
 
636
 
 
637
        open('a', 'wb').write('a first file\n')
 
638
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
 
639
 
 
640
        t.move('a', 'b')
 
641
        self.failUnlessExists('b')
 
642
        self.failIf(os.path.lexists('a'))
 
643
 
 
644
        self.check_file_contents('b', 'a first file\n')
 
645
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
 
646
 
 
647
        # Overwrite a file
 
648
        open('c', 'wb').write('c this file\n')
 
649
        t.move('c', 'b')
 
650
        self.failIf(os.path.lexists('c'))
 
651
        self.check_file_contents('b', 'c this file\n')
 
652
 
 
653
        # TODO: Try to write a test for atomicity
 
654
        # TODO: Test moving into a non-existant subdirectory
 
655
        # TODO: Test Transport.move_multi
 
656
 
 
657
    def test_copy(self):
 
658
        t = self.get_transport()
 
659
 
 
660
        if self.readonly:
 
661
            return
 
662
 
 
663
        open('a', 'wb').write('a file\n')
 
664
        t.copy('a', 'b')
 
665
        self.check_file_contents('b', 'a file\n')
 
666
 
 
667
        self.assertRaises(NoSuchFile, t.copy, 'c', 'd')
 
668
        os.mkdir('c')
 
669
        # What should the assert be if you try to copy a
 
670
        # file over a directory?
 
671
        #self.assertRaises(Something, t.copy, 'a', 'c')
 
672
        open('d', 'wb').write('text in d\n')
 
673
        t.copy('d', 'b')
 
674
        self.check_file_contents('b', 'text in d\n')
 
675
 
 
676
        # TODO: test copy_multi
466
677
 
467
678
    def test_connection_error(self):
468
679
        """ConnectionError is raised when connection is impossible"""
469
680
        if not hasattr(self, "get_bogus_transport"):
470
681
            return
471
682
        t = self.get_bogus_transport()
472
 
        self.assertRaises(ConnectionError, t.get, '.bzr/branch')
 
683
        try:
 
684
            t.get('.bzr/branch')
 
685
        except (ConnectionError, NoSuchFile), e:
 
686
            pass
 
687
        except (Exception), e:
 
688
            self.failIf(True, 'Wrong exception thrown: %s' % e)
 
689
        else:
 
690
            self.failIf(True, 'Did not get the expected exception.')
 
691
 
 
692
    def test_stat(self):
 
693
        # TODO: Test stat, just try once, and if it throws, stop testing
 
694
        from stat import S_ISDIR, S_ISREG
 
695
 
 
696
        t = self.get_transport()
 
697
 
 
698
        try:
 
699
            st = t.stat('.')
 
700
        except TransportNotPossible, e:
 
701
            # This transport cannot stat
 
702
            return
 
703
 
 
704
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
 
705
        self.build_tree(paths)
 
706
 
 
707
        local_stats = []
 
708
 
 
709
        for p in paths:
 
710
            st = t.stat(p)
 
711
            local_st = os.stat(p)
 
712
            if p.endswith('/'):
 
713
                self.failUnless(S_ISDIR(st.st_mode))
 
714
            else:
 
715
                self.failUnless(S_ISREG(st.st_mode))
 
716
            self.assertEqual(local_st.st_size, st.st_size)
 
717
            self.assertEqual(local_st.st_mode, st.st_mode)
 
718
            local_stats.append(local_st)
 
719
 
 
720
        remote_stats = list(t.stat_multi(paths))
 
721
        remote_iter_stats = list(t.stat_multi(iter(paths)))
 
722
 
 
723
        for local, remote, remote_iter in \
 
724
            zip(local_stats, remote_stats, remote_iter_stats):
 
725
            self.assertEqual(local.st_mode, remote.st_mode)
 
726
            self.assertEqual(local.st_mode, remote_iter.st_mode)
 
727
 
 
728
            self.assertEqual(local.st_size, remote.st_size)
 
729
            self.assertEqual(local.st_size, remote_iter.st_size)
 
730
            # Should we test UID/GID?
 
731
 
 
732
        self.assertRaises(NoSuchFile, t.stat, 'q')
 
733
        self.assertRaises(NoSuchFile, t.stat, 'b/a')
 
734
 
 
735
        self.assertListRaises(NoSuchFile, t.stat_multi, ['a', 'c', 'd'])
 
736
        self.assertListRaises(NoSuchFile, t.stat_multi, iter(['a', 'c', 'd']))
 
737
 
 
738
    def test_list_dir(self):
 
739
        # TODO: Test list_dir, just try once, and if it throws, stop testing
 
740
        t = self.get_transport()
 
741
        
 
742
        if not t.listable():
 
743
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
 
744
            return
 
745
 
 
746
        def sorted_list(d):
 
747
            l = list(t.list_dir(d))
 
748
            l.sort()
 
749
            return l
 
750
 
 
751
        # SftpServer creates control files in the working directory
 
752
        # so lets move down a directory to be safe
 
753
        os.mkdir('wd')
 
754
        os.chdir('wd')
 
755
        t = t.clone('wd')
 
756
 
 
757
        self.assertEqual([], sorted_list(u'.'))
 
758
        self.build_tree(['a', 'b', 'c/', 'c/d', 'c/e'])
 
759
 
 
760
        self.assertEqual([u'a', u'b', u'c'], sorted_list(u'.'))
 
761
        self.assertEqual([u'd', u'e'], sorted_list(u'c'))
 
762
 
 
763
        os.remove('c/d')
 
764
        os.remove('b')
 
765
        self.assertEqual([u'a', u'c'], sorted_list('.'))
 
766
        self.assertEqual([u'e'], sorted_list(u'c'))
 
767
 
 
768
        self.assertListRaises(NoSuchFile, t.list_dir, 'q')
 
769
        self.assertListRaises(NoSuchFile, t.list_dir, 'c/f')
 
770
        self.assertListRaises(NoSuchFile, t.list_dir, 'a')
 
771
 
 
772
    def test_clone(self):
 
773
        # TODO: Test that clone moves up and down the filesystem
 
774
        t1 = self.get_transport()
 
775
 
 
776
        self.build_tree(['a', 'b/', 'b/c'])
 
777
 
 
778
        self.failUnless(t1.has('a'))
 
779
        self.failUnless(t1.has('b/c'))
 
780
        self.failIf(t1.has('c'))
 
781
 
 
782
        t2 = t1.clone('b')
 
783
        self.failUnless(t2.has('c'))
 
784
        self.failIf(t2.has('a'))
 
785
 
 
786
        t3 = t2.clone('..')
 
787
        self.failUnless(t3.has('a'))
 
788
        self.failIf(t3.has('c'))
 
789
 
 
790
        self.failIf(t1.has('b/d'))
 
791
        self.failIf(t2.has('d'))
 
792
        self.failIf(t3.has('b/d'))
 
793
 
 
794
        if self.readonly:
 
795
            open('b/d', 'wb').write('newfile\n')
 
796
        else:
 
797
            t2.put('d', 'newfile\n')
 
798
 
 
799
        self.failUnless(t1.has('b/d'))
 
800
        self.failUnless(t2.has('d'))
 
801
        self.failUnless(t3.has('b/d'))
473
802
 
474
803
        
475
804
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
476
805
    def get_transport(self):
477
806
        from bzrlib.transport.local import LocalTransport
478
 
        return LocalTransport('.')
 
807
        return LocalTransport(u'.')
479
808
 
480
809
 
481
810
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
484
813
 
485
814
    def get_transport(self):
486
815
        from bzrlib.transport.http import HttpTransport
487
 
        url = self.get_remote_url('.')
 
816
        url = self.get_remote_url(u'.')
488
817
        return HttpTransport(url)
489
818
 
490
819
    def get_bogus_transport(self):