/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_remote.py

Merge in the branch with the extracted lock_write token changes, resolving conflicts.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for remote bzrdir/branch/repo/etc
 
18
 
 
19
These are proxy objects which act on remote objects by sending messages
 
20
through a smart client.  The proxies are to be created when attempting to open
 
21
the object given a transport that supports smartserver rpc operations. 
 
22
"""
 
23
 
 
24
from cStringIO import StringIO
 
25
 
 
26
from bzrlib import (
 
27
    bzrdir,
 
28
    errors,
 
29
    remote,
 
30
    tests,
 
31
    )
 
32
from bzrlib.branch import Branch
 
33
from bzrlib.bzrdir import BzrDir, BzrDirFormat
 
34
from bzrlib.remote import (
 
35
    RemoteBranch,
 
36
    RemoteBzrDir,
 
37
    RemoteBzrDirFormat,
 
38
    RemoteRepository,
 
39
    )
 
40
from bzrlib.revision import NULL_REVISION
 
41
from bzrlib.smart import server
 
42
from bzrlib.smart.client import SmartClient
 
43
from bzrlib.transport import remote as remote_transport
 
44
from bzrlib.transport.memory import MemoryTransport
 
45
 
 
46
 
 
47
class BasicRemoteObjectTests(tests.TestCaseWithTransport):
 
48
 
 
49
    def setUp(self):
 
50
        self.transport_server = server.SmartTCPServer_for_testing
 
51
        super(BasicRemoteObjectTests, self).setUp()
 
52
        self.transport = self.get_transport()
 
53
        self.client = self.transport.get_smart_client()
 
54
        # make a branch that can be opened over the smart transport
 
55
        self.local_wt = BzrDir.create_standalone_workingtree('.')
 
56
 
 
57
    def test_is_readonly(self):
 
58
        # XXX: this is a poor way to test RemoteTransport, but currently there's
 
59
        # no easy way to substitute in a fake client on a transport like we can
 
60
        # with RemoteBzrDir/Branch/Repository.
 
61
        self.assertEqual(self.transport.is_readonly(), False)
 
62
 
 
63
    def test_create_remote_bzrdir(self):
 
64
        b = remote.RemoteBzrDir(self.transport)
 
65
        self.assertIsInstance(b, BzrDir)
 
66
 
 
67
    def test_open_remote_branch(self):
 
68
        # open a standalone branch in the working directory
 
69
        b = remote.RemoteBzrDir(self.transport)
 
70
        branch = b.open_branch()
 
71
 
 
72
    def test_remote_repository(self):
 
73
        b = BzrDir.open_from_transport(self.transport)
 
74
        repo = b.open_repository()
 
75
        revid = u'\xc823123123'.encode('utf8')
 
76
        self.assertFalse(repo.has_revision(revid))
 
77
        self.local_wt.commit(message='test commit', rev_id=revid)
 
78
        self.assertTrue(repo.has_revision(revid))
 
79
 
 
80
    def test_remote_branch_revision_history(self):
 
81
        b = BzrDir.open_from_transport(self.transport).open_branch()
 
82
        self.assertEqual([], b.revision_history())
 
83
        r1 = self.local_wt.commit('1st commit')
 
84
        r2 = self.local_wt.commit('1st commit', rev_id=u'\xc8'.encode('utf8'))
 
85
        self.assertEqual([r1, r2], b.revision_history())
 
86
 
 
87
    def test_find_correct_format(self):
 
88
        """Should open a RemoteBzrDir over a RemoteTransport"""
 
89
        fmt = BzrDirFormat.find_format(self.transport)
 
90
        self.assertTrue(RemoteBzrDirFormat in BzrDirFormat._control_formats)
 
91
        self.assertIsInstance(fmt, remote.RemoteBzrDirFormat)
 
92
 
 
93
    def test_open_detected_smart_format(self):
 
94
        fmt = BzrDirFormat.find_format(self.transport)
 
95
        d = fmt.open(self.transport)
 
96
        self.assertIsInstance(d, BzrDir)
 
97
 
 
98
 
 
99
class ReadonlyRemoteTransportTests(tests.TestCaseWithTransport):
 
100
 
 
101
    def setUp(self):
 
102
        self.transport_server = server.ReadonlySmartTCPServer_for_testing
 
103
        super(ReadonlyRemoteTransportTests, self).setUp()
 
104
 
 
105
    def test_is_readonly_yes(self):
 
106
        # XXX: this is a poor way to test RemoteTransport, but currently there's
 
107
        # no easy way to substitute in a fake client on a transport like we can
 
108
        # with RemoteBzrDir/Branch/Repository.
 
109
        transport = self.get_readonly_transport()
 
110
        self.assertEqual(transport.is_readonly(), True)
 
111
 
 
112
 
 
113
class FakeProtocol(object):
 
114
    """Lookalike SmartClientRequestProtocolOne allowing body reading tests."""
 
115
 
 
116
    def __init__(self, body):
 
117
        self._body_buffer = StringIO(body)
 
118
 
 
119
    def read_body_bytes(self, count=-1):
 
120
        return self._body_buffer.read(count)
 
121
 
 
122
 
 
123
class FakeClient(SmartClient):
 
124
    """Lookalike for SmartClient allowing testing."""
 
125
    
 
126
    def __init__(self, responses):
 
127
        # We don't call the super init because there is no medium.
 
128
        """create a FakeClient.
 
129
 
 
130
        :param respones: A list of response-tuple, body-data pairs to be sent
 
131
            back to callers.
 
132
        """
 
133
        self.responses = responses
 
134
        self._calls = []
 
135
 
 
136
    def call(self, method, *args):
 
137
        self._calls.append(('call', method, args))
 
138
        return self.responses.pop(0)[0]
 
139
 
 
140
    def call2(self, method, *args):
 
141
        self._calls.append(('call2', method, args))
 
142
        result = self.responses.pop(0)
 
143
        return result[0], FakeProtocol(result[1])
 
144
 
 
145
 
 
146
class TestBzrDirOpenBranch(tests.TestCase):
 
147
 
 
148
    def test_branch_present(self):
 
149
        client = FakeClient([(('ok', ''), ), (('ok', '', 'False', 'False'), )])
 
150
        transport = MemoryTransport()
 
151
        transport.mkdir('quack')
 
152
        transport = transport.clone('quack')
 
153
        bzrdir = RemoteBzrDir(transport, _client=client)
 
154
        result = bzrdir.open_branch()
 
155
        self.assertEqual(
 
156
            [('call', 'BzrDir.open_branch', ('///quack/',)),
 
157
             ('call', 'BzrDir.find_repository', ('///quack/',))],
 
158
            client._calls)
 
159
        self.assertIsInstance(result, RemoteBranch)
 
160
        self.assertEqual(bzrdir, result.bzrdir)
 
161
 
 
162
    def test_branch_missing(self):
 
163
        client = FakeClient([(('nobranch',), )])
 
164
        transport = MemoryTransport()
 
165
        transport.mkdir('quack')
 
166
        transport = transport.clone('quack')
 
167
        bzrdir = RemoteBzrDir(transport, _client=client)
 
168
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
 
169
        self.assertEqual(
 
170
            [('call', 'BzrDir.open_branch', ('///quack/',))],
 
171
            client._calls)
 
172
 
 
173
    def check_open_repository(self, rich_root, subtrees):
 
174
        if rich_root:
 
175
            rich_response = 'True'
 
176
        else:
 
177
            rich_response = 'False'
 
178
        if subtrees:
 
179
            subtree_response = 'True'
 
180
        else:
 
181
            subtree_response = 'False'
 
182
        client = FakeClient([(('ok', '', rich_response, subtree_response), ),])
 
183
        transport = MemoryTransport()
 
184
        transport.mkdir('quack')
 
185
        transport = transport.clone('quack')
 
186
        bzrdir = RemoteBzrDir(transport, _client=client)
 
187
        result = bzrdir.open_repository()
 
188
        self.assertEqual(
 
189
            [('call', 'BzrDir.find_repository', ('///quack/',))],
 
190
            client._calls)
 
191
        self.assertIsInstance(result, RemoteRepository)
 
192
        self.assertEqual(bzrdir, result.bzrdir)
 
193
        self.assertEqual(rich_root, result._format.rich_root_data)
 
194
        self.assertEqual(subtrees, result._format.supports_tree_reference)
 
195
 
 
196
    def test_open_repository_sets_format_attributes(self):
 
197
        self.check_open_repository(True, True)
 
198
        self.check_open_repository(False, True)
 
199
        self.check_open_repository(True, False)
 
200
        self.check_open_repository(False, False)
 
201
 
 
202
 
 
203
class TestBranchLastRevisionInfo(tests.TestCase):
 
204
 
 
205
    def test_empty_branch(self):
 
206
        # in an empty branch we decode the response properly
 
207
        client = FakeClient([(('ok', '0', ''), )])
 
208
        transport = MemoryTransport()
 
209
        transport.mkdir('quack')
 
210
        transport = transport.clone('quack')
 
211
        # we do not want bzrdir to make any remote calls
 
212
        bzrdir = RemoteBzrDir(transport, _client=False)
 
213
        branch = RemoteBranch(bzrdir, None, _client=client)
 
214
        result = branch.last_revision_info()
 
215
 
 
216
        self.assertEqual(
 
217
            [('call', 'Branch.last_revision_info', ('///quack/',))],
 
218
            client._calls)
 
219
        self.assertEqual((0, NULL_REVISION), result)
 
220
 
 
221
    def test_non_empty_branch(self):
 
222
        # in a non-empty branch we also decode the response properly
 
223
        revid = u'\xc8'.encode('utf8')
 
224
        client = FakeClient([(('ok', '2', revid), )])
 
225
        transport = MemoryTransport()
 
226
        transport.mkdir('kwaak')
 
227
        transport = transport.clone('kwaak')
 
228
        # we do not want bzrdir to make any remote calls
 
229
        bzrdir = RemoteBzrDir(transport, _client=False)
 
230
        branch = RemoteBranch(bzrdir, None, _client=client)
 
231
        result = branch.last_revision_info()
 
232
 
 
233
        self.assertEqual(
 
234
            [('call', 'Branch.last_revision_info', ('///kwaak/',))],
 
235
            client._calls)
 
236
        self.assertEqual((2, revid), result)
 
237
 
 
238
 
 
239
class TestBranchSetLastRevision(tests.TestCase):
 
240
 
 
241
    def test_set_empty(self):
 
242
        # set_revision_history([]) is translated to calling
 
243
        # Branch.set_last_revision(path, '') on the wire.
 
244
        client = FakeClient([
 
245
            # lock_write
 
246
            (('ok', 'branch token', 'repo token'), ),
 
247
            # set_last_revision
 
248
            (('ok',), ),
 
249
            # unlock
 
250
            (('ok',), )])
 
251
        transport = MemoryTransport()
 
252
        transport.mkdir('branch')
 
253
        transport = transport.clone('branch')
 
254
 
 
255
        bzrdir = RemoteBzrDir(transport, _client=False)
 
256
        branch = RemoteBranch(bzrdir, None, _client=client)
 
257
        # This is a hack to work around the problem that RemoteBranch currently
 
258
        # unnecessarily invokes _ensure_real upon a call to lock_write.
 
259
        branch._ensure_real = lambda: None
 
260
        branch.lock_write()
 
261
        client._calls = []
 
262
        result = branch.set_revision_history([])
 
263
        self.assertEqual(
 
264
            [('call', 'Branch.set_last_revision',
 
265
                ('///branch/', 'branch token', 'repo token', ''))],
 
266
            client._calls)
 
267
        branch.unlock()
 
268
        self.assertEqual(None, result)
 
269
 
 
270
    def test_set_nonempty(self):
 
271
        # set_revision_history([rev-id1, ..., rev-idN]) is translated to calling
 
272
        # Branch.set_last_revision(path, rev-idN) on the wire.
 
273
        client = FakeClient([
 
274
            # lock_write
 
275
            (('ok', 'branch token', 'repo token'), ),
 
276
            # set_last_revision
 
277
            (('ok',), ),
 
278
            # unlock
 
279
            (('ok',), )])
 
280
        transport = MemoryTransport()
 
281
        transport.mkdir('branch')
 
282
        transport = transport.clone('branch')
 
283
 
 
284
        bzrdir = RemoteBzrDir(transport, _client=False)
 
285
        branch = RemoteBranch(bzrdir, None, _client=client)
 
286
        # This is a hack to work around the problem that RemoteBranch currently
 
287
        # unnecessarily invokes _ensure_real upon a call to lock_write.
 
288
        branch._ensure_real = lambda: None
 
289
        # Lock the branch, reset the record of remote calls.
 
290
        branch.lock_write()
 
291
        client._calls = []
 
292
 
 
293
        result = branch.set_revision_history(['rev-id1', 'rev-id2'])
 
294
        self.assertEqual(
 
295
            [('call', 'Branch.set_last_revision',
 
296
                ('///branch/', 'branch token', 'repo token', 'rev-id2'))],
 
297
            client._calls)
 
298
        branch.unlock()
 
299
        self.assertEqual(None, result)
 
300
 
 
301
    def test_no_such_revision(self):
 
302
        # A response of 'NoSuchRevision' is translated into an exception.
 
303
        client = FakeClient([
 
304
            # lock_write
 
305
            (('ok', 'branch token', 'repo token'), ),
 
306
            # set_last_revision
 
307
            (('NoSuchRevision', 'rev-id'), ),
 
308
            # unlock
 
309
            (('ok',), )])
 
310
        transport = MemoryTransport()
 
311
        transport.mkdir('branch')
 
312
        transport = transport.clone('branch')
 
313
 
 
314
        bzrdir = RemoteBzrDir(transport, _client=False)
 
315
        branch = RemoteBranch(bzrdir, None, _client=client)
 
316
        branch._ensure_real = lambda: None
 
317
        branch.lock_write()
 
318
        client._calls = []
 
319
 
 
320
        self.assertRaises(
 
321
            errors.NoSuchRevision, branch.set_revision_history, ['rev-id'])
 
322
        branch.unlock()
 
323
 
 
324
 
 
325
class TestBranchControlGetBranchConf(tests.TestCase):
 
326
    """Test branch.control_files api munging...
 
327
 
 
328
    we special case RemoteBranch.control_files.get('branch.conf') to
 
329
    call a specific API so that RemoteBranch's can intercept configuration
 
330
    file reading, allowing them to signal to the client about things like
 
331
    'email is configured for commits'.
 
332
    """
 
333
 
 
334
    def test_get_branch_conf(self):
 
335
        # in an empty branch we decode the response properly
 
336
        client = FakeClient([(('ok', ), 'config file body')])
 
337
        transport = MemoryTransport()
 
338
        transport.mkdir('quack')
 
339
        transport = transport.clone('quack')
 
340
        # we do not want bzrdir to make any remote calls
 
341
        bzrdir = RemoteBzrDir(transport, _client=False)
 
342
        branch = RemoteBranch(bzrdir, None, _client=client)
 
343
        result = branch.control_files.get('branch.conf')
 
344
        self.assertEqual(
 
345
            [('call2', 'Branch.get_config_file', ('///quack/',))],
 
346
            client._calls)
 
347
        self.assertEqual('config file body', result.read())
 
348
 
 
349
 
 
350
class TestBranchLockWrite(tests.TestCase):
 
351
 
 
352
    def test_lock_write_unlockable(self):
 
353
        client = FakeClient([(('UnlockableTransport', ), '')])
 
354
        transport = MemoryTransport()
 
355
        transport.mkdir('quack')
 
356
        transport = transport.clone('quack')
 
357
        # we do not want bzrdir to make any remote calls
 
358
        bzrdir = RemoteBzrDir(transport, _client=False)
 
359
        branch = RemoteBranch(bzrdir, None, _client=client)
 
360
        self.assertRaises(errors.UnlockableTransport, branch.lock_write)
 
361
        self.assertEqual(
 
362
            [('call', 'Branch.lock_write', ('///quack/', '', ''))],
 
363
            client._calls)
 
364
 
 
365
 
 
366
class TestRemoteRepository(tests.TestCase):
 
367
 
 
368
    def setup_fake_client_and_repository(self, responses, transport_path):
 
369
        """Create the fake client and repository for testing with."""
 
370
        client = FakeClient(responses)
 
371
        transport = MemoryTransport()
 
372
        transport.mkdir(transport_path)
 
373
        transport = transport.clone(transport_path)
 
374
        # we do not want bzrdir to make any remote calls
 
375
        bzrdir = RemoteBzrDir(transport, _client=False)
 
376
        repo = RemoteRepository(bzrdir, None, _client=client)
 
377
        return repo, client
 
378
 
 
379
 
 
380
class TestRepositoryGatherStats(TestRemoteRepository):
 
381
 
 
382
    def test_revid_none(self):
 
383
        # ('ok',), body with revisions and size
 
384
        responses = [(('ok', ), 'revisions: 2\nsize: 18\n')]
 
385
        transport_path = 'quack'
 
386
        repo, client = self.setup_fake_client_and_repository(
 
387
            responses, transport_path)
 
388
        result = repo.gather_stats(None)
 
389
        self.assertEqual(
 
390
            [('call2', 'Repository.gather_stats', ('///quack/','','no'))],
 
391
            client._calls)
 
392
        self.assertEqual({'revisions': 2, 'size': 18}, result)
 
393
 
 
394
    def test_revid_no_committers(self):
 
395
        # ('ok',), body without committers
 
396
        responses = [(('ok', ),
 
397
                      'firstrev: 123456.300 3600\n'
 
398
                      'latestrev: 654231.400 0\n'
 
399
                      'revisions: 2\n'
 
400
                      'size: 18\n')]
 
401
        transport_path = 'quick'
 
402
        revid = u'\xc8'.encode('utf8')
 
403
        repo, client = self.setup_fake_client_and_repository(
 
404
            responses, transport_path)
 
405
        result = repo.gather_stats(revid)
 
406
        self.assertEqual(
 
407
            [('call2', 'Repository.gather_stats',
 
408
              ('///quick/', revid, 'no'))],
 
409
            client._calls)
 
410
        self.assertEqual({'revisions': 2, 'size': 18,
 
411
                          'firstrev': (123456.300, 3600),
 
412
                          'latestrev': (654231.400, 0),},
 
413
                         result)
 
414
 
 
415
    def test_revid_with_committers(self):
 
416
        # ('ok',), body with committers
 
417
        responses = [(('ok', ),
 
418
                      'committers: 128\n'
 
419
                      'firstrev: 123456.300 3600\n'
 
420
                      'latestrev: 654231.400 0\n'
 
421
                      'revisions: 2\n'
 
422
                      'size: 18\n')]
 
423
        transport_path = 'buick'
 
424
        revid = u'\xc8'.encode('utf8')
 
425
        repo, client = self.setup_fake_client_and_repository(
 
426
            responses, transport_path)
 
427
        result = repo.gather_stats(revid, True)
 
428
        self.assertEqual(
 
429
            [('call2', 'Repository.gather_stats',
 
430
              ('///buick/', revid, 'yes'))],
 
431
            client._calls)
 
432
        self.assertEqual({'revisions': 2, 'size': 18,
 
433
                          'committers': 128,
 
434
                          'firstrev': (123456.300, 3600),
 
435
                          'latestrev': (654231.400, 0),},
 
436
                         result)
 
437
 
 
438
 
 
439
class TestRepositoryGetRevisionGraph(TestRemoteRepository):
 
440
    
 
441
    def test_null_revision(self):
 
442
        # a null revision has the predictable result {}, we should have no wire
 
443
        # traffic when calling it with this argument
 
444
        responses = [(('notused', ), '')]
 
445
        transport_path = 'empty'
 
446
        repo, client = self.setup_fake_client_and_repository(
 
447
            responses, transport_path)
 
448
        result = repo.get_revision_graph(NULL_REVISION)
 
449
        self.assertEqual([], client._calls)
 
450
        self.assertEqual({}, result)
 
451
 
 
452
    def test_none_revision(self):
 
453
        # with none we want the entire graph
 
454
        r1 = u'\u0e33'.encode('utf8')
 
455
        r2 = u'\u0dab'.encode('utf8')
 
456
        lines = [' '.join([r2, r1]), r1]
 
457
        encoded_body = '\n'.join(lines)
 
458
 
 
459
        responses = [(('ok', ), encoded_body)]
 
460
        transport_path = 'sinhala'
 
461
        repo, client = self.setup_fake_client_and_repository(
 
462
            responses, transport_path)
 
463
        result = repo.get_revision_graph()
 
464
        self.assertEqual(
 
465
            [('call2', 'Repository.get_revision_graph', ('///sinhala/', ''))],
 
466
            client._calls)
 
467
        self.assertEqual({r1: [], r2: [r1]}, result)
 
468
 
 
469
    def test_specific_revision(self):
 
470
        # with a specific revision we want the graph for that
 
471
        # with none we want the entire graph
 
472
        r11 = u'\u0e33'.encode('utf8')
 
473
        r12 = u'\xc9'.encode('utf8')
 
474
        r2 = u'\u0dab'.encode('utf8')
 
475
        lines = [' '.join([r2, r11, r12]), r11, r12]
 
476
        encoded_body = '\n'.join(lines)
 
477
 
 
478
        responses = [(('ok', ), encoded_body)]
 
479
        transport_path = 'sinhala'
 
480
        repo, client = self.setup_fake_client_and_repository(
 
481
            responses, transport_path)
 
482
        result = repo.get_revision_graph(r2)
 
483
        self.assertEqual(
 
484
            [('call2', 'Repository.get_revision_graph', ('///sinhala/', r2))],
 
485
            client._calls)
 
486
        self.assertEqual({r11: [], r12: [], r2: [r11, r12], }, result)
 
487
 
 
488
    def test_no_such_revision(self):
 
489
        revid = '123'
 
490
        responses = [(('nosuchrevision', revid), '')]
 
491
        transport_path = 'sinhala'
 
492
        repo, client = self.setup_fake_client_and_repository(
 
493
            responses, transport_path)
 
494
        # also check that the right revision is reported in the error
 
495
        self.assertRaises(errors.NoSuchRevision,
 
496
            repo.get_revision_graph, revid)
 
497
        self.assertEqual(
 
498
            [('call2', 'Repository.get_revision_graph', ('///sinhala/', revid))],
 
499
            client._calls)
 
500
 
 
501
        
 
502
class TestRepositoryIsShared(TestRemoteRepository):
 
503
 
 
504
    def test_is_shared(self):
 
505
        # ('yes', ) for Repository.is_shared -> 'True'.
 
506
        responses = [(('yes', ), )]
 
507
        transport_path = 'quack'
 
508
        repo, client = self.setup_fake_client_and_repository(
 
509
            responses, transport_path)
 
510
        result = repo.is_shared()
 
511
        self.assertEqual(
 
512
            [('call', 'Repository.is_shared', ('///quack/',))],
 
513
            client._calls)
 
514
        self.assertEqual(True, result)
 
515
 
 
516
    def test_is_not_shared(self):
 
517
        # ('no', ) for Repository.is_shared -> 'False'.
 
518
        responses = [(('no', ), )]
 
519
        transport_path = 'qwack'
 
520
        repo, client = self.setup_fake_client_and_repository(
 
521
            responses, transport_path)
 
522
        result = repo.is_shared()
 
523
        self.assertEqual(
 
524
            [('call', 'Repository.is_shared', ('///qwack/',))],
 
525
            client._calls)
 
526
        self.assertEqual(False, result)
 
527
 
 
528
 
 
529
class TestRepositoryLockWrite(TestRemoteRepository):
 
530
 
 
531
    def test_lock_write(self):
 
532
        responses = [(('ok', 'a token'), '')]
 
533
        transport_path = 'quack'
 
534
        repo, client = self.setup_fake_client_and_repository(
 
535
            responses, transport_path)
 
536
        result = repo.lock_write()
 
537
        self.assertEqual(
 
538
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
539
            client._calls)
 
540
        self.assertEqual('a token', result)
 
541
 
 
542
    def test_lock_write_already_locked(self):
 
543
        responses = [(('LockContention', ), '')]
 
544
        transport_path = 'quack'
 
545
        repo, client = self.setup_fake_client_and_repository(
 
546
            responses, transport_path)
 
547
        self.assertRaises(errors.LockContention, repo.lock_write)
 
548
        self.assertEqual(
 
549
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
550
            client._calls)
 
551
 
 
552
    def test_lock_write_unlockable(self):
 
553
        responses = [(('UnlockableTransport', ), '')]
 
554
        transport_path = 'quack'
 
555
        repo, client = self.setup_fake_client_and_repository(
 
556
            responses, transport_path)
 
557
        self.assertRaises(errors.UnlockableTransport, repo.lock_write)
 
558
        self.assertEqual(
 
559
            [('call', 'Repository.lock_write', ('///quack/', ''))],
 
560
            client._calls)
 
561
 
 
562
 
 
563
class TestRepositoryUnlock(TestRemoteRepository):
 
564
 
 
565
    def test_unlock(self):
 
566
        responses = [(('ok', 'a token'), ''),
 
567
                     (('ok',), '')]
 
568
        transport_path = 'quack'
 
569
        repo, client = self.setup_fake_client_and_repository(
 
570
            responses, transport_path)
 
571
        repo.lock_write()
 
572
        repo.unlock()
 
573
        self.assertEqual(
 
574
            [('call', 'Repository.lock_write', ('///quack/', '')),
 
575
             ('call', 'Repository.unlock', ('///quack/', 'a token'))],
 
576
            client._calls)
 
577
 
 
578
    def test_unlock_wrong_token(self):
 
579
        # If somehow the token is wrong, unlock will raise TokenMismatch.
 
580
        responses = [(('ok', 'a token'), ''),
 
581
                     (('TokenMismatch',), '')]
 
582
        transport_path = 'quack'
 
583
        repo, client = self.setup_fake_client_and_repository(
 
584
            responses, transport_path)
 
585
        repo.lock_write()
 
586
        self.assertRaises(errors.TokenMismatch, repo.unlock)
 
587
 
 
588
 
 
589
class TestRepositoryHasRevision(TestRemoteRepository):
 
590
 
 
591
    def test_none(self):
 
592
        # repo.has_revision(None) should not cause any traffic.
 
593
        transport_path = 'quack'
 
594
        responses = None
 
595
        repo, client = self.setup_fake_client_and_repository(
 
596
            responses, transport_path)
 
597
 
 
598
        # The null revision is always there, so has_revision(None) == True.
 
599
        self.assertEqual(True, repo.has_revision(None))
 
600
 
 
601
        # The remote repo shouldn't be accessed.
 
602
        self.assertEqual([], client._calls)