1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for remote bzrdir/branch/repo/etc
19
These are proxy objects which act on remote objects by sending messages
20
through a smart client. The proxies are to be created when attempting to open
21
the object given a transport that supports smartserver rpc operations.
24
from cStringIO import StringIO
32
from bzrlib.branch import Branch
33
from bzrlib.bzrdir import BzrDir, BzrDirFormat
34
from bzrlib.remote import (
40
from bzrlib.revision import NULL_REVISION
41
from bzrlib.smart import server
42
from bzrlib.smart.client import SmartClient
43
from bzrlib.transport import remote as remote_transport
44
from bzrlib.transport.memory import MemoryTransport
47
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
50
super(BasicRemoteObjectTests, self).setUp()
51
self.transport_server = server.SmartTCPServer_for_testing
52
self.transport = self.get_transport()
53
self.client = self.transport.get_smart_client()
54
# make a branch that can be opened over the smart transport
55
self.local_wt = BzrDir.create_standalone_workingtree('.')
57
def test_create_remote_bzrdir(self):
58
b = remote.RemoteBzrDir(self.transport)
59
self.assertIsInstance(b, BzrDir)
61
def test_open_remote_branch(self):
62
# open a standalone branch in the working directory
63
b = remote.RemoteBzrDir(self.transport)
64
branch = b.open_branch()
66
def test_remote_repository(self):
67
b = BzrDir.open_from_transport(self.transport)
68
repo = b.open_repository()
69
revid = u'\xc823123123'
70
self.assertFalse(repo.has_revision(revid))
71
self.local_wt.commit(message='test commit', rev_id=revid)
72
self.assertTrue(repo.has_revision(revid))
74
def test_remote_branch_revision_history(self):
75
b = BzrDir.open_from_transport(self.transport).open_branch()
76
self.assertEqual([], b.revision_history())
77
r1 = self.local_wt.commit('1st commit')
78
r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8')
79
self.assertEqual([r1, r2], b.revision_history())
81
def test_find_correct_format(self):
82
"""Should open a RemoteBzrDir over a RemoteTransport"""
83
fmt = BzrDirFormat.find_format(self.transport)
84
self.assertTrue(RemoteBzrDirFormat in BzrDirFormat._control_formats)
85
self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
87
def test_open_detected_smart_format(self):
88
fmt = BzrDirFormat.find_format(self.transport)
89
d = fmt.open(self.transport)
90
self.assertIsInstance(d, BzrDir)
93
class FakeProtocol(object):
94
"""Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
96
def __init__(self, body):
97
self._body_buffer = StringIO(body)
99
def read_body_bytes(self, count=-1):
100
return self._body_buffer.read(count)
103
class FakeClient(SmartClient):
104
"""Lookalike for SmartClient allowing testing."""
106
def __init__(self, responses):
107
# We don't call the super init because there is no medium.
108
"""create a FakeClient.
110
:param respones: A list of response-tuple, body-data pairs to be sent
113
self.responses = responses
116
def call(self, method, *args):
117
self._calls.append(('call', method, args))
118
return self.responses.pop(0)[0]
120
def call2(self, method, *args):
121
self._calls.append(('call2', method, args))
122
result = self.responses.pop(0)
123
return result[0], FakeProtocol(result[1])
126
class TestBranchLastRevisionInfo(tests.TestCase):
128
def test_empty_branch(self):
129
# in an empty branch we decode the response properly
130
client = FakeClient([(('ok', '0', ''), )])
131
transport = MemoryTransport()
132
transport.mkdir('quack')
133
transport = transport.clone('quack')
134
# we do not want bzrdir to make any remote calls
135
bzrdir = RemoteBzrDir(transport, _client=False)
136
branch = RemoteBranch(bzrdir, None, _client=client)
137
result = branch.last_revision_info()
140
[('call', 'Branch.last_revision_info', ('///quack/',))],
142
self.assertEqual((0, NULL_REVISION), result)
144
def test_non_empty_branch(self):
145
# in a non-empty branch we also decode the response properly
147
client = FakeClient([(('ok', '2', u'\xc8'.encode('utf8')), )])
148
transport = MemoryTransport()
149
transport.mkdir('kwaak')
150
transport = transport.clone('kwaak')
151
# we do not want bzrdir to make any remote calls
152
bzrdir = RemoteBzrDir(transport, _client=False)
153
branch = RemoteBranch(bzrdir, None, _client=client)
154
result = branch.last_revision_info()
157
[('call', 'Branch.last_revision_info', ('///kwaak/',))],
159
self.assertEqual((2, u'\xc8'), result)
162
class TestBranchSetLastRevision(tests.TestCase):
164
def test_set_empty(self):
165
# set_revision_history([]) is translated to calling
166
# Branch.set_last_revision(path, '') on the wire.
167
client = FakeClient([(('ok',), )])
168
transport = MemoryTransport()
169
transport.mkdir('branch')
170
transport = transport.clone('branch')
172
bzrdir = RemoteBzrDir(transport, _client=False)
173
branch = RemoteBranch(bzrdir, None, _client=client)
175
result = branch.set_revision_history([])
177
[('call', 'Branch.set_last_revision', ('///branch/', ''))],
179
self.assertEqual(None, result)
181
def test_set_nonempty(self):
182
# set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
183
# Branch.set_last_revision(path, rev-idN) on the wire.
184
client = FakeClient([(('ok',), )])
185
transport = MemoryTransport()
186
transport.mkdir('branch')
187
transport = transport.clone('branch')
189
bzrdir = RemoteBzrDir(transport, _client=False)
190
branch = RemoteBranch(bzrdir, None, _client=client)
192
result = branch.set_revision_history(['rev-id1', 'rev-id2'])
194
[('call', 'Branch.set_last_revision', ('///branch/', 'rev-id2'))],
196
self.assertEqual(None, result)
198
def test_no_such_revision(self):
199
# A response of 'NoSuchRevision' is translated into an exception.
200
client = FakeClient([(('NoSuchRevision', 'rev-id'), )])
201
transport = MemoryTransport()
202
transport.mkdir('branch')
203
transport = transport.clone('branch')
205
bzrdir = RemoteBzrDir(transport, _client=False)
206
branch = RemoteBranch(bzrdir, None, _client=client)
209
errors.NoSuchRevision, branch.set_revision_history, ['rev-id'])
212
class TestBranchControlGetBranchConf(tests.TestCase):
213
"""Test branch.control_files api munging...
215
we special case RemoteBranch.control_files.get('branch.conf') to
216
call a specific API so that RemoteBranch's can intercept configuration
217
file reading, allowing them to signal to the client about things like
218
'email is configured for commits'.
221
def test_get_branch_conf(self):
222
# in an empty branch we decode the response properly
223
client = FakeClient([(('ok', ), 'config file body')])
224
transport = MemoryTransport()
225
transport.mkdir('quack')
226
transport = transport.clone('quack')
227
# we do not want bzrdir to make any remote calls
228
bzrdir = RemoteBzrDir(transport, _client=False)
229
branch = RemoteBranch(bzrdir, None, _client=client)
230
result = branch.control_files.get('branch.conf')
232
[('call2', 'Branch.get_config_file', ('///quack/',))],
234
self.assertEqual('config file body', result.read())
237
class TestRemoteRepository(tests.TestCase):
239
def setup_fake_client_and_repository(self, responses, transport_path):
240
"""Create the fake client and repository for testing with."""
241
client = FakeClient(responses)
242
transport = MemoryTransport()
243
transport.mkdir(transport_path)
244
transport = transport.clone(transport_path)
245
# we do not want bzrdir to make any remote calls
246
bzrdir = RemoteBzrDir(transport, _client=False)
247
repo = RemoteRepository(bzrdir, None, _client=client)
251
class TestRepositoryGatherStats(TestRemoteRepository):
253
def test_revid_none(self):
254
# ('ok',), body with revisions and size
255
responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
256
transport_path = 'quack'
257
repo, client = self.setup_fake_client_and_repository(
258
responses, transport_path)
259
result = repo.gather_stats(None)
261
[('call2', 'Repository.gather_stats', ('///quack/','','no'))],
263
self.assertEqual({'revisions': 2, 'size': 18}, result)
265
def test_revid_no_committers(self):
266
# ('ok',), body without committers
267
responses = [(('ok', ),
268
'firstrev: 123456.300 3600\n'
269
'latestrev: 654231.400 0\n'
272
transport_path = 'quick'
274
repo, client = self.setup_fake_client_and_repository(
275
responses, transport_path)
276
result = repo.gather_stats(revid)
278
[('call2', 'Repository.gather_stats',
279
('///quick/', revid.encode('utf8'), 'no'))],
281
self.assertEqual({'revisions': 2, 'size': 18,
282
'firstrev': (123456.300, 3600),
283
'latestrev': (654231.400, 0),},
286
def test_revid_with_committers(self):
287
# ('ok',), body with committers
288
responses = [(('ok', ),
290
'firstrev: 123456.300 3600\n'
291
'latestrev: 654231.400 0\n'
294
transport_path = 'buick'
296
repo, client = self.setup_fake_client_and_repository(
297
responses, transport_path)
298
result = repo.gather_stats(revid, True)
300
[('call2', 'Repository.gather_stats',
301
('///buick/', revid.encode('utf8'), 'yes'))],
303
self.assertEqual({'revisions': 2, 'size': 18,
305
'firstrev': (123456.300, 3600),
306
'latestrev': (654231.400, 0),},
310
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
312
def test_null_revision(self):
313
# a null revision has the predictable result {}, we should have no wire
314
# traffic when calling it with this argument
315
responses = [(('notused', ), '')]
316
transport_path = 'empty'
317
repo, client = self.setup_fake_client_and_repository(
318
responses, transport_path)
319
result = repo.get_revision_graph(NULL_REVISION)
320
self.assertEqual([], client._calls)
321
self.assertEqual({}, result)
323
def test_none_revision(self):
324
# with none we want the entire graph
327
lines = [' '.join([r2, r1]), r1]
328
encoded_body = '\n'.join(lines).encode('utf8')
330
responses = [(('ok', ), encoded_body)]
331
transport_path = 'sinhala'
332
repo, client = self.setup_fake_client_and_repository(
333
responses, transport_path)
334
result = repo.get_revision_graph()
336
[('call2', 'Repository.get_revision_graph', ('///sinhala/', ''))],
338
self.assertEqual({r1: [], r2: [r1]}, result)
340
def test_specific_revision(self):
341
# with a specific revision we want the graph for that
342
# with none we want the entire graph
346
lines = [' '.join([r2, r11, r12]), r11, r12]
347
encoded_body = '\n'.join(lines).encode('utf8')
349
responses = [(('ok', ), encoded_body)]
350
transport_path = 'sinhala'
351
repo, client = self.setup_fake_client_and_repository(
352
responses, transport_path)
353
result = repo.get_revision_graph(r2)
355
[('call2', 'Repository.get_revision_graph', ('///sinhala/', r2.encode('utf8')))],
357
self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
359
def test_no_such_revision(self):
361
responses = [(('nosuchrevision', revid), '')]
362
transport_path = 'sinhala'
363
repo, client = self.setup_fake_client_and_repository(
364
responses, transport_path)
365
# also check that the right revision is reported in the error
366
self.assertRaises(errors.NoSuchRevision,
367
repo.get_revision_graph, revid)
369
[('call2', 'Repository.get_revision_graph', ('///sinhala/', revid))],
373
class TestRepositoryIsShared(TestRemoteRepository):
375
def test_is_shared(self):
376
# ('yes', ) for Repository.is_shared -> 'True'.
377
responses = [(('yes', ), )]
378
transport_path = 'quack'
379
repo, client = self.setup_fake_client_and_repository(
380
responses, transport_path)
381
result = repo.is_shared()
383
[('call', 'Repository.is_shared', ('///quack/',))],
385
self.assertEqual(True, result)
387
def test_is_not_shared(self):
388
# ('no', ) for Repository.is_shared -> 'False'.
389
responses = [(('no', ), )]
390
transport_path = 'qwack'
391
repo, client = self.setup_fake_client_and_repository(
392
responses, transport_path)
393
result = repo.is_shared()
395
[('call', 'Repository.is_shared', ('///qwack/',))],
397
self.assertEqual(False, result)
400
class TestRepositoryLockWrite(TestRemoteRepository):
402
def test_lock_write(self):
403
responses = [(('ok', 'a token'), '')]
404
transport_path = 'quack'
405
repo, client = self.setup_fake_client_and_repository(
406
responses, transport_path)
407
result = repo.lock_write()
409
[('call', 'Repository.lock_write', ('///quack/',))],
411
self.assertEqual('a token', result)
413
def test_lock_write_already_locked(self):
414
responses = [(('LockContention', ), '')]
415
transport_path = 'quack'
416
repo, client = self.setup_fake_client_and_repository(
417
responses, transport_path)
418
self.assertRaises(errors.LockContention, repo.lock_write)
420
[('call', 'Repository.lock_write', ('///quack/',))],
424
class TestRepositoryUnlock(TestRemoteRepository):
426
def test_unlock(self):
427
responses = [(('ok', 'a token'), ''),
429
transport_path = 'quack'
430
repo, client = self.setup_fake_client_and_repository(
431
responses, transport_path)
435
[('call', 'Repository.lock_write', ('///quack/',)),
436
('call', 'Repository.unlock', ('///quack/', 'a token'))],
439
def test_unlock_wrong_token(self):
440
# If somehow the token is wrong, unlock will raise TokenMismatch.
441
responses = [(('ok', 'a token'), ''),
442
(('TokenMismatch',), '')]
443
transport_path = 'quack'
444
repo, client = self.setup_fake_client_and_repository(
445
responses, transport_path)
447
self.assertRaises(errors.TokenMismatch, repo.unlock)