1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for remote bzrdir/branch/repo/etc
19
These are proxy objects which act on remote objects by sending messages
20
through a smart client. The proxies are to be created when attempting to open
21
the object given a transport that supports smartserver rpc operations.
24
from cStringIO import StringIO
26
from bzrlib import bzrdir, remote, tests
27
from bzrlib.branch import Branch
28
from bzrlib.bzrdir import BzrDir, BzrDirFormat
29
from bzrlib.errors import NoSuchRevision
30
from bzrlib.remote import (
36
from bzrlib.revision import NULL_REVISION
37
from bzrlib.smart import server
38
from bzrlib.smart.client import SmartClient
39
from bzrlib.transport import remote as remote_transport
40
from bzrlib.transport.memory import MemoryTransport
43
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
46
super(BasicRemoteObjectTests, self).setUp()
47
self.transport_server = server.SmartTCPServer_for_testing
48
self.transport = self.get_transport()
49
self.client = self.transport.get_smart_client()
50
# make a branch that can be opened over the smart transport
51
self.local_wt = BzrDir.create_standalone_workingtree('.')
53
def test_create_remote_bzrdir(self):
54
b = remote.RemoteBzrDir(self.transport)
55
self.assertIsInstance(b, BzrDir)
57
def test_open_remote_branch(self):
58
# open a standalone branch in the working directory
59
b = remote.RemoteBzrDir(self.transport)
60
branch = b.open_branch()
62
def test_remote_repository(self):
63
b = BzrDir.open_from_transport(self.transport)
64
repo = b.open_repository()
65
revid = u'\xc823123123'
66
self.assertFalse(repo.has_revision(revid))
67
self.local_wt.commit(message='test commit', rev_id=revid)
68
self.assertTrue(repo.has_revision(revid))
70
def test_remote_branch_revision_history(self):
71
b = BzrDir.open_from_transport(self.transport).open_branch()
72
self.assertEqual([], b.revision_history())
73
r1 = self.local_wt.commit('1st commit')
74
r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8')
75
self.assertEqual([r1, r2], b.revision_history())
77
def test_find_correct_format(self):
78
"""Should open a RemoteBzrDir over a RemoteTransport"""
79
fmt = BzrDirFormat.find_format(self.transport)
80
self.assertTrue(RemoteBzrDirFormat in BzrDirFormat._control_formats)
81
self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
83
def test_open_detected_smart_format(self):
84
fmt = BzrDirFormat.find_format(self.transport)
85
d = fmt.open(self.transport)
86
self.assertIsInstance(d, BzrDir)
89
class FakeProtocol(object):
90
"""Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
92
def __init__(self, body):
93
self._body_buffer = StringIO(body)
95
def read_body_bytes(self, count=-1):
96
return self._body_buffer.read(count)
99
class FakeClient(SmartClient):
100
"""Lookalike for SmartClient allowing testing."""
102
def __init__(self, responses):
103
# We don't call the super init because there is no medium.
104
"""create a FakeClient.
106
:param respones: A list of response-tuple, body-data pairs to be sent
109
self.responses = responses
112
def call(self, method, *args):
113
self._calls.append(('call', method, args))
114
return self.responses.pop(0)[0]
116
def call2(self, method, *args):
117
self._calls.append(('call2', method, args))
118
result = self.responses.pop(0)
119
return result[0], FakeProtocol(result[1])
122
class TestBranchLastRevisionInfo(tests.TestCase):
124
def test_empty_branch(self):
125
# in an empty branch we decode the response properly
126
client = FakeClient([(('ok', '0', ''), )])
127
transport = MemoryTransport()
128
transport.mkdir('quack')
129
transport = transport.clone('quack')
130
# we do not want bzrdir to make any remote calls
131
bzrdir = RemoteBzrDir(transport, _client=False)
132
branch = RemoteBranch(bzrdir, None, _client=client)
133
result = branch.last_revision_info()
136
[('call', 'Branch.last_revision_info', ('///quack/',))],
138
self.assertEqual((0, NULL_REVISION), result)
140
def test_non_empty_branch(self):
141
# in a non-empty branch we also decode the response properly
143
client = FakeClient([(('ok', '2', u'\xc8'.encode('utf8')), )])
144
transport = MemoryTransport()
145
transport.mkdir('kwaak')
146
transport = transport.clone('kwaak')
147
# we do not want bzrdir to make any remote calls
148
bzrdir = RemoteBzrDir(transport, _client=False)
149
branch = RemoteBranch(bzrdir, None, _client=client)
150
result = branch.last_revision_info()
153
[('call', 'Branch.last_revision_info', ('///kwaak/',))],
155
self.assertEqual((2, u'\xc8'), result)
158
class TestBranchControlGetBranchConf(tests.TestCase):
159
"""Test branch.control_files api munging...
161
we special case RemoteBranch.control_files.get('branch.conf') to
162
call a specific API so that RemoteBranch's can intercept configuration
163
file reading, allowing them to signal to the client about things like
164
'email is configured for commits'.
167
def test_get_branch_conf(self):
168
# in an empty branch we decode the response properly
169
client = FakeClient([(('ok', ), 'config file body')])
170
transport = MemoryTransport()
171
transport.mkdir('quack')
172
transport = transport.clone('quack')
173
# we do not want bzrdir to make any remote calls
174
bzrdir = RemoteBzrDir(transport, _client=False)
175
branch = RemoteBranch(bzrdir, None, _client=client)
176
result = branch.control_files.get('branch.conf')
178
[('call2', 'Branch.get_config_file', ('///quack/',))],
180
self.assertEqual('config file body', result.read())
183
class TestRemoteRepository(tests.TestCase):
185
def setup_fake_client_and_repository(self, responses, transport_path):
186
"""Create the fake client and repository for testing with."""
187
client = FakeClient(responses)
188
transport = MemoryTransport()
189
transport.mkdir(transport_path)
190
transport = transport.clone(transport_path)
191
# we do not want bzrdir to make any remote calls
192
bzrdir = RemoteBzrDir(transport, _client=False)
193
repo = RemoteRepository(bzrdir, None, _client=client)
197
class TestRepositoryGatherStats(tests.TestCase):
199
def setup_fake_client_and_repository(self, responses, transport_path):
200
"""Create the fake client and repository for testing with."""
201
client = FakeClient(responses)
202
transport = MemoryTransport()
203
transport.mkdir(transport_path)
204
transport = transport.clone(transport_path)
205
# we do not want bzrdir to make any remote calls
206
bzrdir = RemoteBzrDir(transport, _client=False)
207
repo = RemoteRepository(bzrdir, None, _client=client)
210
def test_revid_none(self):
211
# ('ok',), body with revisions and size
212
responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
213
transport_path = 'quack'
214
repo, client = self.setup_fake_client_and_repository(
215
responses, transport_path)
216
result = repo.gather_stats(None)
218
[('call2', 'Repository.gather_stats', ('///quack/','','no'))],
220
self.assertEqual({'revisions': 2, 'size': 18}, result)
222
def test_revid_no_committers(self):
223
# ('ok',), body without committers
224
responses = [(('ok', ),
225
'firstrev: 123456.300 3600\n'
226
'latestrev: 654231.400 0\n'
229
transport_path = 'quick'
231
repo, client = self.setup_fake_client_and_repository(
232
responses, transport_path)
233
result = repo.gather_stats(revid)
235
[('call2', 'Repository.gather_stats',
236
('///quick/', revid.encode('utf8'), 'no'))],
238
self.assertEqual({'revisions': 2, 'size': 18,
239
'firstrev': (123456.300, 3600),
240
'latestrev': (654231.400, 0),},
243
def test_revid_with_committers(self):
244
# ('ok',), body with committers
245
responses = [(('ok', ),
247
'firstrev: 123456.300 3600\n'
248
'latestrev: 654231.400 0\n'
251
transport_path = 'buick'
253
repo, client = self.setup_fake_client_and_repository(
254
responses, transport_path)
255
result = repo.gather_stats(revid, True)
257
[('call2', 'Repository.gather_stats',
258
('///buick/', revid.encode('utf8'), 'yes'))],
260
self.assertEqual({'revisions': 2, 'size': 18,
262
'firstrev': (123456.300, 3600),
263
'latestrev': (654231.400, 0),},
267
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
269
def test_null_revision(self):
270
# a null revision has the predictable result {}, we should have no wire
271
# traffic when calling it with this argument
272
responses = [(('notused', ), '')]
273
transport_path = 'empty'
274
repo, client = self.setup_fake_client_and_repository(
275
responses, transport_path)
276
result = repo.get_revision_graph(NULL_REVISION)
277
self.assertEqual([], client._calls)
278
self.assertEqual({}, result)
280
def test_none_revision(self):
281
# with none we want the entire graph
284
lines = [' '.join([r2, r1]), r1]
285
encoded_body = '\n'.join(lines).encode('utf8')
287
responses = [(('ok', ), encoded_body)]
288
transport_path = 'sinhala'
289
repo, client = self.setup_fake_client_and_repository(
290
responses, transport_path)
291
result = repo.get_revision_graph()
293
[('call2', 'Repository.get_revision_graph', ('///sinhala/', ''))],
295
self.assertEqual({r1: [], r2: [r1]}, result)
297
def test_specific_revision(self):
298
# with a specific revision we want the graph for that
299
# with none we want the entire graph
303
lines = [' '.join([r2, r11, r12]), r11, r12]
304
encoded_body = '\n'.join(lines).encode('utf8')
306
responses = [(('ok', ), encoded_body)]
307
transport_path = 'sinhala'
308
repo, client = self.setup_fake_client_and_repository(
309
responses, transport_path)
310
result = repo.get_revision_graph(r2)
312
[('call2', 'Repository.get_revision_graph', ('///sinhala/', r2.encode('utf8')))],
314
self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
316
def test_no_such_revision(self):
318
responses = [(('nosuchrevision', revid), '')]
319
transport_path = 'sinhala'
320
repo, client = self.setup_fake_client_and_repository(
321
responses, transport_path)
322
# also check that the right revision is reported in the error
323
self.assertRaises(NoSuchRevision,
324
repo.get_revision_graph, revid)
326
[('call2', 'Repository.get_revision_graph', ('///sinhala/', revid))],
330
class TestRepositoryIsShared(TestRemoteRepository):
332
def test_is_shared(self):
333
# ('yes', ) for Repository.is_shared -> 'True'.
334
responses = [(('yes', ), )]
335
transport_path = 'quack'
336
repo, client = self.setup_fake_client_and_repository(
337
responses, transport_path)
338
result = repo.is_shared()
340
[('call', 'Repository.is_shared', ('///quack/',))],
342
self.assertEqual(True, result)
344
def test_is_not_shared(self):
345
# ('no', ) for Repository.is_shared -> 'False'.
346
responses = [(('no', ), )]
347
transport_path = 'qwack'
348
repo, client = self.setup_fake_client_and_repository(
349
responses, transport_path)
350
result = repo.is_shared()
352
[('call', 'Repository.is_shared', ('///qwack/',))],
354
self.assertEqual(False, result)