/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-03-09 02:31:23 UTC
  • mfrom: (4090.2.1 ianc-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20090309023123-7hvnfrilrt5ql771
compare types with is not '=' (Benjamin Peterson)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the smart wire/domain protocol.
 
18
 
 
19
This module contains tests for the domain-level smart requests and responses,
 
20
such as the 'Branch.lock_write' request. Many of these use specific disk
 
21
formats to exercise calls that only make sense for formats with specific
 
22
properties.
 
23
 
 
24
Tests for low-level protocol encoding are found in test_smart_transport.
 
25
"""
 
26
 
 
27
import bz2
 
28
from cStringIO import StringIO
 
29
import tarfile
 
30
 
 
31
from bzrlib import (
 
32
    bzrdir,
 
33
    errors,
 
34
    pack,
 
35
    smart,
 
36
    tests,
 
37
    urlutils,
 
38
    )
 
39
from bzrlib.branch import Branch, BranchReferenceFormat
 
40
import bzrlib.smart.branch
 
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
 
42
import bzrlib.smart.packrepository
 
43
import bzrlib.smart.repository
 
44
from bzrlib.smart.request import (
 
45
    FailedSmartServerResponse,
 
46
    SmartServerRequest,
 
47
    SmartServerResponse,
 
48
    SuccessfulSmartServerResponse,
 
49
    )
 
50
from bzrlib.tests import (
 
51
    iter_suite_tests,
 
52
    split_suite_by_re,
 
53
    TestScenarioApplier,
 
54
    )
 
55
from bzrlib.transport import chroot, get_transport
 
56
from bzrlib.util import bencode
 
57
 
 
58
 
 
59
def load_tests(standard_tests, module, loader):
 
60
    """Multiply tests version and protocol consistency."""
 
61
    # FindRepository tests.
 
62
    bzrdir_mod = bzrlib.smart.bzrdir
 
63
    applier = TestScenarioApplier()
 
64
    applier.scenarios = [
 
65
        ("find_repository", {
 
66
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
 
67
        ("find_repositoryV2", {
 
68
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
 
69
        ("find_repositoryV3", {
 
70
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
 
71
        ]
 
72
    to_adapt, result = split_suite_by_re(standard_tests,
 
73
        "TestSmartServerRequestFindRepository")
 
74
    v2_only, v1_and_2 = split_suite_by_re(to_adapt,
 
75
        "_v2")
 
76
    for test in iter_suite_tests(v1_and_2):
 
77
        result.addTests(applier.adapt(test))
 
78
    del applier.scenarios[0]
 
79
    for test in iter_suite_tests(v2_only):
 
80
        result.addTests(applier.adapt(test))
 
81
    return result
 
82
 
 
83
 
 
84
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
 
85
 
 
86
    def setUp(self):
 
87
        tests.TestCaseWithTransport.setUp(self)
 
88
        self._chroot_server = None
 
89
 
 
90
    def get_transport(self, relpath=None):
 
91
        if self._chroot_server is None:
 
92
            backing_transport = tests.TestCaseWithTransport.get_transport(self)
 
93
            self._chroot_server = chroot.ChrootServer(backing_transport)
 
94
            self._chroot_server.setUp()
 
95
            self.addCleanup(self._chroot_server.tearDown)
 
96
        t = get_transport(self._chroot_server.get_url())
 
97
        if relpath is not None:
 
98
            t = t.clone(relpath)
 
99
        return t
 
100
 
 
101
 
 
102
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
 
103
 
 
104
    def setUp(self):
 
105
        super(TestCaseWithSmartMedium, self).setUp()
 
106
        # We're allowed to set  the transport class here, so that we don't use
 
107
        # the default or a parameterized class, but rather use the
 
108
        # TestCaseWithTransport infrastructure to set up a smart server and
 
109
        # transport.
 
110
        self.transport_server = self.make_transport_server
 
111
 
 
112
    def make_transport_server(self):
 
113
        return smart.server.SmartTCPServer_for_testing('-' + self.id())
 
114
 
 
115
    def get_smart_medium(self):
 
116
        """Get a smart medium to use in tests."""
 
117
        return self.get_transport().get_smart_medium()
 
118
 
 
119
 
 
120
class TestSmartServerResponse(tests.TestCase):
 
121
 
 
122
    def test__eq__(self):
 
123
        self.assertEqual(SmartServerResponse(('ok', )),
 
124
            SmartServerResponse(('ok', )))
 
125
        self.assertEqual(SmartServerResponse(('ok', ), 'body'),
 
126
            SmartServerResponse(('ok', ), 'body'))
 
127
        self.assertNotEqual(SmartServerResponse(('ok', )),
 
128
            SmartServerResponse(('notok', )))
 
129
        self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
 
130
            SmartServerResponse(('ok', )))
 
131
        self.assertNotEqual(None,
 
132
            SmartServerResponse(('ok', )))
 
133
 
 
134
    def test__str__(self):
 
135
        """SmartServerResponses can be stringified."""
 
136
        self.assertEqual(
 
137
            "<SuccessfulSmartServerResponse args=('args',) body='body'>",
 
138
            str(SuccessfulSmartServerResponse(('args',), 'body')))
 
139
        self.assertEqual(
 
140
            "<FailedSmartServerResponse args=('args',) body='body'>",
 
141
            str(FailedSmartServerResponse(('args',), 'body')))
 
142
 
 
143
 
 
144
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
 
145
 
 
146
    def test_translate_client_path(self):
 
147
        transport = self.get_transport()
 
148
        request = SmartServerRequest(transport, 'foo/')
 
149
        self.assertEqual('./', request.translate_client_path('foo/'))
 
150
        self.assertRaises(
 
151
            errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
 
152
        self.assertRaises(
 
153
            errors.PathNotChild, request.translate_client_path, '/')
 
154
        self.assertRaises(
 
155
            errors.PathNotChild, request.translate_client_path, 'bar/')
 
156
        self.assertEqual('./baz', request.translate_client_path('foo/baz'))
 
157
 
 
158
    def test_transport_from_client_path(self):
 
159
        transport = self.get_transport()
 
160
        request = SmartServerRequest(transport, 'foo/')
 
161
        self.assertEqual(
 
162
            transport.base,
 
163
            request.transport_from_client_path('foo/').base)
 
164
 
 
165
 
 
166
class TestSmartServerBzrDirRequestCloningMetaDir(
 
167
    tests.TestCaseWithMemoryTransport):
 
168
    """Tests for BzrDir.cloning_metadir."""
 
169
 
 
170
    def test_cloning_metadir(self):
 
171
        """When there is a bzrdir present, the call succeeds."""
 
172
        backing = self.get_transport()
 
173
        dir = self.make_bzrdir('.')
 
174
        local_result = dir.cloning_metadir()
 
175
        request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
 
176
        request = request_class(backing)
 
177
        expected = SuccessfulSmartServerResponse(
 
178
            (local_result.network_name(),
 
179
            local_result.repository_format.network_name(),
 
180
            ('branch', local_result.get_branch_format().network_name())))
 
181
        self.assertEqual(expected, request.execute('', 'False'))
 
182
 
 
183
    def test_cloning_metadir_reference(self):
 
184
        """The request works when bzrdir contains a branch reference."""
 
185
        backing = self.get_transport()
 
186
        referenced_branch = self.make_branch('referenced')
 
187
        dir = self.make_bzrdir('.')
 
188
        local_result = dir.cloning_metadir()
 
189
        reference = BranchReferenceFormat().initialize(dir, referenced_branch)
 
190
        reference_url = BranchReferenceFormat().get_reference(dir)
 
191
        # The server shouldn't try to follow the branch reference, so it's fine
 
192
        # if the referenced branch isn't reachable.
 
193
        backing.rename('referenced', 'moved')
 
194
        request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
 
195
        request = request_class(backing)
 
196
        expected = SuccessfulSmartServerResponse(
 
197
            (local_result.network_name(),
 
198
            local_result.repository_format.network_name(),
 
199
            ('ref', reference_url)))
 
200
        self.assertEqual(expected, request.execute('', 'False'))
 
201
 
 
202
 
 
203
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
 
204
    """Tests for BzrDir.create_repository."""
 
205
 
 
206
    def test_makes_repository(self):
 
207
        """When there is a bzrdir present, the call succeeds."""
 
208
        backing = self.get_transport()
 
209
        self.make_bzrdir('.')
 
210
        request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
 
211
        request = request_class(backing)
 
212
        reference_bzrdir_format = bzrdir.format_registry.get('default')()
 
213
        reference_format = reference_bzrdir_format.repository_format
 
214
        network_name = reference_format.network_name()
 
215
        expected = SuccessfulSmartServerResponse(
 
216
            ('ok', 'no', 'no', 'no', network_name))
 
217
        self.assertEqual(expected, request.execute('', network_name, 'True'))
 
218
 
 
219
 
 
220
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
 
221
    """Tests for BzrDir.find_repository."""
 
222
 
 
223
    def test_no_repository(self):
 
224
        """When there is no repository to be found, ('norepository', ) is returned."""
 
225
        backing = self.get_transport()
 
226
        request = self._request_class(backing)
 
227
        self.make_bzrdir('.')
 
228
        self.assertEqual(SmartServerResponse(('norepository', )),
 
229
            request.execute(''))
 
230
 
 
231
    def test_nonshared_repository(self):
 
232
        # nonshared repositorys only allow 'find' to return a handle when the
 
233
        # path the repository is being searched on is the same as that that
 
234
        # the repository is at.
 
235
        backing = self.get_transport()
 
236
        request = self._request_class(backing)
 
237
        result = self._make_repository_and_result()
 
238
        self.assertEqual(result, request.execute(''))
 
239
        self.make_bzrdir('subdir')
 
240
        self.assertEqual(SmartServerResponse(('norepository', )),
 
241
            request.execute('subdir'))
 
242
 
 
243
    def _make_repository_and_result(self, shared=False, format=None):
 
244
        """Convenience function to setup a repository.
 
245
 
 
246
        :result: The SmartServerResponse to expect when opening it.
 
247
        """
 
248
        repo = self.make_repository('.', shared=shared, format=format)
 
249
        if repo.supports_rich_root():
 
250
            rich_root = 'yes'
 
251
        else:
 
252
            rich_root = 'no'
 
253
        if repo._format.supports_tree_reference:
 
254
            subtrees = 'yes'
 
255
        else:
 
256
            subtrees = 'no'
 
257
        if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
 
258
            self._request_class):
 
259
            return SuccessfulSmartServerResponse(
 
260
                ('ok', '', rich_root, subtrees, 'no',
 
261
                 repo._format.network_name()))
 
262
        elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
 
263
            self._request_class):
 
264
            # All tests so far are on formats, and for non-external
 
265
            # repositories.
 
266
            return SuccessfulSmartServerResponse(
 
267
                ('ok', '', rich_root, subtrees, 'no'))
 
268
        else:
 
269
            return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
 
270
 
 
271
    def test_shared_repository(self):
 
272
        """When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
 
273
        backing = self.get_transport()
 
274
        request = self._request_class(backing)
 
275
        result = self._make_repository_and_result(shared=True)
 
276
        self.assertEqual(result, request.execute(''))
 
277
        self.make_bzrdir('subdir')
 
278
        result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
 
279
        self.assertEqual(result2,
 
280
            request.execute('subdir'))
 
281
        self.make_bzrdir('subdir/deeper')
 
282
        result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
 
283
        self.assertEqual(result3,
 
284
            request.execute('subdir/deeper'))
 
285
 
 
286
    def test_rich_root_and_subtree_encoding(self):
 
287
        """Test for the format attributes for rich root and subtree support."""
 
288
        backing = self.get_transport()
 
289
        request = self._request_class(backing)
 
290
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
291
        # check the test will be valid
 
292
        self.assertEqual('yes', result.args[2])
 
293
        self.assertEqual('yes', result.args[3])
 
294
        self.assertEqual(result, request.execute(''))
 
295
 
 
296
    def test_supports_external_lookups_no_v2(self):
 
297
        """Test for the supports_external_lookups attribute."""
 
298
        backing = self.get_transport()
 
299
        request = self._request_class(backing)
 
300
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
301
        # check the test will be valid
 
302
        self.assertEqual('no', result.args[4])
 
303
        self.assertEqual(result, request.execute(''))
 
304
 
 
305
 
 
306
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
 
307
 
 
308
    def test_empty_dir(self):
 
309
        """Initializing an empty dir should succeed and do it."""
 
310
        backing = self.get_transport()
 
311
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
312
        self.assertEqual(SmartServerResponse(('ok', )),
 
313
            request.execute(''))
 
314
        made_dir = bzrdir.BzrDir.open_from_transport(backing)
 
315
        # no branch, tree or repository is expected with the current
 
316
        # default formart.
 
317
        self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
 
318
        self.assertRaises(errors.NotBranchError, made_dir.open_branch)
 
319
        self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
 
320
 
 
321
    def test_missing_dir(self):
 
322
        """Initializing a missing directory should fail like the bzrdir api."""
 
323
        backing = self.get_transport()
 
324
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
325
        self.assertRaises(errors.NoSuchFile,
 
326
            request.execute, 'subdir')
 
327
 
 
328
    def test_initialized_dir(self):
 
329
        """Initializing an extant bzrdir should fail like the bzrdir api."""
 
330
        backing = self.get_transport()
 
331
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
332
        self.make_bzrdir('subdir')
 
333
        self.assertRaises(errors.FileExists,
 
334
            request.execute, 'subdir')
 
335
 
 
336
 
 
337
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
 
338
 
 
339
    def test_no_branch(self):
 
340
        """When there is no branch, ('nobranch', ) is returned."""
 
341
        backing = self.get_transport()
 
342
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
343
        self.make_bzrdir('.')
 
344
        self.assertEqual(SmartServerResponse(('nobranch', )),
 
345
            request.execute(''))
 
346
 
 
347
    def test_branch(self):
 
348
        """When there is a branch, 'ok' is returned."""
 
349
        backing = self.get_transport()
 
350
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
351
        self.make_branch('.')
 
352
        self.assertEqual(SmartServerResponse(('ok', '')),
 
353
            request.execute(''))
 
354
 
 
355
    def test_branch_reference(self):
 
356
        """When there is a branch reference, the reference URL is returned."""
 
357
        backing = self.get_transport()
 
358
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
359
        branch = self.make_branch('branch')
 
360
        checkout = branch.create_checkout('reference',lightweight=True)
 
361
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
362
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
363
        self.assertEqual(SmartServerResponse(('ok', reference_url)),
 
364
            request.execute('reference'))
 
365
 
 
366
 
 
367
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
 
368
 
 
369
    def test_no_branch(self):
 
370
        """When there is no branch, ('nobranch', ) is returned."""
 
371
        backing = self.get_transport()
 
372
        self.make_bzrdir('.')
 
373
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
374
        self.assertEqual(SmartServerResponse(('nobranch', )),
 
375
            request.execute(''))
 
376
 
 
377
    def test_branch(self):
 
378
        """When there is a branch, 'ok' is returned."""
 
379
        backing = self.get_transport()
 
380
        expected = self.make_branch('.')._format.network_name()
 
381
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
382
        self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
 
383
            request.execute(''))
 
384
 
 
385
    def test_branch_reference(self):
 
386
        """When there is a branch reference, the reference URL is returned."""
 
387
        backing = self.get_transport()
 
388
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
389
        branch = self.make_branch('branch')
 
390
        checkout = branch.create_checkout('reference',lightweight=True)
 
391
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
392
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
393
        self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
 
394
            request.execute('reference'))
 
395
 
 
396
 
 
397
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
 
398
 
 
399
    def test_empty(self):
 
400
        """For an empty branch, the body is empty."""
 
401
        backing = self.get_transport()
 
402
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
 
403
        self.make_branch('.')
 
404
        self.assertEqual(SmartServerResponse(('ok', ), ''),
 
405
            request.execute(''))
 
406
 
 
407
    def test_not_empty(self):
 
408
        """For a non-empty branch, the body is empty."""
 
409
        backing = self.get_transport()
 
410
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
 
411
        tree = self.make_branch_and_memory_tree('.')
 
412
        tree.lock_write()
 
413
        tree.add('')
 
414
        r1 = tree.commit('1st commit')
 
415
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
416
        tree.unlock()
 
417
        self.assertEqual(
 
418
            SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
 
419
            request.execute(''))
 
420
 
 
421
 
 
422
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
 
423
 
 
424
    def test_no_branch(self):
 
425
        """When there is a bzrdir and no branch, NotBranchError is raised."""
 
426
        backing = self.get_transport()
 
427
        request = smart.branch.SmartServerBranchRequest(backing)
 
428
        self.make_bzrdir('.')
 
429
        self.assertRaises(errors.NotBranchError,
 
430
            request.execute, '')
 
431
 
 
432
    def test_branch_reference(self):
 
433
        """When there is a branch reference, NotBranchError is raised."""
 
434
        backing = self.get_transport()
 
435
        request = smart.branch.SmartServerBranchRequest(backing)
 
436
        branch = self.make_branch('branch')
 
437
        checkout = branch.create_checkout('reference',lightweight=True)
 
438
        self.assertRaises(errors.NotBranchError,
 
439
            request.execute, 'checkout')
 
440
 
 
441
 
 
442
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
 
443
 
 
444
    def test_empty(self):
 
445
        """For an empty branch, the result is ('ok', '0', 'null:')."""
 
446
        backing = self.get_transport()
 
447
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
 
448
        self.make_branch('.')
 
449
        self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
 
450
            request.execute(''))
 
451
 
 
452
    def test_not_empty(self):
 
453
        """For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
 
454
        backing = self.get_transport()
 
455
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
 
456
        tree = self.make_branch_and_memory_tree('.')
 
457
        tree.lock_write()
 
458
        tree.add('')
 
459
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
460
        r1 = tree.commit('1st commit')
 
461
        r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
 
462
        tree.unlock()
 
463
        self.assertEqual(
 
464
            SmartServerResponse(('ok', '2', rev_id_utf8)),
 
465
            request.execute(''))
 
466
 
 
467
 
 
468
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
 
469
 
 
470
    def test_default(self):
 
471
        """With no file, we get empty content."""
 
472
        backing = self.get_transport()
 
473
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
 
474
        branch = self.make_branch('.')
 
475
        # there should be no file by default
 
476
        content = ''
 
477
        self.assertEqual(SmartServerResponse(('ok', ), content),
 
478
            request.execute(''))
 
479
 
 
480
    def test_with_content(self):
 
481
        # SmartServerBranchGetConfigFile should return the content from
 
482
        # branch.control_files.get('branch.conf') for now - in the future it may
 
483
        # perform more complex processing.
 
484
        backing = self.get_transport()
 
485
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
 
486
        branch = self.make_branch('.')
 
487
        branch._transport.put_bytes('branch.conf', 'foo bar baz')
 
488
        self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
 
489
            request.execute(''))
 
490
 
 
491
 
 
492
class SetLastRevisionTestBase(tests.TestCaseWithMemoryTransport):
 
493
    """Base test case for verbs that implement set_last_revision."""
 
494
 
 
495
    def setUp(self):
 
496
        tests.TestCaseWithMemoryTransport.setUp(self)
 
497
        backing_transport = self.get_transport()
 
498
        self.request = self.request_class(backing_transport)
 
499
        self.tree = self.make_branch_and_memory_tree('.')
 
500
 
 
501
    def lock_branch(self):
 
502
        b = self.tree.branch
 
503
        branch_token = b.lock_write()
 
504
        repo_token = b.repository.lock_write()
 
505
        b.repository.unlock()
 
506
        return branch_token, repo_token
 
507
 
 
508
    def unlock_branch(self):
 
509
        self.tree.branch.unlock()
 
510
 
 
511
    def set_last_revision(self, revision_id, revno):
 
512
        branch_token, repo_token = self.lock_branch()
 
513
        response = self._set_last_revision(
 
514
            revision_id, revno, branch_token, repo_token)
 
515
        self.unlock_branch()
 
516
        return response
 
517
 
 
518
    def assertRequestSucceeds(self, revision_id, revno):
 
519
        response = self.set_last_revision(revision_id, revno)
 
520
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
 
521
 
 
522
 
 
523
class TestSetLastRevisionVerbMixin(object):
 
524
    """Mixin test case for verbs that implement set_last_revision."""
 
525
 
 
526
    def test_set_null_to_null(self):
 
527
        """An empty branch can have its last revision set to 'null:'."""
 
528
        self.assertRequestSucceeds('null:', 0)
 
529
 
 
530
    def test_NoSuchRevision(self):
 
531
        """If the revision_id is not present, the verb returns NoSuchRevision.
 
532
        """
 
533
        revision_id = 'non-existent revision'
 
534
        self.assertEqual(
 
535
            FailedSmartServerResponse(('NoSuchRevision', revision_id)),
 
536
            self.set_last_revision(revision_id, 1))
 
537
 
 
538
    def make_tree_with_two_commits(self):
 
539
        self.tree.lock_write()
 
540
        self.tree.add('')
 
541
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
542
        r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
 
543
        r2 = self.tree.commit('2nd commit', rev_id='rev-2')
 
544
        self.tree.unlock()
 
545
 
 
546
    def test_branch_last_revision_info_is_updated(self):
 
547
        """A branch's tip can be set to a revision that is present in its
 
548
        repository.
 
549
        """
 
550
        # Make a branch with an empty revision history, but two revisions in
 
551
        # its repository.
 
552
        self.make_tree_with_two_commits()
 
553
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
554
        self.tree.branch.set_revision_history([])
 
555
        self.assertEqual(
 
556
            (0, 'null:'), self.tree.branch.last_revision_info())
 
557
        # We can update the branch to a revision that is present in the
 
558
        # repository.
 
559
        self.assertRequestSucceeds(rev_id_utf8, 1)
 
560
        self.assertEqual(
 
561
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
562
 
 
563
    def test_branch_last_revision_info_rewind(self):
 
564
        """A branch's tip can be set to a revision that is an ancestor of the
 
565
        current tip.
 
566
        """
 
567
        self.make_tree_with_two_commits()
 
568
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
569
        self.assertEqual(
 
570
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
571
        self.assertRequestSucceeds(rev_id_utf8, 1)
 
572
        self.assertEqual(
 
573
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
574
 
 
575
    def test_TipChangeRejected(self):
 
576
        """If a pre_change_branch_tip hook raises TipChangeRejected, the verb
 
577
        returns TipChangeRejected.
 
578
        """
 
579
        rejection_message = u'rejection message\N{INTERROBANG}'
 
580
        def hook_that_rejects(params):
 
581
            raise errors.TipChangeRejected(rejection_message)
 
582
        Branch.hooks.install_named_hook(
 
583
            'pre_change_branch_tip', hook_that_rejects, None)
 
584
        self.assertEqual(
 
585
            FailedSmartServerResponse(
 
586
                ('TipChangeRejected', rejection_message.encode('utf-8'))),
 
587
            self.set_last_revision('null:', 0))
 
588
 
 
589
 
 
590
class TestSmartServerBranchRequestSetLastRevision(
 
591
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
592
    """Tests for Branch.set_last_revision verb."""
 
593
 
 
594
    request_class = smart.branch.SmartServerBranchRequestSetLastRevision
 
595
 
 
596
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
597
        return self.request.execute(
 
598
            '', branch_token, repo_token, revision_id)
 
599
 
 
600
 
 
601
class TestSmartServerBranchRequestSetLastRevisionInfo(
 
602
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
603
    """Tests for Branch.set_last_revision_info verb."""
 
604
 
 
605
    request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
 
606
 
 
607
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
608
        return self.request.execute(
 
609
            '', branch_token, repo_token, revno, revision_id)
 
610
 
 
611
    def test_NoSuchRevision(self):
 
612
        """Branch.set_last_revision_info does not have to return
 
613
        NoSuchRevision if the revision_id is absent.
 
614
        """
 
615
        raise tests.TestNotApplicable()
 
616
 
 
617
 
 
618
class TestSmartServerBranchRequestSetLastRevisionEx(
 
619
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
620
    """Tests for Branch.set_last_revision_ex verb."""
 
621
 
 
622
    request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
 
623
 
 
624
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
625
        return self.request.execute(
 
626
            '', branch_token, repo_token, revision_id, 0, 0)
 
627
 
 
628
    def assertRequestSucceeds(self, revision_id, revno):
 
629
        response = self.set_last_revision(revision_id, revno)
 
630
        self.assertEqual(
 
631
            SuccessfulSmartServerResponse(('ok', revno, revision_id)),
 
632
            response)
 
633
 
 
634
    def test_branch_last_revision_info_rewind(self):
 
635
        """A branch's tip can be set to a revision that is an ancestor of the
 
636
        current tip, but only if allow_overwrite_descendant is passed.
 
637
        """
 
638
        self.make_tree_with_two_commits()
 
639
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
640
        self.assertEqual(
 
641
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
642
        # If allow_overwrite_descendant flag is 0, then trying to set the tip
 
643
        # to an older revision ID has no effect.
 
644
        branch_token, repo_token = self.lock_branch()
 
645
        response = self.request.execute(
 
646
            '', branch_token, repo_token, rev_id_utf8, 0, 0)
 
647
        self.assertEqual(
 
648
            SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
 
649
            response)
 
650
        self.assertEqual(
 
651
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
652
 
 
653
        # If allow_overwrite_descendant flag is 1, then setting the tip to an
 
654
        # ancestor works.
 
655
        response = self.request.execute(
 
656
            '', branch_token, repo_token, rev_id_utf8, 0, 1)
 
657
        self.assertEqual(
 
658
            SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
 
659
            response)
 
660
        self.unlock_branch()
 
661
        self.assertEqual(
 
662
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
663
 
 
664
    def make_branch_with_divergent_history(self):
 
665
        """Make a branch with divergent history in its repo.
 
666
 
 
667
        The branch's tip will be 'child-2', and the repo will also contain
 
668
        'child-1', which diverges from a common base revision.
 
669
        """
 
670
        self.tree.lock_write()
 
671
        self.tree.add('')
 
672
        r1 = self.tree.commit('1st commit')
 
673
        revno_1, revid_1 = self.tree.branch.last_revision_info()
 
674
        r2 = self.tree.commit('2nd commit', rev_id='child-1')
 
675
        # Undo the second commit
 
676
        self.tree.branch.set_last_revision_info(revno_1, revid_1)
 
677
        self.tree.set_parent_ids([revid_1])
 
678
        # Make a new second commit, child-2.  child-2 has diverged from
 
679
        # child-1.
 
680
        new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
 
681
        self.tree.unlock()
 
682
 
 
683
    def test_not_allow_diverged(self):
 
684
        """If allow_diverged is not passed, then setting a divergent history
 
685
        returns a Diverged error.
 
686
        """
 
687
        self.make_branch_with_divergent_history()
 
688
        self.assertEqual(
 
689
            FailedSmartServerResponse(('Diverged',)),
 
690
            self.set_last_revision('child-1', 2))
 
691
        # The branch tip was not changed.
 
692
        self.assertEqual('child-2', self.tree.branch.last_revision())
 
693
 
 
694
    def test_allow_diverged(self):
 
695
        """If allow_diverged is passed, then setting a divergent history
 
696
        succeeds.
 
697
        """
 
698
        self.make_branch_with_divergent_history()
 
699
        branch_token, repo_token = self.lock_branch()
 
700
        response = self.request.execute(
 
701
            '', branch_token, repo_token, 'child-1', 1, 0)
 
702
        self.assertEqual(
 
703
            SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
 
704
            response)
 
705
        self.unlock_branch()
 
706
        # The branch tip was changed.
 
707
        self.assertEqual('child-1', self.tree.branch.last_revision())
 
708
 
 
709
 
 
710
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
 
711
 
 
712
    def test_get_parent_none(self):
 
713
        base_branch = self.make_branch('base')
 
714
        request = smart.branch.SmartServerBranchGetParent(self.get_transport())
 
715
        response = request.execute('base')
 
716
        self.assertEquals(
 
717
            SuccessfulSmartServerResponse(('',)), response)
 
718
 
 
719
    def test_get_parent_something(self):
 
720
        base_branch = self.make_branch('base')
 
721
        base_branch.set_parent(self.get_url('foo'))
 
722
        request = smart.branch.SmartServerBranchGetParent(self.get_transport())
 
723
        response = request.execute('base')
 
724
        self.assertEquals(
 
725
            SuccessfulSmartServerResponse(("../foo",)),
 
726
            response)
 
727
 
 
728
 
 
729
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
 
730
# Only called when the branch format and tags match [yay factory
 
731
# methods] so only need to test straight forward cases.
 
732
 
 
733
    def test_get_bytes(self):
 
734
        base_branch = self.make_branch('base')
 
735
        request = smart.branch.SmartServerBranchGetTagsBytes(
 
736
            self.get_transport())
 
737
        response = request.execute('base')
 
738
        self.assertEquals(
 
739
            SuccessfulSmartServerResponse(('',)), response)
 
740
 
 
741
 
 
742
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
 
743
 
 
744
    def test_get_stacked_on_url(self):
 
745
        base_branch = self.make_branch('base', format='1.6')
 
746
        stacked_branch = self.make_branch('stacked', format='1.6')
 
747
        # typically should be relative
 
748
        stacked_branch.set_stacked_on_url('../base')
 
749
        request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
 
750
            self.get_transport())
 
751
        response = request.execute('stacked')
 
752
        self.assertEquals(
 
753
            SmartServerResponse(('ok', '../base')),
 
754
            response)
 
755
 
 
756
 
 
757
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
 
758
 
 
759
    def setUp(self):
 
760
        tests.TestCaseWithMemoryTransport.setUp(self)
 
761
 
 
762
    def test_lock_write_on_unlocked_branch(self):
 
763
        backing = self.get_transport()
 
764
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
765
        branch = self.make_branch('.', format='knit')
 
766
        repository = branch.repository
 
767
        response = request.execute('')
 
768
        branch_nonce = branch.control_files._lock.peek().get('nonce')
 
769
        repository_nonce = repository.control_files._lock.peek().get('nonce')
 
770
        self.assertEqual(
 
771
            SmartServerResponse(('ok', branch_nonce, repository_nonce)),
 
772
            response)
 
773
        # The branch (and associated repository) is now locked.  Verify that
 
774
        # with a new branch object.
 
775
        new_branch = repository.bzrdir.open_branch()
 
776
        self.assertRaises(errors.LockContention, new_branch.lock_write)
 
777
 
 
778
    def test_lock_write_on_locked_branch(self):
 
779
        backing = self.get_transport()
 
780
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
781
        branch = self.make_branch('.')
 
782
        branch.lock_write()
 
783
        branch.leave_lock_in_place()
 
784
        branch.unlock()
 
785
        response = request.execute('')
 
786
        self.assertEqual(
 
787
            SmartServerResponse(('LockContention',)), response)
 
788
 
 
789
    def test_lock_write_with_tokens_on_locked_branch(self):
 
790
        backing = self.get_transport()
 
791
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
792
        branch = self.make_branch('.', format='knit')
 
793
        branch_token = branch.lock_write()
 
794
        repo_token = branch.repository.lock_write()
 
795
        branch.repository.unlock()
 
796
        branch.leave_lock_in_place()
 
797
        branch.repository.leave_lock_in_place()
 
798
        branch.unlock()
 
799
        response = request.execute('',
 
800
                                   branch_token, repo_token)
 
801
        self.assertEqual(
 
802
            SmartServerResponse(('ok', branch_token, repo_token)), response)
 
803
 
 
804
    def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
 
805
        backing = self.get_transport()
 
806
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
807
        branch = self.make_branch('.', format='knit')
 
808
        branch_token = branch.lock_write()
 
809
        repo_token = branch.repository.lock_write()
 
810
        branch.repository.unlock()
 
811
        branch.leave_lock_in_place()
 
812
        branch.repository.leave_lock_in_place()
 
813
        branch.unlock()
 
814
        response = request.execute('',
 
815
                                   branch_token+'xxx', repo_token)
 
816
        self.assertEqual(
 
817
            SmartServerResponse(('TokenMismatch',)), response)
 
818
 
 
819
    def test_lock_write_on_locked_repo(self):
 
820
        backing = self.get_transport()
 
821
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
822
        branch = self.make_branch('.', format='knit')
 
823
        branch.repository.lock_write()
 
824
        branch.repository.leave_lock_in_place()
 
825
        branch.repository.unlock()
 
826
        response = request.execute('')
 
827
        self.assertEqual(
 
828
            SmartServerResponse(('LockContention',)), response)
 
829
 
 
830
    def test_lock_write_on_readonly_transport(self):
 
831
        backing = self.get_readonly_transport()
 
832
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
833
        branch = self.make_branch('.')
 
834
        root = self.get_transport().clone('/')
 
835
        path = urlutils.relative_url(root.base, self.get_transport().base)
 
836
        response = request.execute(path)
 
837
        error_name, lock_str, why_str = response.args
 
838
        self.assertFalse(response.is_successful())
 
839
        self.assertEqual('LockFailed', error_name)
 
840
 
 
841
 
 
842
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
 
843
 
 
844
    def setUp(self):
 
845
        tests.TestCaseWithMemoryTransport.setUp(self)
 
846
 
 
847
    def test_unlock_on_locked_branch_and_repo(self):
 
848
        backing = self.get_transport()
 
849
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
850
        branch = self.make_branch('.', format='knit')
 
851
        # Lock the branch
 
852
        branch_token = branch.lock_write()
 
853
        repo_token = branch.repository.lock_write()
 
854
        branch.repository.unlock()
 
855
        # Unlock the branch (and repo) object, leaving the physical locks
 
856
        # in place.
 
857
        branch.leave_lock_in_place()
 
858
        branch.repository.leave_lock_in_place()
 
859
        branch.unlock()
 
860
        response = request.execute('',
 
861
                                   branch_token, repo_token)
 
862
        self.assertEqual(
 
863
            SmartServerResponse(('ok',)), response)
 
864
        # The branch is now unlocked.  Verify that with a new branch
 
865
        # object.
 
866
        new_branch = branch.bzrdir.open_branch()
 
867
        new_branch.lock_write()
 
868
        new_branch.unlock()
 
869
 
 
870
    def test_unlock_on_unlocked_branch_unlocked_repo(self):
 
871
        backing = self.get_transport()
 
872
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
873
        branch = self.make_branch('.', format='knit')
 
874
        response = request.execute(
 
875
            '', 'branch token', 'repo token')
 
876
        self.assertEqual(
 
877
            SmartServerResponse(('TokenMismatch',)), response)
 
878
 
 
879
    def test_unlock_on_unlocked_branch_locked_repo(self):
 
880
        backing = self.get_transport()
 
881
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
882
        branch = self.make_branch('.', format='knit')
 
883
        # Lock the repository.
 
884
        repo_token = branch.repository.lock_write()
 
885
        branch.repository.leave_lock_in_place()
 
886
        branch.repository.unlock()
 
887
        # Issue branch lock_write request on the unlocked branch (with locked
 
888
        # repo).
 
889
        response = request.execute(
 
890
            '', 'branch token', repo_token)
 
891
        self.assertEqual(
 
892
            SmartServerResponse(('TokenMismatch',)), response)
 
893
 
 
894
 
 
895
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
 
896
 
 
897
    def test_no_repository(self):
 
898
        """Raise NoRepositoryPresent when there is a bzrdir and no repo."""
 
899
        # we test this using a shared repository above the named path,
 
900
        # thus checking the right search logic is used - that is, that
 
901
        # its the exact path being looked at and the server is not
 
902
        # searching.
 
903
        backing = self.get_transport()
 
904
        request = smart.repository.SmartServerRepositoryRequest(backing)
 
905
        self.make_repository('.', shared=True)
 
906
        self.make_bzrdir('subdir')
 
907
        self.assertRaises(errors.NoRepositoryPresent,
 
908
            request.execute, 'subdir')
 
909
 
 
910
 
 
911
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
 
912
 
 
913
    def test_trivial_bzipped(self):
 
914
        # This tests that the wire encoding is actually bzipped
 
915
        backing = self.get_transport()
 
916
        request = smart.repository.SmartServerRepositoryGetParentMap(backing)
 
917
        tree = self.make_branch_and_memory_tree('.')
 
918
 
 
919
        self.assertEqual(None,
 
920
            request.execute('', 'missing-id'))
 
921
        # Note that it returns a body (of '' bzipped).
 
922
        self.assertEqual(
 
923
            SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
 
924
            request.do_body('\n\n0\n'))
 
925
 
 
926
 
 
927
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
 
928
 
 
929
    def test_none_argument(self):
 
930
        backing = self.get_transport()
 
931
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
932
        tree = self.make_branch_and_memory_tree('.')
 
933
        tree.lock_write()
 
934
        tree.add('')
 
935
        r1 = tree.commit('1st commit')
 
936
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
937
        tree.unlock()
 
938
 
 
939
        # the lines of revision_id->revision_parent_list has no guaranteed
 
940
        # order coming out of a dict, so sort both our test and response
 
941
        lines = sorted([' '.join([r2, r1]), r1])
 
942
        response = request.execute('', '')
 
943
        response.body = '\n'.join(sorted(response.body.split('\n')))
 
944
 
 
945
        self.assertEqual(
 
946
            SmartServerResponse(('ok', ), '\n'.join(lines)), response)
 
947
 
 
948
    def test_specific_revision_argument(self):
 
949
        backing = self.get_transport()
 
950
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
951
        tree = self.make_branch_and_memory_tree('.')
 
952
        tree.lock_write()
 
953
        tree.add('')
 
954
        rev_id_utf8 = u'\xc9'.encode('utf-8')
 
955
        r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
 
956
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
957
        tree.unlock()
 
958
 
 
959
        self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
 
960
            request.execute('', rev_id_utf8))
 
961
 
 
962
    def test_no_such_revision(self):
 
963
        backing = self.get_transport()
 
964
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
965
        tree = self.make_branch_and_memory_tree('.')
 
966
        tree.lock_write()
 
967
        tree.add('')
 
968
        r1 = tree.commit('1st commit')
 
969
        tree.unlock()
 
970
 
 
971
        # Note that it still returns body (of zero bytes).
 
972
        self.assertEqual(
 
973
            SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
 
974
            request.execute('', 'missingrevision'))
 
975
 
 
976
 
 
977
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
 
978
 
 
979
    def make_two_commit_repo(self):
 
980
        tree = self.make_branch_and_memory_tree('.')
 
981
        tree.lock_write()
 
982
        tree.add('')
 
983
        r1 = tree.commit('1st commit')
 
984
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
985
        tree.unlock()
 
986
        repo = tree.branch.repository
 
987
        return repo, r1, r2
 
988
 
 
989
    def test_ancestry_of(self):
 
990
        """The search argument may be a 'ancestry-of' some heads'."""
 
991
        backing = self.get_transport()
 
992
        request = smart.repository.SmartServerRepositoryGetStream(backing)
 
993
        repo, r1, r2 = self.make_two_commit_repo()
 
994
        fetch_spec = ['ancestry-of', r2]
 
995
        lines = '\n'.join(fetch_spec)
 
996
        request.execute('', repo._format.network_name())
 
997
        response = request.do_body(lines)
 
998
        self.assertEqual(('ok',), response.args)
 
999
        stream_bytes = ''.join(response.body_stream)
 
1000
        self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
 
1001
 
 
1002
    def test_search(self):
 
1003
        """The search argument may be a 'search' of some explicit keys."""
 
1004
        backing = self.get_transport()
 
1005
        request = smart.repository.SmartServerRepositoryGetStream(backing)
 
1006
        repo, r1, r2 = self.make_two_commit_repo()
 
1007
        fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
 
1008
        lines = '\n'.join(fetch_spec)
 
1009
        request.execute('', repo._format.network_name())
 
1010
        response = request.do_body(lines)
 
1011
        self.assertEqual(('ok',), response.args)
 
1012
        stream_bytes = ''.join(response.body_stream)
 
1013
        self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
 
1014
 
 
1015
 
 
1016
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
 
1017
 
 
1018
    def test_missing_revision(self):
 
1019
        """For a missing revision, ('no', ) is returned."""
 
1020
        backing = self.get_transport()
 
1021
        request = smart.repository.SmartServerRequestHasRevision(backing)
 
1022
        self.make_repository('.')
 
1023
        self.assertEqual(SmartServerResponse(('no', )),
 
1024
            request.execute('', 'revid'))
 
1025
 
 
1026
    def test_present_revision(self):
 
1027
        """For a present revision, ('yes', ) is returned."""
 
1028
        backing = self.get_transport()
 
1029
        request = smart.repository.SmartServerRequestHasRevision(backing)
 
1030
        tree = self.make_branch_and_memory_tree('.')
 
1031
        tree.lock_write()
 
1032
        tree.add('')
 
1033
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
1034
        r1 = tree.commit('a commit', rev_id=rev_id_utf8)
 
1035
        tree.unlock()
 
1036
        self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
 
1037
        self.assertEqual(SmartServerResponse(('yes', )),
 
1038
            request.execute('', rev_id_utf8))
 
1039
 
 
1040
 
 
1041
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
 
1042
 
 
1043
    def test_empty_revid(self):
 
1044
        """With an empty revid, we get only size an number and revisions"""
 
1045
        backing = self.get_transport()
 
1046
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
1047
        repository = self.make_repository('.')
 
1048
        stats = repository.gather_stats()
 
1049
        expected_body = 'revisions: 0\n'
 
1050
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1051
                         request.execute('', '', 'no'))
 
1052
 
 
1053
    def test_revid_with_committers(self):
 
1054
        """For a revid we get more infos."""
 
1055
        backing = self.get_transport()
 
1056
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
1057
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
1058
        tree = self.make_branch_and_memory_tree('.')
 
1059
        tree.lock_write()
 
1060
        tree.add('')
 
1061
        # Let's build a predictable result
 
1062
        tree.commit('a commit', timestamp=123456.2, timezone=3600)
 
1063
        tree.commit('a commit', timestamp=654321.4, timezone=0,
 
1064
                    rev_id=rev_id_utf8)
 
1065
        tree.unlock()
 
1066
 
 
1067
        stats = tree.branch.repository.gather_stats()
 
1068
        expected_body = ('firstrev: 123456.200 3600\n'
 
1069
                         'latestrev: 654321.400 0\n'
 
1070
                         'revisions: 2\n')
 
1071
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1072
                         request.execute('',
 
1073
                                         rev_id_utf8, 'no'))
 
1074
 
 
1075
    def test_not_empty_repository_with_committers(self):
 
1076
        """For a revid and requesting committers we get the whole thing."""
 
1077
        backing = self.get_transport()
 
1078
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
1079
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
1080
        tree = self.make_branch_and_memory_tree('.')
 
1081
        tree.lock_write()
 
1082
        tree.add('')
 
1083
        # Let's build a predictable result
 
1084
        tree.commit('a commit', timestamp=123456.2, timezone=3600,
 
1085
                    committer='foo')
 
1086
        tree.commit('a commit', timestamp=654321.4, timezone=0,
 
1087
                    committer='bar', rev_id=rev_id_utf8)
 
1088
        tree.unlock()
 
1089
        stats = tree.branch.repository.gather_stats()
 
1090
 
 
1091
        expected_body = ('committers: 2\n'
 
1092
                         'firstrev: 123456.200 3600\n'
 
1093
                         'latestrev: 654321.400 0\n'
 
1094
                         'revisions: 2\n')
 
1095
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1096
                         request.execute('',
 
1097
                                         rev_id_utf8, 'yes'))
 
1098
 
 
1099
 
 
1100
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
 
1101
 
 
1102
    def test_is_shared(self):
 
1103
        """For a shared repository, ('yes', ) is returned."""
 
1104
        backing = self.get_transport()
 
1105
        request = smart.repository.SmartServerRepositoryIsShared(backing)
 
1106
        self.make_repository('.', shared=True)
 
1107
        self.assertEqual(SmartServerResponse(('yes', )),
 
1108
            request.execute('', ))
 
1109
 
 
1110
    def test_is_not_shared(self):
 
1111
        """For a shared repository, ('no', ) is returned."""
 
1112
        backing = self.get_transport()
 
1113
        request = smart.repository.SmartServerRepositoryIsShared(backing)
 
1114
        self.make_repository('.', shared=False)
 
1115
        self.assertEqual(SmartServerResponse(('no', )),
 
1116
            request.execute('', ))
 
1117
 
 
1118
 
 
1119
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
 
1120
 
 
1121
    def setUp(self):
 
1122
        tests.TestCaseWithMemoryTransport.setUp(self)
 
1123
 
 
1124
    def test_lock_write_on_unlocked_repo(self):
 
1125
        backing = self.get_transport()
 
1126
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1127
        repository = self.make_repository('.', format='knit')
 
1128
        response = request.execute('')
 
1129
        nonce = repository.control_files._lock.peek().get('nonce')
 
1130
        self.assertEqual(SmartServerResponse(('ok', nonce)), response)
 
1131
        # The repository is now locked.  Verify that with a new repository
 
1132
        # object.
 
1133
        new_repo = repository.bzrdir.open_repository()
 
1134
        self.assertRaises(errors.LockContention, new_repo.lock_write)
 
1135
 
 
1136
    def test_lock_write_on_locked_repo(self):
 
1137
        backing = self.get_transport()
 
1138
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1139
        repository = self.make_repository('.', format='knit')
 
1140
        repository.lock_write()
 
1141
        repository.leave_lock_in_place()
 
1142
        repository.unlock()
 
1143
        response = request.execute('')
 
1144
        self.assertEqual(
 
1145
            SmartServerResponse(('LockContention',)), response)
 
1146
 
 
1147
    def test_lock_write_on_readonly_transport(self):
 
1148
        backing = self.get_readonly_transport()
 
1149
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1150
        repository = self.make_repository('.', format='knit')
 
1151
        response = request.execute('')
 
1152
        self.assertFalse(response.is_successful())
 
1153
        self.assertEqual('LockFailed', response.args[0])
 
1154
 
 
1155
 
 
1156
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
 
1157
 
 
1158
    def setUp(self):
 
1159
        tests.TestCaseWithMemoryTransport.setUp(self)
 
1160
 
 
1161
    def test_unlock_on_locked_repo(self):
 
1162
        backing = self.get_transport()
 
1163
        request = smart.repository.SmartServerRepositoryUnlock(backing)
 
1164
        repository = self.make_repository('.', format='knit')
 
1165
        token = repository.lock_write()
 
1166
        repository.leave_lock_in_place()
 
1167
        repository.unlock()
 
1168
        response = request.execute('', token)
 
1169
        self.assertEqual(
 
1170
            SmartServerResponse(('ok',)), response)
 
1171
        # The repository is now unlocked.  Verify that with a new repository
 
1172
        # object.
 
1173
        new_repo = repository.bzrdir.open_repository()
 
1174
        new_repo.lock_write()
 
1175
        new_repo.unlock()
 
1176
 
 
1177
    def test_unlock_on_unlocked_repo(self):
 
1178
        backing = self.get_transport()
 
1179
        request = smart.repository.SmartServerRepositoryUnlock(backing)
 
1180
        repository = self.make_repository('.', format='knit')
 
1181
        response = request.execute('', 'some token')
 
1182
        self.assertEqual(
 
1183
            SmartServerResponse(('TokenMismatch',)), response)
 
1184
 
 
1185
 
 
1186
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
 
1187
 
 
1188
    def test_is_readonly_no(self):
 
1189
        backing = self.get_transport()
 
1190
        request = smart.request.SmartServerIsReadonly(backing)
 
1191
        response = request.execute()
 
1192
        self.assertEqual(
 
1193
            SmartServerResponse(('no',)), response)
 
1194
 
 
1195
    def test_is_readonly_yes(self):
 
1196
        backing = self.get_readonly_transport()
 
1197
        request = smart.request.SmartServerIsReadonly(backing)
 
1198
        response = request.execute()
 
1199
        self.assertEqual(
 
1200
            SmartServerResponse(('yes',)), response)
 
1201
 
 
1202
 
 
1203
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
 
1204
 
 
1205
    def test_set_false(self):
 
1206
        backing = self.get_transport()
 
1207
        repo = self.make_repository('.', shared=True)
 
1208
        repo.set_make_working_trees(True)
 
1209
        request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
 
1210
        request = request_class(backing)
 
1211
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
 
1212
            request.execute('', 'False'))
 
1213
        repo = repo.bzrdir.open_repository()
 
1214
        self.assertFalse(repo.make_working_trees())
 
1215
 
 
1216
    def test_set_true(self):
 
1217
        backing = self.get_transport()
 
1218
        repo = self.make_repository('.', shared=True)
 
1219
        repo.set_make_working_trees(False)
 
1220
        request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
 
1221
        request = request_class(backing)
 
1222
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
 
1223
            request.execute('', 'True'))
 
1224
        repo = repo.bzrdir.open_repository()
 
1225
        self.assertTrue(repo.make_working_trees())
 
1226
 
 
1227
 
 
1228
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
 
1229
 
 
1230
    def make_repo_needing_autopacking(self, path='.'):
 
1231
        # Make a repo in need of autopacking.
 
1232
        tree = self.make_branch_and_tree('.', format='pack-0.92')
 
1233
        repo = tree.branch.repository
 
1234
        # monkey-patch the pack collection to disable autopacking
 
1235
        repo._pack_collection._max_pack_count = lambda count: count
 
1236
        for x in range(10):
 
1237
            tree.commit('commit %s' % x)
 
1238
        self.assertEqual(10, len(repo._pack_collection.names()))
 
1239
        del repo._pack_collection._max_pack_count
 
1240
        return repo
 
1241
 
 
1242
    def test_autopack_needed(self):
 
1243
        repo = self.make_repo_needing_autopacking()
 
1244
        backing = self.get_transport()
 
1245
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1246
            backing)
 
1247
        response = request.execute('')
 
1248
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1249
        repo._pack_collection.reload_pack_names()
 
1250
        self.assertEqual(1, len(repo._pack_collection.names()))
 
1251
 
 
1252
    def test_autopack_not_needed(self):
 
1253
        tree = self.make_branch_and_tree('.', format='pack-0.92')
 
1254
        repo = tree.branch.repository
 
1255
        for x in range(9):
 
1256
            tree.commit('commit %s' % x)
 
1257
        backing = self.get_transport()
 
1258
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1259
            backing)
 
1260
        response = request.execute('')
 
1261
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1262
        repo._pack_collection.reload_pack_names()
 
1263
        self.assertEqual(9, len(repo._pack_collection.names()))
 
1264
 
 
1265
    def test_autopack_on_nonpack_format(self):
 
1266
        """A request to autopack a non-pack repo is a no-op."""
 
1267
        repo = self.make_repository('.', format='knit')
 
1268
        backing = self.get_transport()
 
1269
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1270
            backing)
 
1271
        response = request.execute('')
 
1272
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1273
 
 
1274
 
 
1275
class TestHandlers(tests.TestCase):
 
1276
    """Tests for the request.request_handlers object."""
 
1277
 
 
1278
    def test_all_registrations_exist(self):
 
1279
        """All registered request_handlers can be found."""
 
1280
        # If there's a typo in a register_lazy call, this loop will fail with
 
1281
        # an AttributeError.
 
1282
        for key, item in smart.request.request_handlers.iteritems():
 
1283
            pass
 
1284
 
 
1285
    def test_registered_methods(self):
 
1286
        """Test that known methods are registered to the correct object."""
 
1287
        self.assertEqual(
 
1288
            smart.request.request_handlers.get('Branch.get_config_file'),
 
1289
            smart.branch.SmartServerBranchGetConfigFile)
 
1290
        self.assertEqual(
 
1291
            smart.request.request_handlers.get('Branch.get_parent'),
 
1292
            smart.branch.SmartServerBranchGetParent)
 
1293
        self.assertEqual(
 
1294
            smart.request.request_handlers.get('Branch.get_tags_bytes'),
 
1295
            smart.branch.SmartServerBranchGetTagsBytes)
 
1296
        self.assertEqual(
 
1297
            smart.request.request_handlers.get('Branch.lock_write'),
 
1298
            smart.branch.SmartServerBranchRequestLockWrite)
 
1299
        self.assertEqual(
 
1300
            smart.request.request_handlers.get('Branch.last_revision_info'),
 
1301
            smart.branch.SmartServerBranchRequestLastRevisionInfo)
 
1302
        self.assertEqual(
 
1303
            smart.request.request_handlers.get('Branch.revision_history'),
 
1304
            smart.branch.SmartServerRequestRevisionHistory)
 
1305
        self.assertEqual(
 
1306
            smart.request.request_handlers.get('Branch.set_last_revision'),
 
1307
            smart.branch.SmartServerBranchRequestSetLastRevision)
 
1308
        self.assertEqual(
 
1309
            smart.request.request_handlers.get('Branch.set_last_revision_info'),
 
1310
            smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
 
1311
        self.assertEqual(
 
1312
            smart.request.request_handlers.get('Branch.unlock'),
 
1313
            smart.branch.SmartServerBranchRequestUnlock)
 
1314
        self.assertEqual(
 
1315
            smart.request.request_handlers.get('BzrDir.find_repository'),
 
1316
            smart.bzrdir.SmartServerRequestFindRepositoryV1)
 
1317
        self.assertEqual(
 
1318
            smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
 
1319
            smart.bzrdir.SmartServerRequestFindRepositoryV2)
 
1320
        self.assertEqual(
 
1321
            smart.request.request_handlers.get('BzrDirFormat.initialize'),
 
1322
            smart.bzrdir.SmartServerRequestInitializeBzrDir)
 
1323
        self.assertEqual(
 
1324
            smart.request.request_handlers.get('BzrDir.cloning_metadir'),
 
1325
            smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
 
1326
        self.assertEqual(
 
1327
            smart.request.request_handlers.get('BzrDir.open_branch'),
 
1328
            smart.bzrdir.SmartServerRequestOpenBranch)
 
1329
        self.assertEqual(
 
1330
            smart.request.request_handlers.get('BzrDir.open_branchV2'),
 
1331
            smart.bzrdir.SmartServerRequestOpenBranchV2)
 
1332
        self.assertEqual(
 
1333
            smart.request.request_handlers.get('PackRepository.autopack'),
 
1334
            smart.packrepository.SmartServerPackRepositoryAutopack)
 
1335
        self.assertEqual(
 
1336
            smart.request.request_handlers.get('Repository.gather_stats'),
 
1337
            smart.repository.SmartServerRepositoryGatherStats)
 
1338
        self.assertEqual(
 
1339
            smart.request.request_handlers.get('Repository.get_parent_map'),
 
1340
            smart.repository.SmartServerRepositoryGetParentMap)
 
1341
        self.assertEqual(
 
1342
            smart.request.request_handlers.get(
 
1343
                'Repository.get_revision_graph'),
 
1344
            smart.repository.SmartServerRepositoryGetRevisionGraph)
 
1345
        self.assertEqual(
 
1346
            smart.request.request_handlers.get('Repository.has_revision'),
 
1347
            smart.repository.SmartServerRequestHasRevision)
 
1348
        self.assertEqual(
 
1349
            smart.request.request_handlers.get('Repository.is_shared'),
 
1350
            smart.repository.SmartServerRepositoryIsShared)
 
1351
        self.assertEqual(
 
1352
            smart.request.request_handlers.get('Repository.lock_write'),
 
1353
            smart.repository.SmartServerRepositoryLockWrite)
 
1354
        self.assertEqual(
 
1355
            smart.request.request_handlers.get('Repository.get_stream'),
 
1356
            smart.repository.SmartServerRepositoryGetStream)
 
1357
        self.assertEqual(
 
1358
            smart.request.request_handlers.get('Repository.tarball'),
 
1359
            smart.repository.SmartServerRepositoryTarball)
 
1360
        self.assertEqual(
 
1361
            smart.request.request_handlers.get('Repository.unlock'),
 
1362
            smart.repository.SmartServerRepositoryUnlock)
 
1363
        self.assertEqual(
 
1364
            smart.request.request_handlers.get('Transport.is_readonly'),
 
1365
            smart.request.SmartServerIsReadonly)