1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protococl."""
19
from bzrlib import errors, smart, tests
20
from bzrlib.smart.request import SmartServerResponse
21
import bzrlib.smart.bzrdir
22
import bzrlib.smart.branch
23
import bzrlib.smart.repository
26
class TestSmartServerResponse(tests.TestCase):
29
self.assertEqual(SmartServerResponse(('ok', )),
30
SmartServerResponse(('ok', )))
31
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
32
SmartServerResponse(('ok', ), 'body'))
33
self.assertNotEqual(SmartServerResponse(('ok', )),
34
SmartServerResponse(('notok', )))
35
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
36
SmartServerResponse(('ok', )))
39
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
41
def test_no_repository(self):
42
"""When there is no repository to be found, ('norepository', ) is returned."""
43
backing = self.get_transport()
44
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
46
self.assertEqual(SmartServerResponse(('norepository', )),
47
request.execute(backing.local_abspath('')))
49
def test_nonshared_repository(self):
50
# nonshared repositorys only allow 'find' to return a handle when the
51
# path the repository is being searched on is the same as that that
52
# the repository is at.
53
backing = self.get_transport()
54
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
55
self.make_repository('.')
56
self.assertEqual(SmartServerResponse(('ok', '')),
57
request.execute(backing.local_abspath('')))
58
self.make_bzrdir('subdir')
59
self.assertEqual(SmartServerResponse(('norepository', )),
60
request.execute(backing.local_abspath('subdir')))
62
def test_shared_repository(self):
63
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
64
backing = self.get_transport()
65
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
66
self.make_repository('.', shared=True)
67
self.assertEqual(SmartServerResponse(('ok', '')),
68
request.execute(backing.local_abspath('')))
69
self.make_bzrdir('subdir')
70
self.assertEqual(SmartServerResponse(('ok', '..')),
71
request.execute(backing.local_abspath('subdir')))
72
self.make_bzrdir('subdir/deeper')
73
self.assertEqual(SmartServerResponse(('ok', '../..')),
74
request.execute(backing.local_abspath('subdir/deeper')))
77
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
79
def test_no_repository(self):
80
"""NoRepositoryPresent is raised when there is no repository."""
81
# we test this using a shared repository above the named path,
82
# thus checking the right search logic is used.
83
backing = self.get_transport()
84
request = smart.repository.SmartServerRequestHasRevision(backing)
85
self.make_repository('.', shared=True)
86
self.make_bzrdir('subdir')
87
self.assertRaises(errors.NoRepositoryPresent,
88
request.execute, backing.local_abspath('subdir'), 'revid')
90
def test_missing_revision(self):
91
"""For a missing revision, ('no', ) is returned."""
92
backing = self.get_transport()
93
request = smart.repository.SmartServerRequestHasRevision(backing)
94
self.make_repository('.')
95
self.assertEqual(SmartServerResponse(('no', )),
96
request.execute(backing.local_abspath(''), 'revid'))
98
def test_present_revision(self):
99
"""For a present revision, ('ok', ) is returned."""
100
backing = self.get_transport()
101
request = smart.repository.SmartServerRequestHasRevision(backing)
102
tree = self.make_branch_and_memory_tree('.')
105
r1 = tree.commit('a commit', rev_id=u'\xc8abc')
107
self.assertTrue(tree.branch.repository.has_revision(u'\xc8abc'))
108
self.assertEqual(SmartServerResponse(('ok', )),
109
request.execute(backing.local_abspath(''),
110
u'\xc8abc'.encode('utf8')))
113
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
115
def test_no_branch(self):
116
"""When there is no branch, ('nobranch', ) is returned."""
117
backing = self.get_transport()
118
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
119
self.make_bzrdir('.')
120
self.assertEqual(SmartServerResponse(('nobranch', )),
121
request.execute(backing.local_abspath('')))
123
def test_branch(self):
124
"""When there is a branch, 'ok' is returned."""
125
backing = self.get_transport()
126
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
127
self.make_branch('.')
128
self.assertEqual(SmartServerResponse(('ok', '')),
129
request.execute(backing.local_abspath('')))
131
def test_branch_reference(self):
132
"""When there is a branch reference, the reference URL is returned."""
133
backing = self.get_transport()
134
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
135
branch = self.make_branch('branch')
136
checkout = branch.create_checkout('reference',lightweight=True)
137
# TODO: once we have an API to probe for references of any sort, we
139
reference_url = backing.abspath('branch') + '/'
140
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
141
self.assertEqual(SmartServerResponse(('ok', reference_url)),
142
request.execute(backing.local_abspath('reference')))
145
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
147
def test_no_branch(self):
148
"""When there is a bzrdir and no branch, NotBranchError is raised."""
149
backing = self.get_transport()
150
request = smart.branch.SmartServerRequestRevisionHistory(backing)
151
self.make_bzrdir('.')
152
self.assertRaises(errors.NotBranchError,
153
request.execute, backing.local_abspath(''))
155
def test_empty(self):
156
"""For an empty branch, the body is empty."""
157
backing = self.get_transport()
158
request = smart.branch.SmartServerRequestRevisionHistory(backing)
159
self.make_branch('.')
160
self.assertEqual(SmartServerResponse(('ok', ), ''),
161
request.execute(backing.local_abspath('')))
163
def test_not_empty(self):
164
"""For a non-empty branch, the body is empty."""
165
backing = self.get_transport()
166
request = smart.branch.SmartServerRequestRevisionHistory(backing)
167
tree = self.make_branch_and_memory_tree('.')
170
r1 = tree.commit('1st commit')
171
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
173
self.assertEqual(SmartServerResponse(('ok', ),
174
('\x00'.join([r1, r2])).encode('utf8')),
175
request.execute(backing.local_abspath('')))
177
def test_branch_reference(self):
178
"""When there is a branch reference, NotBranchError is raised."""
179
backing = self.get_transport()
180
request = smart.branch.SmartServerRequestRevisionHistory(backing)
181
branch = self.make_branch('branch')
182
checkout = branch.create_checkout('reference',lightweight=True)
183
self.assertRaises(errors.NotBranchError,
184
request.execute, backing.local_abspath('checkout'))
187
class TestHandlers(tests.TestCase):
188
"""Tests for the request.request_handlers object."""
190
def test_registered_methods(self):
191
"""Test that known methods are registered to the correct object."""
193
smart.request.request_handlers.get('Branch.revision_history'),
194
smart.branch.SmartServerRequestRevisionHistory)
196
smart.request.request_handlers.get('BzrDir.find_repository'),
197
smart.bzrdir.SmartServerRequestFindRepository)
199
smart.request.request_handlers.get('BzrDir.open_branch'),
200
smart.bzrdir.SmartServerRequestOpenBranch)
202
smart.request.request_handlers.get('Repository.has_revision'),
203
smart.repository.SmartServerRequestHasRevision)