1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for remote bzrdir/branch/repo/etc
19
These are proxy objects which act on remote objects by sending messages
20
through a smart client. The proxies are to be created when attempting to open
21
the object given a transport that supports smartserver rpc operations.
24
from cStringIO import StringIO
26
from bzrlib import bzrdir, remote, tests
27
from bzrlib.branch import Branch
28
from bzrlib.bzrdir import BzrDir, BzrDirFormat
29
from bzrlib.errors import NoSuchRevision
30
from bzrlib.remote import (
36
from bzrlib.revision import NULL_REVISION
37
from bzrlib.smart import server
38
from bzrlib.smart.client import SmartClient
39
from bzrlib.transport import remote as remote_transport
40
from bzrlib.transport.memory import MemoryTransport
43
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
46
super(BasicRemoteObjectTests, self).setUp()
47
self.transport_server = server.SmartTCPServer_for_testing
48
self.transport = self.get_transport()
49
self.client = self.transport.get_smart_client()
50
# make a branch that can be opened over the smart transport
51
self.local_wt = BzrDir.create_standalone_workingtree('.')
53
def test_create_remote_bzrdir(self):
54
b = remote.RemoteBzrDir(self.transport)
55
self.assertIsInstance(b, BzrDir)
57
def test_open_remote_branch(self):
58
# open a standalone branch in the working directory
59
b = remote.RemoteBzrDir(self.transport)
60
branch = b.open_branch()
62
def test_remote_repository(self):
63
b = BzrDir.open_from_transport(self.transport)
64
repo = b.open_repository()
65
revid = u'\xc823123123'
66
self.assertFalse(repo.has_revision(revid))
67
self.local_wt.commit(message='test commit', rev_id=revid)
68
self.assertTrue(repo.has_revision(revid))
70
def test_remote_branch_revision_history(self):
71
b = BzrDir.open_from_transport(self.transport).open_branch()
72
self.assertEqual([], b.revision_history())
73
r1 = self.local_wt.commit('1st commit')
74
r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8')
75
self.assertEqual([r1, r2], b.revision_history())
77
def test_find_correct_format(self):
78
"""Should open a RemoteBzrDir over a RemoteTransport"""
79
fmt = BzrDirFormat.find_format(self.transport)
80
self.assertTrue(RemoteBzrDirFormat in BzrDirFormat._control_formats)
81
self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
83
def test_open_detected_smart_format(self):
84
fmt = BzrDirFormat.find_format(self.transport)
85
d = fmt.open(self.transport)
86
self.assertIsInstance(d, BzrDir)
89
class FakeProtocol(object):
90
"""Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
92
def __init__(self, body):
93
self._body_buffer = StringIO(body)
95
def read_body_bytes(self, count=-1):
96
return self._body_buffer.read(count)
99
class FakeClient(SmartClient):
100
"""Lookalike for SmartClient allowing testing."""
102
def __init__(self, responses):
103
# We don't call the super init because there is no medium.
104
"""create a FakeClient.
106
:param respones: A list of response-tuple, body-data pairs to be sent
109
self.responses = responses
112
def call(self, method, *args):
113
self._calls.append(('call', method, args))
114
return self.responses.pop(0)[0]
116
def call2(self, method, *args):
117
self._calls.append(('call2', method, args))
118
result = self.responses.pop(0)
119
return result[0], FakeProtocol(result[1])
122
class TestBranchLastRevisionInfo(tests.TestCase):
124
def test_empty_branch(self):
125
# in an empty branch we decode the response properly
126
client = FakeClient([(('ok', '0', ''), )])
127
transport = MemoryTransport()
128
transport.mkdir('quack')
129
transport = transport.clone('quack')
130
# we do not want bzrdir to make any remote calls
131
bzrdir = RemoteBzrDir(transport, _client=False)
132
branch = RemoteBranch(bzrdir, None, _client=client)
133
result = branch.last_revision_info()
136
[('call', 'Branch.last_revision_info', ('///quack/',))],
138
self.assertEqual((0, NULL_REVISION), result)
140
def test_non_empty_branch(self):
141
# in a non-empty branch we also decode the response properly
143
client = FakeClient([(('ok', '2', u'\xc8'.encode('utf8')), )])
144
transport = MemoryTransport()
145
transport.mkdir('kwaak')
146
transport = transport.clone('kwaak')
147
# we do not want bzrdir to make any remote calls
148
bzrdir = RemoteBzrDir(transport, _client=False)
149
branch = RemoteBranch(bzrdir, None, _client=client)
150
result = branch.last_revision_info()
153
[('call', 'Branch.last_revision_info', ('///kwaak/',))],
155
self.assertEqual((2, u'\xc8'), result)
158
class TestBranchSetLastRevision(tests.TestCase):
160
def test_set_empty(self):
161
# set_revision_history([]) is translated to calling
162
# Branch.set_last_revision(path, '') on the wire.
163
client = FakeClient([(('ok',), )])
164
transport = MemoryTransport()
165
transport.mkdir('branch')
166
transport = transport.clone('branch')
168
bzrdir = RemoteBzrDir(transport, _client=False)
169
branch = RemoteBranch(bzrdir, None, _client=client)
171
result = branch.set_revision_history([])
173
[('call', 'Branch.set_last_revision', ('///branch/', ''))],
175
self.assertEqual(None, result)
177
def test_set_nonempty(self):
178
# set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
179
# Branch.set_last_revision(path, rev-idN) on the wire.
180
client = FakeClient([(('ok',), )])
181
transport = MemoryTransport()
182
transport.mkdir('branch')
183
transport = transport.clone('branch')
185
bzrdir = RemoteBzrDir(transport, _client=False)
186
branch = RemoteBranch(bzrdir, None, _client=client)
188
result = branch.set_revision_history(['rev-id1', 'rev-id2'])
190
[('call', 'Branch.set_last_revision', ('///branch/', 'rev-id2'))],
192
self.assertEqual(None, result)
194
def test_no_such_revision(self):
195
# A response of 'NoSuchRevision' is translated into an exception.
196
client = FakeClient([(('NoSuchRevision', 'rev-id'), )])
197
transport = MemoryTransport()
198
transport.mkdir('branch')
199
transport = transport.clone('branch')
201
bzrdir = RemoteBzrDir(transport, _client=False)
202
branch = RemoteBranch(bzrdir, None, _client=client)
205
NoSuchRevision, branch.set_revision_history, ['rev-id'])
208
class TestBranchControlGetBranchConf(tests.TestCase):
209
"""Test branch.control_files api munging...
211
we special case RemoteBranch.control_files.get('branch.conf') to
212
call a specific API so that RemoteBranch's can intercept configuration
213
file reading, allowing them to signal to the client about things like
214
'email is configured for commits'.
217
def test_get_branch_conf(self):
218
# in an empty branch we decode the response properly
219
client = FakeClient([(('ok', ), 'config file body')])
220
transport = MemoryTransport()
221
transport.mkdir('quack')
222
transport = transport.clone('quack')
223
# we do not want bzrdir to make any remote calls
224
bzrdir = RemoteBzrDir(transport, _client=False)
225
branch = RemoteBranch(bzrdir, None, _client=client)
226
result = branch.control_files.get('branch.conf')
228
[('call2', 'Branch.get_config_file', ('///quack/',))],
230
self.assertEqual('config file body', result.read())
233
class TestRemoteRepository(tests.TestCase):
235
def setup_fake_client_and_repository(self, responses, transport_path):
236
"""Create the fake client and repository for testing with."""
237
client = FakeClient(responses)
238
transport = MemoryTransport()
239
transport.mkdir(transport_path)
240
transport = transport.clone(transport_path)
241
# we do not want bzrdir to make any remote calls
242
bzrdir = RemoteBzrDir(transport, _client=False)
243
repo = RemoteRepository(bzrdir, None, _client=client)
247
class TestRepositoryGatherStats(TestRemoteRepository):
249
def test_revid_none(self):
250
# ('ok',), body with revisions and size
251
responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
252
transport_path = 'quack'
253
repo, client = self.setup_fake_client_and_repository(
254
responses, transport_path)
255
result = repo.gather_stats(None)
257
[('call2', 'Repository.gather_stats', ('///quack/','','no'))],
259
self.assertEqual({'revisions': 2, 'size': 18}, result)
261
def test_revid_no_committers(self):
262
# ('ok',), body without committers
263
responses = [(('ok', ),
264
'firstrev: 123456.300 3600\n'
265
'latestrev: 654231.400 0\n'
268
transport_path = 'quick'
270
repo, client = self.setup_fake_client_and_repository(
271
responses, transport_path)
272
result = repo.gather_stats(revid)
274
[('call2', 'Repository.gather_stats',
275
('///quick/', revid.encode('utf8'), 'no'))],
277
self.assertEqual({'revisions': 2, 'size': 18,
278
'firstrev': (123456.300, 3600),
279
'latestrev': (654231.400, 0),},
282
def test_revid_with_committers(self):
283
# ('ok',), body with committers
284
responses = [(('ok', ),
286
'firstrev: 123456.300 3600\n'
287
'latestrev: 654231.400 0\n'
290
transport_path = 'buick'
292
repo, client = self.setup_fake_client_and_repository(
293
responses, transport_path)
294
result = repo.gather_stats(revid, True)
296
[('call2', 'Repository.gather_stats',
297
('///buick/', revid.encode('utf8'), 'yes'))],
299
self.assertEqual({'revisions': 2, 'size': 18,
301
'firstrev': (123456.300, 3600),
302
'latestrev': (654231.400, 0),},
306
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
308
def test_null_revision(self):
309
# a null revision has the predictable result {}, we should have no wire
310
# traffic when calling it with this argument
311
responses = [(('notused', ), '')]
312
transport_path = 'empty'
313
repo, client = self.setup_fake_client_and_repository(
314
responses, transport_path)
315
result = repo.get_revision_graph(NULL_REVISION)
316
self.assertEqual([], client._calls)
317
self.assertEqual({}, result)
319
def test_none_revision(self):
320
# with none we want the entire graph
323
lines = [' '.join([r2, r1]), r1]
324
encoded_body = '\n'.join(lines).encode('utf8')
326
responses = [(('ok', ), encoded_body)]
327
transport_path = 'sinhala'
328
repo, client = self.setup_fake_client_and_repository(
329
responses, transport_path)
330
result = repo.get_revision_graph()
332
[('call2', 'Repository.get_revision_graph', ('///sinhala/', ''))],
334
self.assertEqual({r1: [], r2: [r1]}, result)
336
def test_specific_revision(self):
337
# with a specific revision we want the graph for that
338
# with none we want the entire graph
342
lines = [' '.join([r2, r11, r12]), r11, r12]
343
encoded_body = '\n'.join(lines).encode('utf8')
345
responses = [(('ok', ), encoded_body)]
346
transport_path = 'sinhala'
347
repo, client = self.setup_fake_client_and_repository(
348
responses, transport_path)
349
result = repo.get_revision_graph(r2)
351
[('call2', 'Repository.get_revision_graph', ('///sinhala/', r2.encode('utf8')))],
353
self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
355
def test_no_such_revision(self):
357
responses = [(('nosuchrevision', revid), '')]
358
transport_path = 'sinhala'
359
repo, client = self.setup_fake_client_and_repository(
360
responses, transport_path)
361
# also check that the right revision is reported in the error
362
self.assertRaises(NoSuchRevision,
363
repo.get_revision_graph, revid)
365
[('call2', 'Repository.get_revision_graph', ('///sinhala/', revid))],
369
class TestRepositoryIsShared(TestRemoteRepository):
371
def test_is_shared(self):
372
# ('yes', ) for Repository.is_shared -> 'True'.
373
responses = [(('yes', ), )]
374
transport_path = 'quack'
375
repo, client = self.setup_fake_client_and_repository(
376
responses, transport_path)
377
result = repo.is_shared()
379
[('call', 'Repository.is_shared', ('///quack/',))],
381
self.assertEqual(True, result)
383
def test_is_not_shared(self):
384
# ('no', ) for Repository.is_shared -> 'False'.
385
responses = [(('no', ), )]
386
transport_path = 'qwack'
387
repo, client = self.setup_fake_client_and_repository(
388
responses, transport_path)
389
result = repo.is_shared()
391
[('call', 'Repository.is_shared', ('///qwack/',))],
393
self.assertEqual(False, result)