1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
301
def test_empty_dir(self):
302
"""Initializing an empty dir should succeed and do it."""
303
backing = self.get_transport()
304
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
305
self.assertEqual(SmartServerResponse(('ok', )),
307
made_dir = bzrdir.BzrDir.open_from_transport(backing)
308
# no branch, tree or repository is expected with the current
310
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
311
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
312
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
314
def test_missing_dir(self):
315
"""Initializing a missing directory should fail like the bzrdir api."""
316
backing = self.get_transport()
317
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
318
self.assertRaises(errors.NoSuchFile,
319
request.execute, 'subdir')
321
def test_initialized_dir(self):
322
"""Initializing an extant bzrdir should fail like the bzrdir api."""
323
backing = self.get_transport()
324
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
325
self.make_bzrdir('subdir')
326
self.assertRaises(errors.FileExists,
327
request.execute, 'subdir')
330
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
332
def test_no_branch(self):
333
"""When there is no branch, ('nobranch', ) is returned."""
334
backing = self.get_transport()
335
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
336
self.make_bzrdir('.')
337
self.assertEqual(SmartServerResponse(('nobranch', )),
340
def test_branch(self):
341
"""When there is a branch, 'ok' is returned."""
342
backing = self.get_transport()
343
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
344
self.make_branch('.')
345
self.assertEqual(SmartServerResponse(('ok', '')),
348
def test_branch_reference(self):
349
"""When there is a branch reference, the reference URL is returned."""
350
backing = self.get_transport()
351
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
352
branch = self.make_branch('branch')
353
checkout = branch.create_checkout('reference',lightweight=True)
354
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
355
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
356
self.assertEqual(SmartServerResponse(('ok', reference_url)),
357
request.execute('reference'))
360
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
362
def test_no_branch(self):
363
"""When there is no branch, ('nobranch', ) is returned."""
364
backing = self.get_transport()
365
self.make_bzrdir('.')
366
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
367
self.assertEqual(SmartServerResponse(('nobranch', )),
370
def test_branch(self):
371
"""When there is a branch, 'ok' is returned."""
372
backing = self.get_transport()
373
expected = self.make_branch('.')._format.network_name()
374
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
375
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
378
def test_branch_reference(self):
379
"""When there is a branch reference, the reference URL is returned."""
380
backing = self.get_transport()
381
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
382
branch = self.make_branch('branch')
383
checkout = branch.create_checkout('reference',lightweight=True)
384
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
385
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
386
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
387
request.execute('reference'))
389
def test_stacked_branch(self):
390
"""Opening a stacked branch does not open the stacked-on branch."""
391
trunk = self.make_branch('trunk')
392
feature = self.make_branch('feature', format='1.9')
393
feature.set_stacked_on_url(trunk.base)
395
Branch.hooks.install_named_hook('open', opened_branches.append, None)
396
backing = self.get_transport()
397
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
400
response = request.execute('feature')
402
request.teardown_jail()
403
expected_format = feature._format.network_name()
405
SuccessfulSmartServerResponse(('branch', expected_format)),
407
self.assertLength(1, opened_branches)
410
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
412
def test_empty(self):
413
"""For an empty branch, the body is empty."""
414
backing = self.get_transport()
415
request = smart.branch.SmartServerRequestRevisionHistory(backing)
416
self.make_branch('.')
417
self.assertEqual(SmartServerResponse(('ok', ), ''),
420
def test_not_empty(self):
421
"""For a non-empty branch, the body is empty."""
422
backing = self.get_transport()
423
request = smart.branch.SmartServerRequestRevisionHistory(backing)
424
tree = self.make_branch_and_memory_tree('.')
427
r1 = tree.commit('1st commit')
428
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
431
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
435
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
437
def test_no_branch(self):
438
"""When there is a bzrdir and no branch, NotBranchError is raised."""
439
backing = self.get_transport()
440
request = smart.branch.SmartServerBranchRequest(backing)
441
self.make_bzrdir('.')
442
self.assertRaises(errors.NotBranchError,
445
def test_branch_reference(self):
446
"""When there is a branch reference, NotBranchError is raised."""
447
backing = self.get_transport()
448
request = smart.branch.SmartServerBranchRequest(backing)
449
branch = self.make_branch('branch')
450
checkout = branch.create_checkout('reference',lightweight=True)
451
self.assertRaises(errors.NotBranchError,
452
request.execute, 'checkout')
455
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
457
def test_empty(self):
458
"""For an empty branch, the result is ('ok', '0', 'null:')."""
459
backing = self.get_transport()
460
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
461
self.make_branch('.')
462
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
465
def test_not_empty(self):
466
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
467
backing = self.get_transport()
468
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
469
tree = self.make_branch_and_memory_tree('.')
472
rev_id_utf8 = u'\xc8'.encode('utf-8')
473
r1 = tree.commit('1st commit')
474
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
477
SmartServerResponse(('ok', '2', rev_id_utf8)),
481
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
483
def test_default(self):
484
"""With no file, we get empty content."""
485
backing = self.get_transport()
486
request = smart.branch.SmartServerBranchGetConfigFile(backing)
487
branch = self.make_branch('.')
488
# there should be no file by default
490
self.assertEqual(SmartServerResponse(('ok', ), content),
493
def test_with_content(self):
494
# SmartServerBranchGetConfigFile should return the content from
495
# branch.control_files.get('branch.conf') for now - in the future it may
496
# perform more complex processing.
497
backing = self.get_transport()
498
request = smart.branch.SmartServerBranchGetConfigFile(backing)
499
branch = self.make_branch('.')
500
branch._transport.put_bytes('branch.conf', 'foo bar baz')
501
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
505
class SetLastRevisionTestBase(tests.TestCaseWithMemoryTransport):
506
"""Base test case for verbs that implement set_last_revision."""
509
tests.TestCaseWithMemoryTransport.setUp(self)
510
backing_transport = self.get_transport()
511
self.request = self.request_class(backing_transport)
512
self.tree = self.make_branch_and_memory_tree('.')
514
def lock_branch(self):
516
branch_token = b.lock_write()
517
repo_token = b.repository.lock_write()
518
b.repository.unlock()
519
return branch_token, repo_token
521
def unlock_branch(self):
522
self.tree.branch.unlock()
524
def set_last_revision(self, revision_id, revno):
525
branch_token, repo_token = self.lock_branch()
526
response = self._set_last_revision(
527
revision_id, revno, branch_token, repo_token)
531
def assertRequestSucceeds(self, revision_id, revno):
532
response = self.set_last_revision(revision_id, revno)
533
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
536
class TestSetLastRevisionVerbMixin(object):
537
"""Mixin test case for verbs that implement set_last_revision."""
539
def test_set_null_to_null(self):
540
"""An empty branch can have its last revision set to 'null:'."""
541
self.assertRequestSucceeds('null:', 0)
543
def test_NoSuchRevision(self):
544
"""If the revision_id is not present, the verb returns NoSuchRevision.
546
revision_id = 'non-existent revision'
548
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
549
self.set_last_revision(revision_id, 1))
551
def make_tree_with_two_commits(self):
552
self.tree.lock_write()
554
rev_id_utf8 = u'\xc8'.encode('utf-8')
555
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
556
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
559
def test_branch_last_revision_info_is_updated(self):
560
"""A branch's tip can be set to a revision that is present in its
563
# Make a branch with an empty revision history, but two revisions in
565
self.make_tree_with_two_commits()
566
rev_id_utf8 = u'\xc8'.encode('utf-8')
567
self.tree.branch.set_revision_history([])
569
(0, 'null:'), self.tree.branch.last_revision_info())
570
# We can update the branch to a revision that is present in the
572
self.assertRequestSucceeds(rev_id_utf8, 1)
574
(1, rev_id_utf8), self.tree.branch.last_revision_info())
576
def test_branch_last_revision_info_rewind(self):
577
"""A branch's tip can be set to a revision that is an ancestor of the
580
self.make_tree_with_two_commits()
581
rev_id_utf8 = u'\xc8'.encode('utf-8')
583
(2, 'rev-2'), self.tree.branch.last_revision_info())
584
self.assertRequestSucceeds(rev_id_utf8, 1)
586
(1, rev_id_utf8), self.tree.branch.last_revision_info())
588
def test_TipChangeRejected(self):
589
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
590
returns TipChangeRejected.
592
rejection_message = u'rejection message\N{INTERROBANG}'
593
def hook_that_rejects(params):
594
raise errors.TipChangeRejected(rejection_message)
595
Branch.hooks.install_named_hook(
596
'pre_change_branch_tip', hook_that_rejects, None)
598
FailedSmartServerResponse(
599
('TipChangeRejected', rejection_message.encode('utf-8'))),
600
self.set_last_revision('null:', 0))
603
class TestSmartServerBranchRequestSetLastRevision(
604
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
605
"""Tests for Branch.set_last_revision verb."""
607
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
609
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
610
return self.request.execute(
611
'', branch_token, repo_token, revision_id)
614
class TestSmartServerBranchRequestSetLastRevisionInfo(
615
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
616
"""Tests for Branch.set_last_revision_info verb."""
618
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
620
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
621
return self.request.execute(
622
'', branch_token, repo_token, revno, revision_id)
624
def test_NoSuchRevision(self):
625
"""Branch.set_last_revision_info does not have to return
626
NoSuchRevision if the revision_id is absent.
628
raise tests.TestNotApplicable()
631
class TestSmartServerBranchRequestSetLastRevisionEx(
632
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
633
"""Tests for Branch.set_last_revision_ex verb."""
635
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
637
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
638
return self.request.execute(
639
'', branch_token, repo_token, revision_id, 0, 0)
641
def assertRequestSucceeds(self, revision_id, revno):
642
response = self.set_last_revision(revision_id, revno)
644
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
647
def test_branch_last_revision_info_rewind(self):
648
"""A branch's tip can be set to a revision that is an ancestor of the
649
current tip, but only if allow_overwrite_descendant is passed.
651
self.make_tree_with_two_commits()
652
rev_id_utf8 = u'\xc8'.encode('utf-8')
654
(2, 'rev-2'), self.tree.branch.last_revision_info())
655
# If allow_overwrite_descendant flag is 0, then trying to set the tip
656
# to an older revision ID has no effect.
657
branch_token, repo_token = self.lock_branch()
658
response = self.request.execute(
659
'', branch_token, repo_token, rev_id_utf8, 0, 0)
661
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
664
(2, 'rev-2'), self.tree.branch.last_revision_info())
666
# If allow_overwrite_descendant flag is 1, then setting the tip to an
668
response = self.request.execute(
669
'', branch_token, repo_token, rev_id_utf8, 0, 1)
671
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
675
(1, rev_id_utf8), self.tree.branch.last_revision_info())
677
def make_branch_with_divergent_history(self):
678
"""Make a branch with divergent history in its repo.
680
The branch's tip will be 'child-2', and the repo will also contain
681
'child-1', which diverges from a common base revision.
683
self.tree.lock_write()
685
r1 = self.tree.commit('1st commit')
686
revno_1, revid_1 = self.tree.branch.last_revision_info()
687
r2 = self.tree.commit('2nd commit', rev_id='child-1')
688
# Undo the second commit
689
self.tree.branch.set_last_revision_info(revno_1, revid_1)
690
self.tree.set_parent_ids([revid_1])
691
# Make a new second commit, child-2. child-2 has diverged from
693
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
696
def test_not_allow_diverged(self):
697
"""If allow_diverged is not passed, then setting a divergent history
698
returns a Diverged error.
700
self.make_branch_with_divergent_history()
702
FailedSmartServerResponse(('Diverged',)),
703
self.set_last_revision('child-1', 2))
704
# The branch tip was not changed.
705
self.assertEqual('child-2', self.tree.branch.last_revision())
707
def test_allow_diverged(self):
708
"""If allow_diverged is passed, then setting a divergent history
711
self.make_branch_with_divergent_history()
712
branch_token, repo_token = self.lock_branch()
713
response = self.request.execute(
714
'', branch_token, repo_token, 'child-1', 1, 0)
716
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
719
# The branch tip was changed.
720
self.assertEqual('child-1', self.tree.branch.last_revision())
723
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
725
def test_get_parent_none(self):
726
base_branch = self.make_branch('base')
727
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
728
response = request.execute('base')
730
SuccessfulSmartServerResponse(('',)), response)
732
def test_get_parent_something(self):
733
base_branch = self.make_branch('base')
734
base_branch.set_parent(self.get_url('foo'))
735
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
736
response = request.execute('base')
738
SuccessfulSmartServerResponse(("../foo",)),
742
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
743
# Only called when the branch format and tags match [yay factory
744
# methods] so only need to test straight forward cases.
746
def test_get_bytes(self):
747
base_branch = self.make_branch('base')
748
request = smart.branch.SmartServerBranchGetTagsBytes(
749
self.get_transport())
750
response = request.execute('base')
752
SuccessfulSmartServerResponse(('',)), response)
755
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
757
def test_get_stacked_on_url(self):
758
base_branch = self.make_branch('base', format='1.6')
759
stacked_branch = self.make_branch('stacked', format='1.6')
760
# typically should be relative
761
stacked_branch.set_stacked_on_url('../base')
762
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
763
self.get_transport())
764
response = request.execute('stacked')
766
SmartServerResponse(('ok', '../base')),
770
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
773
tests.TestCaseWithMemoryTransport.setUp(self)
775
def test_lock_write_on_unlocked_branch(self):
776
backing = self.get_transport()
777
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
778
branch = self.make_branch('.', format='knit')
779
repository = branch.repository
780
response = request.execute('')
781
branch_nonce = branch.control_files._lock.peek().get('nonce')
782
repository_nonce = repository.control_files._lock.peek().get('nonce')
784
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
786
# The branch (and associated repository) is now locked. Verify that
787
# with a new branch object.
788
new_branch = repository.bzrdir.open_branch()
789
self.assertRaises(errors.LockContention, new_branch.lock_write)
791
def test_lock_write_on_locked_branch(self):
792
backing = self.get_transport()
793
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
794
branch = self.make_branch('.')
796
branch.leave_lock_in_place()
798
response = request.execute('')
800
SmartServerResponse(('LockContention',)), response)
802
def test_lock_write_with_tokens_on_locked_branch(self):
803
backing = self.get_transport()
804
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
805
branch = self.make_branch('.', format='knit')
806
branch_token = branch.lock_write()
807
repo_token = branch.repository.lock_write()
808
branch.repository.unlock()
809
branch.leave_lock_in_place()
810
branch.repository.leave_lock_in_place()
812
response = request.execute('',
813
branch_token, repo_token)
815
SmartServerResponse(('ok', branch_token, repo_token)), response)
817
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
818
backing = self.get_transport()
819
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
820
branch = self.make_branch('.', format='knit')
821
branch_token = branch.lock_write()
822
repo_token = branch.repository.lock_write()
823
branch.repository.unlock()
824
branch.leave_lock_in_place()
825
branch.repository.leave_lock_in_place()
827
response = request.execute('',
828
branch_token+'xxx', repo_token)
830
SmartServerResponse(('TokenMismatch',)), response)
832
def test_lock_write_on_locked_repo(self):
833
backing = self.get_transport()
834
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
835
branch = self.make_branch('.', format='knit')
836
branch.repository.lock_write()
837
branch.repository.leave_lock_in_place()
838
branch.repository.unlock()
839
response = request.execute('')
841
SmartServerResponse(('LockContention',)), response)
843
def test_lock_write_on_readonly_transport(self):
844
backing = self.get_readonly_transport()
845
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
846
branch = self.make_branch('.')
847
root = self.get_transport().clone('/')
848
path = urlutils.relative_url(root.base, self.get_transport().base)
849
response = request.execute(path)
850
error_name, lock_str, why_str = response.args
851
self.assertFalse(response.is_successful())
852
self.assertEqual('LockFailed', error_name)
855
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
858
tests.TestCaseWithMemoryTransport.setUp(self)
860
def test_unlock_on_locked_branch_and_repo(self):
861
backing = self.get_transport()
862
request = smart.branch.SmartServerBranchRequestUnlock(backing)
863
branch = self.make_branch('.', format='knit')
865
branch_token = branch.lock_write()
866
repo_token = branch.repository.lock_write()
867
branch.repository.unlock()
868
# Unlock the branch (and repo) object, leaving the physical locks
870
branch.leave_lock_in_place()
871
branch.repository.leave_lock_in_place()
873
response = request.execute('',
874
branch_token, repo_token)
876
SmartServerResponse(('ok',)), response)
877
# The branch is now unlocked. Verify that with a new branch
879
new_branch = branch.bzrdir.open_branch()
880
new_branch.lock_write()
883
def test_unlock_on_unlocked_branch_unlocked_repo(self):
884
backing = self.get_transport()
885
request = smart.branch.SmartServerBranchRequestUnlock(backing)
886
branch = self.make_branch('.', format='knit')
887
response = request.execute(
888
'', 'branch token', 'repo token')
890
SmartServerResponse(('TokenMismatch',)), response)
892
def test_unlock_on_unlocked_branch_locked_repo(self):
893
backing = self.get_transport()
894
request = smart.branch.SmartServerBranchRequestUnlock(backing)
895
branch = self.make_branch('.', format='knit')
896
# Lock the repository.
897
repo_token = branch.repository.lock_write()
898
branch.repository.leave_lock_in_place()
899
branch.repository.unlock()
900
# Issue branch lock_write request on the unlocked branch (with locked
902
response = request.execute(
903
'', 'branch token', repo_token)
905
SmartServerResponse(('TokenMismatch',)), response)
908
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
910
def test_no_repository(self):
911
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
912
# we test this using a shared repository above the named path,
913
# thus checking the right search logic is used - that is, that
914
# its the exact path being looked at and the server is not
916
backing = self.get_transport()
917
request = smart.repository.SmartServerRepositoryRequest(backing)
918
self.make_repository('.', shared=True)
919
self.make_bzrdir('subdir')
920
self.assertRaises(errors.NoRepositoryPresent,
921
request.execute, 'subdir')
924
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
926
def test_trivial_bzipped(self):
927
# This tests that the wire encoding is actually bzipped
928
backing = self.get_transport()
929
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
930
tree = self.make_branch_and_memory_tree('.')
932
self.assertEqual(None,
933
request.execute('', 'missing-id'))
934
# Note that it returns a body (of '' bzipped).
936
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
937
request.do_body('\n\n0\n'))
940
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
942
def test_none_argument(self):
943
backing = self.get_transport()
944
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
945
tree = self.make_branch_and_memory_tree('.')
948
r1 = tree.commit('1st commit')
949
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
952
# the lines of revision_id->revision_parent_list has no guaranteed
953
# order coming out of a dict, so sort both our test and response
954
lines = sorted([' '.join([r2, r1]), r1])
955
response = request.execute('', '')
956
response.body = '\n'.join(sorted(response.body.split('\n')))
959
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
961
def test_specific_revision_argument(self):
962
backing = self.get_transport()
963
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
964
tree = self.make_branch_and_memory_tree('.')
967
rev_id_utf8 = u'\xc9'.encode('utf-8')
968
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
969
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
972
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
973
request.execute('', rev_id_utf8))
975
def test_no_such_revision(self):
976
backing = self.get_transport()
977
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
978
tree = self.make_branch_and_memory_tree('.')
981
r1 = tree.commit('1st commit')
984
# Note that it still returns body (of zero bytes).
986
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
987
request.execute('', 'missingrevision'))
990
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
992
def make_two_commit_repo(self):
993
tree = self.make_branch_and_memory_tree('.')
996
r1 = tree.commit('1st commit')
997
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
999
repo = tree.branch.repository
1002
def test_ancestry_of(self):
1003
"""The search argument may be a 'ancestry-of' some heads'."""
1004
backing = self.get_transport()
1005
request = smart.repository.SmartServerRepositoryGetStream(backing)
1006
repo, r1, r2 = self.make_two_commit_repo()
1007
fetch_spec = ['ancestry-of', r2]
1008
lines = '\n'.join(fetch_spec)
1009
request.execute('', repo._format.network_name())
1010
response = request.do_body(lines)
1011
self.assertEqual(('ok',), response.args)
1012
stream_bytes = ''.join(response.body_stream)
1013
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1015
def test_search(self):
1016
"""The search argument may be a 'search' of some explicit keys."""
1017
backing = self.get_transport()
1018
request = smart.repository.SmartServerRepositoryGetStream(backing)
1019
repo, r1, r2 = self.make_two_commit_repo()
1020
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1021
lines = '\n'.join(fetch_spec)
1022
request.execute('', repo._format.network_name())
1023
response = request.do_body(lines)
1024
self.assertEqual(('ok',), response.args)
1025
stream_bytes = ''.join(response.body_stream)
1026
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1029
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1031
def test_missing_revision(self):
1032
"""For a missing revision, ('no', ) is returned."""
1033
backing = self.get_transport()
1034
request = smart.repository.SmartServerRequestHasRevision(backing)
1035
self.make_repository('.')
1036
self.assertEqual(SmartServerResponse(('no', )),
1037
request.execute('', 'revid'))
1039
def test_present_revision(self):
1040
"""For a present revision, ('yes', ) is returned."""
1041
backing = self.get_transport()
1042
request = smart.repository.SmartServerRequestHasRevision(backing)
1043
tree = self.make_branch_and_memory_tree('.')
1046
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1047
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1049
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1050
self.assertEqual(SmartServerResponse(('yes', )),
1051
request.execute('', rev_id_utf8))
1054
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1056
def test_empty_revid(self):
1057
"""With an empty revid, we get only size an number and revisions"""
1058
backing = self.get_transport()
1059
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1060
repository = self.make_repository('.')
1061
stats = repository.gather_stats()
1062
expected_body = 'revisions: 0\n'
1063
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1064
request.execute('', '', 'no'))
1066
def test_revid_with_committers(self):
1067
"""For a revid we get more infos."""
1068
backing = self.get_transport()
1069
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1070
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1071
tree = self.make_branch_and_memory_tree('.')
1074
# Let's build a predictable result
1075
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1076
tree.commit('a commit', timestamp=654321.4, timezone=0,
1080
stats = tree.branch.repository.gather_stats()
1081
expected_body = ('firstrev: 123456.200 3600\n'
1082
'latestrev: 654321.400 0\n'
1084
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1088
def test_not_empty_repository_with_committers(self):
1089
"""For a revid and requesting committers we get the whole thing."""
1090
backing = self.get_transport()
1091
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1092
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1093
tree = self.make_branch_and_memory_tree('.')
1096
# Let's build a predictable result
1097
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1099
tree.commit('a commit', timestamp=654321.4, timezone=0,
1100
committer='bar', rev_id=rev_id_utf8)
1102
stats = tree.branch.repository.gather_stats()
1104
expected_body = ('committers: 2\n'
1105
'firstrev: 123456.200 3600\n'
1106
'latestrev: 654321.400 0\n'
1108
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1110
rev_id_utf8, 'yes'))
1113
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1115
def test_is_shared(self):
1116
"""For a shared repository, ('yes', ) is returned."""
1117
backing = self.get_transport()
1118
request = smart.repository.SmartServerRepositoryIsShared(backing)
1119
self.make_repository('.', shared=True)
1120
self.assertEqual(SmartServerResponse(('yes', )),
1121
request.execute('', ))
1123
def test_is_not_shared(self):
1124
"""For a shared repository, ('no', ) is returned."""
1125
backing = self.get_transport()
1126
request = smart.repository.SmartServerRepositoryIsShared(backing)
1127
self.make_repository('.', shared=False)
1128
self.assertEqual(SmartServerResponse(('no', )),
1129
request.execute('', ))
1132
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1134
def test_lock_write_on_unlocked_repo(self):
1135
backing = self.get_transport()
1136
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1137
repository = self.make_repository('.', format='knit')
1138
response = request.execute('')
1139
nonce = repository.control_files._lock.peek().get('nonce')
1140
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1141
# The repository is now locked. Verify that with a new repository
1143
new_repo = repository.bzrdir.open_repository()
1144
self.assertRaises(errors.LockContention, new_repo.lock_write)
1146
def test_lock_write_on_locked_repo(self):
1147
backing = self.get_transport()
1148
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1149
repository = self.make_repository('.', format='knit')
1150
repository.lock_write()
1151
repository.leave_lock_in_place()
1153
response = request.execute('')
1155
SmartServerResponse(('LockContention',)), response)
1157
def test_lock_write_on_readonly_transport(self):
1158
backing = self.get_readonly_transport()
1159
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1160
repository = self.make_repository('.', format='knit')
1161
response = request.execute('')
1162
self.assertFalse(response.is_successful())
1163
self.assertEqual('LockFailed', response.args[0])
1166
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1168
def make_empty_byte_stream(self, repo):
1169
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1170
return ''.join(byte_stream)
1173
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1175
def test_insert_stream_empty(self):
1176
backing = self.get_transport()
1177
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1178
repository = self.make_repository('.')
1179
response = request.execute('', '')
1180
self.assertEqual(None, response)
1181
response = request.do_chunk(self.make_empty_byte_stream(repository))
1182
self.assertEqual(None, response)
1183
response = request.do_end()
1184
self.assertEqual(SmartServerResponse(('ok', )), response)
1187
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1189
def test_insert_stream_empty(self):
1190
backing = self.get_transport()
1191
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1193
repository = self.make_repository('.', format='knit')
1194
lock_token = repository.lock_write()
1195
response = request.execute('', '', lock_token)
1196
self.assertEqual(None, response)
1197
response = request.do_chunk(self.make_empty_byte_stream(repository))
1198
self.assertEqual(None, response)
1199
response = request.do_end()
1200
self.assertEqual(SmartServerResponse(('ok', )), response)
1203
def test_insert_stream_with_wrong_lock_token(self):
1204
backing = self.get_transport()
1205
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1207
repository = self.make_repository('.', format='knit')
1208
lock_token = repository.lock_write()
1210
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1214
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1217
tests.TestCaseWithMemoryTransport.setUp(self)
1219
def test_unlock_on_locked_repo(self):
1220
backing = self.get_transport()
1221
request = smart.repository.SmartServerRepositoryUnlock(backing)
1222
repository = self.make_repository('.', format='knit')
1223
token = repository.lock_write()
1224
repository.leave_lock_in_place()
1226
response = request.execute('', token)
1228
SmartServerResponse(('ok',)), response)
1229
# The repository is now unlocked. Verify that with a new repository
1231
new_repo = repository.bzrdir.open_repository()
1232
new_repo.lock_write()
1235
def test_unlock_on_unlocked_repo(self):
1236
backing = self.get_transport()
1237
request = smart.repository.SmartServerRepositoryUnlock(backing)
1238
repository = self.make_repository('.', format='knit')
1239
response = request.execute('', 'some token')
1241
SmartServerResponse(('TokenMismatch',)), response)
1244
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1246
def test_is_readonly_no(self):
1247
backing = self.get_transport()
1248
request = smart.request.SmartServerIsReadonly(backing)
1249
response = request.execute()
1251
SmartServerResponse(('no',)), response)
1253
def test_is_readonly_yes(self):
1254
backing = self.get_readonly_transport()
1255
request = smart.request.SmartServerIsReadonly(backing)
1256
response = request.execute()
1258
SmartServerResponse(('yes',)), response)
1261
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1263
def test_set_false(self):
1264
backing = self.get_transport()
1265
repo = self.make_repository('.', shared=True)
1266
repo.set_make_working_trees(True)
1267
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1268
request = request_class(backing)
1269
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1270
request.execute('', 'False'))
1271
repo = repo.bzrdir.open_repository()
1272
self.assertFalse(repo.make_working_trees())
1274
def test_set_true(self):
1275
backing = self.get_transport()
1276
repo = self.make_repository('.', shared=True)
1277
repo.set_make_working_trees(False)
1278
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1279
request = request_class(backing)
1280
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1281
request.execute('', 'True'))
1282
repo = repo.bzrdir.open_repository()
1283
self.assertTrue(repo.make_working_trees())
1286
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1288
def make_repo_needing_autopacking(self, path='.'):
1289
# Make a repo in need of autopacking.
1290
tree = self.make_branch_and_tree('.', format='pack-0.92')
1291
repo = tree.branch.repository
1292
# monkey-patch the pack collection to disable autopacking
1293
repo._pack_collection._max_pack_count = lambda count: count
1295
tree.commit('commit %s' % x)
1296
self.assertEqual(10, len(repo._pack_collection.names()))
1297
del repo._pack_collection._max_pack_count
1300
def test_autopack_needed(self):
1301
repo = self.make_repo_needing_autopacking()
1303
self.addCleanup(repo.unlock)
1304
backing = self.get_transport()
1305
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1307
response = request.execute('')
1308
self.assertEqual(SmartServerResponse(('ok',)), response)
1309
repo._pack_collection.reload_pack_names()
1310
self.assertEqual(1, len(repo._pack_collection.names()))
1312
def test_autopack_not_needed(self):
1313
tree = self.make_branch_and_tree('.', format='pack-0.92')
1314
repo = tree.branch.repository
1316
self.addCleanup(repo.unlock)
1318
tree.commit('commit %s' % x)
1319
backing = self.get_transport()
1320
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1322
response = request.execute('')
1323
self.assertEqual(SmartServerResponse(('ok',)), response)
1324
repo._pack_collection.reload_pack_names()
1325
self.assertEqual(9, len(repo._pack_collection.names()))
1327
def test_autopack_on_nonpack_format(self):
1328
"""A request to autopack a non-pack repo is a no-op."""
1329
repo = self.make_repository('.', format='knit')
1330
backing = self.get_transport()
1331
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1333
response = request.execute('')
1334
self.assertEqual(SmartServerResponse(('ok',)), response)
1337
class TestHandlers(tests.TestCase):
1338
"""Tests for the request.request_handlers object."""
1340
def test_all_registrations_exist(self):
1341
"""All registered request_handlers can be found."""
1342
# If there's a typo in a register_lazy call, this loop will fail with
1343
# an AttributeError.
1344
for key, item in smart.request.request_handlers.iteritems():
1347
def test_registered_methods(self):
1348
"""Test that known methods are registered to the correct object."""
1350
smart.request.request_handlers.get('Branch.get_config_file'),
1351
smart.branch.SmartServerBranchGetConfigFile)
1353
smart.request.request_handlers.get('Branch.get_parent'),
1354
smart.branch.SmartServerBranchGetParent)
1356
smart.request.request_handlers.get('Branch.get_tags_bytes'),
1357
smart.branch.SmartServerBranchGetTagsBytes)
1359
smart.request.request_handlers.get('Branch.lock_write'),
1360
smart.branch.SmartServerBranchRequestLockWrite)
1362
smart.request.request_handlers.get('Branch.last_revision_info'),
1363
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1365
smart.request.request_handlers.get('Branch.revision_history'),
1366
smart.branch.SmartServerRequestRevisionHistory)
1368
smart.request.request_handlers.get('Branch.set_last_revision'),
1369
smart.branch.SmartServerBranchRequestSetLastRevision)
1371
smart.request.request_handlers.get('Branch.set_last_revision_info'),
1372
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1374
smart.request.request_handlers.get('Branch.unlock'),
1375
smart.branch.SmartServerBranchRequestUnlock)
1377
smart.request.request_handlers.get('BzrDir.find_repository'),
1378
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1380
smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
1381
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1383
smart.request.request_handlers.get('BzrDirFormat.initialize'),
1384
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1386
smart.request.request_handlers.get('BzrDir.cloning_metadir'),
1387
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1389
smart.request.request_handlers.get('BzrDir.open_branch'),
1390
smart.bzrdir.SmartServerRequestOpenBranch)
1392
smart.request.request_handlers.get('BzrDir.open_branchV2'),
1393
smart.bzrdir.SmartServerRequestOpenBranchV2)
1395
smart.request.request_handlers.get('PackRepository.autopack'),
1396
smart.packrepository.SmartServerPackRepositoryAutopack)
1398
smart.request.request_handlers.get('Repository.gather_stats'),
1399
smart.repository.SmartServerRepositoryGatherStats)
1401
smart.request.request_handlers.get('Repository.get_parent_map'),
1402
smart.repository.SmartServerRepositoryGetParentMap)
1404
smart.request.request_handlers.get(
1405
'Repository.get_revision_graph'),
1406
smart.repository.SmartServerRepositoryGetRevisionGraph)
1408
smart.request.request_handlers.get('Repository.get_stream'),
1409
smart.repository.SmartServerRepositoryGetStream)
1411
smart.request.request_handlers.get('Repository.has_revision'),
1412
smart.repository.SmartServerRequestHasRevision)
1414
smart.request.request_handlers.get('Repository.insert_stream'),
1415
smart.repository.SmartServerRepositoryInsertStream)
1417
smart.request.request_handlers.get('Repository.insert_stream_locked'),
1418
smart.repository.SmartServerRepositoryInsertStreamLocked)
1420
smart.request.request_handlers.get('Repository.is_shared'),
1421
smart.repository.SmartServerRepositoryIsShared)
1423
smart.request.request_handlers.get('Repository.lock_write'),
1424
smart.repository.SmartServerRepositoryLockWrite)
1426
smart.request.request_handlers.get('Repository.tarball'),
1427
smart.repository.SmartServerRepositoryTarball)
1429
smart.request.request_handlers.get('Repository.unlock'),
1430
smart.repository.SmartServerRepositoryUnlock)
1432
smart.request.request_handlers.get('Transport.is_readonly'),
1433
smart.request.SmartServerIsReadonly)