/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

  • Committer: Andrew Bennetts
  • Date: 2008-03-27 06:10:18 UTC
  • mfrom: (3309 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3320.
  • Revision ID: andrew.bennetts@canonical.com-20080327061018-dxztpxyv6yoeg3am
Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
Tests for low-level protocol encoding are found in test_smart_transport.
25
25
"""
26
26
 
 
27
import bz2
27
28
from cStringIO import StringIO
28
29
import tarfile
29
30
 
45
46
    SmartServerResponse,
46
47
    SuccessfulSmartServerResponse,
47
48
    )
 
49
from bzrlib.tests import (
 
50
    iter_suite_tests,
 
51
    split_suite_by_re,
 
52
    TestScenarioApplier,
 
53
    )
48
54
from bzrlib.transport import chroot, get_transport
49
55
from bzrlib.util import bencode
50
56
 
51
57
 
 
58
def load_tests(standard_tests, module, loader):
 
59
    """Multiply tests version and protocol consistency."""
 
60
    # FindRepository tests.
 
61
    bzrdir_mod = bzrlib.smart.bzrdir
 
62
    applier = TestScenarioApplier()
 
63
    applier.scenarios = [
 
64
        ("find_repository", {
 
65
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
 
66
        ("find_repositoryV2", {
 
67
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
 
68
        ]
 
69
    to_adapt, result = split_suite_by_re(standard_tests,
 
70
        "TestSmartServerRequestFindRepository")
 
71
    v2_only, v1_and_2 = split_suite_by_re(to_adapt,
 
72
        "_v2")
 
73
    for test in iter_suite_tests(v1_and_2):
 
74
        result.addTests(applier.adapt(test))
 
75
    del applier.scenarios[0]
 
76
    for test in iter_suite_tests(v2_only):
 
77
        result.addTests(applier.adapt(test))
 
78
    return result
 
79
 
 
80
 
52
81
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
53
82
 
54
83
    def setUp(self):
134
163
    def test_no_repository(self):
135
164
        """When there is no repository to be found, ('norepository', ) is returned."""
136
165
        backing = self.get_transport()
137
 
        request = smart.bzrdir.SmartServerRequestFindRepository(backing)
 
166
        request = self._request_class(backing)
138
167
        self.make_bzrdir('.')
139
168
        self.assertEqual(SmartServerResponse(('norepository', )),
140
169
            request.execute(''))
144
173
        # path the repository is being searched on is the same as that that 
145
174
        # the repository is at.
146
175
        backing = self.get_transport()
147
 
        request = smart.bzrdir.SmartServerRequestFindRepository(backing)
 
176
        request = self._request_class(backing)
148
177
        result = self._make_repository_and_result()
149
178
        self.assertEqual(result, request.execute(''))
150
179
        self.make_bzrdir('subdir')
165
194
            subtrees = 'yes'
166
195
        else:
167
196
            subtrees = 'no'
168
 
        return SmartServerResponse(('ok', '', rich_root, subtrees))
 
197
        if (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
 
198
            self._request_class):
 
199
            # All tests so far are on formats, and for non-external
 
200
            # repositories.
 
201
            return SuccessfulSmartServerResponse(
 
202
                ('ok', '', rich_root, subtrees, 'no'))
 
203
        else:
 
204
            return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
169
205
 
170
206
    def test_shared_repository(self):
171
207
        """When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
172
208
        backing = self.get_transport()
173
 
        request = smart.bzrdir.SmartServerRequestFindRepository(backing)
 
209
        request = self._request_class(backing)
174
210
        result = self._make_repository_and_result(shared=True)
175
211
        self.assertEqual(result, request.execute(''))
176
212
        self.make_bzrdir('subdir')
185
221
    def test_rich_root_and_subtree_encoding(self):
186
222
        """Test for the format attributes for rich root and subtree support."""
187
223
        backing = self.get_transport()
188
 
        request = smart.bzrdir.SmartServerRequestFindRepository(backing)
 
224
        request = self._request_class(backing)
189
225
        result = self._make_repository_and_result(format='dirstate-with-subtree')
190
226
        # check the test will be valid
191
227
        self.assertEqual('yes', result.args[2])
192
228
        self.assertEqual('yes', result.args[3])
193
229
        self.assertEqual(result, request.execute(''))
194
230
 
 
231
    def test_supports_external_lookups_no_v2(self):
 
232
        """Test for the supports_external_lookups attribute."""
 
233
        backing = self.get_transport()
 
234
        request = self._request_class(backing)
 
235
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
236
        # check the test will be valid
 
237
        self.assertEqual('no', result.args[4])
 
238
        self.assertEqual(result, request.execute(''))
 
239
 
195
240
 
196
241
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
197
242
 
435
480
 
436
481
    def setUp(self):
437
482
        tests.TestCaseWithMemoryTransport.setUp(self)
438
 
        self.reduceLockdirTimeout()
439
483
 
440
484
    def test_lock_write_on_unlocked_branch(self):
441
485
        backing = self.get_transport()
521
565
 
522
566
    def setUp(self):
523
567
        tests.TestCaseWithMemoryTransport.setUp(self)
524
 
        self.reduceLockdirTimeout()
525
568
 
526
569
    def test_unlock_on_locked_branch_and_repo(self):
527
570
        backing = self.get_transport()
587
630
            request.execute, 'subdir')
588
631
 
589
632
 
 
633
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithTransport):
 
634
 
 
635
    def test_trivial_bzipped(self):
 
636
        # This tests that the wire encoding is actually bzipped
 
637
        backing = self.get_transport()
 
638
        request = smart.repository.SmartServerRepositoryGetParentMap(backing)
 
639
        tree = self.make_branch_and_memory_tree('.')
 
640
 
 
641
        self.assertEqual(None,
 
642
            request.execute('', 'missing-id'))
 
643
        # Note that it returns a body (of '' bzipped).
 
644
        self.assertEqual(
 
645
            SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
 
646
            request.do_body('\n\n0\n'))
 
647
 
 
648
 
590
649
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
591
650
 
592
651
    def test_none_argument(self):
749
808
 
750
809
    def setUp(self):
751
810
        tests.TestCaseWithMemoryTransport.setUp(self)
752
 
        self.reduceLockdirTimeout()
753
811
 
754
812
    def test_lock_write_on_unlocked_repo(self):
755
813
        backing = self.get_transport()
787
845
 
788
846
    def setUp(self):
789
847
        tests.TestCaseWithMemoryTransport.setUp(self)
790
 
        self.reduceLockdirTimeout()
791
848
 
792
849
    def test_unlock_on_locked_repo(self):
793
850
        backing = self.get_transport()
886
943
        tree.add('')
887
944
        rev_id1_utf8 = u'\xc8'.encode('utf-8')
888
945
        rev_id2_utf8 = u'\xc9'.encode('utf-8')
889
 
        r1 = tree.commit('1st commit', rev_id=rev_id1_utf8)
890
 
        r1 = tree.commit('2nd commit', rev_id=rev_id2_utf8)
 
946
        tree.commit('1st commit', rev_id=rev_id1_utf8)
 
947
        tree.commit('2nd commit', rev_id=rev_id2_utf8)
891
948
        tree.unlock()
892
949
 
893
 
        response = request.execute('', rev_id2_utf8)
 
950
        response = request.execute('')
 
951
        self.assertEqual(None, response)
 
952
        response = request.do_body("%s\n%s\n1" % (rev_id2_utf8, rev_id1_utf8))
894
953
        self.assertEqual(('ok',), response.args)
895
954
        parser = pack.ContainerPushParser()
896
955
        names = []
909
968
            backing)
910
969
        repo = self.make_repository('.')
911
970
        rev_id1_utf8 = u'\xc8'.encode('utf-8')
912
 
        response = request.execute('', rev_id1_utf8)
913
 
        # There's no error initially.
914
 
        self.assertTrue(response.is_successful())
915
 
        self.assertEqual(('ok',), response.args)
916
 
        # We only get an error while streaming the body.
917
 
        body = list(response.body_stream)
918
 
        last_chunk = body[-1]
919
 
        self.assertIsInstance(last_chunk, FailedSmartServerResponse)
 
971
        response = request.execute('')
 
972
        self.assertEqual(None, response)
 
973
        response = request.do_body("%s\n\n1" % (rev_id1_utf8,))
920
974
        self.assertEqual(
921
 
            last_chunk,
922
 
            FailedSmartServerResponse(('NoSuchRevision', rev_id1_utf8)))
 
975
            FailedSmartServerResponse(('NoSuchRevision', )),
 
976
            response)
923
977
 
924
978
 
925
979
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
964
1018
            smart.branch.SmartServerBranchRequestUnlock)
965
1019
        self.assertEqual(
966
1020
            smart.request.request_handlers.get('BzrDir.find_repository'),
967
 
            smart.bzrdir.SmartServerRequestFindRepository)
 
1021
            smart.bzrdir.SmartServerRequestFindRepositoryV1)
 
1022
        self.assertEqual(
 
1023
            smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
 
1024
            smart.bzrdir.SmartServerRequestFindRepositoryV2)
968
1025
        self.assertEqual(
969
1026
            smart.request.request_handlers.get('BzrDirFormat.initialize'),
970
1027
            smart.bzrdir.SmartServerRequestInitializeBzrDir)
975
1032
            smart.request.request_handlers.get('Repository.gather_stats'),
976
1033
            smart.repository.SmartServerRepositoryGatherStats)
977
1034
        self.assertEqual(
 
1035
            smart.request.request_handlers.get('Repository.get_parent_map'),
 
1036
            smart.repository.SmartServerRepositoryGetParentMap)
 
1037
        self.assertEqual(
978
1038
            smart.request.request_handlers.get(
979
1039
                'Repository.get_revision_graph'),
980
1040
            smart.repository.SmartServerRepositoryGetRevisionGraph)
988
1048
            smart.request.request_handlers.get('Repository.lock_write'),
989
1049
            smart.repository.SmartServerRepositoryLockWrite)
990
1050
        self.assertEqual(
991
 
            smart.request.request_handlers.get(
992
 
                'Repository.chunked_stream_knit_data_for_revisions'),
993
 
            smart.repository.SmartServerRepositoryStreamKnitDataForRevisions)
994
 
        self.assertEqual(
995
1051
            smart.request.request_handlers.get('Repository.tarball'),
996
1052
            smart.repository.SmartServerRepositoryTarball)
997
1053
        self.assertEqual(