24
24
Tests for low-level protocol encoding are found in test_smart_transport.
27
28
from StringIO import StringIO
31
32
from bzrlib import bzrdir, errors, pack, smart, tests
32
from bzrlib.smart.request import SmartServerResponse
33
from bzrlib.smart.request import (
34
FailedSmartServerResponse,
36
SuccessfulSmartServerResponse,
33
38
import bzrlib.smart.bzrdir
34
39
import bzrlib.smart.branch
35
40
import bzrlib.smart.repository
41
from bzrlib.tests import (
36
46
from bzrlib.util import bencode
49
def load_tests(standard_tests, module, loader):
50
"""Multiply tests version and protocol consistency."""
51
# FindRepository tests.
52
bzrdir_mod = bzrlib.smart.bzrdir
53
applier = TestScenarioApplier()
56
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
57
("find_repositoryV2", {
58
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
60
to_adapt, result = split_suite_by_re(standard_tests,
61
"TestSmartServerRequestFindRepository")
62
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
64
for test in iter_suite_tests(v1_and_2):
65
result.addTests(applier.adapt(test))
66
del applier.scenarios[0]
67
for test in iter_suite_tests(v2_only):
68
result.addTests(applier.adapt(test))
39
72
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
72
105
def test_no_repository(self):
73
106
"""When there is no repository to be found, ('norepository', ) is returned."""
74
107
backing = self.get_transport()
75
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
108
request = self._request_class(backing)
76
109
self.make_bzrdir('.')
77
110
self.assertEqual(SmartServerResponse(('norepository', )),
78
111
request.execute(backing.local_abspath('')))
82
115
# path the repository is being searched on is the same as that that
83
116
# the repository is at.
84
117
backing = self.get_transport()
85
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
118
request = self._request_class(backing)
86
119
result = self._make_repository_and_result()
87
120
self.assertEqual(result, request.execute(backing.local_abspath('')))
88
121
self.make_bzrdir('subdir')
106
return SmartServerResponse(('ok', '', rich_root, subtrees))
139
if (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
140
self._request_class):
141
# All tests so far are on formats, and for non-external
143
return SuccessfulSmartServerResponse(
144
('ok', '', rich_root, subtrees, 'no'))
146
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
108
148
def test_shared_repository(self):
109
149
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
110
150
backing = self.get_transport()
111
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
151
request = self._request_class(backing)
112
152
result = self._make_repository_and_result(shared=True)
113
153
self.assertEqual(result, request.execute(backing.local_abspath('')))
114
154
self.make_bzrdir('subdir')
123
163
def test_rich_root_and_subtree_encoding(self):
124
164
"""Test for the format attributes for rich root and subtree support."""
125
165
backing = self.get_transport()
126
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
166
request = self._request_class(backing)
127
167
result = self._make_repository_and_result(format='dirstate-with-subtree')
128
168
# check the test will be valid
129
169
self.assertEqual('yes', result.args[2])
130
170
self.assertEqual('yes', result.args[3])
131
171
self.assertEqual(result, request.execute(backing.local_abspath('')))
173
def test_supports_external_lookups_no_v2(self):
174
"""Test for the supports_external_lookups attribute."""
175
backing = self.get_transport()
176
request = self._request_class(backing)
177
result = self._make_repository_and_result(format='dirstate-with-subtree')
178
# check the test will be valid
179
self.assertEqual('no', result.args[4])
180
self.assertEqual(result, request.execute(backing.local_abspath('')))
134
183
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
525
574
request.execute, backing.local_abspath('subdir'))
577
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithTransport):
579
def test_trivial_bzipped(self):
580
# This tests that the wire encoding is actually bzipped
581
backing = self.get_transport()
582
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
583
tree = self.make_branch_and_memory_tree('.')
585
self.assertEqual(None,
586
request.execute(backing.local_abspath(''), 'missing-id'))
587
# Note that it returns a body (of '' bzipped).
589
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
590
request.do_body('\n\n0\n'))
528
593
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
530
595
def test_none_argument(self):
882
class TestSmartServerRepositoryStreamRevisionsChunked(tests.TestCaseWithTransport):
884
def test_fetch_revisions(self):
885
backing = self.get_transport()
886
request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
888
tree = self.make_branch_and_memory_tree('.')
891
rev_id1_utf8 = u'\xc8'.encode('utf-8')
892
rev_id2_utf8 = u'\xc9'.encode('utf-8')
893
tree.commit('1st commit', rev_id=rev_id1_utf8)
894
tree.commit('2nd commit', rev_id=rev_id2_utf8)
897
response = request.execute(backing.local_abspath(''))
898
self.assertEqual(None, response)
899
response = request.do_body("%s\n%s\n1" % (rev_id2_utf8, rev_id1_utf8))
900
self.assertEqual(('ok',), response.args)
901
from cStringIO import StringIO
902
parser = pack.ContainerPushParser()
904
for stream_bytes in response.body_stream:
905
parser.accept_bytes(stream_bytes)
906
for [name], record_bytes in parser.read_pending_records():
908
# The bytes should be a valid bencoded string.
909
bencode.bdecode(record_bytes)
910
# XXX: assert that the bencoded knit records have the right
913
def test_no_such_revision_error(self):
914
backing = self.get_transport()
915
request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
917
repo = self.make_repository('.')
918
rev_id1_utf8 = u'\xc8'.encode('utf-8')
919
response = request.execute(backing.local_abspath(''))
920
self.assertEqual(None, response)
921
response = request.do_body("%s\n\n1" % (rev_id1_utf8,))
923
FailedSmartServerResponse(('NoSuchRevision', )),
817
927
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
819
929
def test_is_readonly_no(self):
856
966
smart.branch.SmartServerBranchRequestUnlock)
857
967
self.assertEqual(
858
968
smart.request.request_handlers.get('BzrDir.find_repository'),
859
smart.bzrdir.SmartServerRequestFindRepository)
969
smart.bzrdir.SmartServerRequestFindRepositoryV1)
971
smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
972
smart.bzrdir.SmartServerRequestFindRepositoryV2)
860
973
self.assertEqual(
861
974
smart.request.request_handlers.get('BzrDirFormat.initialize'),
862
975
smart.bzrdir.SmartServerRequestInitializeBzrDir)
867
980
smart.request.request_handlers.get('Repository.gather_stats'),
868
981
smart.repository.SmartServerRepositoryGatherStats)
869
982
self.assertEqual(
983
smart.request.request_handlers.get('Repository.get_parent_map'),
984
smart.repository.SmartServerRepositoryGetParentMap)
870
986
smart.request.request_handlers.get(
871
987
'Repository.get_revision_graph'),
872
988
smart.repository.SmartServerRepositoryGetRevisionGraph)
880
996
smart.request.request_handlers.get('Repository.lock_write'),
881
997
smart.repository.SmartServerRepositoryLockWrite)
882
998
self.assertEqual(
883
smart.request.request_handlers.get(
884
'Repository.stream_knit_data_for_revisions'),
885
smart.repository.SmartServerRepositoryStreamKnitDataForRevisions)
887
999
smart.request.request_handlers.get('Repository.tarball'),
888
1000
smart.repository.SmartServerRepositoryTarball)
889
1001
self.assertEqual(