/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

  • Committer: John Arbash Meinel
  • Author(s): Alexander Belchenko
  • Date: 2009-03-06 20:45:56 UTC
  • mto: This revision was merged to the branch mainline in revision 4088.
  • Revision ID: john@arbash-meinel.com-20090306204556-rvxi33g6nhvfhqob
Remove the check for no $TERM setting, as win32 doesn't have $TERM

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the smart wire/domain protocol.
 
18
 
 
19
This module contains tests for the domain-level smart requests and responses,
 
20
such as the 'Branch.lock_write' request. Many of these use specific disk
 
21
formats to exercise calls that only make sense for formats with specific
 
22
properties.
 
23
 
 
24
Tests for low-level protocol encoding are found in test_smart_transport.
 
25
"""
 
26
 
 
27
import bz2
 
28
from cStringIO import StringIO
 
29
import tarfile
 
30
 
 
31
from bzrlib import (
 
32
    bzrdir,
 
33
    errors,
 
34
    pack,
 
35
    smart,
 
36
    tests,
 
37
    urlutils,
 
38
    )
 
39
from bzrlib.branch import Branch, BranchReferenceFormat
 
40
import bzrlib.smart.branch
 
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
 
42
import bzrlib.smart.packrepository
 
43
import bzrlib.smart.repository
 
44
from bzrlib.smart.request import (
 
45
    FailedSmartServerResponse,
 
46
    SmartServerRequest,
 
47
    SmartServerResponse,
 
48
    SuccessfulSmartServerResponse,
 
49
    )
 
50
from bzrlib.tests import (
 
51
    iter_suite_tests,
 
52
    split_suite_by_re,
 
53
    TestScenarioApplier,
 
54
    )
 
55
from bzrlib.transport import chroot, get_transport
 
56
from bzrlib.util import bencode
 
57
 
 
58
 
 
59
def load_tests(standard_tests, module, loader):
 
60
    """Multiply tests version and protocol consistency."""
 
61
    # FindRepository tests.
 
62
    bzrdir_mod = bzrlib.smart.bzrdir
 
63
    applier = TestScenarioApplier()
 
64
    applier.scenarios = [
 
65
        ("find_repository", {
 
66
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
 
67
        ("find_repositoryV2", {
 
68
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
 
69
        ("find_repositoryV3", {
 
70
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
 
71
        ]
 
72
    to_adapt, result = split_suite_by_re(standard_tests,
 
73
        "TestSmartServerRequestFindRepository")
 
74
    v2_only, v1_and_2 = split_suite_by_re(to_adapt,
 
75
        "_v2")
 
76
    for test in iter_suite_tests(v1_and_2):
 
77
        result.addTests(applier.adapt(test))
 
78
    del applier.scenarios[0]
 
79
    for test in iter_suite_tests(v2_only):
 
80
        result.addTests(applier.adapt(test))
 
81
    return result
 
82
 
 
83
 
 
84
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
 
85
 
 
86
    def setUp(self):
 
87
        tests.TestCaseWithTransport.setUp(self)
 
88
        self._chroot_server = None
 
89
 
 
90
    def get_transport(self, relpath=None):
 
91
        if self._chroot_server is None:
 
92
            backing_transport = tests.TestCaseWithTransport.get_transport(self)
 
93
            self._chroot_server = chroot.ChrootServer(backing_transport)
 
94
            self._chroot_server.setUp()
 
95
            self.addCleanup(self._chroot_server.tearDown)
 
96
        t = get_transport(self._chroot_server.get_url())
 
97
        if relpath is not None:
 
98
            t = t.clone(relpath)
 
99
        return t
 
100
 
 
101
 
 
102
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
 
103
 
 
104
    def setUp(self):
 
105
        super(TestCaseWithSmartMedium, self).setUp()
 
106
        # We're allowed to set  the transport class here, so that we don't use
 
107
        # the default or a parameterized class, but rather use the
 
108
        # TestCaseWithTransport infrastructure to set up a smart server and
 
109
        # transport.
 
110
        self.transport_server = self.make_transport_server
 
111
 
 
112
    def make_transport_server(self):
 
113
        return smart.server.SmartTCPServer_for_testing('-' + self.id())
 
114
 
 
115
    def get_smart_medium(self):
 
116
        """Get a smart medium to use in tests."""
 
117
        return self.get_transport().get_smart_medium()
 
118
 
 
119
 
 
120
class TestSmartServerResponse(tests.TestCase):
 
121
 
 
122
    def test__eq__(self):
 
123
        self.assertEqual(SmartServerResponse(('ok', )),
 
124
            SmartServerResponse(('ok', )))
 
125
        self.assertEqual(SmartServerResponse(('ok', ), 'body'),
 
126
            SmartServerResponse(('ok', ), 'body'))
 
127
        self.assertNotEqual(SmartServerResponse(('ok', )),
 
128
            SmartServerResponse(('notok', )))
 
129
        self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
 
130
            SmartServerResponse(('ok', )))
 
131
        self.assertNotEqual(None,
 
132
            SmartServerResponse(('ok', )))
 
133
 
 
134
    def test__str__(self):
 
135
        """SmartServerResponses can be stringified."""
 
136
        self.assertEqual(
 
137
            "<SuccessfulSmartServerResponse args=('args',) body='body'>",
 
138
            str(SuccessfulSmartServerResponse(('args',), 'body')))
 
139
        self.assertEqual(
 
140
            "<FailedSmartServerResponse args=('args',) body='body'>",
 
141
            str(FailedSmartServerResponse(('args',), 'body')))
 
142
 
 
143
 
 
144
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
 
145
 
 
146
    def test_translate_client_path(self):
 
147
        transport = self.get_transport()
 
148
        request = SmartServerRequest(transport, 'foo/')
 
149
        self.assertEqual('./', request.translate_client_path('foo/'))
 
150
        self.assertRaises(
 
151
            errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
 
152
        self.assertRaises(
 
153
            errors.PathNotChild, request.translate_client_path, '/')
 
154
        self.assertRaises(
 
155
            errors.PathNotChild, request.translate_client_path, 'bar/')
 
156
        self.assertEqual('./baz', request.translate_client_path('foo/baz'))
 
157
 
 
158
    def test_transport_from_client_path(self):
 
159
        transport = self.get_transport()
 
160
        request = SmartServerRequest(transport, 'foo/')
 
161
        self.assertEqual(
 
162
            transport.base,
 
163
            request.transport_from_client_path('foo/').base)
 
164
 
 
165
 
 
166
class TestSmartServerBzrDirRequestCloningMetaDir(
 
167
    tests.TestCaseWithMemoryTransport):
 
168
    """Tests for BzrDir.cloning_metadir."""
 
169
 
 
170
    def test_cloning_metadir(self):
 
171
        """When there is a bzrdir present, the call succeeds."""
 
172
        backing = self.get_transport()
 
173
        dir = self.make_bzrdir('.')
 
174
        local_result = dir.cloning_metadir()
 
175
        request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
 
176
        request = request_class(backing)
 
177
        expected = SuccessfulSmartServerResponse(
 
178
            (local_result.network_name(),
 
179
            local_result.repository_format.network_name(),
 
180
            ('direct', local_result.get_branch_format().network_name())))
 
181
        self.assertEqual(expected, request.execute('', 'False'))
 
182
 
 
183
    def test_cloning_metadir_reference(self):
 
184
        """The request works when bzrdir contains a branch reference."""
 
185
        backing = self.get_transport()
 
186
        referenced_branch = self.make_branch('referenced')
 
187
        dir = self.make_bzrdir('.')
 
188
        local_result = dir.cloning_metadir()
 
189
        reference = BranchReferenceFormat().initialize(dir, referenced_branch)
 
190
        reference_url = BranchReferenceFormat().get_reference(dir)
 
191
        # The server shouldn't try to follow the branch reference, so it's fine
 
192
        # if the referenced branch isn't reachable.
 
193
        backing.rename('referenced', 'moved')
 
194
        request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
 
195
        request = request_class(backing)
 
196
        expected = SuccessfulSmartServerResponse(
 
197
            (local_result.network_name(),
 
198
            local_result.repository_format.network_name(),
 
199
            ('reference', reference_url)))
 
200
        self.assertEqual(expected, request.execute('', 'False'))
 
201
 
 
202
 
 
203
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
 
204
    """Tests for BzrDir.create_repository."""
 
205
 
 
206
    def test_makes_repository(self):
 
207
        """When there is a bzrdir present, the call succeeds."""
 
208
        backing = self.get_transport()
 
209
        self.make_bzrdir('.')
 
210
        request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
 
211
        request = request_class(backing)
 
212
        reference_bzrdir_format = bzrdir.format_registry.get('default')()
 
213
        reference_format = reference_bzrdir_format.repository_format
 
214
        network_name = reference_format.network_name()
 
215
        expected = SuccessfulSmartServerResponse(
 
216
            ('ok', 'no', 'no', 'no', network_name))
 
217
        self.assertEqual(expected, request.execute('', network_name, 'True'))
 
218
 
 
219
 
 
220
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
 
221
    """Tests for BzrDir.find_repository."""
 
222
 
 
223
    def test_no_repository(self):
 
224
        """When there is no repository to be found, ('norepository', ) is returned."""
 
225
        backing = self.get_transport()
 
226
        request = self._request_class(backing)
 
227
        self.make_bzrdir('.')
 
228
        self.assertEqual(SmartServerResponse(('norepository', )),
 
229
            request.execute(''))
 
230
 
 
231
    def test_nonshared_repository(self):
 
232
        # nonshared repositorys only allow 'find' to return a handle when the
 
233
        # path the repository is being searched on is the same as that that
 
234
        # the repository is at.
 
235
        backing = self.get_transport()
 
236
        request = self._request_class(backing)
 
237
        result = self._make_repository_and_result()
 
238
        self.assertEqual(result, request.execute(''))
 
239
        self.make_bzrdir('subdir')
 
240
        self.assertEqual(SmartServerResponse(('norepository', )),
 
241
            request.execute('subdir'))
 
242
 
 
243
    def _make_repository_and_result(self, shared=False, format=None):
 
244
        """Convenience function to setup a repository.
 
245
 
 
246
        :result: The SmartServerResponse to expect when opening it.
 
247
        """
 
248
        repo = self.make_repository('.', shared=shared, format=format)
 
249
        if repo.supports_rich_root():
 
250
            rich_root = 'yes'
 
251
        else:
 
252
            rich_root = 'no'
 
253
        if repo._format.supports_tree_reference:
 
254
            subtrees = 'yes'
 
255
        else:
 
256
            subtrees = 'no'
 
257
        if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
 
258
            self._request_class):
 
259
            return SuccessfulSmartServerResponse(
 
260
                ('ok', '', rich_root, subtrees, 'no',
 
261
                 repo._format.network_name()))
 
262
        elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
 
263
            self._request_class):
 
264
            # All tests so far are on formats, and for non-external
 
265
            # repositories.
 
266
            return SuccessfulSmartServerResponse(
 
267
                ('ok', '', rich_root, subtrees, 'no'))
 
268
        else:
 
269
            return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
 
270
 
 
271
    def test_shared_repository(self):
 
272
        """When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
 
273
        backing = self.get_transport()
 
274
        request = self._request_class(backing)
 
275
        result = self._make_repository_and_result(shared=True)
 
276
        self.assertEqual(result, request.execute(''))
 
277
        self.make_bzrdir('subdir')
 
278
        result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
 
279
        self.assertEqual(result2,
 
280
            request.execute('subdir'))
 
281
        self.make_bzrdir('subdir/deeper')
 
282
        result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
 
283
        self.assertEqual(result3,
 
284
            request.execute('subdir/deeper'))
 
285
 
 
286
    def test_rich_root_and_subtree_encoding(self):
 
287
        """Test for the format attributes for rich root and subtree support."""
 
288
        backing = self.get_transport()
 
289
        request = self._request_class(backing)
 
290
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
291
        # check the test will be valid
 
292
        self.assertEqual('yes', result.args[2])
 
293
        self.assertEqual('yes', result.args[3])
 
294
        self.assertEqual(result, request.execute(''))
 
295
 
 
296
    def test_supports_external_lookups_no_v2(self):
 
297
        """Test for the supports_external_lookups attribute."""
 
298
        backing = self.get_transport()
 
299
        request = self._request_class(backing)
 
300
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
301
        # check the test will be valid
 
302
        self.assertEqual('no', result.args[4])
 
303
        self.assertEqual(result, request.execute(''))
 
304
 
 
305
 
 
306
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
 
307
 
 
308
    def test_empty_dir(self):
 
309
        """Initializing an empty dir should succeed and do it."""
 
310
        backing = self.get_transport()
 
311
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
312
        self.assertEqual(SmartServerResponse(('ok', )),
 
313
            request.execute(''))
 
314
        made_dir = bzrdir.BzrDir.open_from_transport(backing)
 
315
        # no branch, tree or repository is expected with the current
 
316
        # default formart.
 
317
        self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
 
318
        self.assertRaises(errors.NotBranchError, made_dir.open_branch)
 
319
        self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
 
320
 
 
321
    def test_missing_dir(self):
 
322
        """Initializing a missing directory should fail like the bzrdir api."""
 
323
        backing = self.get_transport()
 
324
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
325
        self.assertRaises(errors.NoSuchFile,
 
326
            request.execute, 'subdir')
 
327
 
 
328
    def test_initialized_dir(self):
 
329
        """Initializing an extant bzrdir should fail like the bzrdir api."""
 
330
        backing = self.get_transport()
 
331
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
332
        self.make_bzrdir('subdir')
 
333
        self.assertRaises(errors.FileExists,
 
334
            request.execute, 'subdir')
 
335
 
 
336
 
 
337
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
 
338
 
 
339
    def test_no_branch(self):
 
340
        """When there is no branch, ('nobranch', ) is returned."""
 
341
        backing = self.get_transport()
 
342
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
343
        self.make_bzrdir('.')
 
344
        self.assertEqual(SmartServerResponse(('nobranch', )),
 
345
            request.execute(''))
 
346
 
 
347
    def test_branch(self):
 
348
        """When there is a branch, 'ok' is returned."""
 
349
        backing = self.get_transport()
 
350
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
351
        self.make_branch('.')
 
352
        self.assertEqual(SmartServerResponse(('ok', '')),
 
353
            request.execute(''))
 
354
 
 
355
    def test_branch_reference(self):
 
356
        """When there is a branch reference, the reference URL is returned."""
 
357
        backing = self.get_transport()
 
358
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
359
        branch = self.make_branch('branch')
 
360
        checkout = branch.create_checkout('reference',lightweight=True)
 
361
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
362
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
363
        self.assertEqual(SmartServerResponse(('ok', reference_url)),
 
364
            request.execute('reference'))
 
365
 
 
366
 
 
367
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
 
368
 
 
369
    def test_empty(self):
 
370
        """For an empty branch, the body is empty."""
 
371
        backing = self.get_transport()
 
372
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
 
373
        self.make_branch('.')
 
374
        self.assertEqual(SmartServerResponse(('ok', ), ''),
 
375
            request.execute(''))
 
376
 
 
377
    def test_not_empty(self):
 
378
        """For a non-empty branch, the body is empty."""
 
379
        backing = self.get_transport()
 
380
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
 
381
        tree = self.make_branch_and_memory_tree('.')
 
382
        tree.lock_write()
 
383
        tree.add('')
 
384
        r1 = tree.commit('1st commit')
 
385
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
386
        tree.unlock()
 
387
        self.assertEqual(
 
388
            SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
 
389
            request.execute(''))
 
390
 
 
391
 
 
392
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
 
393
 
 
394
    def test_no_branch(self):
 
395
        """When there is a bzrdir and no branch, NotBranchError is raised."""
 
396
        backing = self.get_transport()
 
397
        request = smart.branch.SmartServerBranchRequest(backing)
 
398
        self.make_bzrdir('.')
 
399
        self.assertRaises(errors.NotBranchError,
 
400
            request.execute, '')
 
401
 
 
402
    def test_branch_reference(self):
 
403
        """When there is a branch reference, NotBranchError is raised."""
 
404
        backing = self.get_transport()
 
405
        request = smart.branch.SmartServerBranchRequest(backing)
 
406
        branch = self.make_branch('branch')
 
407
        checkout = branch.create_checkout('reference',lightweight=True)
 
408
        self.assertRaises(errors.NotBranchError,
 
409
            request.execute, 'checkout')
 
410
 
 
411
 
 
412
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
 
413
 
 
414
    def test_empty(self):
 
415
        """For an empty branch, the result is ('ok', '0', 'null:')."""
 
416
        backing = self.get_transport()
 
417
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
 
418
        self.make_branch('.')
 
419
        self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
 
420
            request.execute(''))
 
421
 
 
422
    def test_not_empty(self):
 
423
        """For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
 
424
        backing = self.get_transport()
 
425
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
 
426
        tree = self.make_branch_and_memory_tree('.')
 
427
        tree.lock_write()
 
428
        tree.add('')
 
429
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
430
        r1 = tree.commit('1st commit')
 
431
        r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
 
432
        tree.unlock()
 
433
        self.assertEqual(
 
434
            SmartServerResponse(('ok', '2', rev_id_utf8)),
 
435
            request.execute(''))
 
436
 
 
437
 
 
438
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
 
439
 
 
440
    def test_default(self):
 
441
        """With no file, we get empty content."""
 
442
        backing = self.get_transport()
 
443
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
 
444
        branch = self.make_branch('.')
 
445
        # there should be no file by default
 
446
        content = ''
 
447
        self.assertEqual(SmartServerResponse(('ok', ), content),
 
448
            request.execute(''))
 
449
 
 
450
    def test_with_content(self):
 
451
        # SmartServerBranchGetConfigFile should return the content from
 
452
        # branch.control_files.get('branch.conf') for now - in the future it may
 
453
        # perform more complex processing.
 
454
        backing = self.get_transport()
 
455
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
 
456
        branch = self.make_branch('.')
 
457
        branch._transport.put_bytes('branch.conf', 'foo bar baz')
 
458
        self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
 
459
            request.execute(''))
 
460
 
 
461
 
 
462
class SetLastRevisionTestBase(tests.TestCaseWithMemoryTransport):
 
463
    """Base test case for verbs that implement set_last_revision."""
 
464
 
 
465
    def setUp(self):
 
466
        tests.TestCaseWithMemoryTransport.setUp(self)
 
467
        backing_transport = self.get_transport()
 
468
        self.request = self.request_class(backing_transport)
 
469
        self.tree = self.make_branch_and_memory_tree('.')
 
470
 
 
471
    def lock_branch(self):
 
472
        b = self.tree.branch
 
473
        branch_token = b.lock_write()
 
474
        repo_token = b.repository.lock_write()
 
475
        b.repository.unlock()
 
476
        return branch_token, repo_token
 
477
 
 
478
    def unlock_branch(self):
 
479
        self.tree.branch.unlock()
 
480
 
 
481
    def set_last_revision(self, revision_id, revno):
 
482
        branch_token, repo_token = self.lock_branch()
 
483
        response = self._set_last_revision(
 
484
            revision_id, revno, branch_token, repo_token)
 
485
        self.unlock_branch()
 
486
        return response
 
487
 
 
488
    def assertRequestSucceeds(self, revision_id, revno):
 
489
        response = self.set_last_revision(revision_id, revno)
 
490
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
 
491
 
 
492
 
 
493
class TestSetLastRevisionVerbMixin(object):
 
494
    """Mixin test case for verbs that implement set_last_revision."""
 
495
 
 
496
    def test_set_null_to_null(self):
 
497
        """An empty branch can have its last revision set to 'null:'."""
 
498
        self.assertRequestSucceeds('null:', 0)
 
499
 
 
500
    def test_NoSuchRevision(self):
 
501
        """If the revision_id is not present, the verb returns NoSuchRevision.
 
502
        """
 
503
        revision_id = 'non-existent revision'
 
504
        self.assertEqual(
 
505
            FailedSmartServerResponse(('NoSuchRevision', revision_id)),
 
506
            self.set_last_revision(revision_id, 1))
 
507
 
 
508
    def make_tree_with_two_commits(self):
 
509
        self.tree.lock_write()
 
510
        self.tree.add('')
 
511
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
512
        r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
 
513
        r2 = self.tree.commit('2nd commit', rev_id='rev-2')
 
514
        self.tree.unlock()
 
515
 
 
516
    def test_branch_last_revision_info_is_updated(self):
 
517
        """A branch's tip can be set to a revision that is present in its
 
518
        repository.
 
519
        """
 
520
        # Make a branch with an empty revision history, but two revisions in
 
521
        # its repository.
 
522
        self.make_tree_with_two_commits()
 
523
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
524
        self.tree.branch.set_revision_history([])
 
525
        self.assertEqual(
 
526
            (0, 'null:'), self.tree.branch.last_revision_info())
 
527
        # We can update the branch to a revision that is present in the
 
528
        # repository.
 
529
        self.assertRequestSucceeds(rev_id_utf8, 1)
 
530
        self.assertEqual(
 
531
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
532
 
 
533
    def test_branch_last_revision_info_rewind(self):
 
534
        """A branch's tip can be set to a revision that is an ancestor of the
 
535
        current tip.
 
536
        """
 
537
        self.make_tree_with_two_commits()
 
538
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
539
        self.assertEqual(
 
540
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
541
        self.assertRequestSucceeds(rev_id_utf8, 1)
 
542
        self.assertEqual(
 
543
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
544
 
 
545
    def test_TipChangeRejected(self):
 
546
        """If a pre_change_branch_tip hook raises TipChangeRejected, the verb
 
547
        returns TipChangeRejected.
 
548
        """
 
549
        rejection_message = u'rejection message\N{INTERROBANG}'
 
550
        def hook_that_rejects(params):
 
551
            raise errors.TipChangeRejected(rejection_message)
 
552
        Branch.hooks.install_named_hook(
 
553
            'pre_change_branch_tip', hook_that_rejects, None)
 
554
        self.assertEqual(
 
555
            FailedSmartServerResponse(
 
556
                ('TipChangeRejected', rejection_message.encode('utf-8'))),
 
557
            self.set_last_revision('null:', 0))
 
558
 
 
559
 
 
560
class TestSmartServerBranchRequestSetLastRevision(
 
561
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
562
    """Tests for Branch.set_last_revision verb."""
 
563
 
 
564
    request_class = smart.branch.SmartServerBranchRequestSetLastRevision
 
565
 
 
566
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
567
        return self.request.execute(
 
568
            '', branch_token, repo_token, revision_id)
 
569
 
 
570
 
 
571
class TestSmartServerBranchRequestSetLastRevisionInfo(
 
572
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
573
    """Tests for Branch.set_last_revision_info verb."""
 
574
 
 
575
    request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
 
576
 
 
577
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
578
        return self.request.execute(
 
579
            '', branch_token, repo_token, revno, revision_id)
 
580
 
 
581
    def test_NoSuchRevision(self):
 
582
        """Branch.set_last_revision_info does not have to return
 
583
        NoSuchRevision if the revision_id is absent.
 
584
        """
 
585
        raise tests.TestNotApplicable()
 
586
 
 
587
 
 
588
class TestSmartServerBranchRequestSetLastRevisionEx(
 
589
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
590
    """Tests for Branch.set_last_revision_ex verb."""
 
591
 
 
592
    request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
 
593
 
 
594
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
595
        return self.request.execute(
 
596
            '', branch_token, repo_token, revision_id, 0, 0)
 
597
 
 
598
    def assertRequestSucceeds(self, revision_id, revno):
 
599
        response = self.set_last_revision(revision_id, revno)
 
600
        self.assertEqual(
 
601
            SuccessfulSmartServerResponse(('ok', revno, revision_id)),
 
602
            response)
 
603
 
 
604
    def test_branch_last_revision_info_rewind(self):
 
605
        """A branch's tip can be set to a revision that is an ancestor of the
 
606
        current tip, but only if allow_overwrite_descendant is passed.
 
607
        """
 
608
        self.make_tree_with_two_commits()
 
609
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
610
        self.assertEqual(
 
611
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
612
        # If allow_overwrite_descendant flag is 0, then trying to set the tip
 
613
        # to an older revision ID has no effect.
 
614
        branch_token, repo_token = self.lock_branch()
 
615
        response = self.request.execute(
 
616
            '', branch_token, repo_token, rev_id_utf8, 0, 0)
 
617
        self.assertEqual(
 
618
            SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
 
619
            response)
 
620
        self.assertEqual(
 
621
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
622
 
 
623
        # If allow_overwrite_descendant flag is 1, then setting the tip to an
 
624
        # ancestor works.
 
625
        response = self.request.execute(
 
626
            '', branch_token, repo_token, rev_id_utf8, 0, 1)
 
627
        self.assertEqual(
 
628
            SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
 
629
            response)
 
630
        self.unlock_branch()
 
631
        self.assertEqual(
 
632
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
633
 
 
634
    def make_branch_with_divergent_history(self):
 
635
        """Make a branch with divergent history in its repo.
 
636
 
 
637
        The branch's tip will be 'child-2', and the repo will also contain
 
638
        'child-1', which diverges from a common base revision.
 
639
        """
 
640
        self.tree.lock_write()
 
641
        self.tree.add('')
 
642
        r1 = self.tree.commit('1st commit')
 
643
        revno_1, revid_1 = self.tree.branch.last_revision_info()
 
644
        r2 = self.tree.commit('2nd commit', rev_id='child-1')
 
645
        # Undo the second commit
 
646
        self.tree.branch.set_last_revision_info(revno_1, revid_1)
 
647
        self.tree.set_parent_ids([revid_1])
 
648
        # Make a new second commit, child-2.  child-2 has diverged from
 
649
        # child-1.
 
650
        new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
 
651
        self.tree.unlock()
 
652
 
 
653
    def test_not_allow_diverged(self):
 
654
        """If allow_diverged is not passed, then setting a divergent history
 
655
        returns a Diverged error.
 
656
        """
 
657
        self.make_branch_with_divergent_history()
 
658
        self.assertEqual(
 
659
            FailedSmartServerResponse(('Diverged',)),
 
660
            self.set_last_revision('child-1', 2))
 
661
        # The branch tip was not changed.
 
662
        self.assertEqual('child-2', self.tree.branch.last_revision())
 
663
 
 
664
    def test_allow_diverged(self):
 
665
        """If allow_diverged is passed, then setting a divergent history
 
666
        succeeds.
 
667
        """
 
668
        self.make_branch_with_divergent_history()
 
669
        branch_token, repo_token = self.lock_branch()
 
670
        response = self.request.execute(
 
671
            '', branch_token, repo_token, 'child-1', 1, 0)
 
672
        self.assertEqual(
 
673
            SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
 
674
            response)
 
675
        self.unlock_branch()
 
676
        # The branch tip was changed.
 
677
        self.assertEqual('child-1', self.tree.branch.last_revision())
 
678
 
 
679
 
 
680
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
 
681
 
 
682
    def test_get_parent_none(self):
 
683
        base_branch = self.make_branch('base')
 
684
        request = smart.branch.SmartServerBranchGetParent(self.get_transport())
 
685
        response = request.execute('base')
 
686
        self.assertEquals(
 
687
            SuccessfulSmartServerResponse(('',)), response)
 
688
 
 
689
    def test_get_parent_something(self):
 
690
        base_branch = self.make_branch('base')
 
691
        base_branch.set_parent(self.get_url('foo'))
 
692
        request = smart.branch.SmartServerBranchGetParent(self.get_transport())
 
693
        response = request.execute('base')
 
694
        self.assertEquals(
 
695
            SuccessfulSmartServerResponse(("../foo",)),
 
696
            response)
 
697
 
 
698
 
 
699
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
 
700
 
 
701
    def test_get_stacked_on_url(self):
 
702
        base_branch = self.make_branch('base', format='1.6')
 
703
        stacked_branch = self.make_branch('stacked', format='1.6')
 
704
        # typically should be relative
 
705
        stacked_branch.set_stacked_on_url('../base')
 
706
        request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
 
707
            self.get_transport())
 
708
        response = request.execute('stacked')
 
709
        self.assertEquals(
 
710
            SmartServerResponse(('ok', '../base')),
 
711
            response)
 
712
 
 
713
 
 
714
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
 
715
 
 
716
    def setUp(self):
 
717
        tests.TestCaseWithMemoryTransport.setUp(self)
 
718
 
 
719
    def test_lock_write_on_unlocked_branch(self):
 
720
        backing = self.get_transport()
 
721
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
722
        branch = self.make_branch('.', format='knit')
 
723
        repository = branch.repository
 
724
        response = request.execute('')
 
725
        branch_nonce = branch.control_files._lock.peek().get('nonce')
 
726
        repository_nonce = repository.control_files._lock.peek().get('nonce')
 
727
        self.assertEqual(
 
728
            SmartServerResponse(('ok', branch_nonce, repository_nonce)),
 
729
            response)
 
730
        # The branch (and associated repository) is now locked.  Verify that
 
731
        # with a new branch object.
 
732
        new_branch = repository.bzrdir.open_branch()
 
733
        self.assertRaises(errors.LockContention, new_branch.lock_write)
 
734
 
 
735
    def test_lock_write_on_locked_branch(self):
 
736
        backing = self.get_transport()
 
737
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
738
        branch = self.make_branch('.')
 
739
        branch.lock_write()
 
740
        branch.leave_lock_in_place()
 
741
        branch.unlock()
 
742
        response = request.execute('')
 
743
        self.assertEqual(
 
744
            SmartServerResponse(('LockContention',)), response)
 
745
 
 
746
    def test_lock_write_with_tokens_on_locked_branch(self):
 
747
        backing = self.get_transport()
 
748
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
749
        branch = self.make_branch('.', format='knit')
 
750
        branch_token = branch.lock_write()
 
751
        repo_token = branch.repository.lock_write()
 
752
        branch.repository.unlock()
 
753
        branch.leave_lock_in_place()
 
754
        branch.repository.leave_lock_in_place()
 
755
        branch.unlock()
 
756
        response = request.execute('',
 
757
                                   branch_token, repo_token)
 
758
        self.assertEqual(
 
759
            SmartServerResponse(('ok', branch_token, repo_token)), response)
 
760
 
 
761
    def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
 
762
        backing = self.get_transport()
 
763
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
764
        branch = self.make_branch('.', format='knit')
 
765
        branch_token = branch.lock_write()
 
766
        repo_token = branch.repository.lock_write()
 
767
        branch.repository.unlock()
 
768
        branch.leave_lock_in_place()
 
769
        branch.repository.leave_lock_in_place()
 
770
        branch.unlock()
 
771
        response = request.execute('',
 
772
                                   branch_token+'xxx', repo_token)
 
773
        self.assertEqual(
 
774
            SmartServerResponse(('TokenMismatch',)), response)
 
775
 
 
776
    def test_lock_write_on_locked_repo(self):
 
777
        backing = self.get_transport()
 
778
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
779
        branch = self.make_branch('.', format='knit')
 
780
        branch.repository.lock_write()
 
781
        branch.repository.leave_lock_in_place()
 
782
        branch.repository.unlock()
 
783
        response = request.execute('')
 
784
        self.assertEqual(
 
785
            SmartServerResponse(('LockContention',)), response)
 
786
 
 
787
    def test_lock_write_on_readonly_transport(self):
 
788
        backing = self.get_readonly_transport()
 
789
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
790
        branch = self.make_branch('.')
 
791
        root = self.get_transport().clone('/')
 
792
        path = urlutils.relative_url(root.base, self.get_transport().base)
 
793
        response = request.execute(path)
 
794
        error_name, lock_str, why_str = response.args
 
795
        self.assertFalse(response.is_successful())
 
796
        self.assertEqual('LockFailed', error_name)
 
797
 
 
798
 
 
799
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
 
800
 
 
801
    def setUp(self):
 
802
        tests.TestCaseWithMemoryTransport.setUp(self)
 
803
 
 
804
    def test_unlock_on_locked_branch_and_repo(self):
 
805
        backing = self.get_transport()
 
806
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
807
        branch = self.make_branch('.', format='knit')
 
808
        # Lock the branch
 
809
        branch_token = branch.lock_write()
 
810
        repo_token = branch.repository.lock_write()
 
811
        branch.repository.unlock()
 
812
        # Unlock the branch (and repo) object, leaving the physical locks
 
813
        # in place.
 
814
        branch.leave_lock_in_place()
 
815
        branch.repository.leave_lock_in_place()
 
816
        branch.unlock()
 
817
        response = request.execute('',
 
818
                                   branch_token, repo_token)
 
819
        self.assertEqual(
 
820
            SmartServerResponse(('ok',)), response)
 
821
        # The branch is now unlocked.  Verify that with a new branch
 
822
        # object.
 
823
        new_branch = branch.bzrdir.open_branch()
 
824
        new_branch.lock_write()
 
825
        new_branch.unlock()
 
826
 
 
827
    def test_unlock_on_unlocked_branch_unlocked_repo(self):
 
828
        backing = self.get_transport()
 
829
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
830
        branch = self.make_branch('.', format='knit')
 
831
        response = request.execute(
 
832
            '', 'branch token', 'repo token')
 
833
        self.assertEqual(
 
834
            SmartServerResponse(('TokenMismatch',)), response)
 
835
 
 
836
    def test_unlock_on_unlocked_branch_locked_repo(self):
 
837
        backing = self.get_transport()
 
838
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
839
        branch = self.make_branch('.', format='knit')
 
840
        # Lock the repository.
 
841
        repo_token = branch.repository.lock_write()
 
842
        branch.repository.leave_lock_in_place()
 
843
        branch.repository.unlock()
 
844
        # Issue branch lock_write request on the unlocked branch (with locked
 
845
        # repo).
 
846
        response = request.execute(
 
847
            '', 'branch token', repo_token)
 
848
        self.assertEqual(
 
849
            SmartServerResponse(('TokenMismatch',)), response)
 
850
 
 
851
 
 
852
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
 
853
 
 
854
    def test_no_repository(self):
 
855
        """Raise NoRepositoryPresent when there is a bzrdir and no repo."""
 
856
        # we test this using a shared repository above the named path,
 
857
        # thus checking the right search logic is used - that is, that
 
858
        # its the exact path being looked at and the server is not
 
859
        # searching.
 
860
        backing = self.get_transport()
 
861
        request = smart.repository.SmartServerRepositoryRequest(backing)
 
862
        self.make_repository('.', shared=True)
 
863
        self.make_bzrdir('subdir')
 
864
        self.assertRaises(errors.NoRepositoryPresent,
 
865
            request.execute, 'subdir')
 
866
 
 
867
 
 
868
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
 
869
 
 
870
    def test_trivial_bzipped(self):
 
871
        # This tests that the wire encoding is actually bzipped
 
872
        backing = self.get_transport()
 
873
        request = smart.repository.SmartServerRepositoryGetParentMap(backing)
 
874
        tree = self.make_branch_and_memory_tree('.')
 
875
 
 
876
        self.assertEqual(None,
 
877
            request.execute('', 'missing-id'))
 
878
        # Note that it returns a body (of '' bzipped).
 
879
        self.assertEqual(
 
880
            SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
 
881
            request.do_body('\n\n0\n'))
 
882
 
 
883
 
 
884
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
 
885
 
 
886
    def test_none_argument(self):
 
887
        backing = self.get_transport()
 
888
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
889
        tree = self.make_branch_and_memory_tree('.')
 
890
        tree.lock_write()
 
891
        tree.add('')
 
892
        r1 = tree.commit('1st commit')
 
893
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
894
        tree.unlock()
 
895
 
 
896
        # the lines of revision_id->revision_parent_list has no guaranteed
 
897
        # order coming out of a dict, so sort both our test and response
 
898
        lines = sorted([' '.join([r2, r1]), r1])
 
899
        response = request.execute('', '')
 
900
        response.body = '\n'.join(sorted(response.body.split('\n')))
 
901
 
 
902
        self.assertEqual(
 
903
            SmartServerResponse(('ok', ), '\n'.join(lines)), response)
 
904
 
 
905
    def test_specific_revision_argument(self):
 
906
        backing = self.get_transport()
 
907
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
908
        tree = self.make_branch_and_memory_tree('.')
 
909
        tree.lock_write()
 
910
        tree.add('')
 
911
        rev_id_utf8 = u'\xc9'.encode('utf-8')
 
912
        r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
 
913
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
914
        tree.unlock()
 
915
 
 
916
        self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
 
917
            request.execute('', rev_id_utf8))
 
918
 
 
919
    def test_no_such_revision(self):
 
920
        backing = self.get_transport()
 
921
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
922
        tree = self.make_branch_and_memory_tree('.')
 
923
        tree.lock_write()
 
924
        tree.add('')
 
925
        r1 = tree.commit('1st commit')
 
926
        tree.unlock()
 
927
 
 
928
        # Note that it still returns body (of zero bytes).
 
929
        self.assertEqual(
 
930
            SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
 
931
            request.execute('', 'missingrevision'))
 
932
 
 
933
 
 
934
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
 
935
 
 
936
    def test_missing_revision(self):
 
937
        """For a missing revision, ('no', ) is returned."""
 
938
        backing = self.get_transport()
 
939
        request = smart.repository.SmartServerRequestHasRevision(backing)
 
940
        self.make_repository('.')
 
941
        self.assertEqual(SmartServerResponse(('no', )),
 
942
            request.execute('', 'revid'))
 
943
 
 
944
    def test_present_revision(self):
 
945
        """For a present revision, ('yes', ) is returned."""
 
946
        backing = self.get_transport()
 
947
        request = smart.repository.SmartServerRequestHasRevision(backing)
 
948
        tree = self.make_branch_and_memory_tree('.')
 
949
        tree.lock_write()
 
950
        tree.add('')
 
951
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
952
        r1 = tree.commit('a commit', rev_id=rev_id_utf8)
 
953
        tree.unlock()
 
954
        self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
 
955
        self.assertEqual(SmartServerResponse(('yes', )),
 
956
            request.execute('', rev_id_utf8))
 
957
 
 
958
 
 
959
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
 
960
 
 
961
    def test_empty_revid(self):
 
962
        """With an empty revid, we get only size an number and revisions"""
 
963
        backing = self.get_transport()
 
964
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
965
        repository = self.make_repository('.')
 
966
        stats = repository.gather_stats()
 
967
        expected_body = 'revisions: 0\n'
 
968
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
969
                         request.execute('', '', 'no'))
 
970
 
 
971
    def test_revid_with_committers(self):
 
972
        """For a revid we get more infos."""
 
973
        backing = self.get_transport()
 
974
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
975
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
976
        tree = self.make_branch_and_memory_tree('.')
 
977
        tree.lock_write()
 
978
        tree.add('')
 
979
        # Let's build a predictable result
 
980
        tree.commit('a commit', timestamp=123456.2, timezone=3600)
 
981
        tree.commit('a commit', timestamp=654321.4, timezone=0,
 
982
                    rev_id=rev_id_utf8)
 
983
        tree.unlock()
 
984
 
 
985
        stats = tree.branch.repository.gather_stats()
 
986
        expected_body = ('firstrev: 123456.200 3600\n'
 
987
                         'latestrev: 654321.400 0\n'
 
988
                         'revisions: 2\n')
 
989
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
990
                         request.execute('',
 
991
                                         rev_id_utf8, 'no'))
 
992
 
 
993
    def test_not_empty_repository_with_committers(self):
 
994
        """For a revid and requesting committers we get the whole thing."""
 
995
        backing = self.get_transport()
 
996
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
997
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
998
        tree = self.make_branch_and_memory_tree('.')
 
999
        tree.lock_write()
 
1000
        tree.add('')
 
1001
        # Let's build a predictable result
 
1002
        tree.commit('a commit', timestamp=123456.2, timezone=3600,
 
1003
                    committer='foo')
 
1004
        tree.commit('a commit', timestamp=654321.4, timezone=0,
 
1005
                    committer='bar', rev_id=rev_id_utf8)
 
1006
        tree.unlock()
 
1007
        stats = tree.branch.repository.gather_stats()
 
1008
 
 
1009
        expected_body = ('committers: 2\n'
 
1010
                         'firstrev: 123456.200 3600\n'
 
1011
                         'latestrev: 654321.400 0\n'
 
1012
                         'revisions: 2\n')
 
1013
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1014
                         request.execute('',
 
1015
                                         rev_id_utf8, 'yes'))
 
1016
 
 
1017
 
 
1018
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
 
1019
 
 
1020
    def test_is_shared(self):
 
1021
        """For a shared repository, ('yes', ) is returned."""
 
1022
        backing = self.get_transport()
 
1023
        request = smart.repository.SmartServerRepositoryIsShared(backing)
 
1024
        self.make_repository('.', shared=True)
 
1025
        self.assertEqual(SmartServerResponse(('yes', )),
 
1026
            request.execute('', ))
 
1027
 
 
1028
    def test_is_not_shared(self):
 
1029
        """For a shared repository, ('no', ) is returned."""
 
1030
        backing = self.get_transport()
 
1031
        request = smart.repository.SmartServerRepositoryIsShared(backing)
 
1032
        self.make_repository('.', shared=False)
 
1033
        self.assertEqual(SmartServerResponse(('no', )),
 
1034
            request.execute('', ))
 
1035
 
 
1036
 
 
1037
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
 
1038
 
 
1039
    def setUp(self):
 
1040
        tests.TestCaseWithMemoryTransport.setUp(self)
 
1041
 
 
1042
    def test_lock_write_on_unlocked_repo(self):
 
1043
        backing = self.get_transport()
 
1044
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1045
        repository = self.make_repository('.', format='knit')
 
1046
        response = request.execute('')
 
1047
        nonce = repository.control_files._lock.peek().get('nonce')
 
1048
        self.assertEqual(SmartServerResponse(('ok', nonce)), response)
 
1049
        # The repository is now locked.  Verify that with a new repository
 
1050
        # object.
 
1051
        new_repo = repository.bzrdir.open_repository()
 
1052
        self.assertRaises(errors.LockContention, new_repo.lock_write)
 
1053
 
 
1054
    def test_lock_write_on_locked_repo(self):
 
1055
        backing = self.get_transport()
 
1056
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1057
        repository = self.make_repository('.', format='knit')
 
1058
        repository.lock_write()
 
1059
        repository.leave_lock_in_place()
 
1060
        repository.unlock()
 
1061
        response = request.execute('')
 
1062
        self.assertEqual(
 
1063
            SmartServerResponse(('LockContention',)), response)
 
1064
 
 
1065
    def test_lock_write_on_readonly_transport(self):
 
1066
        backing = self.get_readonly_transport()
 
1067
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1068
        repository = self.make_repository('.', format='knit')
 
1069
        response = request.execute('')
 
1070
        self.assertFalse(response.is_successful())
 
1071
        self.assertEqual('LockFailed', response.args[0])
 
1072
 
 
1073
 
 
1074
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
 
1075
 
 
1076
    def setUp(self):
 
1077
        tests.TestCaseWithMemoryTransport.setUp(self)
 
1078
 
 
1079
    def test_unlock_on_locked_repo(self):
 
1080
        backing = self.get_transport()
 
1081
        request = smart.repository.SmartServerRepositoryUnlock(backing)
 
1082
        repository = self.make_repository('.', format='knit')
 
1083
        token = repository.lock_write()
 
1084
        repository.leave_lock_in_place()
 
1085
        repository.unlock()
 
1086
        response = request.execute('', token)
 
1087
        self.assertEqual(
 
1088
            SmartServerResponse(('ok',)), response)
 
1089
        # The repository is now unlocked.  Verify that with a new repository
 
1090
        # object.
 
1091
        new_repo = repository.bzrdir.open_repository()
 
1092
        new_repo.lock_write()
 
1093
        new_repo.unlock()
 
1094
 
 
1095
    def test_unlock_on_unlocked_repo(self):
 
1096
        backing = self.get_transport()
 
1097
        request = smart.repository.SmartServerRepositoryUnlock(backing)
 
1098
        repository = self.make_repository('.', format='knit')
 
1099
        response = request.execute('', 'some token')
 
1100
        self.assertEqual(
 
1101
            SmartServerResponse(('TokenMismatch',)), response)
 
1102
 
 
1103
 
 
1104
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
 
1105
 
 
1106
    def test_is_readonly_no(self):
 
1107
        backing = self.get_transport()
 
1108
        request = smart.request.SmartServerIsReadonly(backing)
 
1109
        response = request.execute()
 
1110
        self.assertEqual(
 
1111
            SmartServerResponse(('no',)), response)
 
1112
 
 
1113
    def test_is_readonly_yes(self):
 
1114
        backing = self.get_readonly_transport()
 
1115
        request = smart.request.SmartServerIsReadonly(backing)
 
1116
        response = request.execute()
 
1117
        self.assertEqual(
 
1118
            SmartServerResponse(('yes',)), response)
 
1119
 
 
1120
 
 
1121
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
 
1122
 
 
1123
    def test_set_false(self):
 
1124
        backing = self.get_transport()
 
1125
        repo = self.make_repository('.', shared=True)
 
1126
        repo.set_make_working_trees(True)
 
1127
        request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
 
1128
        request = request_class(backing)
 
1129
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
 
1130
            request.execute('', 'False'))
 
1131
        repo = repo.bzrdir.open_repository()
 
1132
        self.assertFalse(repo.make_working_trees())
 
1133
 
 
1134
    def test_set_true(self):
 
1135
        backing = self.get_transport()
 
1136
        repo = self.make_repository('.', shared=True)
 
1137
        repo.set_make_working_trees(False)
 
1138
        request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
 
1139
        request = request_class(backing)
 
1140
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
 
1141
            request.execute('', 'True'))
 
1142
        repo = repo.bzrdir.open_repository()
 
1143
        self.assertTrue(repo.make_working_trees())
 
1144
 
 
1145
 
 
1146
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
 
1147
 
 
1148
    def make_repo_needing_autopacking(self, path='.'):
 
1149
        # Make a repo in need of autopacking.
 
1150
        tree = self.make_branch_and_tree('.', format='pack-0.92')
 
1151
        repo = tree.branch.repository
 
1152
        # monkey-patch the pack collection to disable autopacking
 
1153
        repo._pack_collection._max_pack_count = lambda count: count
 
1154
        for x in range(10):
 
1155
            tree.commit('commit %s' % x)
 
1156
        self.assertEqual(10, len(repo._pack_collection.names()))
 
1157
        del repo._pack_collection._max_pack_count
 
1158
        return repo
 
1159
 
 
1160
    def test_autopack_needed(self):
 
1161
        repo = self.make_repo_needing_autopacking()
 
1162
        backing = self.get_transport()
 
1163
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1164
            backing)
 
1165
        response = request.execute('')
 
1166
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1167
        repo._pack_collection.reload_pack_names()
 
1168
        self.assertEqual(1, len(repo._pack_collection.names()))
 
1169
 
 
1170
    def test_autopack_not_needed(self):
 
1171
        tree = self.make_branch_and_tree('.', format='pack-0.92')
 
1172
        repo = tree.branch.repository
 
1173
        for x in range(9):
 
1174
            tree.commit('commit %s' % x)
 
1175
        backing = self.get_transport()
 
1176
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1177
            backing)
 
1178
        response = request.execute('')
 
1179
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1180
        repo._pack_collection.reload_pack_names()
 
1181
        self.assertEqual(9, len(repo._pack_collection.names()))
 
1182
 
 
1183
    def test_autopack_on_nonpack_format(self):
 
1184
        """A request to autopack a non-pack repo is a no-op."""
 
1185
        repo = self.make_repository('.', format='knit')
 
1186
        backing = self.get_transport()
 
1187
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1188
            backing)
 
1189
        response = request.execute('')
 
1190
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1191
 
 
1192
 
 
1193
class TestHandlers(tests.TestCase):
 
1194
    """Tests for the request.request_handlers object."""
 
1195
 
 
1196
    def test_all_registrations_exist(self):
 
1197
        """All registered request_handlers can be found."""
 
1198
        # If there's a typo in a register_lazy call, this loop will fail with
 
1199
        # an AttributeError.
 
1200
        for key, item in smart.request.request_handlers.iteritems():
 
1201
            pass
 
1202
 
 
1203
    def test_registered_methods(self):
 
1204
        """Test that known methods are registered to the correct object."""
 
1205
        self.assertEqual(
 
1206
            smart.request.request_handlers.get('Branch.get_config_file'),
 
1207
            smart.branch.SmartServerBranchGetConfigFile)
 
1208
        self.assertEqual(
 
1209
            smart.request.request_handlers.get('Branch.get_parent'),
 
1210
            smart.branch.SmartServerBranchGetParent)
 
1211
        self.assertEqual(
 
1212
            smart.request.request_handlers.get('Branch.lock_write'),
 
1213
            smart.branch.SmartServerBranchRequestLockWrite)
 
1214
        self.assertEqual(
 
1215
            smart.request.request_handlers.get('Branch.last_revision_info'),
 
1216
            smart.branch.SmartServerBranchRequestLastRevisionInfo)
 
1217
        self.assertEqual(
 
1218
            smart.request.request_handlers.get('Branch.revision_history'),
 
1219
            smart.branch.SmartServerRequestRevisionHistory)
 
1220
        self.assertEqual(
 
1221
            smart.request.request_handlers.get('Branch.set_last_revision'),
 
1222
            smart.branch.SmartServerBranchRequestSetLastRevision)
 
1223
        self.assertEqual(
 
1224
            smart.request.request_handlers.get('Branch.set_last_revision_info'),
 
1225
            smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
 
1226
        self.assertEqual(
 
1227
            smart.request.request_handlers.get('Branch.unlock'),
 
1228
            smart.branch.SmartServerBranchRequestUnlock)
 
1229
        self.assertEqual(
 
1230
            smart.request.request_handlers.get('BzrDir.find_repository'),
 
1231
            smart.bzrdir.SmartServerRequestFindRepositoryV1)
 
1232
        self.assertEqual(
 
1233
            smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
 
1234
            smart.bzrdir.SmartServerRequestFindRepositoryV2)
 
1235
        self.assertEqual(
 
1236
            smart.request.request_handlers.get('BzrDirFormat.initialize'),
 
1237
            smart.bzrdir.SmartServerRequestInitializeBzrDir)
 
1238
        self.assertEqual(
 
1239
            smart.request.request_handlers.get('BzrDir.cloning_metadir'),
 
1240
            smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
 
1241
        self.assertEqual(
 
1242
            smart.request.request_handlers.get('BzrDir.open_branch'),
 
1243
            smart.bzrdir.SmartServerRequestOpenBranch)
 
1244
        self.assertEqual(
 
1245
            smart.request.request_handlers.get('PackRepository.autopack'),
 
1246
            smart.packrepository.SmartServerPackRepositoryAutopack)
 
1247
        self.assertEqual(
 
1248
            smart.request.request_handlers.get('Repository.gather_stats'),
 
1249
            smart.repository.SmartServerRepositoryGatherStats)
 
1250
        self.assertEqual(
 
1251
            smart.request.request_handlers.get('Repository.get_parent_map'),
 
1252
            smart.repository.SmartServerRepositoryGetParentMap)
 
1253
        self.assertEqual(
 
1254
            smart.request.request_handlers.get(
 
1255
                'Repository.get_revision_graph'),
 
1256
            smart.repository.SmartServerRepositoryGetRevisionGraph)
 
1257
        self.assertEqual(
 
1258
            smart.request.request_handlers.get('Repository.has_revision'),
 
1259
            smart.repository.SmartServerRequestHasRevision)
 
1260
        self.assertEqual(
 
1261
            smart.request.request_handlers.get('Repository.is_shared'),
 
1262
            smart.repository.SmartServerRepositoryIsShared)
 
1263
        self.assertEqual(
 
1264
            smart.request.request_handlers.get('Repository.lock_write'),
 
1265
            smart.repository.SmartServerRepositoryLockWrite)
 
1266
        self.assertEqual(
 
1267
            smart.request.request_handlers.get('Repository.get_stream'),
 
1268
            smart.repository.SmartServerRepositoryGetStream)
 
1269
        self.assertEqual(
 
1270
            smart.request.request_handlers.get('Repository.tarball'),
 
1271
            smart.repository.SmartServerRepositoryTarball)
 
1272
        self.assertEqual(
 
1273
            smart.request.request_handlers.get('Repository.unlock'),
 
1274
            smart.repository.SmartServerRepositoryUnlock)
 
1275
        self.assertEqual(
 
1276
            smart.request.request_handlers.get('Transport.is_readonly'),
 
1277
            smart.request.SmartServerIsReadonly)