/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

Fix some bit of fetching.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for the smart wire/domain protocol.
18
 
 
19
 
This module contains tests for the domain-level smart requests and responses,
20
 
such as the 'Branch.lock_write' request. Many of these use specific disk
21
 
formats to exercise calls that only make sense for formats with specific
22
 
properties.
23
 
 
24
 
Tests for low-level protocol encoding are found in test_smart_transport.
25
 
"""
26
 
 
27
 
import bz2
28
 
from StringIO import StringIO
29
 
import tempfile
30
 
import tarfile
31
 
 
32
 
from bzrlib import bzrdir, errors, pack, smart, tests
33
 
from bzrlib.smart.request import (
34
 
    FailedSmartServerResponse,
35
 
    SmartServerResponse,
36
 
    SuccessfulSmartServerResponse,
37
 
    )
38
 
import bzrlib.smart.bzrdir
39
 
import bzrlib.smart.branch
40
 
import bzrlib.smart.repository
41
 
from bzrlib.tests import (
42
 
    iter_suite_tests,
43
 
    split_suite_by_re,
44
 
    TestScenarioApplier,
45
 
    )
46
 
from bzrlib.util import bencode
47
 
 
48
 
 
49
 
def load_tests(standard_tests, module, loader):
50
 
    """Multiply tests version and protocol consistency."""
51
 
    # FindRepository tests.
52
 
    bzrdir_mod = bzrlib.smart.bzrdir
53
 
    applier = TestScenarioApplier()
54
 
    applier.scenarios = [
55
 
        ("find_repository", {
56
 
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
57
 
        ("find_repositoryV2", {
58
 
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
59
 
        ]
60
 
    to_adapt, result = split_suite_by_re(standard_tests,
61
 
        "TestSmartServerRequestFindRepository")
62
 
    v2_only, v1_and_2 = split_suite_by_re(to_adapt,
63
 
        "_v2")
64
 
    for test in iter_suite_tests(v1_and_2):
65
 
        result.addTests(applier.adapt(test))
66
 
    del applier.scenarios[0]
67
 
    for test in iter_suite_tests(v2_only):
68
 
        result.addTests(applier.adapt(test))
69
 
    return result
70
 
 
71
 
 
72
 
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
73
 
 
74
 
    def setUp(self):
75
 
        super(TestCaseWithSmartMedium, self).setUp()
76
 
        # We're allowed to set  the transport class here, so that we don't use
77
 
        # the default or a parameterized class, but rather use the
78
 
        # TestCaseWithTransport infrastructure to set up a smart server and
79
 
        # transport.
80
 
        self.transport_server = smart.server.SmartTCPServer_for_testing
81
 
 
82
 
    def get_smart_medium(self):
83
 
        """Get a smart medium to use in tests."""
84
 
        return self.get_transport().get_smart_medium()
85
 
 
86
 
 
87
 
class TestSmartServerResponse(tests.TestCase):
88
 
 
89
 
    def test__eq__(self):
90
 
        self.assertEqual(SmartServerResponse(('ok', )),
91
 
            SmartServerResponse(('ok', )))
92
 
        self.assertEqual(SmartServerResponse(('ok', ), 'body'),
93
 
            SmartServerResponse(('ok', ), 'body'))
94
 
        self.assertNotEqual(SmartServerResponse(('ok', )),
95
 
            SmartServerResponse(('notok', )))
96
 
        self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
97
 
            SmartServerResponse(('ok', )))
98
 
        self.assertNotEqual(None,
99
 
            SmartServerResponse(('ok', )))
100
 
 
101
 
 
102
 
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
103
 
    """Tests for BzrDir.find_repository."""
104
 
 
105
 
    def test_no_repository(self):
106
 
        """When there is no repository to be found, ('norepository', ) is returned."""
107
 
        backing = self.get_transport()
108
 
        request = self._request_class(backing)
109
 
        self.make_bzrdir('.')
110
 
        self.assertEqual(SmartServerResponse(('norepository', )),
111
 
            request.execute(backing.local_abspath('')))
112
 
 
113
 
    def test_nonshared_repository(self):
114
 
        # nonshared repositorys only allow 'find' to return a handle when the 
115
 
        # path the repository is being searched on is the same as that that 
116
 
        # the repository is at.
117
 
        backing = self.get_transport()
118
 
        request = self._request_class(backing)
119
 
        result = self._make_repository_and_result()
120
 
        self.assertEqual(result, request.execute(backing.local_abspath('')))
121
 
        self.make_bzrdir('subdir')
122
 
        self.assertEqual(SmartServerResponse(('norepository', )),
123
 
            request.execute(backing.local_abspath('subdir')))
124
 
 
125
 
    def _make_repository_and_result(self, shared=False, format=None):
126
 
        """Convenience function to setup a repository.
127
 
 
128
 
        :result: The SmartServerResponse to expect when opening it.
129
 
        """
130
 
        repo = self.make_repository('.', shared=shared, format=format)
131
 
        if repo.supports_rich_root():
132
 
            rich_root = 'yes'
133
 
        else:
134
 
            rich_root = 'no'
135
 
        if repo._format.supports_tree_reference:
136
 
            subtrees = 'yes'
137
 
        else:
138
 
            subtrees = 'no'
139
 
        if (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
140
 
            self._request_class):
141
 
            # All tests so far are on formats, and for non-external
142
 
            # repositories.
143
 
            return SuccessfulSmartServerResponse(
144
 
                ('ok', '', rich_root, subtrees, 'no'))
145
 
        else:
146
 
            return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
147
 
 
148
 
    def test_shared_repository(self):
149
 
        """When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
150
 
        backing = self.get_transport()
151
 
        request = self._request_class(backing)
152
 
        result = self._make_repository_and_result(shared=True)
153
 
        self.assertEqual(result, request.execute(backing.local_abspath('')))
154
 
        self.make_bzrdir('subdir')
155
 
        result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
156
 
        self.assertEqual(result2,
157
 
            request.execute(backing.local_abspath('subdir')))
158
 
        self.make_bzrdir('subdir/deeper')
159
 
        result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
160
 
        self.assertEqual(result3,
161
 
            request.execute(backing.local_abspath('subdir/deeper')))
162
 
 
163
 
    def test_rich_root_and_subtree_encoding(self):
164
 
        """Test for the format attributes for rich root and subtree support."""
165
 
        backing = self.get_transport()
166
 
        request = self._request_class(backing)
167
 
        result = self._make_repository_and_result(format='dirstate-with-subtree')
168
 
        # check the test will be valid
169
 
        self.assertEqual('yes', result.args[2])
170
 
        self.assertEqual('yes', result.args[3])
171
 
        self.assertEqual(result, request.execute(backing.local_abspath('')))
172
 
 
173
 
    def test_supports_external_lookups_no_v2(self):
174
 
        """Test for the supports_external_lookups attribute."""
175
 
        backing = self.get_transport()
176
 
        request = self._request_class(backing)
177
 
        result = self._make_repository_and_result(format='dirstate-with-subtree')
178
 
        # check the test will be valid
179
 
        self.assertEqual('no', result.args[4])
180
 
        self.assertEqual(result, request.execute(backing.local_abspath('')))
181
 
 
182
 
 
183
 
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
184
 
 
185
 
    def test_empty_dir(self):
186
 
        """Initializing an empty dir should succeed and do it."""
187
 
        backing = self.get_transport()
188
 
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
189
 
        self.assertEqual(SmartServerResponse(('ok', )),
190
 
            request.execute(backing.local_abspath('.')))
191
 
        made_dir = bzrdir.BzrDir.open_from_transport(backing)
192
 
        # no branch, tree or repository is expected with the current 
193
 
        # default formart.
194
 
        self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
195
 
        self.assertRaises(errors.NotBranchError, made_dir.open_branch)
196
 
        self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
197
 
 
198
 
    def test_missing_dir(self):
199
 
        """Initializing a missing directory should fail like the bzrdir api."""
200
 
        backing = self.get_transport()
201
 
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
202
 
        self.assertRaises(errors.NoSuchFile,
203
 
            request.execute, backing.local_abspath('subdir'))
204
 
 
205
 
    def test_initialized_dir(self):
206
 
        """Initializing an extant bzrdir should fail like the bzrdir api."""
207
 
        backing = self.get_transport()
208
 
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
209
 
        self.make_bzrdir('subdir')
210
 
        self.assertRaises(errors.FileExists,
211
 
            request.execute, backing.local_abspath('subdir'))
212
 
 
213
 
 
214
 
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
215
 
 
216
 
    def test_no_branch(self):
217
 
        """When there is no branch, ('nobranch', ) is returned."""
218
 
        backing = self.get_transport()
219
 
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
220
 
        self.make_bzrdir('.')
221
 
        self.assertEqual(SmartServerResponse(('nobranch', )),
222
 
            request.execute(backing.local_abspath('')))
223
 
 
224
 
    def test_branch(self):
225
 
        """When there is a branch, 'ok' is returned."""
226
 
        backing = self.get_transport()
227
 
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
228
 
        self.make_branch('.')
229
 
        self.assertEqual(SmartServerResponse(('ok', '')),
230
 
            request.execute(backing.local_abspath('')))
231
 
 
232
 
    def test_branch_reference(self):
233
 
        """When there is a branch reference, the reference URL is returned."""
234
 
        backing = self.get_transport()
235
 
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
236
 
        branch = self.make_branch('branch')
237
 
        checkout = branch.create_checkout('reference',lightweight=True)
238
 
        # TODO: once we have an API to probe for references of any sort, we
239
 
        # can use it here.
240
 
        reference_url = backing.abspath('branch') + '/'
241
 
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
242
 
        self.assertEqual(SmartServerResponse(('ok', reference_url)),
243
 
            request.execute(backing.local_abspath('reference')))
244
 
 
245
 
 
246
 
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
247
 
 
248
 
    def test_empty(self):
249
 
        """For an empty branch, the body is empty."""
250
 
        backing = self.get_transport()
251
 
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
252
 
        self.make_branch('.')
253
 
        self.assertEqual(SmartServerResponse(('ok', ), ''),
254
 
            request.execute(backing.local_abspath('')))
255
 
 
256
 
    def test_not_empty(self):
257
 
        """For a non-empty branch, the body is empty."""
258
 
        backing = self.get_transport()
259
 
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
260
 
        tree = self.make_branch_and_memory_tree('.')
261
 
        tree.lock_write()
262
 
        tree.add('')
263
 
        r1 = tree.commit('1st commit')
264
 
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
265
 
        tree.unlock()
266
 
        self.assertEqual(
267
 
            SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
268
 
            request.execute(backing.local_abspath('')))
269
 
 
270
 
 
271
 
class TestSmartServerBranchRequest(tests.TestCaseWithTransport):
272
 
 
273
 
    def test_no_branch(self):
274
 
        """When there is a bzrdir and no branch, NotBranchError is raised."""
275
 
        backing = self.get_transport()
276
 
        request = smart.branch.SmartServerBranchRequest(backing)
277
 
        self.make_bzrdir('.')
278
 
        self.assertRaises(errors.NotBranchError,
279
 
            request.execute, backing.local_abspath(''))
280
 
 
281
 
    def test_branch_reference(self):
282
 
        """When there is a branch reference, NotBranchError is raised."""
283
 
        backing = self.get_transport()
284
 
        request = smart.branch.SmartServerBranchRequest(backing)
285
 
        branch = self.make_branch('branch')
286
 
        checkout = branch.create_checkout('reference',lightweight=True)
287
 
        self.assertRaises(errors.NotBranchError,
288
 
            request.execute, backing.local_abspath('checkout'))
289
 
 
290
 
 
291
 
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithTransport):
292
 
 
293
 
    def test_empty(self):
294
 
        """For an empty branch, the result is ('ok', '0', 'null:')."""
295
 
        backing = self.get_transport()
296
 
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
297
 
        self.make_branch('.')
298
 
        self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
299
 
            request.execute(backing.local_abspath('')))
300
 
 
301
 
    def test_not_empty(self):
302
 
        """For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
303
 
        backing = self.get_transport()
304
 
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
305
 
        tree = self.make_branch_and_memory_tree('.')
306
 
        tree.lock_write()
307
 
        tree.add('')
308
 
        rev_id_utf8 = u'\xc8'.encode('utf-8')
309
 
        r1 = tree.commit('1st commit')
310
 
        r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
311
 
        tree.unlock()
312
 
        self.assertEqual(
313
 
            SmartServerResponse(('ok', '2', rev_id_utf8)),
314
 
            request.execute(backing.local_abspath('')))
315
 
 
316
 
 
317
 
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithTransport):
318
 
 
319
 
    def test_default(self):
320
 
        """With no file, we get empty content."""
321
 
        backing = self.get_transport()
322
 
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
323
 
        branch = self.make_branch('.')
324
 
        # there should be no file by default
325
 
        content = ''
326
 
        self.assertEqual(SmartServerResponse(('ok', ), content),
327
 
            request.execute(backing.local_abspath('')))
328
 
 
329
 
    def test_with_content(self):
330
 
        # SmartServerBranchGetConfigFile should return the content from
331
 
        # branch.control_files.get('branch.conf') for now - in the future it may
332
 
        # perform more complex processing. 
333
 
        backing = self.get_transport()
334
 
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
335
 
        branch = self.make_branch('.')
336
 
        branch.control_files.put_utf8('branch.conf', 'foo bar baz')
337
 
        self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
338
 
            request.execute(backing.local_abspath('')))
339
 
 
340
 
 
341
 
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithTransport):
342
 
 
343
 
    def test_empty(self):
344
 
        backing = self.get_transport()
345
 
        request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
346
 
        b = self.make_branch('.')
347
 
        branch_token = b.lock_write()
348
 
        repo_token = b.repository.lock_write()
349
 
        b.repository.unlock()
350
 
        try:
351
 
            self.assertEqual(SmartServerResponse(('ok',)),
352
 
                request.execute(
353
 
                    backing.local_abspath(''), branch_token, repo_token,
354
 
                    'null:'))
355
 
        finally:
356
 
            b.unlock()
357
 
 
358
 
    def test_not_present_revision_id(self):
359
 
        backing = self.get_transport()
360
 
        request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
361
 
        b = self.make_branch('.')
362
 
        branch_token = b.lock_write()
363
 
        repo_token = b.repository.lock_write()
364
 
        b.repository.unlock()
365
 
        try:
366
 
            revision_id = 'non-existent revision'
367
 
            self.assertEqual(
368
 
                SmartServerResponse(('NoSuchRevision', revision_id)),
369
 
                request.execute(
370
 
                    backing.local_abspath(''), branch_token, repo_token,
371
 
                    revision_id))
372
 
        finally:
373
 
            b.unlock()
374
 
 
375
 
    def test_revision_id_present(self):
376
 
        backing = self.get_transport()
377
 
        request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
378
 
        tree = self.make_branch_and_memory_tree('.')
379
 
        tree.lock_write()
380
 
        tree.add('')
381
 
        rev_id_utf8 = u'\xc8'.encode('utf-8')
382
 
        r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
383
 
        r2 = tree.commit('2nd commit')
384
 
        tree.unlock()
385
 
        branch_token = tree.branch.lock_write()
386
 
        repo_token = tree.branch.repository.lock_write()
387
 
        tree.branch.repository.unlock()
388
 
        try:
389
 
            self.assertEqual(
390
 
                SmartServerResponse(('ok',)),
391
 
                request.execute(
392
 
                    backing.local_abspath(''), branch_token, repo_token,
393
 
                    rev_id_utf8))
394
 
            self.assertEqual([rev_id_utf8], tree.branch.revision_history())
395
 
        finally:
396
 
            tree.branch.unlock()
397
 
 
398
 
    def test_revision_id_present2(self):
399
 
        backing = self.get_transport()
400
 
        request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
401
 
        tree = self.make_branch_and_memory_tree('.')
402
 
        tree.lock_write()
403
 
        tree.add('')
404
 
        rev_id_utf8 = u'\xc8'.encode('utf-8')
405
 
        r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
406
 
        r2 = tree.commit('2nd commit')
407
 
        tree.unlock()
408
 
        tree.branch.set_revision_history([])
409
 
        branch_token = tree.branch.lock_write()
410
 
        repo_token = tree.branch.repository.lock_write()
411
 
        tree.branch.repository.unlock()
412
 
        try:
413
 
            self.assertEqual(
414
 
                SmartServerResponse(('ok',)),
415
 
                request.execute(
416
 
                    backing.local_abspath(''), branch_token, repo_token,
417
 
                    rev_id_utf8))
418
 
            self.assertEqual([rev_id_utf8], tree.branch.revision_history())
419
 
        finally:
420
 
            tree.branch.unlock()
421
 
 
422
 
 
423
 
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
424
 
 
425
 
    def setUp(self):
426
 
        tests.TestCaseWithTransport.setUp(self)
427
 
        self.reduceLockdirTimeout()
428
 
 
429
 
    def test_lock_write_on_unlocked_branch(self):
430
 
        backing = self.get_transport()
431
 
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
432
 
        branch = self.make_branch('.', format='knit')
433
 
        repository = branch.repository
434
 
        response = request.execute(backing.local_abspath(''))
435
 
        branch_nonce = branch.control_files._lock.peek().get('nonce')
436
 
        repository_nonce = repository.control_files._lock.peek().get('nonce')
437
 
        self.assertEqual(
438
 
            SmartServerResponse(('ok', branch_nonce, repository_nonce)),
439
 
            response)
440
 
        # The branch (and associated repository) is now locked.  Verify that
441
 
        # with a new branch object.
442
 
        new_branch = repository.bzrdir.open_branch()
443
 
        self.assertRaises(errors.LockContention, new_branch.lock_write)
444
 
 
445
 
    def test_lock_write_on_locked_branch(self):
446
 
        backing = self.get_transport()
447
 
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
448
 
        branch = self.make_branch('.')
449
 
        branch.lock_write()
450
 
        branch.leave_lock_in_place()
451
 
        branch.unlock()
452
 
        response = request.execute(backing.local_abspath(''))
453
 
        self.assertEqual(
454
 
            SmartServerResponse(('LockContention',)), response)
455
 
 
456
 
    def test_lock_write_with_tokens_on_locked_branch(self):
457
 
        backing = self.get_transport()
458
 
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
459
 
        branch = self.make_branch('.', format='knit')
460
 
        branch_token = branch.lock_write()
461
 
        repo_token = branch.repository.lock_write()
462
 
        branch.repository.unlock()
463
 
        branch.leave_lock_in_place()
464
 
        branch.repository.leave_lock_in_place()
465
 
        branch.unlock()
466
 
        response = request.execute(backing.local_abspath(''),
467
 
                                   branch_token, repo_token)
468
 
        self.assertEqual(
469
 
            SmartServerResponse(('ok', branch_token, repo_token)), response)
470
 
 
471
 
    def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
472
 
        backing = self.get_transport()
473
 
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
474
 
        branch = self.make_branch('.', format='knit')
475
 
        branch_token = branch.lock_write()
476
 
        repo_token = branch.repository.lock_write()
477
 
        branch.repository.unlock()
478
 
        branch.leave_lock_in_place()
479
 
        branch.repository.leave_lock_in_place()
480
 
        branch.unlock()
481
 
        response = request.execute(backing.local_abspath(''),
482
 
                                   branch_token+'xxx', repo_token)
483
 
        self.assertEqual(
484
 
            SmartServerResponse(('TokenMismatch',)), response)
485
 
 
486
 
    def test_lock_write_on_locked_repo(self):
487
 
        backing = self.get_transport()
488
 
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
489
 
        branch = self.make_branch('.', format='knit')
490
 
        branch.repository.lock_write()
491
 
        branch.repository.leave_lock_in_place()
492
 
        branch.repository.unlock()
493
 
        response = request.execute(backing.local_abspath(''))
494
 
        self.assertEqual(
495
 
            SmartServerResponse(('LockContention',)), response)
496
 
 
497
 
    def test_lock_write_on_readonly_transport(self):
498
 
        backing = self.get_readonly_transport()
499
 
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
500
 
        branch = self.make_branch('.')
501
 
        response = request.execute('')
502
 
        error_name, lock_str, why_str = response.args
503
 
        self.assertFalse(response.is_successful())
504
 
        self.assertEqual('LockFailed', error_name)
505
 
 
506
 
 
507
 
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
508
 
 
509
 
    def setUp(self):
510
 
        tests.TestCaseWithTransport.setUp(self)
511
 
        self.reduceLockdirTimeout()
512
 
 
513
 
    def test_unlock_on_locked_branch_and_repo(self):
514
 
        backing = self.get_transport()
515
 
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
516
 
        branch = self.make_branch('.', format='knit')
517
 
        # Lock the branch
518
 
        branch_token = branch.lock_write()
519
 
        repo_token = branch.repository.lock_write()
520
 
        branch.repository.unlock()
521
 
        # Unlock the branch (and repo) object, leaving the physical locks
522
 
        # in place.
523
 
        branch.leave_lock_in_place()
524
 
        branch.repository.leave_lock_in_place()
525
 
        branch.unlock()
526
 
        response = request.execute(backing.local_abspath(''),
527
 
                                   branch_token, repo_token)
528
 
        self.assertEqual(
529
 
            SmartServerResponse(('ok',)), response)
530
 
        # The branch is now unlocked.  Verify that with a new branch
531
 
        # object.
532
 
        new_branch = branch.bzrdir.open_branch()
533
 
        new_branch.lock_write()
534
 
        new_branch.unlock()
535
 
 
536
 
    def test_unlock_on_unlocked_branch_unlocked_repo(self):
537
 
        backing = self.get_transport()
538
 
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
539
 
        branch = self.make_branch('.', format='knit')
540
 
        response = request.execute(
541
 
            backing.local_abspath(''), 'branch token', 'repo token')
542
 
        self.assertEqual(
543
 
            SmartServerResponse(('TokenMismatch',)), response)
544
 
 
545
 
    def test_unlock_on_unlocked_branch_locked_repo(self):
546
 
        backing = self.get_transport()
547
 
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
548
 
        branch = self.make_branch('.', format='knit')
549
 
        # Lock the repository.
550
 
        repo_token = branch.repository.lock_write()
551
 
        branch.repository.leave_lock_in_place()
552
 
        branch.repository.unlock()
553
 
        # Issue branch lock_write request on the unlocked branch (with locked
554
 
        # repo).
555
 
        response = request.execute(
556
 
            backing.local_abspath(''), 'branch token', repo_token)
557
 
        self.assertEqual(
558
 
            SmartServerResponse(('TokenMismatch',)), response)
559
 
 
560
 
 
561
 
class TestSmartServerRepositoryRequest(tests.TestCaseWithTransport):
562
 
 
563
 
    def test_no_repository(self):
564
 
        """Raise NoRepositoryPresent when there is a bzrdir and no repo."""
565
 
        # we test this using a shared repository above the named path,
566
 
        # thus checking the right search logic is used - that is, that
567
 
        # its the exact path being looked at and the server is not
568
 
        # searching.
569
 
        backing = self.get_transport()
570
 
        request = smart.repository.SmartServerRepositoryRequest(backing)
571
 
        self.make_repository('.', shared=True)
572
 
        self.make_bzrdir('subdir')
573
 
        self.assertRaises(errors.NoRepositoryPresent,
574
 
            request.execute, backing.local_abspath('subdir'))
575
 
 
576
 
 
577
 
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithTransport):
578
 
 
579
 
    def test_trivial_bzipped(self):
580
 
        # This tests that the wire encoding is actually bzipped
581
 
        backing = self.get_transport()
582
 
        request = smart.repository.SmartServerRepositoryGetParentMap(backing)
583
 
        tree = self.make_branch_and_memory_tree('.')
584
 
 
585
 
        self.assertEqual(None,
586
 
            request.execute(backing.local_abspath(''), 'missing-id'))
587
 
        # Note that it returns a body (of '' bzipped).
588
 
        self.assertEqual(
589
 
            SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
590
 
            request.do_body('\n\n0\n'))
591
 
 
592
 
 
593
 
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
594
 
 
595
 
    def test_none_argument(self):
596
 
        backing = self.get_transport()
597
 
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
598
 
        tree = self.make_branch_and_memory_tree('.')
599
 
        tree.lock_write()
600
 
        tree.add('')
601
 
        r1 = tree.commit('1st commit')
602
 
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
603
 
        tree.unlock()
604
 
 
605
 
        # the lines of revision_id->revision_parent_list has no guaranteed
606
 
        # order coming out of a dict, so sort both our test and response
607
 
        lines = sorted([' '.join([r2, r1]), r1])
608
 
        response = request.execute(backing.local_abspath(''), '')
609
 
        response.body = '\n'.join(sorted(response.body.split('\n')))
610
 
 
611
 
        self.assertEqual(
612
 
            SmartServerResponse(('ok', ), '\n'.join(lines)), response)
613
 
 
614
 
    def test_specific_revision_argument(self):
615
 
        backing = self.get_transport()
616
 
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
617
 
        tree = self.make_branch_and_memory_tree('.')
618
 
        tree.lock_write()
619
 
        tree.add('')
620
 
        rev_id_utf8 = u'\xc9'.encode('utf-8')
621
 
        r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
622
 
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
623
 
        tree.unlock()
624
 
 
625
 
        self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
626
 
            request.execute(backing.local_abspath(''), rev_id_utf8))
627
 
    
628
 
    def test_no_such_revision(self):
629
 
        backing = self.get_transport()
630
 
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
631
 
        tree = self.make_branch_and_memory_tree('.')
632
 
        tree.lock_write()
633
 
        tree.add('')
634
 
        r1 = tree.commit('1st commit')
635
 
        tree.unlock()
636
 
 
637
 
        # Note that it still returns body (of zero bytes).
638
 
        self.assertEqual(
639
 
            SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
640
 
            request.execute(backing.local_abspath(''), 'missingrevision'))
641
 
 
642
 
 
643
 
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
644
 
 
645
 
    def test_missing_revision(self):
646
 
        """For a missing revision, ('no', ) is returned."""
647
 
        backing = self.get_transport()
648
 
        request = smart.repository.SmartServerRequestHasRevision(backing)
649
 
        self.make_repository('.')
650
 
        self.assertEqual(SmartServerResponse(('no', )),
651
 
            request.execute(backing.local_abspath(''), 'revid'))
652
 
 
653
 
    def test_present_revision(self):
654
 
        """For a present revision, ('yes', ) is returned."""
655
 
        backing = self.get_transport()
656
 
        request = smart.repository.SmartServerRequestHasRevision(backing)
657
 
        tree = self.make_branch_and_memory_tree('.')
658
 
        tree.lock_write()
659
 
        tree.add('')
660
 
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
661
 
        r1 = tree.commit('a commit', rev_id=rev_id_utf8)
662
 
        tree.unlock()
663
 
        self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
664
 
        self.assertEqual(SmartServerResponse(('yes', )),
665
 
            request.execute(backing.local_abspath(''), rev_id_utf8))
666
 
 
667
 
 
668
 
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithTransport):
669
 
 
670
 
    def test_empty_revid(self):
671
 
        """With an empty revid, we get only size an number and revisions"""
672
 
        backing = self.get_transport()
673
 
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
674
 
        repository = self.make_repository('.')
675
 
        stats = repository.gather_stats()
676
 
        size = stats['size']
677
 
        expected_body = 'revisions: 0\nsize: %d\n' % size
678
 
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
679
 
                         request.execute(backing.local_abspath(''), '', 'no'))
680
 
 
681
 
    def test_revid_with_committers(self):
682
 
        """For a revid we get more infos."""
683
 
        backing = self.get_transport()
684
 
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
685
 
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
686
 
        tree = self.make_branch_and_memory_tree('.')
687
 
        tree.lock_write()
688
 
        tree.add('')
689
 
        # Let's build a predictable result
690
 
        tree.commit('a commit', timestamp=123456.2, timezone=3600)
691
 
        tree.commit('a commit', timestamp=654321.4, timezone=0,
692
 
                    rev_id=rev_id_utf8)
693
 
        tree.unlock()
694
 
 
695
 
        stats = tree.branch.repository.gather_stats()
696
 
        size = stats['size']
697
 
        expected_body = ('firstrev: 123456.200 3600\n'
698
 
                         'latestrev: 654321.400 0\n'
699
 
                         'revisions: 2\n'
700
 
                         'size: %d\n' % size)
701
 
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
702
 
                         request.execute(backing.local_abspath(''),
703
 
                                         rev_id_utf8, 'no'))
704
 
 
705
 
    def test_not_empty_repository_with_committers(self):
706
 
        """For a revid and requesting committers we get the whole thing."""
707
 
        backing = self.get_transport()
708
 
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
709
 
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
710
 
        tree = self.make_branch_and_memory_tree('.')
711
 
        tree.lock_write()
712
 
        tree.add('')
713
 
        # Let's build a predictable result
714
 
        tree.commit('a commit', timestamp=123456.2, timezone=3600,
715
 
                    committer='foo')
716
 
        tree.commit('a commit', timestamp=654321.4, timezone=0,
717
 
                    committer='bar', rev_id=rev_id_utf8)
718
 
        tree.unlock()
719
 
        stats = tree.branch.repository.gather_stats()
720
 
 
721
 
        size = stats['size']
722
 
        expected_body = ('committers: 2\n'
723
 
                         'firstrev: 123456.200 3600\n'
724
 
                         'latestrev: 654321.400 0\n'
725
 
                         'revisions: 2\n'
726
 
                         'size: %d\n' % size)
727
 
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
728
 
                         request.execute(backing.local_abspath(''),
729
 
                                         rev_id_utf8, 'yes'))
730
 
 
731
 
 
732
 
class TestSmartServerRepositoryIsShared(tests.TestCaseWithTransport):
733
 
 
734
 
    def test_is_shared(self):
735
 
        """For a shared repository, ('yes', ) is returned."""
736
 
        backing = self.get_transport()
737
 
        request = smart.repository.SmartServerRepositoryIsShared(backing)
738
 
        self.make_repository('.', shared=True)
739
 
        self.assertEqual(SmartServerResponse(('yes', )),
740
 
            request.execute(backing.local_abspath(''), ))
741
 
 
742
 
    def test_is_not_shared(self):
743
 
        """For a shared repository, ('no', ) is returned."""
744
 
        backing = self.get_transport()
745
 
        request = smart.repository.SmartServerRepositoryIsShared(backing)
746
 
        self.make_repository('.', shared=False)
747
 
        self.assertEqual(SmartServerResponse(('no', )),
748
 
            request.execute(backing.local_abspath(''), ))
749
 
 
750
 
 
751
 
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
752
 
 
753
 
    def setUp(self):
754
 
        tests.TestCaseWithTransport.setUp(self)
755
 
        self.reduceLockdirTimeout()
756
 
 
757
 
    def test_lock_write_on_unlocked_repo(self):
758
 
        backing = self.get_transport()
759
 
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
760
 
        repository = self.make_repository('.', format='knit')
761
 
        response = request.execute(backing.local_abspath(''))
762
 
        nonce = repository.control_files._lock.peek().get('nonce')
763
 
        self.assertEqual(SmartServerResponse(('ok', nonce)), response)
764
 
        # The repository is now locked.  Verify that with a new repository
765
 
        # object.
766
 
        new_repo = repository.bzrdir.open_repository()
767
 
        self.assertRaises(errors.LockContention, new_repo.lock_write)
768
 
 
769
 
    def test_lock_write_on_locked_repo(self):
770
 
        backing = self.get_transport()
771
 
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
772
 
        repository = self.make_repository('.', format='knit')
773
 
        repository.lock_write()
774
 
        repository.leave_lock_in_place()
775
 
        repository.unlock()
776
 
        response = request.execute(backing.local_abspath(''))
777
 
        self.assertEqual(
778
 
            SmartServerResponse(('LockContention',)), response)
779
 
 
780
 
    def test_lock_write_on_readonly_transport(self):
781
 
        backing = self.get_readonly_transport()
782
 
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
783
 
        repository = self.make_repository('.', format='knit')
784
 
        response = request.execute('')
785
 
        self.assertFalse(response.is_successful())
786
 
        self.assertEqual('LockFailed', response.args[0])
787
 
 
788
 
 
789
 
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
790
 
 
791
 
    def setUp(self):
792
 
        tests.TestCaseWithTransport.setUp(self)
793
 
        self.reduceLockdirTimeout()
794
 
 
795
 
    def test_unlock_on_locked_repo(self):
796
 
        backing = self.get_transport()
797
 
        request = smart.repository.SmartServerRepositoryUnlock(backing)
798
 
        repository = self.make_repository('.', format='knit')
799
 
        token = repository.lock_write()
800
 
        repository.leave_lock_in_place()
801
 
        repository.unlock()
802
 
        response = request.execute(backing.local_abspath(''), token)
803
 
        self.assertEqual(
804
 
            SmartServerResponse(('ok',)), response)
805
 
        # The repository is now unlocked.  Verify that with a new repository
806
 
        # object.
807
 
        new_repo = repository.bzrdir.open_repository()
808
 
        new_repo.lock_write()
809
 
        new_repo.unlock()
810
 
 
811
 
    def test_unlock_on_unlocked_repo(self):
812
 
        backing = self.get_transport()
813
 
        request = smart.repository.SmartServerRepositoryUnlock(backing)
814
 
        repository = self.make_repository('.', format='knit')
815
 
        response = request.execute(backing.local_abspath(''), 'some token')
816
 
        self.assertEqual(
817
 
            SmartServerResponse(('TokenMismatch',)), response)
818
 
 
819
 
 
820
 
class TestSmartServerRepositoryTarball(tests.TestCaseWithTransport):
821
 
 
822
 
    def test_repository_tarball(self):
823
 
        backing = self.get_transport()
824
 
        request = smart.repository.SmartServerRepositoryTarball(backing)
825
 
        repository = self.make_repository('.')
826
 
        # make some extraneous junk in the repository directory which should
827
 
        # not be copied
828
 
        self.build_tree(['.bzr/repository/extra-junk'])
829
 
        response = request.execute(backing.local_abspath(''), 'bz2')
830
 
        self.assertEqual(('ok',), response.args)
831
 
        # body should be a tbz2
832
 
        body_file = StringIO(response.body)
833
 
        body_tar = tarfile.open('body_tar.tbz2', fileobj=body_file,
834
 
            mode='r|bz2')
835
 
        # let's make sure there are some key repository components inside it.
836
 
        # the tarfile returns directories with trailing slashes...
837
 
        names = set([n.rstrip('/') for n in body_tar.getnames()])
838
 
        self.assertTrue('.bzr/repository/lock' in names)
839
 
        self.assertTrue('.bzr/repository/format' in names)
840
 
        self.assertTrue('.bzr/repository/extra-junk' not in names,
841
 
            "extraneous file present in tar file")
842
 
 
843
 
 
844
 
class TestSmartServerRepositoryStreamKnitData(tests.TestCaseWithTransport):
845
 
 
846
 
    def test_fetch_revisions(self):
847
 
        backing = self.get_transport()
848
 
        request = smart.repository.SmartServerRepositoryStreamKnitDataForRevisions(backing)
849
 
        tree = self.make_branch_and_memory_tree('.')
850
 
        tree.lock_write()
851
 
        tree.add('')
852
 
        rev_id1_utf8 = u'\xc8'.encode('utf-8')
853
 
        rev_id2_utf8 = u'\xc9'.encode('utf-8')
854
 
        r1 = tree.commit('1st commit', rev_id=rev_id1_utf8)
855
 
        r1 = tree.commit('2nd commit', rev_id=rev_id2_utf8)
856
 
        tree.unlock()
857
 
 
858
 
        response = request.execute(backing.local_abspath(''), rev_id2_utf8)
859
 
        self.assertEqual(('ok',), response.args)
860
 
        from cStringIO import StringIO
861
 
        unpacker = pack.ContainerReader(StringIO(response.body))
862
 
        names = []
863
 
        for [name], read_bytes in unpacker.iter_records():
864
 
            names.append(name)
865
 
            bytes = read_bytes(None)
866
 
            # The bytes should be a valid bencoded string.
867
 
            bencode.bdecode(bytes)
868
 
            # XXX: assert that the bencoded knit records have the right
869
 
            # contents?
870
 
        
871
 
    def test_no_such_revision_error(self):
872
 
        backing = self.get_transport()
873
 
        request = smart.repository.SmartServerRepositoryStreamKnitDataForRevisions(backing)
874
 
        repo = self.make_repository('.')
875
 
        rev_id1_utf8 = u'\xc8'.encode('utf-8')
876
 
        response = request.execute(backing.local_abspath(''), rev_id1_utf8)
877
 
        self.assertEqual(
878
 
            SmartServerResponse(('NoSuchRevision', rev_id1_utf8)),
879
 
            response)
880
 
 
881
 
 
882
 
class TestSmartServerRepositoryStreamRevisionsChunked(tests.TestCaseWithTransport):
883
 
 
884
 
    def test_fetch_revisions(self):
885
 
        backing = self.get_transport()
886
 
        request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
887
 
            backing)
888
 
        tree = self.make_branch_and_memory_tree('.')
889
 
        tree.lock_write()
890
 
        tree.add('')
891
 
        rev_id1_utf8 = u'\xc8'.encode('utf-8')
892
 
        rev_id2_utf8 = u'\xc9'.encode('utf-8')
893
 
        tree.commit('1st commit', rev_id=rev_id1_utf8)
894
 
        tree.commit('2nd commit', rev_id=rev_id2_utf8)
895
 
        tree.unlock()
896
 
 
897
 
        response = request.execute(backing.local_abspath(''))
898
 
        self.assertEqual(None, response)
899
 
        response = request.do_body("%s\n%s\n1" % (rev_id2_utf8, rev_id1_utf8))
900
 
        self.assertEqual(('ok',), response.args)
901
 
        from cStringIO import StringIO
902
 
        parser = pack.ContainerPushParser()
903
 
        names = []
904
 
        for stream_bytes in response.body_stream:
905
 
            parser.accept_bytes(stream_bytes)
906
 
            for [name], record_bytes in parser.read_pending_records():
907
 
                names.append(name)
908
 
                # The bytes should be a valid bencoded string.
909
 
                bencode.bdecode(record_bytes)
910
 
                # XXX: assert that the bencoded knit records have the right
911
 
                # contents?
912
 
        
913
 
    def test_no_such_revision_error(self):
914
 
        backing = self.get_transport()
915
 
        request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
916
 
            backing)
917
 
        repo = self.make_repository('.')
918
 
        rev_id1_utf8 = u'\xc8'.encode('utf-8')
919
 
        response = request.execute(backing.local_abspath(''))
920
 
        self.assertEqual(None, response)
921
 
        response = request.do_body("%s\n\n1" % (rev_id1_utf8,))
922
 
        self.assertEqual(
923
 
            FailedSmartServerResponse(('NoSuchRevision', )),
924
 
            response)
925
 
 
926
 
 
927
 
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
928
 
 
929
 
    def test_is_readonly_no(self):
930
 
        backing = self.get_transport()
931
 
        request = smart.request.SmartServerIsReadonly(backing)
932
 
        response = request.execute()
933
 
        self.assertEqual(
934
 
            SmartServerResponse(('no',)), response)
935
 
 
936
 
    def test_is_readonly_yes(self):
937
 
        backing = self.get_readonly_transport()
938
 
        request = smart.request.SmartServerIsReadonly(backing)
939
 
        response = request.execute()
940
 
        self.assertEqual(
941
 
            SmartServerResponse(('yes',)), response)
942
 
 
943
 
 
944
 
class TestHandlers(tests.TestCase):
945
 
    """Tests for the request.request_handlers object."""
946
 
 
947
 
    def test_registered_methods(self):
948
 
        """Test that known methods are registered to the correct object."""
949
 
        self.assertEqual(
950
 
            smart.request.request_handlers.get('Branch.get_config_file'),
951
 
            smart.branch.SmartServerBranchGetConfigFile)
952
 
        self.assertEqual(
953
 
            smart.request.request_handlers.get('Branch.lock_write'),
954
 
            smart.branch.SmartServerBranchRequestLockWrite)
955
 
        self.assertEqual(
956
 
            smart.request.request_handlers.get('Branch.last_revision_info'),
957
 
            smart.branch.SmartServerBranchRequestLastRevisionInfo)
958
 
        self.assertEqual(
959
 
            smart.request.request_handlers.get('Branch.revision_history'),
960
 
            smart.branch.SmartServerRequestRevisionHistory)
961
 
        self.assertEqual(
962
 
            smart.request.request_handlers.get('Branch.set_last_revision'),
963
 
            smart.branch.SmartServerBranchRequestSetLastRevision)
964
 
        self.assertEqual(
965
 
            smart.request.request_handlers.get('Branch.unlock'),
966
 
            smart.branch.SmartServerBranchRequestUnlock)
967
 
        self.assertEqual(
968
 
            smart.request.request_handlers.get('BzrDir.find_repository'),
969
 
            smart.bzrdir.SmartServerRequestFindRepositoryV1)
970
 
        self.assertEqual(
971
 
            smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
972
 
            smart.bzrdir.SmartServerRequestFindRepositoryV2)
973
 
        self.assertEqual(
974
 
            smart.request.request_handlers.get('BzrDirFormat.initialize'),
975
 
            smart.bzrdir.SmartServerRequestInitializeBzrDir)
976
 
        self.assertEqual(
977
 
            smart.request.request_handlers.get('BzrDir.open_branch'),
978
 
            smart.bzrdir.SmartServerRequestOpenBranch)
979
 
        self.assertEqual(
980
 
            smart.request.request_handlers.get('Repository.gather_stats'),
981
 
            smart.repository.SmartServerRepositoryGatherStats)
982
 
        self.assertEqual(
983
 
            smart.request.request_handlers.get('Repository.get_parent_map'),
984
 
            smart.repository.SmartServerRepositoryGetParentMap)
985
 
        self.assertEqual(
986
 
            smart.request.request_handlers.get(
987
 
                'Repository.get_revision_graph'),
988
 
            smart.repository.SmartServerRepositoryGetRevisionGraph)
989
 
        self.assertEqual(
990
 
            smart.request.request_handlers.get('Repository.has_revision'),
991
 
            smart.repository.SmartServerRequestHasRevision)
992
 
        self.assertEqual(
993
 
            smart.request.request_handlers.get('Repository.is_shared'),
994
 
            smart.repository.SmartServerRepositoryIsShared)
995
 
        self.assertEqual(
996
 
            smart.request.request_handlers.get('Repository.lock_write'),
997
 
            smart.repository.SmartServerRepositoryLockWrite)
998
 
        self.assertEqual(
999
 
            smart.request.request_handlers.get('Repository.tarball'),
1000
 
            smart.repository.SmartServerRepositoryTarball)
1001
 
        self.assertEqual(
1002
 
            smart.request.request_handlers.get('Repository.unlock'),
1003
 
            smart.repository.SmartServerRepositoryUnlock)
1004
 
        self.assertEqual(
1005
 
            smart.request.request_handlers.get('Transport.is_readonly'),
1006
 
            smart.request.SmartServerIsReadonly)