1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protococl."""
19
from bzrlib import bzrdir, errors, smart, tests
20
from bzrlib.smart.request import SmartServerResponse
21
import bzrlib.smart.bzrdir
22
import bzrlib.smart.branch
23
import bzrlib.smart.repository
26
class TestSmartServerResponse(tests.TestCase):
29
self.assertEqual(SmartServerResponse(('ok', )),
30
SmartServerResponse(('ok', )))
31
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
32
SmartServerResponse(('ok', ), 'body'))
33
self.assertNotEqual(SmartServerResponse(('ok', )),
34
SmartServerResponse(('notok', )))
35
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
36
SmartServerResponse(('ok', )))
37
self.assertNotEqual(None,
38
SmartServerResponse(('ok', )))
41
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
43
def test_no_repository(self):
44
"""When there is no repository to be found, ('norepository', ) is returned."""
45
backing = self.get_transport()
46
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
48
self.assertEqual(SmartServerResponse(('norepository', )),
49
request.execute(backing.local_abspath('')))
51
def test_nonshared_repository(self):
52
# nonshared repositorys only allow 'find' to return a handle when the
53
# path the repository is being searched on is the same as that that
54
# the repository is at.
55
backing = self.get_transport()
56
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
57
self.make_repository('.')
58
self.assertEqual(SmartServerResponse(('ok', '')),
59
request.execute(backing.local_abspath('')))
60
self.make_bzrdir('subdir')
61
self.assertEqual(SmartServerResponse(('norepository', )),
62
request.execute(backing.local_abspath('subdir')))
64
def test_shared_repository(self):
65
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
66
backing = self.get_transport()
67
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
68
self.make_repository('.', shared=True)
69
self.assertEqual(SmartServerResponse(('ok', '')),
70
request.execute(backing.local_abspath('')))
71
self.make_bzrdir('subdir')
72
self.assertEqual(SmartServerResponse(('ok', '..')),
73
request.execute(backing.local_abspath('subdir')))
74
self.make_bzrdir('subdir/deeper')
75
self.assertEqual(SmartServerResponse(('ok', '../..')),
76
request.execute(backing.local_abspath('subdir/deeper')))
79
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
81
def test_no_repository(self):
82
"""NoRepositoryPresent is raised when there is no repository."""
83
# we test this using a shared repository above the named path,
84
# thus checking the right search logic is used.
85
backing = self.get_transport()
86
request = smart.repository.SmartServerRequestHasRevision(backing)
87
self.make_repository('.', shared=True)
88
self.make_bzrdir('subdir')
89
self.assertRaises(errors.NoRepositoryPresent,
90
request.execute, backing.local_abspath('subdir'), 'revid')
92
def test_missing_revision(self):
93
"""For a missing revision, ('no', ) is returned."""
94
backing = self.get_transport()
95
request = smart.repository.SmartServerRequestHasRevision(backing)
96
self.make_repository('.')
97
self.assertEqual(SmartServerResponse(('no', )),
98
request.execute(backing.local_abspath(''), 'revid'))
100
def test_present_revision(self):
101
"""For a present revision, ('ok', ) is returned."""
102
backing = self.get_transport()
103
request = smart.repository.SmartServerRequestHasRevision(backing)
104
tree = self.make_branch_and_memory_tree('.')
107
r1 = tree.commit('a commit', rev_id=u'\xc8abc')
109
self.assertTrue(tree.branch.repository.has_revision(u'\xc8abc'))
110
self.assertEqual(SmartServerResponse(('ok', )),
111
request.execute(backing.local_abspath(''),
112
u'\xc8abc'.encode('utf8')))
115
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
117
def test_empty_dir(self):
118
"""Initializing an empty dir should succeed and do it."""
119
backing = self.get_transport()
120
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
121
self.assertEqual(SmartServerResponse(('ok', )),
122
request.execute(backing.local_abspath('.')))
123
made_dir = bzrdir.BzrDir.open_from_transport(backing)
124
# no branch, tree or repository is expected with the current
126
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
127
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
128
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
130
def test_missing_dir(self):
131
"""Initializing a missing directory should fail like the bzrdir api."""
132
backing = self.get_transport()
133
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
134
self.assertRaises(errors.NoSuchFile,
135
request.execute, backing.local_abspath('subdir'))
137
def test_initialized_dir(self):
138
"""Initializing an extant bzrdir should fail like the bzrdir api."""
139
backing = self.get_transport()
140
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
141
self.make_bzrdir('subdir')
142
self.assertRaises(errors.FileExists,
143
request.execute, backing.local_abspath('subdir'))
146
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
148
def test_no_branch(self):
149
"""When there is no branch, ('nobranch', ) is returned."""
150
backing = self.get_transport()
151
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
152
self.make_bzrdir('.')
153
self.assertEqual(SmartServerResponse(('nobranch', )),
154
request.execute(backing.local_abspath('')))
156
def test_branch(self):
157
"""When there is a branch, 'ok' is returned."""
158
backing = self.get_transport()
159
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
160
self.make_branch('.')
161
self.assertEqual(SmartServerResponse(('ok', '')),
162
request.execute(backing.local_abspath('')))
164
def test_branch_reference(self):
165
"""When there is a branch reference, the reference URL is returned."""
166
backing = self.get_transport()
167
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
168
branch = self.make_branch('branch')
169
checkout = branch.create_checkout('reference',lightweight=True)
170
# TODO: once we have an API to probe for references of any sort, we
172
reference_url = backing.abspath('branch') + '/'
173
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
174
self.assertEqual(SmartServerResponse(('ok', reference_url)),
175
request.execute(backing.local_abspath('reference')))
178
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
180
def test_no_branch(self):
181
"""When there is a bzrdir and no branch, NotBranchError is raised."""
182
backing = self.get_transport()
183
request = smart.branch.SmartServerRequestRevisionHistory(backing)
184
self.make_bzrdir('.')
185
self.assertRaises(errors.NotBranchError,
186
request.execute, backing.local_abspath(''))
188
def test_empty(self):
189
"""For an empty branch, the body is empty."""
190
backing = self.get_transport()
191
request = smart.branch.SmartServerRequestRevisionHistory(backing)
192
self.make_branch('.')
193
self.assertEqual(SmartServerResponse(('ok', ), ''),
194
request.execute(backing.local_abspath('')))
196
def test_not_empty(self):
197
"""For a non-empty branch, the body is empty."""
198
backing = self.get_transport()
199
request = smart.branch.SmartServerRequestRevisionHistory(backing)
200
tree = self.make_branch_and_memory_tree('.')
203
r1 = tree.commit('1st commit')
204
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
206
self.assertEqual(SmartServerResponse(('ok', ),
207
('\x00'.join([r1, r2])).encode('utf8')),
208
request.execute(backing.local_abspath('')))
210
def test_branch_reference(self):
211
"""When there is a branch reference, NotBranchError is raised."""
212
backing = self.get_transport()
213
request = smart.branch.SmartServerRequestRevisionHistory(backing)
214
branch = self.make_branch('branch')
215
checkout = branch.create_checkout('reference',lightweight=True)
216
self.assertRaises(errors.NotBranchError,
217
request.execute, backing.local_abspath('checkout'))
220
class TestHandlers(tests.TestCase):
221
"""Tests for the request.request_handlers object."""
223
def test_registered_methods(self):
224
"""Test that known methods are registered to the correct object."""
226
smart.request.request_handlers.get('Branch.revision_history'),
227
smart.branch.SmartServerRequestRevisionHistory)
229
smart.request.request_handlers.get('BzrDir.find_repository'),
230
smart.bzrdir.SmartServerRequestFindRepository)
232
smart.request.request_handlers.get('BzrDirFormat.initialize'),
233
smart.bzrdir.SmartServerRequestInitializeBzrDir)
235
smart.request.request_handlers.get('BzrDir.open_branch'),
236
smart.bzrdir.SmartServerRequestOpenBranch)
238
smart.request.request_handlers.get('Repository.has_revision'),
239
smart.repository.SmartServerRequestHasRevision)