/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smart.py

  • Committer: Jelmer Vernooij
  • Date: 2009-03-12 14:02:53 UTC
  • mfrom: (4135 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4137.
  • Revision ID: jelmer@samba.org-20090312140253-bmldbzlmsitfdrzf
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the smart wire/domain protocol.
 
18
 
 
19
This module contains tests for the domain-level smart requests and responses,
 
20
such as the 'Branch.lock_write' request. Many of these use specific disk
 
21
formats to exercise calls that only make sense for formats with specific
 
22
properties.
 
23
 
 
24
Tests for low-level protocol encoding are found in test_smart_transport.
 
25
"""
 
26
 
 
27
import bz2
 
28
from cStringIO import StringIO
 
29
import tarfile
 
30
 
 
31
from bzrlib import (
 
32
    bzrdir,
 
33
    errors,
 
34
    pack,
 
35
    smart,
 
36
    tests,
 
37
    urlutils,
 
38
    )
 
39
from bzrlib.branch import Branch, BranchReferenceFormat
 
40
import bzrlib.smart.branch
 
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
 
42
import bzrlib.smart.packrepository
 
43
import bzrlib.smart.repository
 
44
from bzrlib.smart.request import (
 
45
    FailedSmartServerResponse,
 
46
    SmartServerRequest,
 
47
    SmartServerResponse,
 
48
    SuccessfulSmartServerResponse,
 
49
    )
 
50
from bzrlib.tests import (
 
51
    split_suite_by_re,
 
52
    )
 
53
from bzrlib.transport import chroot, get_transport
 
54
from bzrlib.util import bencode
 
55
 
 
56
 
 
57
def load_tests(standard_tests, module, loader):
 
58
    """Multiply tests version and protocol consistency."""
 
59
    # FindRepository tests.
 
60
    bzrdir_mod = bzrlib.smart.bzrdir
 
61
    scenarios = [
 
62
        ("find_repository", {
 
63
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
 
64
        ("find_repositoryV2", {
 
65
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
 
66
        ("find_repositoryV3", {
 
67
            "_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
 
68
        ]
 
69
    to_adapt, result = split_suite_by_re(standard_tests,
 
70
        "TestSmartServerRequestFindRepository")
 
71
    v2_only, v1_and_2 = split_suite_by_re(to_adapt,
 
72
        "_v2")
 
73
    tests.multiply_tests(v1_and_2, scenarios, result)
 
74
    # The first scenario is only applicable to v1 protocols, it is deleted
 
75
    # since.
 
76
    tests.multiply_tests(v2_only, scenarios[1:], result)
 
77
    return result
 
78
 
 
79
 
 
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
 
81
 
 
82
    def setUp(self):
 
83
        tests.TestCaseWithTransport.setUp(self)
 
84
        self._chroot_server = None
 
85
 
 
86
    def get_transport(self, relpath=None):
 
87
        if self._chroot_server is None:
 
88
            backing_transport = tests.TestCaseWithTransport.get_transport(self)
 
89
            self._chroot_server = chroot.ChrootServer(backing_transport)
 
90
            self._chroot_server.setUp()
 
91
            self.addCleanup(self._chroot_server.tearDown)
 
92
        t = get_transport(self._chroot_server.get_url())
 
93
        if relpath is not None:
 
94
            t = t.clone(relpath)
 
95
        return t
 
96
 
 
97
 
 
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
 
99
 
 
100
    def setUp(self):
 
101
        super(TestCaseWithSmartMedium, self).setUp()
 
102
        # We're allowed to set  the transport class here, so that we don't use
 
103
        # the default or a parameterized class, but rather use the
 
104
        # TestCaseWithTransport infrastructure to set up a smart server and
 
105
        # transport.
 
106
        self.transport_server = self.make_transport_server
 
107
 
 
108
    def make_transport_server(self):
 
109
        return smart.server.SmartTCPServer_for_testing('-' + self.id())
 
110
 
 
111
    def get_smart_medium(self):
 
112
        """Get a smart medium to use in tests."""
 
113
        return self.get_transport().get_smart_medium()
 
114
 
 
115
 
 
116
class TestSmartServerResponse(tests.TestCase):
 
117
 
 
118
    def test__eq__(self):
 
119
        self.assertEqual(SmartServerResponse(('ok', )),
 
120
            SmartServerResponse(('ok', )))
 
121
        self.assertEqual(SmartServerResponse(('ok', ), 'body'),
 
122
            SmartServerResponse(('ok', ), 'body'))
 
123
        self.assertNotEqual(SmartServerResponse(('ok', )),
 
124
            SmartServerResponse(('notok', )))
 
125
        self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
 
126
            SmartServerResponse(('ok', )))
 
127
        self.assertNotEqual(None,
 
128
            SmartServerResponse(('ok', )))
 
129
 
 
130
    def test__str__(self):
 
131
        """SmartServerResponses can be stringified."""
 
132
        self.assertEqual(
 
133
            "<SuccessfulSmartServerResponse args=('args',) body='body'>",
 
134
            str(SuccessfulSmartServerResponse(('args',), 'body')))
 
135
        self.assertEqual(
 
136
            "<FailedSmartServerResponse args=('args',) body='body'>",
 
137
            str(FailedSmartServerResponse(('args',), 'body')))
 
138
 
 
139
 
 
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
 
141
 
 
142
    def test_translate_client_path(self):
 
143
        transport = self.get_transport()
 
144
        request = SmartServerRequest(transport, 'foo/')
 
145
        self.assertEqual('./', request.translate_client_path('foo/'))
 
146
        self.assertRaises(
 
147
            errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
 
148
        self.assertRaises(
 
149
            errors.PathNotChild, request.translate_client_path, '/')
 
150
        self.assertRaises(
 
151
            errors.PathNotChild, request.translate_client_path, 'bar/')
 
152
        self.assertEqual('./baz', request.translate_client_path('foo/baz'))
 
153
 
 
154
    def test_transport_from_client_path(self):
 
155
        transport = self.get_transport()
 
156
        request = SmartServerRequest(transport, 'foo/')
 
157
        self.assertEqual(
 
158
            transport.base,
 
159
            request.transport_from_client_path('foo/').base)
 
160
 
 
161
 
 
162
class TestSmartServerBzrDirRequestCloningMetaDir(
 
163
    tests.TestCaseWithMemoryTransport):
 
164
    """Tests for BzrDir.cloning_metadir."""
 
165
 
 
166
    def test_cloning_metadir(self):
 
167
        """When there is a bzrdir present, the call succeeds."""
 
168
        backing = self.get_transport()
 
169
        dir = self.make_bzrdir('.')
 
170
        local_result = dir.cloning_metadir()
 
171
        request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
 
172
        request = request_class(backing)
 
173
        expected = SuccessfulSmartServerResponse(
 
174
            (local_result.network_name(),
 
175
            local_result.repository_format.network_name(),
 
176
            ('branch', local_result.get_branch_format().network_name())))
 
177
        self.assertEqual(expected, request.execute('', 'False'))
 
178
 
 
179
    def test_cloning_metadir_reference(self):
 
180
        """The request works when bzrdir contains a branch reference."""
 
181
        backing = self.get_transport()
 
182
        referenced_branch = self.make_branch('referenced')
 
183
        dir = self.make_bzrdir('.')
 
184
        local_result = dir.cloning_metadir()
 
185
        reference = BranchReferenceFormat().initialize(dir, referenced_branch)
 
186
        reference_url = BranchReferenceFormat().get_reference(dir)
 
187
        # The server shouldn't try to follow the branch reference, so it's fine
 
188
        # if the referenced branch isn't reachable.
 
189
        backing.rename('referenced', 'moved')
 
190
        request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
 
191
        request = request_class(backing)
 
192
        expected = SuccessfulSmartServerResponse(
 
193
            (local_result.network_name(),
 
194
            local_result.repository_format.network_name(),
 
195
            ('ref', reference_url)))
 
196
        self.assertEqual(expected, request.execute('', 'False'))
 
197
 
 
198
 
 
199
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
 
200
    """Tests for BzrDir.create_repository."""
 
201
 
 
202
    def test_makes_repository(self):
 
203
        """When there is a bzrdir present, the call succeeds."""
 
204
        backing = self.get_transport()
 
205
        self.make_bzrdir('.')
 
206
        request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
 
207
        request = request_class(backing)
 
208
        reference_bzrdir_format = bzrdir.format_registry.get('default')()
 
209
        reference_format = reference_bzrdir_format.repository_format
 
210
        network_name = reference_format.network_name()
 
211
        expected = SuccessfulSmartServerResponse(
 
212
            ('ok', 'no', 'no', 'no', network_name))
 
213
        self.assertEqual(expected, request.execute('', network_name, 'True'))
 
214
 
 
215
 
 
216
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
 
217
    """Tests for BzrDir.find_repository."""
 
218
 
 
219
    def test_no_repository(self):
 
220
        """When there is no repository to be found, ('norepository', ) is returned."""
 
221
        backing = self.get_transport()
 
222
        request = self._request_class(backing)
 
223
        self.make_bzrdir('.')
 
224
        self.assertEqual(SmartServerResponse(('norepository', )),
 
225
            request.execute(''))
 
226
 
 
227
    def test_nonshared_repository(self):
 
228
        # nonshared repositorys only allow 'find' to return a handle when the
 
229
        # path the repository is being searched on is the same as that that
 
230
        # the repository is at.
 
231
        backing = self.get_transport()
 
232
        request = self._request_class(backing)
 
233
        result = self._make_repository_and_result()
 
234
        self.assertEqual(result, request.execute(''))
 
235
        self.make_bzrdir('subdir')
 
236
        self.assertEqual(SmartServerResponse(('norepository', )),
 
237
            request.execute('subdir'))
 
238
 
 
239
    def _make_repository_and_result(self, shared=False, format=None):
 
240
        """Convenience function to setup a repository.
 
241
 
 
242
        :result: The SmartServerResponse to expect when opening it.
 
243
        """
 
244
        repo = self.make_repository('.', shared=shared, format=format)
 
245
        if repo.supports_rich_root():
 
246
            rich_root = 'yes'
 
247
        else:
 
248
            rich_root = 'no'
 
249
        if repo._format.supports_tree_reference:
 
250
            subtrees = 'yes'
 
251
        else:
 
252
            subtrees = 'no'
 
253
        if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
 
254
            self._request_class):
 
255
            return SuccessfulSmartServerResponse(
 
256
                ('ok', '', rich_root, subtrees, 'no',
 
257
                 repo._format.network_name()))
 
258
        elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
 
259
            self._request_class):
 
260
            # All tests so far are on formats, and for non-external
 
261
            # repositories.
 
262
            return SuccessfulSmartServerResponse(
 
263
                ('ok', '', rich_root, subtrees, 'no'))
 
264
        else:
 
265
            return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
 
266
 
 
267
    def test_shared_repository(self):
 
268
        """When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
 
269
        backing = self.get_transport()
 
270
        request = self._request_class(backing)
 
271
        result = self._make_repository_and_result(shared=True)
 
272
        self.assertEqual(result, request.execute(''))
 
273
        self.make_bzrdir('subdir')
 
274
        result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
 
275
        self.assertEqual(result2,
 
276
            request.execute('subdir'))
 
277
        self.make_bzrdir('subdir/deeper')
 
278
        result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
 
279
        self.assertEqual(result3,
 
280
            request.execute('subdir/deeper'))
 
281
 
 
282
    def test_rich_root_and_subtree_encoding(self):
 
283
        """Test for the format attributes for rich root and subtree support."""
 
284
        backing = self.get_transport()
 
285
        request = self._request_class(backing)
 
286
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
287
        # check the test will be valid
 
288
        self.assertEqual('yes', result.args[2])
 
289
        self.assertEqual('yes', result.args[3])
 
290
        self.assertEqual(result, request.execute(''))
 
291
 
 
292
    def test_supports_external_lookups_no_v2(self):
 
293
        """Test for the supports_external_lookups attribute."""
 
294
        backing = self.get_transport()
 
295
        request = self._request_class(backing)
 
296
        result = self._make_repository_and_result(format='dirstate-with-subtree')
 
297
        # check the test will be valid
 
298
        self.assertEqual('no', result.args[4])
 
299
        self.assertEqual(result, request.execute(''))
 
300
 
 
301
 
 
302
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
 
303
 
 
304
    def test_empty_dir(self):
 
305
        """Initializing an empty dir should succeed and do it."""
 
306
        backing = self.get_transport()
 
307
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
308
        self.assertEqual(SmartServerResponse(('ok', )),
 
309
            request.execute(''))
 
310
        made_dir = bzrdir.BzrDir.open_from_transport(backing)
 
311
        # no branch, tree or repository is expected with the current
 
312
        # default formart.
 
313
        self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
 
314
        self.assertRaises(errors.NotBranchError, made_dir.open_branch)
 
315
        self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
 
316
 
 
317
    def test_missing_dir(self):
 
318
        """Initializing a missing directory should fail like the bzrdir api."""
 
319
        backing = self.get_transport()
 
320
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
321
        self.assertRaises(errors.NoSuchFile,
 
322
            request.execute, 'subdir')
 
323
 
 
324
    def test_initialized_dir(self):
 
325
        """Initializing an extant bzrdir should fail like the bzrdir api."""
 
326
        backing = self.get_transport()
 
327
        request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
 
328
        self.make_bzrdir('subdir')
 
329
        self.assertRaises(errors.FileExists,
 
330
            request.execute, 'subdir')
 
331
 
 
332
 
 
333
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
 
334
 
 
335
    def test_no_branch(self):
 
336
        """When there is no branch, ('nobranch', ) is returned."""
 
337
        backing = self.get_transport()
 
338
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
339
        self.make_bzrdir('.')
 
340
        self.assertEqual(SmartServerResponse(('nobranch', )),
 
341
            request.execute(''))
 
342
 
 
343
    def test_branch(self):
 
344
        """When there is a branch, 'ok' is returned."""
 
345
        backing = self.get_transport()
 
346
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
347
        self.make_branch('.')
 
348
        self.assertEqual(SmartServerResponse(('ok', '')),
 
349
            request.execute(''))
 
350
 
 
351
    def test_branch_reference(self):
 
352
        """When there is a branch reference, the reference URL is returned."""
 
353
        backing = self.get_transport()
 
354
        request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
 
355
        branch = self.make_branch('branch')
 
356
        checkout = branch.create_checkout('reference',lightweight=True)
 
357
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
358
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
359
        self.assertEqual(SmartServerResponse(('ok', reference_url)),
 
360
            request.execute('reference'))
 
361
 
 
362
 
 
363
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
 
364
 
 
365
    def test_no_branch(self):
 
366
        """When there is no branch, ('nobranch', ) is returned."""
 
367
        backing = self.get_transport()
 
368
        self.make_bzrdir('.')
 
369
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
370
        self.assertEqual(SmartServerResponse(('nobranch', )),
 
371
            request.execute(''))
 
372
 
 
373
    def test_branch(self):
 
374
        """When there is a branch, 'ok' is returned."""
 
375
        backing = self.get_transport()
 
376
        expected = self.make_branch('.')._format.network_name()
 
377
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
378
        self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
 
379
            request.execute(''))
 
380
 
 
381
    def test_branch_reference(self):
 
382
        """When there is a branch reference, the reference URL is returned."""
 
383
        backing = self.get_transport()
 
384
        request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
 
385
        branch = self.make_branch('branch')
 
386
        checkout = branch.create_checkout('reference',lightweight=True)
 
387
        reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
 
388
        self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
 
389
        self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
 
390
            request.execute('reference'))
 
391
 
 
392
 
 
393
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
 
394
 
 
395
    def test_empty(self):
 
396
        """For an empty branch, the body is empty."""
 
397
        backing = self.get_transport()
 
398
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
 
399
        self.make_branch('.')
 
400
        self.assertEqual(SmartServerResponse(('ok', ), ''),
 
401
            request.execute(''))
 
402
 
 
403
    def test_not_empty(self):
 
404
        """For a non-empty branch, the body is empty."""
 
405
        backing = self.get_transport()
 
406
        request = smart.branch.SmartServerRequestRevisionHistory(backing)
 
407
        tree = self.make_branch_and_memory_tree('.')
 
408
        tree.lock_write()
 
409
        tree.add('')
 
410
        r1 = tree.commit('1st commit')
 
411
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
412
        tree.unlock()
 
413
        self.assertEqual(
 
414
            SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
 
415
            request.execute(''))
 
416
 
 
417
 
 
418
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
 
419
 
 
420
    def test_no_branch(self):
 
421
        """When there is a bzrdir and no branch, NotBranchError is raised."""
 
422
        backing = self.get_transport()
 
423
        request = smart.branch.SmartServerBranchRequest(backing)
 
424
        self.make_bzrdir('.')
 
425
        self.assertRaises(errors.NotBranchError,
 
426
            request.execute, '')
 
427
 
 
428
    def test_branch_reference(self):
 
429
        """When there is a branch reference, NotBranchError is raised."""
 
430
        backing = self.get_transport()
 
431
        request = smart.branch.SmartServerBranchRequest(backing)
 
432
        branch = self.make_branch('branch')
 
433
        checkout = branch.create_checkout('reference',lightweight=True)
 
434
        self.assertRaises(errors.NotBranchError,
 
435
            request.execute, 'checkout')
 
436
 
 
437
 
 
438
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
 
439
 
 
440
    def test_empty(self):
 
441
        """For an empty branch, the result is ('ok', '0', 'null:')."""
 
442
        backing = self.get_transport()
 
443
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
 
444
        self.make_branch('.')
 
445
        self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
 
446
            request.execute(''))
 
447
 
 
448
    def test_not_empty(self):
 
449
        """For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
 
450
        backing = self.get_transport()
 
451
        request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
 
452
        tree = self.make_branch_and_memory_tree('.')
 
453
        tree.lock_write()
 
454
        tree.add('')
 
455
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
456
        r1 = tree.commit('1st commit')
 
457
        r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
 
458
        tree.unlock()
 
459
        self.assertEqual(
 
460
            SmartServerResponse(('ok', '2', rev_id_utf8)),
 
461
            request.execute(''))
 
462
 
 
463
 
 
464
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
 
465
 
 
466
    def test_default(self):
 
467
        """With no file, we get empty content."""
 
468
        backing = self.get_transport()
 
469
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
 
470
        branch = self.make_branch('.')
 
471
        # there should be no file by default
 
472
        content = ''
 
473
        self.assertEqual(SmartServerResponse(('ok', ), content),
 
474
            request.execute(''))
 
475
 
 
476
    def test_with_content(self):
 
477
        # SmartServerBranchGetConfigFile should return the content from
 
478
        # branch.control_files.get('branch.conf') for now - in the future it may
 
479
        # perform more complex processing.
 
480
        backing = self.get_transport()
 
481
        request = smart.branch.SmartServerBranchGetConfigFile(backing)
 
482
        branch = self.make_branch('.')
 
483
        branch._transport.put_bytes('branch.conf', 'foo bar baz')
 
484
        self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
 
485
            request.execute(''))
 
486
 
 
487
 
 
488
class SetLastRevisionTestBase(tests.TestCaseWithMemoryTransport):
 
489
    """Base test case for verbs that implement set_last_revision."""
 
490
 
 
491
    def setUp(self):
 
492
        tests.TestCaseWithMemoryTransport.setUp(self)
 
493
        backing_transport = self.get_transport()
 
494
        self.request = self.request_class(backing_transport)
 
495
        self.tree = self.make_branch_and_memory_tree('.')
 
496
 
 
497
    def lock_branch(self):
 
498
        b = self.tree.branch
 
499
        branch_token = b.lock_write()
 
500
        repo_token = b.repository.lock_write()
 
501
        b.repository.unlock()
 
502
        return branch_token, repo_token
 
503
 
 
504
    def unlock_branch(self):
 
505
        self.tree.branch.unlock()
 
506
 
 
507
    def set_last_revision(self, revision_id, revno):
 
508
        branch_token, repo_token = self.lock_branch()
 
509
        response = self._set_last_revision(
 
510
            revision_id, revno, branch_token, repo_token)
 
511
        self.unlock_branch()
 
512
        return response
 
513
 
 
514
    def assertRequestSucceeds(self, revision_id, revno):
 
515
        response = self.set_last_revision(revision_id, revno)
 
516
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
 
517
 
 
518
 
 
519
class TestSetLastRevisionVerbMixin(object):
 
520
    """Mixin test case for verbs that implement set_last_revision."""
 
521
 
 
522
    def test_set_null_to_null(self):
 
523
        """An empty branch can have its last revision set to 'null:'."""
 
524
        self.assertRequestSucceeds('null:', 0)
 
525
 
 
526
    def test_NoSuchRevision(self):
 
527
        """If the revision_id is not present, the verb returns NoSuchRevision.
 
528
        """
 
529
        revision_id = 'non-existent revision'
 
530
        self.assertEqual(
 
531
            FailedSmartServerResponse(('NoSuchRevision', revision_id)),
 
532
            self.set_last_revision(revision_id, 1))
 
533
 
 
534
    def make_tree_with_two_commits(self):
 
535
        self.tree.lock_write()
 
536
        self.tree.add('')
 
537
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
538
        r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
 
539
        r2 = self.tree.commit('2nd commit', rev_id='rev-2')
 
540
        self.tree.unlock()
 
541
 
 
542
    def test_branch_last_revision_info_is_updated(self):
 
543
        """A branch's tip can be set to a revision that is present in its
 
544
        repository.
 
545
        """
 
546
        # Make a branch with an empty revision history, but two revisions in
 
547
        # its repository.
 
548
        self.make_tree_with_two_commits()
 
549
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
550
        self.tree.branch.set_revision_history([])
 
551
        self.assertEqual(
 
552
            (0, 'null:'), self.tree.branch.last_revision_info())
 
553
        # We can update the branch to a revision that is present in the
 
554
        # repository.
 
555
        self.assertRequestSucceeds(rev_id_utf8, 1)
 
556
        self.assertEqual(
 
557
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
558
 
 
559
    def test_branch_last_revision_info_rewind(self):
 
560
        """A branch's tip can be set to a revision that is an ancestor of the
 
561
        current tip.
 
562
        """
 
563
        self.make_tree_with_two_commits()
 
564
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
565
        self.assertEqual(
 
566
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
567
        self.assertRequestSucceeds(rev_id_utf8, 1)
 
568
        self.assertEqual(
 
569
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
570
 
 
571
    def test_TipChangeRejected(self):
 
572
        """If a pre_change_branch_tip hook raises TipChangeRejected, the verb
 
573
        returns TipChangeRejected.
 
574
        """
 
575
        rejection_message = u'rejection message\N{INTERROBANG}'
 
576
        def hook_that_rejects(params):
 
577
            raise errors.TipChangeRejected(rejection_message)
 
578
        Branch.hooks.install_named_hook(
 
579
            'pre_change_branch_tip', hook_that_rejects, None)
 
580
        self.assertEqual(
 
581
            FailedSmartServerResponse(
 
582
                ('TipChangeRejected', rejection_message.encode('utf-8'))),
 
583
            self.set_last_revision('null:', 0))
 
584
 
 
585
 
 
586
class TestSmartServerBranchRequestSetLastRevision(
 
587
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
588
    """Tests for Branch.set_last_revision verb."""
 
589
 
 
590
    request_class = smart.branch.SmartServerBranchRequestSetLastRevision
 
591
 
 
592
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
593
        return self.request.execute(
 
594
            '', branch_token, repo_token, revision_id)
 
595
 
 
596
 
 
597
class TestSmartServerBranchRequestSetLastRevisionInfo(
 
598
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
599
    """Tests for Branch.set_last_revision_info verb."""
 
600
 
 
601
    request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
 
602
 
 
603
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
604
        return self.request.execute(
 
605
            '', branch_token, repo_token, revno, revision_id)
 
606
 
 
607
    def test_NoSuchRevision(self):
 
608
        """Branch.set_last_revision_info does not have to return
 
609
        NoSuchRevision if the revision_id is absent.
 
610
        """
 
611
        raise tests.TestNotApplicable()
 
612
 
 
613
 
 
614
class TestSmartServerBranchRequestSetLastRevisionEx(
 
615
        SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
 
616
    """Tests for Branch.set_last_revision_ex verb."""
 
617
 
 
618
    request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
 
619
 
 
620
    def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
 
621
        return self.request.execute(
 
622
            '', branch_token, repo_token, revision_id, 0, 0)
 
623
 
 
624
    def assertRequestSucceeds(self, revision_id, revno):
 
625
        response = self.set_last_revision(revision_id, revno)
 
626
        self.assertEqual(
 
627
            SuccessfulSmartServerResponse(('ok', revno, revision_id)),
 
628
            response)
 
629
 
 
630
    def test_branch_last_revision_info_rewind(self):
 
631
        """A branch's tip can be set to a revision that is an ancestor of the
 
632
        current tip, but only if allow_overwrite_descendant is passed.
 
633
        """
 
634
        self.make_tree_with_two_commits()
 
635
        rev_id_utf8 = u'\xc8'.encode('utf-8')
 
636
        self.assertEqual(
 
637
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
638
        # If allow_overwrite_descendant flag is 0, then trying to set the tip
 
639
        # to an older revision ID has no effect.
 
640
        branch_token, repo_token = self.lock_branch()
 
641
        response = self.request.execute(
 
642
            '', branch_token, repo_token, rev_id_utf8, 0, 0)
 
643
        self.assertEqual(
 
644
            SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
 
645
            response)
 
646
        self.assertEqual(
 
647
            (2, 'rev-2'), self.tree.branch.last_revision_info())
 
648
 
 
649
        # If allow_overwrite_descendant flag is 1, then setting the tip to an
 
650
        # ancestor works.
 
651
        response = self.request.execute(
 
652
            '', branch_token, repo_token, rev_id_utf8, 0, 1)
 
653
        self.assertEqual(
 
654
            SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
 
655
            response)
 
656
        self.unlock_branch()
 
657
        self.assertEqual(
 
658
            (1, rev_id_utf8), self.tree.branch.last_revision_info())
 
659
 
 
660
    def make_branch_with_divergent_history(self):
 
661
        """Make a branch with divergent history in its repo.
 
662
 
 
663
        The branch's tip will be 'child-2', and the repo will also contain
 
664
        'child-1', which diverges from a common base revision.
 
665
        """
 
666
        self.tree.lock_write()
 
667
        self.tree.add('')
 
668
        r1 = self.tree.commit('1st commit')
 
669
        revno_1, revid_1 = self.tree.branch.last_revision_info()
 
670
        r2 = self.tree.commit('2nd commit', rev_id='child-1')
 
671
        # Undo the second commit
 
672
        self.tree.branch.set_last_revision_info(revno_1, revid_1)
 
673
        self.tree.set_parent_ids([revid_1])
 
674
        # Make a new second commit, child-2.  child-2 has diverged from
 
675
        # child-1.
 
676
        new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
 
677
        self.tree.unlock()
 
678
 
 
679
    def test_not_allow_diverged(self):
 
680
        """If allow_diverged is not passed, then setting a divergent history
 
681
        returns a Diverged error.
 
682
        """
 
683
        self.make_branch_with_divergent_history()
 
684
        self.assertEqual(
 
685
            FailedSmartServerResponse(('Diverged',)),
 
686
            self.set_last_revision('child-1', 2))
 
687
        # The branch tip was not changed.
 
688
        self.assertEqual('child-2', self.tree.branch.last_revision())
 
689
 
 
690
    def test_allow_diverged(self):
 
691
        """If allow_diverged is passed, then setting a divergent history
 
692
        succeeds.
 
693
        """
 
694
        self.make_branch_with_divergent_history()
 
695
        branch_token, repo_token = self.lock_branch()
 
696
        response = self.request.execute(
 
697
            '', branch_token, repo_token, 'child-1', 1, 0)
 
698
        self.assertEqual(
 
699
            SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
 
700
            response)
 
701
        self.unlock_branch()
 
702
        # The branch tip was changed.
 
703
        self.assertEqual('child-1', self.tree.branch.last_revision())
 
704
 
 
705
 
 
706
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
 
707
 
 
708
    def test_get_parent_none(self):
 
709
        base_branch = self.make_branch('base')
 
710
        request = smart.branch.SmartServerBranchGetParent(self.get_transport())
 
711
        response = request.execute('base')
 
712
        self.assertEquals(
 
713
            SuccessfulSmartServerResponse(('',)), response)
 
714
 
 
715
    def test_get_parent_something(self):
 
716
        base_branch = self.make_branch('base')
 
717
        base_branch.set_parent(self.get_url('foo'))
 
718
        request = smart.branch.SmartServerBranchGetParent(self.get_transport())
 
719
        response = request.execute('base')
 
720
        self.assertEquals(
 
721
            SuccessfulSmartServerResponse(("../foo",)),
 
722
            response)
 
723
 
 
724
 
 
725
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
 
726
# Only called when the branch format and tags match [yay factory
 
727
# methods] so only need to test straight forward cases.
 
728
 
 
729
    def test_get_bytes(self):
 
730
        base_branch = self.make_branch('base')
 
731
        request = smart.branch.SmartServerBranchGetTagsBytes(
 
732
            self.get_transport())
 
733
        response = request.execute('base')
 
734
        self.assertEquals(
 
735
            SuccessfulSmartServerResponse(('',)), response)
 
736
 
 
737
 
 
738
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
 
739
 
 
740
    def test_get_stacked_on_url(self):
 
741
        base_branch = self.make_branch('base', format='1.6')
 
742
        stacked_branch = self.make_branch('stacked', format='1.6')
 
743
        # typically should be relative
 
744
        stacked_branch.set_stacked_on_url('../base')
 
745
        request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
 
746
            self.get_transport())
 
747
        response = request.execute('stacked')
 
748
        self.assertEquals(
 
749
            SmartServerResponse(('ok', '../base')),
 
750
            response)
 
751
 
 
752
 
 
753
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
 
754
 
 
755
    def setUp(self):
 
756
        tests.TestCaseWithMemoryTransport.setUp(self)
 
757
 
 
758
    def test_lock_write_on_unlocked_branch(self):
 
759
        backing = self.get_transport()
 
760
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
761
        branch = self.make_branch('.', format='knit')
 
762
        repository = branch.repository
 
763
        response = request.execute('')
 
764
        branch_nonce = branch.control_files._lock.peek().get('nonce')
 
765
        repository_nonce = repository.control_files._lock.peek().get('nonce')
 
766
        self.assertEqual(
 
767
            SmartServerResponse(('ok', branch_nonce, repository_nonce)),
 
768
            response)
 
769
        # The branch (and associated repository) is now locked.  Verify that
 
770
        # with a new branch object.
 
771
        new_branch = repository.bzrdir.open_branch()
 
772
        self.assertRaises(errors.LockContention, new_branch.lock_write)
 
773
 
 
774
    def test_lock_write_on_locked_branch(self):
 
775
        backing = self.get_transport()
 
776
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
777
        branch = self.make_branch('.')
 
778
        branch.lock_write()
 
779
        branch.leave_lock_in_place()
 
780
        branch.unlock()
 
781
        response = request.execute('')
 
782
        self.assertEqual(
 
783
            SmartServerResponse(('LockContention',)), response)
 
784
 
 
785
    def test_lock_write_with_tokens_on_locked_branch(self):
 
786
        backing = self.get_transport()
 
787
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
788
        branch = self.make_branch('.', format='knit')
 
789
        branch_token = branch.lock_write()
 
790
        repo_token = branch.repository.lock_write()
 
791
        branch.repository.unlock()
 
792
        branch.leave_lock_in_place()
 
793
        branch.repository.leave_lock_in_place()
 
794
        branch.unlock()
 
795
        response = request.execute('',
 
796
                                   branch_token, repo_token)
 
797
        self.assertEqual(
 
798
            SmartServerResponse(('ok', branch_token, repo_token)), response)
 
799
 
 
800
    def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
 
801
        backing = self.get_transport()
 
802
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
803
        branch = self.make_branch('.', format='knit')
 
804
        branch_token = branch.lock_write()
 
805
        repo_token = branch.repository.lock_write()
 
806
        branch.repository.unlock()
 
807
        branch.leave_lock_in_place()
 
808
        branch.repository.leave_lock_in_place()
 
809
        branch.unlock()
 
810
        response = request.execute('',
 
811
                                   branch_token+'xxx', repo_token)
 
812
        self.assertEqual(
 
813
            SmartServerResponse(('TokenMismatch',)), response)
 
814
 
 
815
    def test_lock_write_on_locked_repo(self):
 
816
        backing = self.get_transport()
 
817
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
818
        branch = self.make_branch('.', format='knit')
 
819
        branch.repository.lock_write()
 
820
        branch.repository.leave_lock_in_place()
 
821
        branch.repository.unlock()
 
822
        response = request.execute('')
 
823
        self.assertEqual(
 
824
            SmartServerResponse(('LockContention',)), response)
 
825
 
 
826
    def test_lock_write_on_readonly_transport(self):
 
827
        backing = self.get_readonly_transport()
 
828
        request = smart.branch.SmartServerBranchRequestLockWrite(backing)
 
829
        branch = self.make_branch('.')
 
830
        root = self.get_transport().clone('/')
 
831
        path = urlutils.relative_url(root.base, self.get_transport().base)
 
832
        response = request.execute(path)
 
833
        error_name, lock_str, why_str = response.args
 
834
        self.assertFalse(response.is_successful())
 
835
        self.assertEqual('LockFailed', error_name)
 
836
 
 
837
 
 
838
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
 
839
 
 
840
    def setUp(self):
 
841
        tests.TestCaseWithMemoryTransport.setUp(self)
 
842
 
 
843
    def test_unlock_on_locked_branch_and_repo(self):
 
844
        backing = self.get_transport()
 
845
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
846
        branch = self.make_branch('.', format='knit')
 
847
        # Lock the branch
 
848
        branch_token = branch.lock_write()
 
849
        repo_token = branch.repository.lock_write()
 
850
        branch.repository.unlock()
 
851
        # Unlock the branch (and repo) object, leaving the physical locks
 
852
        # in place.
 
853
        branch.leave_lock_in_place()
 
854
        branch.repository.leave_lock_in_place()
 
855
        branch.unlock()
 
856
        response = request.execute('',
 
857
                                   branch_token, repo_token)
 
858
        self.assertEqual(
 
859
            SmartServerResponse(('ok',)), response)
 
860
        # The branch is now unlocked.  Verify that with a new branch
 
861
        # object.
 
862
        new_branch = branch.bzrdir.open_branch()
 
863
        new_branch.lock_write()
 
864
        new_branch.unlock()
 
865
 
 
866
    def test_unlock_on_unlocked_branch_unlocked_repo(self):
 
867
        backing = self.get_transport()
 
868
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
869
        branch = self.make_branch('.', format='knit')
 
870
        response = request.execute(
 
871
            '', 'branch token', 'repo token')
 
872
        self.assertEqual(
 
873
            SmartServerResponse(('TokenMismatch',)), response)
 
874
 
 
875
    def test_unlock_on_unlocked_branch_locked_repo(self):
 
876
        backing = self.get_transport()
 
877
        request = smart.branch.SmartServerBranchRequestUnlock(backing)
 
878
        branch = self.make_branch('.', format='knit')
 
879
        # Lock the repository.
 
880
        repo_token = branch.repository.lock_write()
 
881
        branch.repository.leave_lock_in_place()
 
882
        branch.repository.unlock()
 
883
        # Issue branch lock_write request on the unlocked branch (with locked
 
884
        # repo).
 
885
        response = request.execute(
 
886
            '', 'branch token', repo_token)
 
887
        self.assertEqual(
 
888
            SmartServerResponse(('TokenMismatch',)), response)
 
889
 
 
890
 
 
891
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
 
892
 
 
893
    def test_no_repository(self):
 
894
        """Raise NoRepositoryPresent when there is a bzrdir and no repo."""
 
895
        # we test this using a shared repository above the named path,
 
896
        # thus checking the right search logic is used - that is, that
 
897
        # its the exact path being looked at and the server is not
 
898
        # searching.
 
899
        backing = self.get_transport()
 
900
        request = smart.repository.SmartServerRepositoryRequest(backing)
 
901
        self.make_repository('.', shared=True)
 
902
        self.make_bzrdir('subdir')
 
903
        self.assertRaises(errors.NoRepositoryPresent,
 
904
            request.execute, 'subdir')
 
905
 
 
906
 
 
907
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
 
908
 
 
909
    def test_trivial_bzipped(self):
 
910
        # This tests that the wire encoding is actually bzipped
 
911
        backing = self.get_transport()
 
912
        request = smart.repository.SmartServerRepositoryGetParentMap(backing)
 
913
        tree = self.make_branch_and_memory_tree('.')
 
914
 
 
915
        self.assertEqual(None,
 
916
            request.execute('', 'missing-id'))
 
917
        # Note that it returns a body (of '' bzipped).
 
918
        self.assertEqual(
 
919
            SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
 
920
            request.do_body('\n\n0\n'))
 
921
 
 
922
 
 
923
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
 
924
 
 
925
    def test_none_argument(self):
 
926
        backing = self.get_transport()
 
927
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
928
        tree = self.make_branch_and_memory_tree('.')
 
929
        tree.lock_write()
 
930
        tree.add('')
 
931
        r1 = tree.commit('1st commit')
 
932
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
933
        tree.unlock()
 
934
 
 
935
        # the lines of revision_id->revision_parent_list has no guaranteed
 
936
        # order coming out of a dict, so sort both our test and response
 
937
        lines = sorted([' '.join([r2, r1]), r1])
 
938
        response = request.execute('', '')
 
939
        response.body = '\n'.join(sorted(response.body.split('\n')))
 
940
 
 
941
        self.assertEqual(
 
942
            SmartServerResponse(('ok', ), '\n'.join(lines)), response)
 
943
 
 
944
    def test_specific_revision_argument(self):
 
945
        backing = self.get_transport()
 
946
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
947
        tree = self.make_branch_and_memory_tree('.')
 
948
        tree.lock_write()
 
949
        tree.add('')
 
950
        rev_id_utf8 = u'\xc9'.encode('utf-8')
 
951
        r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
 
952
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
953
        tree.unlock()
 
954
 
 
955
        self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
 
956
            request.execute('', rev_id_utf8))
 
957
 
 
958
    def test_no_such_revision(self):
 
959
        backing = self.get_transport()
 
960
        request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
 
961
        tree = self.make_branch_and_memory_tree('.')
 
962
        tree.lock_write()
 
963
        tree.add('')
 
964
        r1 = tree.commit('1st commit')
 
965
        tree.unlock()
 
966
 
 
967
        # Note that it still returns body (of zero bytes).
 
968
        self.assertEqual(
 
969
            SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
 
970
            request.execute('', 'missingrevision'))
 
971
 
 
972
 
 
973
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
 
974
 
 
975
    def make_two_commit_repo(self):
 
976
        tree = self.make_branch_and_memory_tree('.')
 
977
        tree.lock_write()
 
978
        tree.add('')
 
979
        r1 = tree.commit('1st commit')
 
980
        r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
 
981
        tree.unlock()
 
982
        repo = tree.branch.repository
 
983
        return repo, r1, r2
 
984
 
 
985
    def test_ancestry_of(self):
 
986
        """The search argument may be a 'ancestry-of' some heads'."""
 
987
        backing = self.get_transport()
 
988
        request = smart.repository.SmartServerRepositoryGetStream(backing)
 
989
        repo, r1, r2 = self.make_two_commit_repo()
 
990
        fetch_spec = ['ancestry-of', r2]
 
991
        lines = '\n'.join(fetch_spec)
 
992
        request.execute('', repo._format.network_name())
 
993
        response = request.do_body(lines)
 
994
        self.assertEqual(('ok',), response.args)
 
995
        stream_bytes = ''.join(response.body_stream)
 
996
        self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
 
997
 
 
998
    def test_search(self):
 
999
        """The search argument may be a 'search' of some explicit keys."""
 
1000
        backing = self.get_transport()
 
1001
        request = smart.repository.SmartServerRepositoryGetStream(backing)
 
1002
        repo, r1, r2 = self.make_two_commit_repo()
 
1003
        fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
 
1004
        lines = '\n'.join(fetch_spec)
 
1005
        request.execute('', repo._format.network_name())
 
1006
        response = request.do_body(lines)
 
1007
        self.assertEqual(('ok',), response.args)
 
1008
        stream_bytes = ''.join(response.body_stream)
 
1009
        self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
 
1010
 
 
1011
 
 
1012
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
 
1013
 
 
1014
    def test_missing_revision(self):
 
1015
        """For a missing revision, ('no', ) is returned."""
 
1016
        backing = self.get_transport()
 
1017
        request = smart.repository.SmartServerRequestHasRevision(backing)
 
1018
        self.make_repository('.')
 
1019
        self.assertEqual(SmartServerResponse(('no', )),
 
1020
            request.execute('', 'revid'))
 
1021
 
 
1022
    def test_present_revision(self):
 
1023
        """For a present revision, ('yes', ) is returned."""
 
1024
        backing = self.get_transport()
 
1025
        request = smart.repository.SmartServerRequestHasRevision(backing)
 
1026
        tree = self.make_branch_and_memory_tree('.')
 
1027
        tree.lock_write()
 
1028
        tree.add('')
 
1029
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
1030
        r1 = tree.commit('a commit', rev_id=rev_id_utf8)
 
1031
        tree.unlock()
 
1032
        self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
 
1033
        self.assertEqual(SmartServerResponse(('yes', )),
 
1034
            request.execute('', rev_id_utf8))
 
1035
 
 
1036
 
 
1037
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
 
1038
 
 
1039
    def test_empty_revid(self):
 
1040
        """With an empty revid, we get only size an number and revisions"""
 
1041
        backing = self.get_transport()
 
1042
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
1043
        repository = self.make_repository('.')
 
1044
        stats = repository.gather_stats()
 
1045
        expected_body = 'revisions: 0\n'
 
1046
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1047
                         request.execute('', '', 'no'))
 
1048
 
 
1049
    def test_revid_with_committers(self):
 
1050
        """For a revid we get more infos."""
 
1051
        backing = self.get_transport()
 
1052
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
1053
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
1054
        tree = self.make_branch_and_memory_tree('.')
 
1055
        tree.lock_write()
 
1056
        tree.add('')
 
1057
        # Let's build a predictable result
 
1058
        tree.commit('a commit', timestamp=123456.2, timezone=3600)
 
1059
        tree.commit('a commit', timestamp=654321.4, timezone=0,
 
1060
                    rev_id=rev_id_utf8)
 
1061
        tree.unlock()
 
1062
 
 
1063
        stats = tree.branch.repository.gather_stats()
 
1064
        expected_body = ('firstrev: 123456.200 3600\n'
 
1065
                         'latestrev: 654321.400 0\n'
 
1066
                         'revisions: 2\n')
 
1067
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1068
                         request.execute('',
 
1069
                                         rev_id_utf8, 'no'))
 
1070
 
 
1071
    def test_not_empty_repository_with_committers(self):
 
1072
        """For a revid and requesting committers we get the whole thing."""
 
1073
        backing = self.get_transport()
 
1074
        rev_id_utf8 = u'\xc8abc'.encode('utf-8')
 
1075
        request = smart.repository.SmartServerRepositoryGatherStats(backing)
 
1076
        tree = self.make_branch_and_memory_tree('.')
 
1077
        tree.lock_write()
 
1078
        tree.add('')
 
1079
        # Let's build a predictable result
 
1080
        tree.commit('a commit', timestamp=123456.2, timezone=3600,
 
1081
                    committer='foo')
 
1082
        tree.commit('a commit', timestamp=654321.4, timezone=0,
 
1083
                    committer='bar', rev_id=rev_id_utf8)
 
1084
        tree.unlock()
 
1085
        stats = tree.branch.repository.gather_stats()
 
1086
 
 
1087
        expected_body = ('committers: 2\n'
 
1088
                         'firstrev: 123456.200 3600\n'
 
1089
                         'latestrev: 654321.400 0\n'
 
1090
                         'revisions: 2\n')
 
1091
        self.assertEqual(SmartServerResponse(('ok', ), expected_body),
 
1092
                         request.execute('',
 
1093
                                         rev_id_utf8, 'yes'))
 
1094
 
 
1095
 
 
1096
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
 
1097
 
 
1098
    def test_is_shared(self):
 
1099
        """For a shared repository, ('yes', ) is returned."""
 
1100
        backing = self.get_transport()
 
1101
        request = smart.repository.SmartServerRepositoryIsShared(backing)
 
1102
        self.make_repository('.', shared=True)
 
1103
        self.assertEqual(SmartServerResponse(('yes', )),
 
1104
            request.execute('', ))
 
1105
 
 
1106
    def test_is_not_shared(self):
 
1107
        """For a shared repository, ('no', ) is returned."""
 
1108
        backing = self.get_transport()
 
1109
        request = smart.repository.SmartServerRepositoryIsShared(backing)
 
1110
        self.make_repository('.', shared=False)
 
1111
        self.assertEqual(SmartServerResponse(('no', )),
 
1112
            request.execute('', ))
 
1113
 
 
1114
 
 
1115
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
 
1116
 
 
1117
    def setUp(self):
 
1118
        tests.TestCaseWithMemoryTransport.setUp(self)
 
1119
 
 
1120
    def test_lock_write_on_unlocked_repo(self):
 
1121
        backing = self.get_transport()
 
1122
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1123
        repository = self.make_repository('.', format='knit')
 
1124
        response = request.execute('')
 
1125
        nonce = repository.control_files._lock.peek().get('nonce')
 
1126
        self.assertEqual(SmartServerResponse(('ok', nonce)), response)
 
1127
        # The repository is now locked.  Verify that with a new repository
 
1128
        # object.
 
1129
        new_repo = repository.bzrdir.open_repository()
 
1130
        self.assertRaises(errors.LockContention, new_repo.lock_write)
 
1131
 
 
1132
    def test_lock_write_on_locked_repo(self):
 
1133
        backing = self.get_transport()
 
1134
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1135
        repository = self.make_repository('.', format='knit')
 
1136
        repository.lock_write()
 
1137
        repository.leave_lock_in_place()
 
1138
        repository.unlock()
 
1139
        response = request.execute('')
 
1140
        self.assertEqual(
 
1141
            SmartServerResponse(('LockContention',)), response)
 
1142
 
 
1143
    def test_lock_write_on_readonly_transport(self):
 
1144
        backing = self.get_readonly_transport()
 
1145
        request = smart.repository.SmartServerRepositoryLockWrite(backing)
 
1146
        repository = self.make_repository('.', format='knit')
 
1147
        response = request.execute('')
 
1148
        self.assertFalse(response.is_successful())
 
1149
        self.assertEqual('LockFailed', response.args[0])
 
1150
 
 
1151
 
 
1152
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
 
1153
 
 
1154
    def setUp(self):
 
1155
        tests.TestCaseWithMemoryTransport.setUp(self)
 
1156
 
 
1157
    def test_unlock_on_locked_repo(self):
 
1158
        backing = self.get_transport()
 
1159
        request = smart.repository.SmartServerRepositoryUnlock(backing)
 
1160
        repository = self.make_repository('.', format='knit')
 
1161
        token = repository.lock_write()
 
1162
        repository.leave_lock_in_place()
 
1163
        repository.unlock()
 
1164
        response = request.execute('', token)
 
1165
        self.assertEqual(
 
1166
            SmartServerResponse(('ok',)), response)
 
1167
        # The repository is now unlocked.  Verify that with a new repository
 
1168
        # object.
 
1169
        new_repo = repository.bzrdir.open_repository()
 
1170
        new_repo.lock_write()
 
1171
        new_repo.unlock()
 
1172
 
 
1173
    def test_unlock_on_unlocked_repo(self):
 
1174
        backing = self.get_transport()
 
1175
        request = smart.repository.SmartServerRepositoryUnlock(backing)
 
1176
        repository = self.make_repository('.', format='knit')
 
1177
        response = request.execute('', 'some token')
 
1178
        self.assertEqual(
 
1179
            SmartServerResponse(('TokenMismatch',)), response)
 
1180
 
 
1181
 
 
1182
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
 
1183
 
 
1184
    def test_is_readonly_no(self):
 
1185
        backing = self.get_transport()
 
1186
        request = smart.request.SmartServerIsReadonly(backing)
 
1187
        response = request.execute()
 
1188
        self.assertEqual(
 
1189
            SmartServerResponse(('no',)), response)
 
1190
 
 
1191
    def test_is_readonly_yes(self):
 
1192
        backing = self.get_readonly_transport()
 
1193
        request = smart.request.SmartServerIsReadonly(backing)
 
1194
        response = request.execute()
 
1195
        self.assertEqual(
 
1196
            SmartServerResponse(('yes',)), response)
 
1197
 
 
1198
 
 
1199
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
 
1200
 
 
1201
    def test_set_false(self):
 
1202
        backing = self.get_transport()
 
1203
        repo = self.make_repository('.', shared=True)
 
1204
        repo.set_make_working_trees(True)
 
1205
        request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
 
1206
        request = request_class(backing)
 
1207
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
 
1208
            request.execute('', 'False'))
 
1209
        repo = repo.bzrdir.open_repository()
 
1210
        self.assertFalse(repo.make_working_trees())
 
1211
 
 
1212
    def test_set_true(self):
 
1213
        backing = self.get_transport()
 
1214
        repo = self.make_repository('.', shared=True)
 
1215
        repo.set_make_working_trees(False)
 
1216
        request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
 
1217
        request = request_class(backing)
 
1218
        self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
 
1219
            request.execute('', 'True'))
 
1220
        repo = repo.bzrdir.open_repository()
 
1221
        self.assertTrue(repo.make_working_trees())
 
1222
 
 
1223
 
 
1224
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
 
1225
 
 
1226
    def make_repo_needing_autopacking(self, path='.'):
 
1227
        # Make a repo in need of autopacking.
 
1228
        tree = self.make_branch_and_tree('.', format='pack-0.92')
 
1229
        repo = tree.branch.repository
 
1230
        # monkey-patch the pack collection to disable autopacking
 
1231
        repo._pack_collection._max_pack_count = lambda count: count
 
1232
        for x in range(10):
 
1233
            tree.commit('commit %s' % x)
 
1234
        self.assertEqual(10, len(repo._pack_collection.names()))
 
1235
        del repo._pack_collection._max_pack_count
 
1236
        return repo
 
1237
 
 
1238
    def test_autopack_needed(self):
 
1239
        repo = self.make_repo_needing_autopacking()
 
1240
        backing = self.get_transport()
 
1241
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1242
            backing)
 
1243
        response = request.execute('')
 
1244
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1245
        repo._pack_collection.reload_pack_names()
 
1246
        self.assertEqual(1, len(repo._pack_collection.names()))
 
1247
 
 
1248
    def test_autopack_not_needed(self):
 
1249
        tree = self.make_branch_and_tree('.', format='pack-0.92')
 
1250
        repo = tree.branch.repository
 
1251
        for x in range(9):
 
1252
            tree.commit('commit %s' % x)
 
1253
        backing = self.get_transport()
 
1254
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1255
            backing)
 
1256
        response = request.execute('')
 
1257
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1258
        repo._pack_collection.reload_pack_names()
 
1259
        self.assertEqual(9, len(repo._pack_collection.names()))
 
1260
 
 
1261
    def test_autopack_on_nonpack_format(self):
 
1262
        """A request to autopack a non-pack repo is a no-op."""
 
1263
        repo = self.make_repository('.', format='knit')
 
1264
        backing = self.get_transport()
 
1265
        request = smart.packrepository.SmartServerPackRepositoryAutopack(
 
1266
            backing)
 
1267
        response = request.execute('')
 
1268
        self.assertEqual(SmartServerResponse(('ok',)), response)
 
1269
 
 
1270
 
 
1271
class TestHandlers(tests.TestCase):
 
1272
    """Tests for the request.request_handlers object."""
 
1273
 
 
1274
    def test_all_registrations_exist(self):
 
1275
        """All registered request_handlers can be found."""
 
1276
        # If there's a typo in a register_lazy call, this loop will fail with
 
1277
        # an AttributeError.
 
1278
        for key, item in smart.request.request_handlers.iteritems():
 
1279
            pass
 
1280
 
 
1281
    def test_registered_methods(self):
 
1282
        """Test that known methods are registered to the correct object."""
 
1283
        self.assertEqual(
 
1284
            smart.request.request_handlers.get('Branch.get_config_file'),
 
1285
            smart.branch.SmartServerBranchGetConfigFile)
 
1286
        self.assertEqual(
 
1287
            smart.request.request_handlers.get('Branch.get_parent'),
 
1288
            smart.branch.SmartServerBranchGetParent)
 
1289
        self.assertEqual(
 
1290
            smart.request.request_handlers.get('Branch.get_tags_bytes'),
 
1291
            smart.branch.SmartServerBranchGetTagsBytes)
 
1292
        self.assertEqual(
 
1293
            smart.request.request_handlers.get('Branch.lock_write'),
 
1294
            smart.branch.SmartServerBranchRequestLockWrite)
 
1295
        self.assertEqual(
 
1296
            smart.request.request_handlers.get('Branch.last_revision_info'),
 
1297
            smart.branch.SmartServerBranchRequestLastRevisionInfo)
 
1298
        self.assertEqual(
 
1299
            smart.request.request_handlers.get('Branch.revision_history'),
 
1300
            smart.branch.SmartServerRequestRevisionHistory)
 
1301
        self.assertEqual(
 
1302
            smart.request.request_handlers.get('Branch.set_last_revision'),
 
1303
            smart.branch.SmartServerBranchRequestSetLastRevision)
 
1304
        self.assertEqual(
 
1305
            smart.request.request_handlers.get('Branch.set_last_revision_info'),
 
1306
            smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
 
1307
        self.assertEqual(
 
1308
            smart.request.request_handlers.get('Branch.unlock'),
 
1309
            smart.branch.SmartServerBranchRequestUnlock)
 
1310
        self.assertEqual(
 
1311
            smart.request.request_handlers.get('BzrDir.find_repository'),
 
1312
            smart.bzrdir.SmartServerRequestFindRepositoryV1)
 
1313
        self.assertEqual(
 
1314
            smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
 
1315
            smart.bzrdir.SmartServerRequestFindRepositoryV2)
 
1316
        self.assertEqual(
 
1317
            smart.request.request_handlers.get('BzrDirFormat.initialize'),
 
1318
            smart.bzrdir.SmartServerRequestInitializeBzrDir)
 
1319
        self.assertEqual(
 
1320
            smart.request.request_handlers.get('BzrDir.cloning_metadir'),
 
1321
            smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
 
1322
        self.assertEqual(
 
1323
            smart.request.request_handlers.get('BzrDir.open_branch'),
 
1324
            smart.bzrdir.SmartServerRequestOpenBranch)
 
1325
        self.assertEqual(
 
1326
            smart.request.request_handlers.get('BzrDir.open_branchV2'),
 
1327
            smart.bzrdir.SmartServerRequestOpenBranchV2)
 
1328
        self.assertEqual(
 
1329
            smart.request.request_handlers.get('PackRepository.autopack'),
 
1330
            smart.packrepository.SmartServerPackRepositoryAutopack)
 
1331
        self.assertEqual(
 
1332
            smart.request.request_handlers.get('Repository.gather_stats'),
 
1333
            smart.repository.SmartServerRepositoryGatherStats)
 
1334
        self.assertEqual(
 
1335
            smart.request.request_handlers.get('Repository.get_parent_map'),
 
1336
            smart.repository.SmartServerRepositoryGetParentMap)
 
1337
        self.assertEqual(
 
1338
            smart.request.request_handlers.get(
 
1339
                'Repository.get_revision_graph'),
 
1340
            smart.repository.SmartServerRepositoryGetRevisionGraph)
 
1341
        self.assertEqual(
 
1342
            smart.request.request_handlers.get('Repository.has_revision'),
 
1343
            smart.repository.SmartServerRequestHasRevision)
 
1344
        self.assertEqual(
 
1345
            smart.request.request_handlers.get('Repository.is_shared'),
 
1346
            smart.repository.SmartServerRepositoryIsShared)
 
1347
        self.assertEqual(
 
1348
            smart.request.request_handlers.get('Repository.lock_write'),
 
1349
            smart.repository.SmartServerRepositoryLockWrite)
 
1350
        self.assertEqual(
 
1351
            smart.request.request_handlers.get('Repository.get_stream'),
 
1352
            smart.repository.SmartServerRepositoryGetStream)
 
1353
        self.assertEqual(
 
1354
            smart.request.request_handlers.get('Repository.tarball'),
 
1355
            smart.repository.SmartServerRepositoryTarball)
 
1356
        self.assertEqual(
 
1357
            smart.request.request_handlers.get('Repository.unlock'),
 
1358
            smart.repository.SmartServerRepositoryUnlock)
 
1359
        self.assertEqual(
 
1360
            smart.request.request_handlers.get('Transport.is_readonly'),
 
1361
            smart.request.SmartServerIsReadonly)