1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerBzrDirRequestGetConfigFile(
300
tests.TestCaseWithMemoryTransport):
301
"""Tests for BzrDir.get_config_file."""
303
def test_present(self):
304
backing = self.get_transport()
305
dir = self.make_bzrdir('.')
306
dir.get_config().set_default_stack_on("/")
307
local_result = dir._get_config()._get_config_file().read()
308
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
309
request = request_class(backing)
310
expected = SuccessfulSmartServerResponse((), local_result)
311
self.assertEqual(expected, request.execute(''))
313
def test_missing(self):
314
backing = self.get_transport()
315
dir = self.make_bzrdir('.')
316
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
317
request = request_class(backing)
318
expected = SuccessfulSmartServerResponse((), '')
319
self.assertEqual(expected, request.execute(''))
322
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
324
def test_empty_dir(self):
325
"""Initializing an empty dir should succeed and do it."""
326
backing = self.get_transport()
327
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
328
self.assertEqual(SmartServerResponse(('ok', )),
330
made_dir = bzrdir.BzrDir.open_from_transport(backing)
331
# no branch, tree or repository is expected with the current
333
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
334
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
335
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
337
def test_missing_dir(self):
338
"""Initializing a missing directory should fail like the bzrdir api."""
339
backing = self.get_transport()
340
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
341
self.assertRaises(errors.NoSuchFile,
342
request.execute, 'subdir')
344
def test_initialized_dir(self):
345
"""Initializing an extant bzrdir should fail like the bzrdir api."""
346
backing = self.get_transport()
347
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
348
self.make_bzrdir('subdir')
349
self.assertRaises(errors.FileExists,
350
request.execute, 'subdir')
353
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
355
def test_no_branch(self):
356
"""When there is no branch, ('nobranch', ) is returned."""
357
backing = self.get_transport()
358
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
359
self.make_bzrdir('.')
360
self.assertEqual(SmartServerResponse(('nobranch', )),
363
def test_branch(self):
364
"""When there is a branch, 'ok' is returned."""
365
backing = self.get_transport()
366
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
367
self.make_branch('.')
368
self.assertEqual(SmartServerResponse(('ok', '')),
371
def test_branch_reference(self):
372
"""When there is a branch reference, the reference URL is returned."""
373
backing = self.get_transport()
374
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
375
branch = self.make_branch('branch')
376
checkout = branch.create_checkout('reference',lightweight=True)
377
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
378
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
379
self.assertEqual(SmartServerResponse(('ok', reference_url)),
380
request.execute('reference'))
383
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
385
def test_no_branch(self):
386
"""When there is no branch, ('nobranch', ) is returned."""
387
backing = self.get_transport()
388
self.make_bzrdir('.')
389
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
390
self.assertEqual(SmartServerResponse(('nobranch', )),
393
def test_branch(self):
394
"""When there is a branch, 'ok' is returned."""
395
backing = self.get_transport()
396
expected = self.make_branch('.')._format.network_name()
397
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
398
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
401
def test_branch_reference(self):
402
"""When there is a branch reference, the reference URL is returned."""
403
backing = self.get_transport()
404
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
405
branch = self.make_branch('branch')
406
checkout = branch.create_checkout('reference',lightweight=True)
407
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
408
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
409
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
410
request.execute('reference'))
412
def test_stacked_branch(self):
413
"""Opening a stacked branch does not open the stacked-on branch."""
414
trunk = self.make_branch('trunk')
415
feature = self.make_branch('feature', format='1.9')
416
feature.set_stacked_on_url(trunk.base)
418
Branch.hooks.install_named_hook('open', opened_branches.append, None)
419
backing = self.get_transport()
420
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
423
response = request.execute('feature')
425
request.teardown_jail()
426
expected_format = feature._format.network_name()
428
SuccessfulSmartServerResponse(('branch', expected_format)),
430
self.assertLength(1, opened_branches)
433
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
435
def test_empty(self):
436
"""For an empty branch, the body is empty."""
437
backing = self.get_transport()
438
request = smart.branch.SmartServerRequestRevisionHistory(backing)
439
self.make_branch('.')
440
self.assertEqual(SmartServerResponse(('ok', ), ''),
443
def test_not_empty(self):
444
"""For a non-empty branch, the body is empty."""
445
backing = self.get_transport()
446
request = smart.branch.SmartServerRequestRevisionHistory(backing)
447
tree = self.make_branch_and_memory_tree('.')
450
r1 = tree.commit('1st commit')
451
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
454
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
458
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
460
def test_no_branch(self):
461
"""When there is a bzrdir and no branch, NotBranchError is raised."""
462
backing = self.get_transport()
463
request = smart.branch.SmartServerBranchRequest(backing)
464
self.make_bzrdir('.')
465
self.assertRaises(errors.NotBranchError,
468
def test_branch_reference(self):
469
"""When there is a branch reference, NotBranchError is raised."""
470
backing = self.get_transport()
471
request = smart.branch.SmartServerBranchRequest(backing)
472
branch = self.make_branch('branch')
473
checkout = branch.create_checkout('reference',lightweight=True)
474
self.assertRaises(errors.NotBranchError,
475
request.execute, 'checkout')
478
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
480
def test_empty(self):
481
"""For an empty branch, the result is ('ok', '0', 'null:')."""
482
backing = self.get_transport()
483
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
484
self.make_branch('.')
485
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
488
def test_not_empty(self):
489
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
490
backing = self.get_transport()
491
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
492
tree = self.make_branch_and_memory_tree('.')
495
rev_id_utf8 = u'\xc8'.encode('utf-8')
496
r1 = tree.commit('1st commit')
497
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
500
SmartServerResponse(('ok', '2', rev_id_utf8)),
504
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
506
def test_default(self):
507
"""With no file, we get empty content."""
508
backing = self.get_transport()
509
request = smart.branch.SmartServerBranchGetConfigFile(backing)
510
branch = self.make_branch('.')
511
# there should be no file by default
513
self.assertEqual(SmartServerResponse(('ok', ), content),
516
def test_with_content(self):
517
# SmartServerBranchGetConfigFile should return the content from
518
# branch.control_files.get('branch.conf') for now - in the future it may
519
# perform more complex processing.
520
backing = self.get_transport()
521
request = smart.branch.SmartServerBranchGetConfigFile(backing)
522
branch = self.make_branch('.')
523
branch._transport.put_bytes('branch.conf', 'foo bar baz')
524
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
528
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
530
def get_lock_tokens(self, branch):
531
branch_token = branch.lock_write()
532
repo_token = branch.repository.lock_write()
533
branch.repository.unlock()
534
return branch_token, repo_token
537
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
539
def test_value_name(self):
540
branch = self.make_branch('.')
541
request = smart.branch.SmartServerBranchRequestSetConfigOption(
542
branch.bzrdir.root_transport)
543
branch_token, repo_token = self.get_lock_tokens(branch)
544
config = branch._get_config()
545
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
547
self.assertEqual(SuccessfulSmartServerResponse(()), result)
548
self.assertEqual('bar', config.get_option('foo'))
550
def test_value_name_section(self):
551
branch = self.make_branch('.')
552
request = smart.branch.SmartServerBranchRequestSetConfigOption(
553
branch.bzrdir.root_transport)
554
branch_token, repo_token = self.get_lock_tokens(branch)
555
config = branch._get_config()
556
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
558
self.assertEqual(SuccessfulSmartServerResponse(()), result)
559
self.assertEqual('bar', config.get_option('foo', 'gam'))
562
class SetLastRevisionTestBase(TestLockedBranch):
563
"""Base test case for verbs that implement set_last_revision."""
566
tests.TestCaseWithMemoryTransport.setUp(self)
567
backing_transport = self.get_transport()
568
self.request = self.request_class(backing_transport)
569
self.tree = self.make_branch_and_memory_tree('.')
571
def lock_branch(self):
572
return self.get_lock_tokens(self.tree.branch)
574
def unlock_branch(self):
575
self.tree.branch.unlock()
577
def set_last_revision(self, revision_id, revno):
578
branch_token, repo_token = self.lock_branch()
579
response = self._set_last_revision(
580
revision_id, revno, branch_token, repo_token)
584
def assertRequestSucceeds(self, revision_id, revno):
585
response = self.set_last_revision(revision_id, revno)
586
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
589
class TestSetLastRevisionVerbMixin(object):
590
"""Mixin test case for verbs that implement set_last_revision."""
592
def test_set_null_to_null(self):
593
"""An empty branch can have its last revision set to 'null:'."""
594
self.assertRequestSucceeds('null:', 0)
596
def test_NoSuchRevision(self):
597
"""If the revision_id is not present, the verb returns NoSuchRevision.
599
revision_id = 'non-existent revision'
601
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
602
self.set_last_revision(revision_id, 1))
604
def make_tree_with_two_commits(self):
605
self.tree.lock_write()
607
rev_id_utf8 = u'\xc8'.encode('utf-8')
608
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
609
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
612
def test_branch_last_revision_info_is_updated(self):
613
"""A branch's tip can be set to a revision that is present in its
616
# Make a branch with an empty revision history, but two revisions in
618
self.make_tree_with_two_commits()
619
rev_id_utf8 = u'\xc8'.encode('utf-8')
620
self.tree.branch.set_revision_history([])
622
(0, 'null:'), self.tree.branch.last_revision_info())
623
# We can update the branch to a revision that is present in the
625
self.assertRequestSucceeds(rev_id_utf8, 1)
627
(1, rev_id_utf8), self.tree.branch.last_revision_info())
629
def test_branch_last_revision_info_rewind(self):
630
"""A branch's tip can be set to a revision that is an ancestor of the
633
self.make_tree_with_two_commits()
634
rev_id_utf8 = u'\xc8'.encode('utf-8')
636
(2, 'rev-2'), self.tree.branch.last_revision_info())
637
self.assertRequestSucceeds(rev_id_utf8, 1)
639
(1, rev_id_utf8), self.tree.branch.last_revision_info())
641
def test_TipChangeRejected(self):
642
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
643
returns TipChangeRejected.
645
rejection_message = u'rejection message\N{INTERROBANG}'
646
def hook_that_rejects(params):
647
raise errors.TipChangeRejected(rejection_message)
648
Branch.hooks.install_named_hook(
649
'pre_change_branch_tip', hook_that_rejects, None)
651
FailedSmartServerResponse(
652
('TipChangeRejected', rejection_message.encode('utf-8'))),
653
self.set_last_revision('null:', 0))
656
class TestSmartServerBranchRequestSetLastRevision(
657
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
658
"""Tests for Branch.set_last_revision verb."""
660
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
662
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
663
return self.request.execute(
664
'', branch_token, repo_token, revision_id)
667
class TestSmartServerBranchRequestSetLastRevisionInfo(
668
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
669
"""Tests for Branch.set_last_revision_info verb."""
671
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
673
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
674
return self.request.execute(
675
'', branch_token, repo_token, revno, revision_id)
677
def test_NoSuchRevision(self):
678
"""Branch.set_last_revision_info does not have to return
679
NoSuchRevision if the revision_id is absent.
681
raise tests.TestNotApplicable()
684
class TestSmartServerBranchRequestSetLastRevisionEx(
685
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
686
"""Tests for Branch.set_last_revision_ex verb."""
688
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
690
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
691
return self.request.execute(
692
'', branch_token, repo_token, revision_id, 0, 0)
694
def assertRequestSucceeds(self, revision_id, revno):
695
response = self.set_last_revision(revision_id, revno)
697
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
700
def test_branch_last_revision_info_rewind(self):
701
"""A branch's tip can be set to a revision that is an ancestor of the
702
current tip, but only if allow_overwrite_descendant is passed.
704
self.make_tree_with_two_commits()
705
rev_id_utf8 = u'\xc8'.encode('utf-8')
707
(2, 'rev-2'), self.tree.branch.last_revision_info())
708
# If allow_overwrite_descendant flag is 0, then trying to set the tip
709
# to an older revision ID has no effect.
710
branch_token, repo_token = self.lock_branch()
711
response = self.request.execute(
712
'', branch_token, repo_token, rev_id_utf8, 0, 0)
714
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
717
(2, 'rev-2'), self.tree.branch.last_revision_info())
719
# If allow_overwrite_descendant flag is 1, then setting the tip to an
721
response = self.request.execute(
722
'', branch_token, repo_token, rev_id_utf8, 0, 1)
724
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
728
(1, rev_id_utf8), self.tree.branch.last_revision_info())
730
def make_branch_with_divergent_history(self):
731
"""Make a branch with divergent history in its repo.
733
The branch's tip will be 'child-2', and the repo will also contain
734
'child-1', which diverges from a common base revision.
736
self.tree.lock_write()
738
r1 = self.tree.commit('1st commit')
739
revno_1, revid_1 = self.tree.branch.last_revision_info()
740
r2 = self.tree.commit('2nd commit', rev_id='child-1')
741
# Undo the second commit
742
self.tree.branch.set_last_revision_info(revno_1, revid_1)
743
self.tree.set_parent_ids([revid_1])
744
# Make a new second commit, child-2. child-2 has diverged from
746
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
749
def test_not_allow_diverged(self):
750
"""If allow_diverged is not passed, then setting a divergent history
751
returns a Diverged error.
753
self.make_branch_with_divergent_history()
755
FailedSmartServerResponse(('Diverged',)),
756
self.set_last_revision('child-1', 2))
757
# The branch tip was not changed.
758
self.assertEqual('child-2', self.tree.branch.last_revision())
760
def test_allow_diverged(self):
761
"""If allow_diverged is passed, then setting a divergent history
764
self.make_branch_with_divergent_history()
765
branch_token, repo_token = self.lock_branch()
766
response = self.request.execute(
767
'', branch_token, repo_token, 'child-1', 1, 0)
769
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
772
# The branch tip was changed.
773
self.assertEqual('child-1', self.tree.branch.last_revision())
776
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
778
def test_get_parent_none(self):
779
base_branch = self.make_branch('base')
780
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
781
response = request.execute('base')
783
SuccessfulSmartServerResponse(('',)), response)
785
def test_get_parent_something(self):
786
base_branch = self.make_branch('base')
787
base_branch.set_parent(self.get_url('foo'))
788
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
789
response = request.execute('base')
791
SuccessfulSmartServerResponse(("../foo",)),
795
class TestSmartServerBranchRequestSetParent(tests.TestCaseWithMemoryTransport):
797
def test_set_parent_none(self):
798
branch = self.make_branch('base', format="1.9")
800
branch._set_parent_location('foo')
802
request = smart.branch.SmartServerBranchRequestSetParentLocation(
803
self.get_transport())
804
branch_token = branch.lock_write()
805
repo_token = branch.repository.lock_write()
807
response = request.execute('base', branch_token, repo_token, '')
809
branch.repository.unlock()
811
self.assertEqual(SuccessfulSmartServerResponse(()), response)
812
self.assertEqual(None, branch.get_parent())
814
def test_set_parent_something(self):
815
branch = self.make_branch('base', format="1.9")
816
request = smart.branch.SmartServerBranchRequestSetParentLocation(
817
self.get_transport())
818
branch_token = branch.lock_write()
819
repo_token = branch.repository.lock_write()
821
response = request.execute('base', branch_token, repo_token,
824
branch.repository.unlock()
826
self.assertEqual(SuccessfulSmartServerResponse(()), response)
827
self.assertEqual('http://bar/', branch.get_parent())
830
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
831
# Only called when the branch format and tags match [yay factory
832
# methods] so only need to test straight forward cases.
834
def test_get_bytes(self):
835
base_branch = self.make_branch('base')
836
request = smart.branch.SmartServerBranchGetTagsBytes(
837
self.get_transport())
838
response = request.execute('base')
840
SuccessfulSmartServerResponse(('',)), response)
843
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
845
def test_get_stacked_on_url(self):
846
base_branch = self.make_branch('base', format='1.6')
847
stacked_branch = self.make_branch('stacked', format='1.6')
848
# typically should be relative
849
stacked_branch.set_stacked_on_url('../base')
850
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
851
self.get_transport())
852
response = request.execute('stacked')
854
SmartServerResponse(('ok', '../base')),
858
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
861
tests.TestCaseWithMemoryTransport.setUp(self)
863
def test_lock_write_on_unlocked_branch(self):
864
backing = self.get_transport()
865
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
866
branch = self.make_branch('.', format='knit')
867
repository = branch.repository
868
response = request.execute('')
869
branch_nonce = branch.control_files._lock.peek().get('nonce')
870
repository_nonce = repository.control_files._lock.peek().get('nonce')
872
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
874
# The branch (and associated repository) is now locked. Verify that
875
# with a new branch object.
876
new_branch = repository.bzrdir.open_branch()
877
self.assertRaises(errors.LockContention, new_branch.lock_write)
879
def test_lock_write_on_locked_branch(self):
880
backing = self.get_transport()
881
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
882
branch = self.make_branch('.')
884
branch.leave_lock_in_place()
886
response = request.execute('')
888
SmartServerResponse(('LockContention',)), response)
890
def test_lock_write_with_tokens_on_locked_branch(self):
891
backing = self.get_transport()
892
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
893
branch = self.make_branch('.', format='knit')
894
branch_token = branch.lock_write()
895
repo_token = branch.repository.lock_write()
896
branch.repository.unlock()
897
branch.leave_lock_in_place()
898
branch.repository.leave_lock_in_place()
900
response = request.execute('',
901
branch_token, repo_token)
903
SmartServerResponse(('ok', branch_token, repo_token)), response)
905
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
906
backing = self.get_transport()
907
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
908
branch = self.make_branch('.', format='knit')
909
branch_token = branch.lock_write()
910
repo_token = branch.repository.lock_write()
911
branch.repository.unlock()
912
branch.leave_lock_in_place()
913
branch.repository.leave_lock_in_place()
915
response = request.execute('',
916
branch_token+'xxx', repo_token)
918
SmartServerResponse(('TokenMismatch',)), response)
920
def test_lock_write_on_locked_repo(self):
921
backing = self.get_transport()
922
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
923
branch = self.make_branch('.', format='knit')
924
branch.repository.lock_write()
925
branch.repository.leave_lock_in_place()
926
branch.repository.unlock()
927
response = request.execute('')
929
SmartServerResponse(('LockContention',)), response)
931
def test_lock_write_on_readonly_transport(self):
932
backing = self.get_readonly_transport()
933
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
934
branch = self.make_branch('.')
935
root = self.get_transport().clone('/')
936
path = urlutils.relative_url(root.base, self.get_transport().base)
937
response = request.execute(path)
938
error_name, lock_str, why_str = response.args
939
self.assertFalse(response.is_successful())
940
self.assertEqual('LockFailed', error_name)
943
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
946
tests.TestCaseWithMemoryTransport.setUp(self)
948
def test_unlock_on_locked_branch_and_repo(self):
949
backing = self.get_transport()
950
request = smart.branch.SmartServerBranchRequestUnlock(backing)
951
branch = self.make_branch('.', format='knit')
953
branch_token = branch.lock_write()
954
repo_token = branch.repository.lock_write()
955
branch.repository.unlock()
956
# Unlock the branch (and repo) object, leaving the physical locks
958
branch.leave_lock_in_place()
959
branch.repository.leave_lock_in_place()
961
response = request.execute('',
962
branch_token, repo_token)
964
SmartServerResponse(('ok',)), response)
965
# The branch is now unlocked. Verify that with a new branch
967
new_branch = branch.bzrdir.open_branch()
968
new_branch.lock_write()
971
def test_unlock_on_unlocked_branch_unlocked_repo(self):
972
backing = self.get_transport()
973
request = smart.branch.SmartServerBranchRequestUnlock(backing)
974
branch = self.make_branch('.', format='knit')
975
response = request.execute(
976
'', 'branch token', 'repo token')
978
SmartServerResponse(('TokenMismatch',)), response)
980
def test_unlock_on_unlocked_branch_locked_repo(self):
981
backing = self.get_transport()
982
request = smart.branch.SmartServerBranchRequestUnlock(backing)
983
branch = self.make_branch('.', format='knit')
984
# Lock the repository.
985
repo_token = branch.repository.lock_write()
986
branch.repository.leave_lock_in_place()
987
branch.repository.unlock()
988
# Issue branch lock_write request on the unlocked branch (with locked
990
response = request.execute(
991
'', 'branch token', repo_token)
993
SmartServerResponse(('TokenMismatch',)), response)
996
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
998
def test_no_repository(self):
999
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
1000
# we test this using a shared repository above the named path,
1001
# thus checking the right search logic is used - that is, that
1002
# its the exact path being looked at and the server is not
1004
backing = self.get_transport()
1005
request = smart.repository.SmartServerRepositoryRequest(backing)
1006
self.make_repository('.', shared=True)
1007
self.make_bzrdir('subdir')
1008
self.assertRaises(errors.NoRepositoryPresent,
1009
request.execute, 'subdir')
1012
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
1014
def test_trivial_bzipped(self):
1015
# This tests that the wire encoding is actually bzipped
1016
backing = self.get_transport()
1017
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1018
tree = self.make_branch_and_memory_tree('.')
1020
self.assertEqual(None,
1021
request.execute('', 'missing-id'))
1022
# Note that it returns a body that is bzipped.
1024
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
1025
request.do_body('\n\n0\n'))
1027
def test_trivial_include_missing(self):
1028
backing = self.get_transport()
1029
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1030
tree = self.make_branch_and_memory_tree('.')
1032
self.assertEqual(None,
1033
request.execute('', 'missing-id', 'include-missing:'))
1035
SuccessfulSmartServerResponse(('ok', ),
1036
bz2.compress('missing:missing-id')),
1037
request.do_body('\n\n0\n'))
1040
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1042
def test_none_argument(self):
1043
backing = self.get_transport()
1044
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1045
tree = self.make_branch_and_memory_tree('.')
1048
r1 = tree.commit('1st commit')
1049
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1052
# the lines of revision_id->revision_parent_list has no guaranteed
1053
# order coming out of a dict, so sort both our test and response
1054
lines = sorted([' '.join([r2, r1]), r1])
1055
response = request.execute('', '')
1056
response.body = '\n'.join(sorted(response.body.split('\n')))
1059
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1061
def test_specific_revision_argument(self):
1062
backing = self.get_transport()
1063
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1064
tree = self.make_branch_and_memory_tree('.')
1067
rev_id_utf8 = u'\xc9'.encode('utf-8')
1068
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1069
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1072
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1073
request.execute('', rev_id_utf8))
1075
def test_no_such_revision(self):
1076
backing = self.get_transport()
1077
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1078
tree = self.make_branch_and_memory_tree('.')
1081
r1 = tree.commit('1st commit')
1084
# Note that it still returns body (of zero bytes).
1086
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1087
request.execute('', 'missingrevision'))
1090
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1092
def make_two_commit_repo(self):
1093
tree = self.make_branch_and_memory_tree('.')
1096
r1 = tree.commit('1st commit')
1097
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1099
repo = tree.branch.repository
1102
def test_ancestry_of(self):
1103
"""The search argument may be a 'ancestry-of' some heads'."""
1104
backing = self.get_transport()
1105
request = smart.repository.SmartServerRepositoryGetStream(backing)
1106
repo, r1, r2 = self.make_two_commit_repo()
1107
fetch_spec = ['ancestry-of', r2]
1108
lines = '\n'.join(fetch_spec)
1109
request.execute('', repo._format.network_name())
1110
response = request.do_body(lines)
1111
self.assertEqual(('ok',), response.args)
1112
stream_bytes = ''.join(response.body_stream)
1113
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1115
def test_search(self):
1116
"""The search argument may be a 'search' of some explicit keys."""
1117
backing = self.get_transport()
1118
request = smart.repository.SmartServerRepositoryGetStream(backing)
1119
repo, r1, r2 = self.make_two_commit_repo()
1120
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1121
lines = '\n'.join(fetch_spec)
1122
request.execute('', repo._format.network_name())
1123
response = request.do_body(lines)
1124
self.assertEqual(('ok',), response.args)
1125
stream_bytes = ''.join(response.body_stream)
1126
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1129
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1131
def test_missing_revision(self):
1132
"""For a missing revision, ('no', ) is returned."""
1133
backing = self.get_transport()
1134
request = smart.repository.SmartServerRequestHasRevision(backing)
1135
self.make_repository('.')
1136
self.assertEqual(SmartServerResponse(('no', )),
1137
request.execute('', 'revid'))
1139
def test_present_revision(self):
1140
"""For a present revision, ('yes', ) is returned."""
1141
backing = self.get_transport()
1142
request = smart.repository.SmartServerRequestHasRevision(backing)
1143
tree = self.make_branch_and_memory_tree('.')
1146
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1147
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1149
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1150
self.assertEqual(SmartServerResponse(('yes', )),
1151
request.execute('', rev_id_utf8))
1154
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1156
def test_empty_revid(self):
1157
"""With an empty revid, we get only size an number and revisions"""
1158
backing = self.get_transport()
1159
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1160
repository = self.make_repository('.')
1161
stats = repository.gather_stats()
1162
expected_body = 'revisions: 0\n'
1163
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1164
request.execute('', '', 'no'))
1166
def test_revid_with_committers(self):
1167
"""For a revid we get more infos."""
1168
backing = self.get_transport()
1169
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1170
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1171
tree = self.make_branch_and_memory_tree('.')
1174
# Let's build a predictable result
1175
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1176
tree.commit('a commit', timestamp=654321.4, timezone=0,
1180
stats = tree.branch.repository.gather_stats()
1181
expected_body = ('firstrev: 123456.200 3600\n'
1182
'latestrev: 654321.400 0\n'
1184
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1188
def test_not_empty_repository_with_committers(self):
1189
"""For a revid and requesting committers we get the whole thing."""
1190
backing = self.get_transport()
1191
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1192
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1193
tree = self.make_branch_and_memory_tree('.')
1196
# Let's build a predictable result
1197
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1199
tree.commit('a commit', timestamp=654321.4, timezone=0,
1200
committer='bar', rev_id=rev_id_utf8)
1202
stats = tree.branch.repository.gather_stats()
1204
expected_body = ('committers: 2\n'
1205
'firstrev: 123456.200 3600\n'
1206
'latestrev: 654321.400 0\n'
1208
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1210
rev_id_utf8, 'yes'))
1213
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1215
def test_is_shared(self):
1216
"""For a shared repository, ('yes', ) is returned."""
1217
backing = self.get_transport()
1218
request = smart.repository.SmartServerRepositoryIsShared(backing)
1219
self.make_repository('.', shared=True)
1220
self.assertEqual(SmartServerResponse(('yes', )),
1221
request.execute('', ))
1223
def test_is_not_shared(self):
1224
"""For a shared repository, ('no', ) is returned."""
1225
backing = self.get_transport()
1226
request = smart.repository.SmartServerRepositoryIsShared(backing)
1227
self.make_repository('.', shared=False)
1228
self.assertEqual(SmartServerResponse(('no', )),
1229
request.execute('', ))
1232
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1234
def test_lock_write_on_unlocked_repo(self):
1235
backing = self.get_transport()
1236
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1237
repository = self.make_repository('.', format='knit')
1238
response = request.execute('')
1239
nonce = repository.control_files._lock.peek().get('nonce')
1240
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1241
# The repository is now locked. Verify that with a new repository
1243
new_repo = repository.bzrdir.open_repository()
1244
self.assertRaises(errors.LockContention, new_repo.lock_write)
1246
def test_lock_write_on_locked_repo(self):
1247
backing = self.get_transport()
1248
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1249
repository = self.make_repository('.', format='knit')
1250
repository.lock_write()
1251
repository.leave_lock_in_place()
1253
response = request.execute('')
1255
SmartServerResponse(('LockContention',)), response)
1257
def test_lock_write_on_readonly_transport(self):
1258
backing = self.get_readonly_transport()
1259
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1260
repository = self.make_repository('.', format='knit')
1261
response = request.execute('')
1262
self.assertFalse(response.is_successful())
1263
self.assertEqual('LockFailed', response.args[0])
1266
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1268
def make_empty_byte_stream(self, repo):
1269
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1270
return ''.join(byte_stream)
1273
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1275
def test_insert_stream_empty(self):
1276
backing = self.get_transport()
1277
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1278
repository = self.make_repository('.')
1279
response = request.execute('', '')
1280
self.assertEqual(None, response)
1281
response = request.do_chunk(self.make_empty_byte_stream(repository))
1282
self.assertEqual(None, response)
1283
response = request.do_end()
1284
self.assertEqual(SmartServerResponse(('ok', )), response)
1287
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1289
def test_insert_stream_empty(self):
1290
backing = self.get_transport()
1291
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1293
repository = self.make_repository('.', format='knit')
1294
lock_token = repository.lock_write()
1295
response = request.execute('', '', lock_token)
1296
self.assertEqual(None, response)
1297
response = request.do_chunk(self.make_empty_byte_stream(repository))
1298
self.assertEqual(None, response)
1299
response = request.do_end()
1300
self.assertEqual(SmartServerResponse(('ok', )), response)
1303
def test_insert_stream_with_wrong_lock_token(self):
1304
backing = self.get_transport()
1305
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1307
repository = self.make_repository('.', format='knit')
1308
lock_token = repository.lock_write()
1310
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1314
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1317
tests.TestCaseWithMemoryTransport.setUp(self)
1319
def test_unlock_on_locked_repo(self):
1320
backing = self.get_transport()
1321
request = smart.repository.SmartServerRepositoryUnlock(backing)
1322
repository = self.make_repository('.', format='knit')
1323
token = repository.lock_write()
1324
repository.leave_lock_in_place()
1326
response = request.execute('', token)
1328
SmartServerResponse(('ok',)), response)
1329
# The repository is now unlocked. Verify that with a new repository
1331
new_repo = repository.bzrdir.open_repository()
1332
new_repo.lock_write()
1335
def test_unlock_on_unlocked_repo(self):
1336
backing = self.get_transport()
1337
request = smart.repository.SmartServerRepositoryUnlock(backing)
1338
repository = self.make_repository('.', format='knit')
1339
response = request.execute('', 'some token')
1341
SmartServerResponse(('TokenMismatch',)), response)
1344
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1346
def test_is_readonly_no(self):
1347
backing = self.get_transport()
1348
request = smart.request.SmartServerIsReadonly(backing)
1349
response = request.execute()
1351
SmartServerResponse(('no',)), response)
1353
def test_is_readonly_yes(self):
1354
backing = self.get_readonly_transport()
1355
request = smart.request.SmartServerIsReadonly(backing)
1356
response = request.execute()
1358
SmartServerResponse(('yes',)), response)
1361
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1363
def test_set_false(self):
1364
backing = self.get_transport()
1365
repo = self.make_repository('.', shared=True)
1366
repo.set_make_working_trees(True)
1367
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1368
request = request_class(backing)
1369
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1370
request.execute('', 'False'))
1371
repo = repo.bzrdir.open_repository()
1372
self.assertFalse(repo.make_working_trees())
1374
def test_set_true(self):
1375
backing = self.get_transport()
1376
repo = self.make_repository('.', shared=True)
1377
repo.set_make_working_trees(False)
1378
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1379
request = request_class(backing)
1380
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1381
request.execute('', 'True'))
1382
repo = repo.bzrdir.open_repository()
1383
self.assertTrue(repo.make_working_trees())
1386
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1388
def make_repo_needing_autopacking(self, path='.'):
1389
# Make a repo in need of autopacking.
1390
tree = self.make_branch_and_tree('.', format='pack-0.92')
1391
repo = tree.branch.repository
1392
# monkey-patch the pack collection to disable autopacking
1393
repo._pack_collection._max_pack_count = lambda count: count
1395
tree.commit('commit %s' % x)
1396
self.assertEqual(10, len(repo._pack_collection.names()))
1397
del repo._pack_collection._max_pack_count
1400
def test_autopack_needed(self):
1401
repo = self.make_repo_needing_autopacking()
1403
self.addCleanup(repo.unlock)
1404
backing = self.get_transport()
1405
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1407
response = request.execute('')
1408
self.assertEqual(SmartServerResponse(('ok',)), response)
1409
repo._pack_collection.reload_pack_names()
1410
self.assertEqual(1, len(repo._pack_collection.names()))
1412
def test_autopack_not_needed(self):
1413
tree = self.make_branch_and_tree('.', format='pack-0.92')
1414
repo = tree.branch.repository
1416
self.addCleanup(repo.unlock)
1418
tree.commit('commit %s' % x)
1419
backing = self.get_transport()
1420
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1422
response = request.execute('')
1423
self.assertEqual(SmartServerResponse(('ok',)), response)
1424
repo._pack_collection.reload_pack_names()
1425
self.assertEqual(9, len(repo._pack_collection.names()))
1427
def test_autopack_on_nonpack_format(self):
1428
"""A request to autopack a non-pack repo is a no-op."""
1429
repo = self.make_repository('.', format='knit')
1430
backing = self.get_transport()
1431
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1433
response = request.execute('')
1434
self.assertEqual(SmartServerResponse(('ok',)), response)
1437
class TestHandlers(tests.TestCase):
1438
"""Tests for the request.request_handlers object."""
1440
def test_all_registrations_exist(self):
1441
"""All registered request_handlers can be found."""
1442
# If there's a typo in a register_lazy call, this loop will fail with
1443
# an AttributeError.
1444
for key, item in smart.request.request_handlers.iteritems():
1447
def assertHandlerEqual(self, verb, handler):
1448
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1450
def test_registered_methods(self):
1451
"""Test that known methods are registered to the correct object."""
1452
self.assertHandlerEqual('Branch.get_config_file',
1453
smart.branch.SmartServerBranchGetConfigFile)
1454
self.assertHandlerEqual('Branch.get_parent',
1455
smart.branch.SmartServerBranchGetParent)
1456
self.assertHandlerEqual('Branch.get_tags_bytes',
1457
smart.branch.SmartServerBranchGetTagsBytes)
1458
self.assertHandlerEqual('Branch.lock_write',
1459
smart.branch.SmartServerBranchRequestLockWrite)
1460
self.assertHandlerEqual('Branch.last_revision_info',
1461
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1462
self.assertHandlerEqual('Branch.revision_history',
1463
smart.branch.SmartServerRequestRevisionHistory)
1464
self.assertHandlerEqual('Branch.set_config_option',
1465
smart.branch.SmartServerBranchRequestSetConfigOption)
1466
self.assertHandlerEqual('Branch.set_last_revision',
1467
smart.branch.SmartServerBranchRequestSetLastRevision)
1468
self.assertHandlerEqual('Branch.set_last_revision_info',
1469
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1470
self.assertHandlerEqual('Branch.set_last_revision_ex',
1471
smart.branch.SmartServerBranchRequestSetLastRevisionEx)
1472
self.assertHandlerEqual('Branch.set_parent_location',
1473
smart.branch.SmartServerBranchRequestSetParentLocation)
1474
self.assertHandlerEqual('Branch.unlock',
1475
smart.branch.SmartServerBranchRequestUnlock)
1476
self.assertHandlerEqual('BzrDir.find_repository',
1477
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1478
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1479
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1480
self.assertHandlerEqual('BzrDirFormat.initialize',
1481
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1482
self.assertHandlerEqual('BzrDir.cloning_metadir',
1483
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1484
self.assertHandlerEqual('BzrDir.get_config_file',
1485
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1486
self.assertHandlerEqual('BzrDir.open_branch',
1487
smart.bzrdir.SmartServerRequestOpenBranch)
1488
self.assertHandlerEqual('BzrDir.open_branchV2',
1489
smart.bzrdir.SmartServerRequestOpenBranchV2)
1490
self.assertHandlerEqual('PackRepository.autopack',
1491
smart.packrepository.SmartServerPackRepositoryAutopack)
1492
self.assertHandlerEqual('Repository.gather_stats',
1493
smart.repository.SmartServerRepositoryGatherStats)
1494
self.assertHandlerEqual('Repository.get_parent_map',
1495
smart.repository.SmartServerRepositoryGetParentMap)
1496
self.assertHandlerEqual('Repository.get_revision_graph',
1497
smart.repository.SmartServerRepositoryGetRevisionGraph)
1498
self.assertHandlerEqual('Repository.get_stream',
1499
smart.repository.SmartServerRepositoryGetStream)
1500
self.assertHandlerEqual('Repository.has_revision',
1501
smart.repository.SmartServerRequestHasRevision)
1502
self.assertHandlerEqual('Repository.insert_stream',
1503
smart.repository.SmartServerRepositoryInsertStream)
1504
self.assertHandlerEqual('Repository.insert_stream_locked',
1505
smart.repository.SmartServerRepositoryInsertStreamLocked)
1506
self.assertHandlerEqual('Repository.is_shared',
1507
smart.repository.SmartServerRepositoryIsShared)
1508
self.assertHandlerEqual('Repository.lock_write',
1509
smart.repository.SmartServerRepositoryLockWrite)
1510
self.assertHandlerEqual('Repository.tarball',
1511
smart.repository.SmartServerRepositoryTarball)
1512
self.assertHandlerEqual('Repository.unlock',
1513
smart.repository.SmartServerRepositoryUnlock)
1514
self.assertHandlerEqual('Transport.is_readonly',
1515
smart.request.SmartServerIsReadonly)