/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_remote.py

Merge from bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for remote bzrdir/branch/repo/etc
 
18
 
 
19
These are proxy objects which act on remote objects by sending messages
 
20
through a smart client.  The proxies are to be created when attempting to open
 
21
the object given a transport that supports smartserver rpc operations. 
 
22
"""
 
23
 
 
24
from cStringIO import StringIO
 
25
 
 
26
from bzrlib import (
 
27
    bzrdir,
 
28
    errors,
 
29
    remote,
 
30
    tests,
 
31
    )
 
32
from bzrlib.branch import Branch
 
33
from bzrlib.bzrdir import BzrDir, BzrDirFormat
 
34
from bzrlib.remote import (
 
35
    RemoteBranch,
 
36
    RemoteBzrDir,
 
37
    RemoteBzrDirFormat,
 
38
    RemoteRepository,
 
39
    )
 
40
from bzrlib.revision import NULL_REVISION
 
41
from bzrlib.smart import server
 
42
from bzrlib.smart.client import SmartClient
 
43
from bzrlib.transport import remote as remote_transport
 
44
from bzrlib.transport.memory import MemoryTransport
 
45
 
 
46
 
 
47
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
 
48
 
 
49
    def setUp(self):
 
50
        self.transport_server = server.SmartTCPServer_for_testing
 
51
        super(BasicRemoteObjectTests, self).setUp()
 
52
        self.transport = self.get_transport()
 
53
        self.client = self.transport.get_smart_client()
 
54
        # make a branch that can be opened over the smart transport
 
55
        self.local_wt = BzrDir.create_standalone_workingtree('.')
 
56
 
 
57
    def test_is_readonly(self):
 
58
        # XXX: this is a poor way to test RemoteTransport, but currently there's
 
59
        # no easy way to substitute in a fake client on a transport like we can
 
60
        # with RemoteBzrDir/Branch/Repository.
 
61
        self.assertEqual(self.transport.is_readonly(), False)
 
62
 
 
63
    def test_create_remote_bzrdir(self):
 
64
        b = remote.RemoteBzrDir(self.transport)
 
65
        self.assertIsInstance(b, BzrDir)
 
66
 
 
67
    def test_open_remote_branch(self):
 
68
        # open a standalone branch in the working directory
 
69
        b = remote.RemoteBzrDir(self.transport)
 
70
        branch = b.open_branch()
 
71
 
 
72
    def test_remote_repository(self):
 
73
        b = BzrDir.open_from_transport(self.transport)
 
74
        repo = b.open_repository()
 
75
        revid = u'\xc823123123'
 
76
        self.assertFalse(repo.has_revision(revid))
 
77
        self.local_wt.commit(message='test commit', rev_id=revid)
 
78
        self.assertTrue(repo.has_revision(revid))
 
79
 
 
80
    def test_remote_branch_revision_history(self):
 
81
        b = BzrDir.open_from_transport(self.transport).open_branch()
 
82
        self.assertEqual([], b.revision_history())
 
83
        r1 = self.local_wt.commit('1st commit')
 
84
        r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8')
 
85
        self.assertEqual([r1, r2], b.revision_history())
 
86
 
 
87
    def test_find_correct_format(self):
 
88
        """Should open a RemoteBzrDir over a RemoteTransport"""
 
89
        fmt = BzrDirFormat.find_format(self.transport)
 
90
        self.assertTrue(RemoteBzrDirFormat in BzrDirFormat._control_formats)
 
91
        self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
 
92
 
 
93
    def test_open_detected_smart_format(self):
 
94
        fmt = BzrDirFormat.find_format(self.transport)
 
95
        d = fmt.open(self.transport)
 
96
        self.assertIsInstance(d, BzrDir)
 
97
 
 
98
 
 
99
class ReadonlyRemoteTransportTests(tests.TestCaseWithTransport):
 
100
 
 
101
    def setUp(self):
 
102
        self.transport_server = server.ReadonlySmartTCPServer_for_testing
 
103
        super(ReadonlyRemoteTransportTests, self).setUp()
 
104
 
 
105
    def test_is_readonly_yes(self):
 
106
        # XXX: this is a poor way to test RemoteTransport, but currently there's
 
107
        # no easy way to substitute in a fake client on a transport like we can
 
108
        # with RemoteBzrDir/Branch/Repository.
 
109
        transport = self.get_readonly_transport()
 
110
        self.assertEqual(transport.is_readonly(), True)
 
111
 
 
112
 
 
113
class FakeProtocol(object):
 
114
    """Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
 
115
 
 
116
    def __init__(self, body):
 
117
        self._body_buffer = StringIO(body)
 
118
 
 
119
    def read_body_bytes(self, count=-1):
 
120
        return self._body_buffer.read(count)
 
121
 
 
122
 
 
123
class FakeClient(SmartClient):
 
124
    """Lookalike for SmartClient allowing testing."""
 
125
    
 
126
    def __init__(self, responses):
 
127
        # We don't call the super init because there is no medium.
 
128
        """create a FakeClient.
 
129
 
 
130
        :param respones: A list of response-tuple, body-data pairs to be sent
 
131
            back to callers.
 
132
        """
 
133
        self.responses = responses
 
134
        self._calls = []
 
135
 
 
136
    def call(self, method, *args):
 
137
        self._calls.append(('call', method, args))
 
138
        return self.responses.pop(0)[0]
 
139
 
 
140
    def call2(self, method, *args):
 
141
        self._calls.append(('call2', method, args))
 
142
        result = self.responses.pop(0)
 
143
        return result[0], FakeProtocol(result[1])
 
144
 
 
145
 
 
146
class TestBzrDirOpenBranch(tests.TestCase):
 
147
 
 
148
    def test_branch_present(self):
 
149
        client = FakeClient([(('ok', ''), ), (('ok', ''), )])
 
150
        transport = MemoryTransport()
 
151
        transport.mkdir('quack')
 
152
        transport = transport.clone('quack')
 
153
        bzrdir = RemoteBzrDir(transport, _client=client)
 
154
        result = bzrdir.open_branch()
 
155
        self.assertEqual(
 
156
            [('call', 'BzrDir.open_branch', ('///quack/',)),
 
157
             ('call', 'BzrDir.find_repository', ('///quack/',))],
 
158
            client._calls)
 
159
        self.assertIsInstance(result, RemoteBranch)
 
160
        self.assertEqual(bzrdir, result.bzrdir)
 
161
 
 
162
    def test_branch_missing(self):
 
163
        client = FakeClient([(('nobranch',), )])
 
164
        transport = MemoryTransport()
 
165
        transport.mkdir('quack')
 
166
        transport = transport.clone('quack')
 
167
        bzrdir = RemoteBzrDir(transport, _client=client)
 
168
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
 
169
        self.assertEqual(
 
170
            [('call', 'BzrDir.open_branch', ('///quack/',))],
 
171
            client._calls)
 
172
 
 
173
 
 
174
class TestBranchLastRevisionInfo(tests.TestCase):
 
175
 
 
176
    def test_empty_branch(self):
 
177
        # in an empty branch we decode the response properly
 
178
        client = FakeClient([(('ok', '0', ''), )])
 
179
        transport = MemoryTransport()
 
180
        transport.mkdir('quack')
 
181
        transport = transport.clone('quack')
 
182
        # we do not want bzrdir to make any remote calls
 
183
        bzrdir = RemoteBzrDir(transport, _client=False)
 
184
        branch = RemoteBranch(bzrdir, None, _client=client)
 
185
        result = branch.last_revision_info()
 
186
 
 
187
        self.assertEqual(
 
188
            [('call', 'Branch.last_revision_info', ('///quack/',))],
 
189
            client._calls)
 
190
        self.assertEqual((0, NULL_REVISION), result)
 
191
 
 
192
    def test_non_empty_branch(self):
 
193
        # in a non-empty branch we also decode the response properly
 
194
 
 
195
        client = FakeClient([(('ok', '2', u'\xc8'.encode('utf8')), )])
 
196
        transport = MemoryTransport()
 
197
        transport.mkdir('kwaak')
 
198
        transport = transport.clone('kwaak')
 
199
        # we do not want bzrdir to make any remote calls
 
200
        bzrdir = RemoteBzrDir(transport, _client=False)
 
201
        branch = RemoteBranch(bzrdir, None, _client=client)
 
202
        result = branch.last_revision_info()
 
203
 
 
204
        self.assertEqual(
 
205
            [('call', 'Branch.last_revision_info', ('///kwaak/',))],
 
206
            client._calls)
 
207
        self.assertEqual((2, u'\xc8'), result)
 
208
 
 
209
 
 
210
class TestBranchSetLastRevision(tests.TestCase):
 
211
 
 
212
    def test_set_empty(self):
 
213
        # set_revision_history([]) is translated to calling
 
214
        # Branch.set_last_revision(path, '') on the wire.
 
215
        client = FakeClient([
 
216
            # lock_write
 
217
            (('ok', 'branch token', 'repo token'), ),
 
218
            # set_last_revision
 
219
            (('ok',), ),
 
220
            # unlock
 
221
            (('ok',), )])
 
222
        transport = MemoryTransport()
 
223
        transport.mkdir('branch')
 
224
        transport = transport.clone('branch')
 
225
 
 
226
        bzrdir = RemoteBzrDir(transport, _client=False)
 
227
        branch = RemoteBranch(bzrdir, None, _client=client)
 
228
        # This is a hack to work around the problem that RemoteBranch currently
 
229
        # unnecessarily invokes _ensure_real upon a call to lock_write.
 
230
        branch._ensure_real = lambda: None
 
231
        branch.lock_write()
 
232
        client._calls = []
 
233
        result = branch.set_revision_history([])
 
234
        self.assertEqual(
 
235
            [('call', 'Branch.set_last_revision',
 
236
                ('///branch/', 'branch token', 'repo token', ''))],
 
237
            client._calls)
 
238
        branch.unlock()
 
239
        self.assertEqual(None, result)
 
240
 
 
241
    def test_set_nonempty(self):
 
242
        # set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
 
243
        # Branch.set_last_revision(path, rev-idN) on the wire.
 
244
        client = FakeClient([
 
245
            # lock_write
 
246
            (('ok', 'branch token', 'repo token'), ),
 
247
            # set_last_revision
 
248
            (('ok',), ),
 
249
            # unlock
 
250
            (('ok',), )])
 
251
        transport = MemoryTransport()
 
252
        transport.mkdir('branch')
 
253
        transport = transport.clone('branch')
 
254
 
 
255
        bzrdir = RemoteBzrDir(transport, _client=False)
 
256
        branch = RemoteBranch(bzrdir, None, _client=client)
 
257
        # This is a hack to work around the problem that RemoteBranch currently
 
258
        # unnecessarily invokes _ensure_real upon a call to lock_write.
 
259
        branch._ensure_real = lambda: None
 
260
        # Lock the branch, reset the record of remote calls.
 
261
        branch.lock_write()
 
262
        client._calls = []
 
263
 
 
264
        result = branch.set_revision_history(['rev-id1', 'rev-id2'])
 
265
        self.assertEqual(
 
266
            [('call', 'Branch.set_last_revision',
 
267
                ('///branch/', 'branch token', 'repo token', 'rev-id2'))],
 
268
            client._calls)
 
269
        branch.unlock()
 
270
        self.assertEqual(None, result)
 
271
 
 
272
    def test_no_such_revision(self):
 
273
        # A response of 'NoSuchRevision' is translated into an exception.
 
274
        client = FakeClient([
 
275
            # lock_write
 
276
            (('ok', 'branch token', 'repo token'), ),
 
277
            # set_last_revision
 
278
            (('NoSuchRevision', 'rev-id'), ),
 
279
            # unlock
 
280
            (('ok',), )])
 
281
        transport = MemoryTransport()
 
282
        transport.mkdir('branch')
 
283
        transport = transport.clone('branch')
 
284
 
 
285
        bzrdir = RemoteBzrDir(transport, _client=False)
 
286
        branch = RemoteBranch(bzrdir, None, _client=client)
 
287
        branch._ensure_real = lambda: None
 
288
        branch.lock_write()
 
289
        client._calls = []
 
290
 
 
291
        self.assertRaises(
 
292
            errors.NoSuchRevision, branch.set_revision_history, ['rev-id'])
 
293
        branch.unlock()
 
294
 
 
295
 
 
296
class TestBranchControlGetBranchConf(tests.TestCase):
 
297
    """Test branch.control_files api munging...
 
298
 
 
299
    we special case RemoteBranch.control_files.get('branch.conf') to
 
300
    call a specific API so that RemoteBranch's can intercept configuration
 
301
    file reading, allowing them to signal to the client about things like
 
302
    'email is configured for commits'.
 
303
    """
 
304
 
 
305
    def test_get_branch_conf(self):
 
306
        # in an empty branch we decode the response properly
 
307
        client = FakeClient([(('ok', ), 'config file body')])
 
308
        transport = MemoryTransport()
 
309
        transport.mkdir('quack')
 
310
        transport = transport.clone('quack')
 
311
        # we do not want bzrdir to make any remote calls
 
312
        bzrdir = RemoteBzrDir(transport, _client=False)
 
313
        branch = RemoteBranch(bzrdir, None, _client=client)
 
314
        result = branch.control_files.get('branch.conf')
 
315
        self.assertEqual(
 
316
            [('call2', 'Branch.get_config_file', ('///quack/',))],
 
317
            client._calls)
 
318
        self.assertEqual('config file body', result.read())
 
319
 
 
320
 
 
321
class TestBranchLockWrite(tests.TestCase):
 
322
 
 
323
    def test_lock_write_unlockable(self):
 
324
        client = FakeClient([(('UnlockableTransport', ), '')])
 
325
        transport = MemoryTransport()
 
326
        transport.mkdir('quack')
 
327
        transport = transport.clone('quack')
 
328
        # we do not want bzrdir to make any remote calls
 
329
        bzrdir = RemoteBzrDir(transport, _client=False)
 
330
        branch = RemoteBranch(bzrdir, None, _client=client)
 
331
        self.assertRaises(errors.UnlockableTransport, branch.lock_write)
 
332
        self.assertEqual(
 
333
            [('call', 'Branch.lock_write', ('///quack/', '', ''))],
 
334
            client._calls)
 
335
 
 
336
 
 
337
class TestRemoteRepository(tests.TestCase):
 
338
 
 
339
    def setup_fake_client_and_repository(self, responses, transport_path):
 
340
        """Create the fake client and repository for testing with."""
 
341
        client = FakeClient(responses)
 
342
        transport = MemoryTransport()
 
343
        transport.mkdir(transport_path)
 
344
        transport = transport.clone(transport_path)
 
345
        # we do not want bzrdir to make any remote calls
 
346
        bzrdir = RemoteBzrDir(transport, _client=False)
 
347
        repo = RemoteRepository(bzrdir, None, _client=client)
 
348
        return repo, client
 
349
 
 
350
 
 
351
class TestRepositoryGatherStats(TestRemoteRepository):
 
352
 
 
353
    def test_revid_none(self):
 
354
        # ('ok',), body with revisions and size
 
355
        responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
 
356
        transport_path = 'quack'
 
357
        repo, client = self.setup_fake_client_and_repository(
 
358
            responses, transport_path)
 
359
        result = repo.gather_stats(None)
 
360
        self.assertEqual(
 
361
            [('call2', 'Repository.gather_stats', ('///quack/','','no'))],
 
362
            client._calls)
 
363
        self.assertEqual({'revisions': 2, 'size': 18}, result)
 
364
 
 
365
    def test_revid_no_committers(self):
 
366
        # ('ok',), body without committers
 
367
        responses = [(('ok', ),
 
368
                      'firstrev: 123456.300 3600\n'
 
369
                      'latestrev: 654231.400 0\n'
 
370
                      'revisions: 2\n'
 
371
                      'size: 18\n')]
 
372
        transport_path = 'quick'
 
373
        revid = u'\xc8'
 
374
        repo, client = self.setup_fake_client_and_repository(
 
375
            responses, transport_path)
 
376
        result = repo.gather_stats(revid)
 
377
        self.assertEqual(
 
378
            [('call2', 'Repository.gather_stats',
 
379
              ('///quick/', revid.encode('utf8'), 'no'))],
 
380
            client._calls)
 
381
        self.assertEqual({'revisions': 2, 'size': 18,
 
382
                          'firstrev': (123456.300, 3600),
 
383
                          'latestrev': (654231.400, 0),},
 
384
                         result)
 
385
 
 
386
    def test_revid_with_committers(self):
 
387
        # ('ok',), body with committers
 
388
        responses = [(('ok', ),
 
389
                      'committers: 128\n'
 
390
                      'firstrev: 123456.300 3600\n'
 
391
                      'latestrev: 654231.400 0\n'
 
392
                      'revisions: 2\n'
 
393
                      'size: 18\n')]
 
394
        transport_path = 'buick'
 
395
        revid = u'\xc8'
 
396
        repo, client = self.setup_fake_client_and_repository(
 
397
            responses, transport_path)
 
398
        result = repo.gather_stats(revid, True)
 
399
        self.assertEqual(
 
400
            [('call2', 'Repository.gather_stats',
 
401
              ('///buick/', revid.encode('utf8'), 'yes'))],
 
402
            client._calls)
 
403
        self.assertEqual({'revisions': 2, 'size': 18,
 
404
                          'committers': 128,
 
405
                          'firstrev': (123456.300, 3600),
 
406
                          'latestrev': (654231.400, 0),},
 
407
                         result)
 
408
 
 
409
 
 
410
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
 
411
    
 
412
    def test_null_revision(self):
 
413
        # a null revision has the predictable result {}, we should have no wire
 
414
        # traffic when calling it with this argument
 
415
        responses = [(('notused', ), '')]
 
416
        transport_path = 'empty'
 
417
        repo, client = self.setup_fake_client_and_repository(
 
418
            responses, transport_path)
 
419
        result = repo.get_revision_graph(NULL_REVISION)
 
420
        self.assertEqual([], client._calls)
 
421
        self.assertEqual({}, result)
 
422
 
 
423
    def test_none_revision(self):
 
424
        # with none we want the entire graph
 
425
        r1 = u'\u0e33'
 
426
        r2 = u'\u0dab'
 
427
        lines = [' '.join([r2, r1]), r1]
 
428
        encoded_body = '\n'.join(lines).encode('utf8')
 
429
 
 
430
        responses = [(('ok', ), encoded_body)]
 
431
        transport_path = 'sinhala'
 
432
        repo, client = self.setup_fake_client_and_repository(
 
433
            responses, transport_path)
 
434
        result = repo.get_revision_graph()
 
435
        self.assertEqual(
 
436
            [('call2', 'Repository.get_revision_graph', ('///sinhala/', ''))],
 
437
            client._calls)
 
438
        self.assertEqual({r1: [], r2: [r1]}, result)
 
439
 
 
440
    def test_specific_revision(self):
 
441
        # with a specific revision we want the graph for that
 
442
        # with none we want the entire graph
 
443
        r11 = u'\u0e33'
 
444
        r12 = u'\xc9'
 
445
        r2 = u'\u0dab'
 
446
        lines = [' '.join([r2, r11, r12]), r11, r12]
 
447
        encoded_body = '\n'.join(lines).encode('utf8')
 
448
 
 
449
        responses = [(('ok', ), encoded_body)]
 
450
        transport_path = 'sinhala'
 
451
        repo, client = self.setup_fake_client_and_repository(
 
452
            responses, transport_path)
 
453
        result = repo.get_revision_graph(r2)
 
454
        self.assertEqual(
 
455
            [('call2', 'Repository.get_revision_graph', ('///sinhala/', r2.encode('utf8')))],
 
456
            client._calls)
 
457
        self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
 
458
 
 
459
    def test_no_such_revision(self):
 
460
        revid = '123'
 
461
        responses = [(('nosuchrevision', revid), '')]
 
462
        transport_path = 'sinhala'
 
463
        repo, client = self.setup_fake_client_and_repository(
 
464
            responses, transport_path)
 
465
        # also check that the right revision is reported in the error
 
466
        self.assertRaises(errors.NoSuchRevision,
 
467
            repo.get_revision_graph, revid)
 
468
        self.assertEqual(
 
469
            [('call2', 'Repository.get_revision_graph', ('///sinhala/', revid))],
 
470
            client._calls)
 
471
 
 
472
        
 
473
class TestRepositoryIsShared(TestRemoteRepository):
 
474
 
 
475
    def test_is_shared(self):
 
476
        # ('yes', ) for Repository.is_shared -> 'True'.
 
477
        responses = [(('yes', ), )]
 
478
        transport_path = 'quack'
 
479
        repo, client = self.setup_fake_client_and_repository(
 
480
            responses, transport_path)
 
481
        result = repo.is_shared()
 
482
        self.assertEqual(
 
483
            [('call', 'Repository.is_shared', ('///quack/',))],
 
484
            client._calls)
 
485
        self.assertEqual(True, result)
 
486
 
 
487
    def test_is_not_shared(self):
 
488
        # ('no', ) for Repository.is_shared -> 'False'.
 
489
        responses = [(('no', ), )]
 
490
        transport_path = 'qwack'
 
491
        repo, client = self.setup_fake_client_and_repository(
 
492
            responses, transport_path)
 
493
        result = repo.is_shared()
 
494
        self.assertEqual(
 
495
            [('call', 'Repository.is_shared', ('///qwack/',))],
 
496
            client._calls)
 
497
        self.assertEqual(False, result)
 
498
 
 
499
 
 
500
class TestRepositoryLockWrite(TestRemoteRepository):
 
501
 
 
502
    def test_lock_write(self):
 
503
        responses = [(('ok', 'a token'), '')]
 
504
        transport_path = 'quack'
 
505
        repo, client = self.setup_fake_client_and_repository(
 
506
            responses, transport_path)
 
507
        result = repo.lock_write()
 
508
        self.assertEqual(
 
509
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
510
            client._calls)
 
511
        self.assertEqual('a token', result)
 
512
 
 
513
    def test_lock_write_already_locked(self):
 
514
        responses = [(('LockContention', ), '')]
 
515
        transport_path = 'quack'
 
516
        repo, client = self.setup_fake_client_and_repository(
 
517
            responses, transport_path)
 
518
        self.assertRaises(errors.LockContention, repo.lock_write)
 
519
        self.assertEqual(
 
520
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
521
            client._calls)
 
522
 
 
523
    def test_lock_write_unlockable(self):
 
524
        responses = [(('UnlockableTransport', ), '')]
 
525
        transport_path = 'quack'
 
526
        repo, client = self.setup_fake_client_and_repository(
 
527
            responses, transport_path)
 
528
        self.assertRaises(errors.UnlockableTransport, repo.lock_write)
 
529
        self.assertEqual(
 
530
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
531
            client._calls)
 
532
 
 
533
 
 
534
class TestRepositoryUnlock(TestRemoteRepository):
 
535
 
 
536
    def test_unlock(self):
 
537
        responses = [(('ok', 'a token'), ''),
 
538
                     (('ok',), '')]
 
539
        transport_path = 'quack'
 
540
        repo, client = self.setup_fake_client_and_repository(
 
541
            responses, transport_path)
 
542
        repo.lock_write()
 
543
        repo.unlock()
 
544
        self.assertEqual(
 
545
            [('call', 'Repository.lock_write', ('///quack/', '')),
 
546
             ('call', 'Repository.unlock', ('///quack/', 'a token'))],
 
547
            client._calls)
 
548
 
 
549
    def test_unlock_wrong_token(self):
 
550
        # If somehow the token is wrong, unlock will raise TokenMismatch.
 
551
        responses = [(('ok', 'a token'), ''),
 
552
                     (('TokenMismatch',), '')]
 
553
        transport_path = 'quack'
 
554
        repo, client = self.setup_fake_client_and_repository(
 
555
            responses, transport_path)
 
556
        repo.lock_write()
 
557
        self.assertRaises(errors.TokenMismatch, repo.unlock)
 
558
 
 
559
 
 
560
class TestRepositoryHasRevision(TestRemoteRepository):
 
561
 
 
562
    def test_none(self):
 
563
        # repo.has_revision(None) should not cause any traffic.
 
564
        transport_path = 'quack'
 
565
        responses = None
 
566
        repo, client = self.setup_fake_client_and_repository(
 
567
            responses, transport_path)
 
568
 
 
569
        # The null revision is always there, so has_revision(None) == True.
 
570
        self.assertEqual(True, repo.has_revision(None))
 
571
 
 
572
        # The remote repo shouldn't be accessed.
 
573
        self.assertEqual([], client._calls)
 
574