1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
301
def test_empty_dir(self):
302
"""Initializing an empty dir should succeed and do it."""
303
backing = self.get_transport()
304
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
305
self.assertEqual(SmartServerResponse(('ok', )),
307
made_dir = bzrdir.BzrDir.open_from_transport(backing)
308
# no branch, tree or repository is expected with the current
310
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
311
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
312
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
314
def test_missing_dir(self):
315
"""Initializing a missing directory should fail like the bzrdir api."""
316
backing = self.get_transport()
317
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
318
self.assertRaises(errors.NoSuchFile,
319
request.execute, 'subdir')
321
def test_initialized_dir(self):
322
"""Initializing an extant bzrdir should fail like the bzrdir api."""
323
backing = self.get_transport()
324
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
325
self.make_bzrdir('subdir')
326
self.assertRaises(errors.FileExists,
327
request.execute, 'subdir')
330
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
332
def test_no_branch(self):
333
"""When there is no branch, ('nobranch', ) is returned."""
334
backing = self.get_transport()
335
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
336
self.make_bzrdir('.')
337
self.assertEqual(SmartServerResponse(('nobranch', )),
340
def test_branch(self):
341
"""When there is a branch, 'ok' is returned."""
342
backing = self.get_transport()
343
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
344
self.make_branch('.')
345
self.assertEqual(SmartServerResponse(('ok', '')),
348
def test_branch_reference(self):
349
"""When there is a branch reference, the reference URL is returned."""
350
backing = self.get_transport()
351
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
352
branch = self.make_branch('branch')
353
checkout = branch.create_checkout('reference',lightweight=True)
354
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
355
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
356
self.assertEqual(SmartServerResponse(('ok', reference_url)),
357
request.execute('reference'))
360
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
362
def test_no_branch(self):
363
"""When there is no branch, ('nobranch', ) is returned."""
364
backing = self.get_transport()
365
self.make_bzrdir('.')
366
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
367
self.assertEqual(SmartServerResponse(('nobranch', )),
370
def test_branch(self):
371
"""When there is a branch, 'ok' is returned."""
372
backing = self.get_transport()
373
expected = self.make_branch('.')._format.network_name()
374
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
375
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
378
def test_branch_reference(self):
379
"""When there is a branch reference, the reference URL is returned."""
380
backing = self.get_transport()
381
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
382
branch = self.make_branch('branch')
383
checkout = branch.create_checkout('reference',lightweight=True)
384
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
385
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
386
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
387
request.execute('reference'))
389
def test_stacked_branch(self):
390
"""Opening a stacked branch does not open the stacked-on branch."""
391
trunk = self.make_branch('trunk')
392
feature = self.make_branch('feature', format='1.9')
393
feature.set_stacked_on_url(trunk.base)
395
Branch.hooks.install_named_hook('open', opened_branches.append, None)
396
backing = self.get_transport()
397
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
400
response = request.execute('feature')
402
request.teardown_jail()
403
expected_format = feature._format.network_name()
405
SuccessfulSmartServerResponse(('branch', expected_format)),
407
self.assertLength(1, opened_branches)
410
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
412
def test_empty(self):
413
"""For an empty branch, the body is empty."""
414
backing = self.get_transport()
415
request = smart.branch.SmartServerRequestRevisionHistory(backing)
416
self.make_branch('.')
417
self.assertEqual(SmartServerResponse(('ok', ), ''),
420
def test_not_empty(self):
421
"""For a non-empty branch, the body is empty."""
422
backing = self.get_transport()
423
request = smart.branch.SmartServerRequestRevisionHistory(backing)
424
tree = self.make_branch_and_memory_tree('.')
427
r1 = tree.commit('1st commit')
428
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
431
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
435
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
437
def test_no_branch(self):
438
"""When there is a bzrdir and no branch, NotBranchError is raised."""
439
backing = self.get_transport()
440
request = smart.branch.SmartServerBranchRequest(backing)
441
self.make_bzrdir('.')
442
self.assertRaises(errors.NotBranchError,
445
def test_branch_reference(self):
446
"""When there is a branch reference, NotBranchError is raised."""
447
backing = self.get_transport()
448
request = smart.branch.SmartServerBranchRequest(backing)
449
branch = self.make_branch('branch')
450
checkout = branch.create_checkout('reference',lightweight=True)
451
self.assertRaises(errors.NotBranchError,
452
request.execute, 'checkout')
455
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
457
def test_empty(self):
458
"""For an empty branch, the result is ('ok', '0', 'null:')."""
459
backing = self.get_transport()
460
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
461
self.make_branch('.')
462
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
465
def test_not_empty(self):
466
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
467
backing = self.get_transport()
468
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
469
tree = self.make_branch_and_memory_tree('.')
472
rev_id_utf8 = u'\xc8'.encode('utf-8')
473
r1 = tree.commit('1st commit')
474
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
477
SmartServerResponse(('ok', '2', rev_id_utf8)),
481
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
483
def test_default(self):
484
"""With no file, we get empty content."""
485
backing = self.get_transport()
486
request = smart.branch.SmartServerBranchGetConfigFile(backing)
487
branch = self.make_branch('.')
488
# there should be no file by default
490
self.assertEqual(SmartServerResponse(('ok', ), content),
493
def test_with_content(self):
494
# SmartServerBranchGetConfigFile should return the content from
495
# branch.control_files.get('branch.conf') for now - in the future it may
496
# perform more complex processing.
497
backing = self.get_transport()
498
request = smart.branch.SmartServerBranchGetConfigFile(backing)
499
branch = self.make_branch('.')
500
branch._transport.put_bytes('branch.conf', 'foo bar baz')
501
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
505
class SetLastRevisionTestBase(tests.TestCaseWithMemoryTransport):
506
"""Base test case for verbs that implement set_last_revision."""
509
tests.TestCaseWithMemoryTransport.setUp(self)
510
backing_transport = self.get_transport()
511
self.request = self.request_class(backing_transport)
512
self.tree = self.make_branch_and_memory_tree('.')
514
def lock_branch(self):
516
branch_token = b.lock_write()
517
repo_token = b.repository.lock_write()
518
b.repository.unlock()
519
return branch_token, repo_token
521
def unlock_branch(self):
522
self.tree.branch.unlock()
524
def set_last_revision(self, revision_id, revno):
525
branch_token, repo_token = self.lock_branch()
526
response = self._set_last_revision(
527
revision_id, revno, branch_token, repo_token)
531
def assertRequestSucceeds(self, revision_id, revno):
532
response = self.set_last_revision(revision_id, revno)
533
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
536
class TestSetLastRevisionVerbMixin(object):
537
"""Mixin test case for verbs that implement set_last_revision."""
539
def test_set_null_to_null(self):
540
"""An empty branch can have its last revision set to 'null:'."""
541
self.assertRequestSucceeds('null:', 0)
543
def test_NoSuchRevision(self):
544
"""If the revision_id is not present, the verb returns NoSuchRevision.
546
revision_id = 'non-existent revision'
548
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
549
self.set_last_revision(revision_id, 1))
551
def make_tree_with_two_commits(self):
552
self.tree.lock_write()
554
rev_id_utf8 = u'\xc8'.encode('utf-8')
555
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
556
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
559
def test_branch_last_revision_info_is_updated(self):
560
"""A branch's tip can be set to a revision that is present in its
563
# Make a branch with an empty revision history, but two revisions in
565
self.make_tree_with_two_commits()
566
rev_id_utf8 = u'\xc8'.encode('utf-8')
567
self.tree.branch.set_revision_history([])
569
(0, 'null:'), self.tree.branch.last_revision_info())
570
# We can update the branch to a revision that is present in the
572
self.assertRequestSucceeds(rev_id_utf8, 1)
574
(1, rev_id_utf8), self.tree.branch.last_revision_info())
576
def test_branch_last_revision_info_rewind(self):
577
"""A branch's tip can be set to a revision that is an ancestor of the
580
self.make_tree_with_two_commits()
581
rev_id_utf8 = u'\xc8'.encode('utf-8')
583
(2, 'rev-2'), self.tree.branch.last_revision_info())
584
self.assertRequestSucceeds(rev_id_utf8, 1)
586
(1, rev_id_utf8), self.tree.branch.last_revision_info())
588
def test_TipChangeRejected(self):
589
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
590
returns TipChangeRejected.
592
rejection_message = u'rejection message\N{INTERROBANG}'
593
def hook_that_rejects(params):
594
raise errors.TipChangeRejected(rejection_message)
595
Branch.hooks.install_named_hook(
596
'pre_change_branch_tip', hook_that_rejects, None)
598
FailedSmartServerResponse(
599
('TipChangeRejected', rejection_message.encode('utf-8'))),
600
self.set_last_revision('null:', 0))
603
class TestSmartServerBranchRequestSetLastRevision(
604
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
605
"""Tests for Branch.set_last_revision verb."""
607
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
609
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
610
return self.request.execute(
611
'', branch_token, repo_token, revision_id)
614
class TestSmartServerBranchRequestSetLastRevisionInfo(
615
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
616
"""Tests for Branch.set_last_revision_info verb."""
618
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
620
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
621
return self.request.execute(
622
'', branch_token, repo_token, revno, revision_id)
624
def test_NoSuchRevision(self):
625
"""Branch.set_last_revision_info does not have to return
626
NoSuchRevision if the revision_id is absent.
628
raise tests.TestNotApplicable()
631
class TestSmartServerBranchRequestSetLastRevisionEx(
632
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
633
"""Tests for Branch.set_last_revision_ex verb."""
635
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
637
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
638
return self.request.execute(
639
'', branch_token, repo_token, revision_id, 0, 0)
641
def assertRequestSucceeds(self, revision_id, revno):
642
response = self.set_last_revision(revision_id, revno)
644
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
647
def test_branch_last_revision_info_rewind(self):
648
"""A branch's tip can be set to a revision that is an ancestor of the
649
current tip, but only if allow_overwrite_descendant is passed.
651
self.make_tree_with_two_commits()
652
rev_id_utf8 = u'\xc8'.encode('utf-8')
654
(2, 'rev-2'), self.tree.branch.last_revision_info())
655
# If allow_overwrite_descendant flag is 0, then trying to set the tip
656
# to an older revision ID has no effect.
657
branch_token, repo_token = self.lock_branch()
658
response = self.request.execute(
659
'', branch_token, repo_token, rev_id_utf8, 0, 0)
661
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
664
(2, 'rev-2'), self.tree.branch.last_revision_info())
666
# If allow_overwrite_descendant flag is 1, then setting the tip to an
668
response = self.request.execute(
669
'', branch_token, repo_token, rev_id_utf8, 0, 1)
671
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
675
(1, rev_id_utf8), self.tree.branch.last_revision_info())
677
def make_branch_with_divergent_history(self):
678
"""Make a branch with divergent history in its repo.
680
The branch's tip will be 'child-2', and the repo will also contain
681
'child-1', which diverges from a common base revision.
683
self.tree.lock_write()
685
r1 = self.tree.commit('1st commit')
686
revno_1, revid_1 = self.tree.branch.last_revision_info()
687
r2 = self.tree.commit('2nd commit', rev_id='child-1')
688
# Undo the second commit
689
self.tree.branch.set_last_revision_info(revno_1, revid_1)
690
self.tree.set_parent_ids([revid_1])
691
# Make a new second commit, child-2. child-2 has diverged from
693
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
696
def test_not_allow_diverged(self):
697
"""If allow_diverged is not passed, then setting a divergent history
698
returns a Diverged error.
700
self.make_branch_with_divergent_history()
702
FailedSmartServerResponse(('Diverged',)),
703
self.set_last_revision('child-1', 2))
704
# The branch tip was not changed.
705
self.assertEqual('child-2', self.tree.branch.last_revision())
707
def test_allow_diverged(self):
708
"""If allow_diverged is passed, then setting a divergent history
711
self.make_branch_with_divergent_history()
712
branch_token, repo_token = self.lock_branch()
713
response = self.request.execute(
714
'', branch_token, repo_token, 'child-1', 1, 0)
716
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
719
# The branch tip was changed.
720
self.assertEqual('child-1', self.tree.branch.last_revision())
723
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
725
def test_get_parent_none(self):
726
base_branch = self.make_branch('base')
727
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
728
response = request.execute('base')
730
SuccessfulSmartServerResponse(('',)), response)
732
def test_get_parent_something(self):
733
base_branch = self.make_branch('base')
734
base_branch.set_parent(self.get_url('foo'))
735
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
736
response = request.execute('base')
738
SuccessfulSmartServerResponse(("../foo",)),
742
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
743
# Only called when the branch format and tags match [yay factory
744
# methods] so only need to test straight forward cases.
746
def test_get_bytes(self):
747
base_branch = self.make_branch('base')
748
request = smart.branch.SmartServerBranchGetTagsBytes(
749
self.get_transport())
750
response = request.execute('base')
752
SuccessfulSmartServerResponse(('',)), response)
755
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
757
def test_get_stacked_on_url(self):
758
base_branch = self.make_branch('base', format='1.6')
759
stacked_branch = self.make_branch('stacked', format='1.6')
760
# typically should be relative
761
stacked_branch.set_stacked_on_url('../base')
762
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
763
self.get_transport())
764
response = request.execute('stacked')
766
SmartServerResponse(('ok', '../base')),
770
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
773
tests.TestCaseWithMemoryTransport.setUp(self)
775
def test_lock_write_on_unlocked_branch(self):
776
backing = self.get_transport()
777
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
778
branch = self.make_branch('.', format='knit')
779
repository = branch.repository
780
response = request.execute('')
781
branch_nonce = branch.control_files._lock.peek().get('nonce')
782
repository_nonce = repository.control_files._lock.peek().get('nonce')
784
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
786
# The branch (and associated repository) is now locked. Verify that
787
# with a new branch object.
788
new_branch = repository.bzrdir.open_branch()
789
self.assertRaises(errors.LockContention, new_branch.lock_write)
791
def test_lock_write_on_locked_branch(self):
792
backing = self.get_transport()
793
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
794
branch = self.make_branch('.')
796
branch.leave_lock_in_place()
798
response = request.execute('')
800
SmartServerResponse(('LockContention',)), response)
802
def test_lock_write_with_tokens_on_locked_branch(self):
803
backing = self.get_transport()
804
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
805
branch = self.make_branch('.', format='knit')
806
branch_token = branch.lock_write()
807
repo_token = branch.repository.lock_write()
808
branch.repository.unlock()
809
branch.leave_lock_in_place()
810
branch.repository.leave_lock_in_place()
812
response = request.execute('',
813
branch_token, repo_token)
815
SmartServerResponse(('ok', branch_token, repo_token)), response)
817
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
818
backing = self.get_transport()
819
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
820
branch = self.make_branch('.', format='knit')
821
branch_token = branch.lock_write()
822
repo_token = branch.repository.lock_write()
823
branch.repository.unlock()
824
branch.leave_lock_in_place()
825
branch.repository.leave_lock_in_place()
827
response = request.execute('',
828
branch_token+'xxx', repo_token)
830
SmartServerResponse(('TokenMismatch',)), response)
832
def test_lock_write_on_locked_repo(self):
833
backing = self.get_transport()
834
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
835
branch = self.make_branch('.', format='knit')
836
branch.repository.lock_write()
837
branch.repository.leave_lock_in_place()
838
branch.repository.unlock()
839
response = request.execute('')
841
SmartServerResponse(('LockContention',)), response)
843
def test_lock_write_on_readonly_transport(self):
844
backing = self.get_readonly_transport()
845
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
846
branch = self.make_branch('.')
847
root = self.get_transport().clone('/')
848
path = urlutils.relative_url(root.base, self.get_transport().base)
849
response = request.execute(path)
850
error_name, lock_str, why_str = response.args
851
self.assertFalse(response.is_successful())
852
self.assertEqual('LockFailed', error_name)
855
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
858
tests.TestCaseWithMemoryTransport.setUp(self)
860
def test_unlock_on_locked_branch_and_repo(self):
861
backing = self.get_transport()
862
request = smart.branch.SmartServerBranchRequestUnlock(backing)
863
branch = self.make_branch('.', format='knit')
865
branch_token = branch.lock_write()
866
repo_token = branch.repository.lock_write()
867
branch.repository.unlock()
868
# Unlock the branch (and repo) object, leaving the physical locks
870
branch.leave_lock_in_place()
871
branch.repository.leave_lock_in_place()
873
response = request.execute('',
874
branch_token, repo_token)
876
SmartServerResponse(('ok',)), response)
877
# The branch is now unlocked. Verify that with a new branch
879
new_branch = branch.bzrdir.open_branch()
880
new_branch.lock_write()
883
def test_unlock_on_unlocked_branch_unlocked_repo(self):
884
backing = self.get_transport()
885
request = smart.branch.SmartServerBranchRequestUnlock(backing)
886
branch = self.make_branch('.', format='knit')
887
response = request.execute(
888
'', 'branch token', 'repo token')
890
SmartServerResponse(('TokenMismatch',)), response)
892
def test_unlock_on_unlocked_branch_locked_repo(self):
893
backing = self.get_transport()
894
request = smart.branch.SmartServerBranchRequestUnlock(backing)
895
branch = self.make_branch('.', format='knit')
896
# Lock the repository.
897
repo_token = branch.repository.lock_write()
898
branch.repository.leave_lock_in_place()
899
branch.repository.unlock()
900
# Issue branch lock_write request on the unlocked branch (with locked
902
response = request.execute(
903
'', 'branch token', repo_token)
905
SmartServerResponse(('TokenMismatch',)), response)
908
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
910
def test_no_repository(self):
911
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
912
# we test this using a shared repository above the named path,
913
# thus checking the right search logic is used - that is, that
914
# its the exact path being looked at and the server is not
916
backing = self.get_transport()
917
request = smart.repository.SmartServerRepositoryRequest(backing)
918
self.make_repository('.', shared=True)
919
self.make_bzrdir('subdir')
920
self.assertRaises(errors.NoRepositoryPresent,
921
request.execute, 'subdir')
924
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
926
def test_trivial_bzipped(self):
927
# This tests that the wire encoding is actually bzipped
928
backing = self.get_transport()
929
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
930
tree = self.make_branch_and_memory_tree('.')
932
self.assertEqual(None,
933
request.execute('', 'missing-id'))
934
# Note that it returns a body that is bzipped.
936
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
937
request.do_body('\n\n0\n'))
939
def test_trivial_include_missing(self):
940
backing = self.get_transport()
941
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
942
tree = self.make_branch_and_memory_tree('.')
944
self.assertEqual(None,
945
request.execute('', 'missing-id', 'include-missing:'))
947
SuccessfulSmartServerResponse(('ok', ),
948
bz2.compress('missing:missing-id')),
949
request.do_body('\n\n0\n'))
952
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
954
def test_none_argument(self):
955
backing = self.get_transport()
956
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
957
tree = self.make_branch_and_memory_tree('.')
960
r1 = tree.commit('1st commit')
961
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
964
# the lines of revision_id->revision_parent_list has no guaranteed
965
# order coming out of a dict, so sort both our test and response
966
lines = sorted([' '.join([r2, r1]), r1])
967
response = request.execute('', '')
968
response.body = '\n'.join(sorted(response.body.split('\n')))
971
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
973
def test_specific_revision_argument(self):
974
backing = self.get_transport()
975
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
976
tree = self.make_branch_and_memory_tree('.')
979
rev_id_utf8 = u'\xc9'.encode('utf-8')
980
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
981
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
984
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
985
request.execute('', rev_id_utf8))
987
def test_no_such_revision(self):
988
backing = self.get_transport()
989
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
990
tree = self.make_branch_and_memory_tree('.')
993
r1 = tree.commit('1st commit')
996
# Note that it still returns body (of zero bytes).
998
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
999
request.execute('', 'missingrevision'))
1002
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1004
def make_two_commit_repo(self):
1005
tree = self.make_branch_and_memory_tree('.')
1008
r1 = tree.commit('1st commit')
1009
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1011
repo = tree.branch.repository
1014
def test_ancestry_of(self):
1015
"""The search argument may be a 'ancestry-of' some heads'."""
1016
backing = self.get_transport()
1017
request = smart.repository.SmartServerRepositoryGetStream(backing)
1018
repo, r1, r2 = self.make_two_commit_repo()
1019
fetch_spec = ['ancestry-of', r2]
1020
lines = '\n'.join(fetch_spec)
1021
request.execute('', repo._format.network_name())
1022
response = request.do_body(lines)
1023
self.assertEqual(('ok',), response.args)
1024
stream_bytes = ''.join(response.body_stream)
1025
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1027
def test_search(self):
1028
"""The search argument may be a 'search' of some explicit keys."""
1029
backing = self.get_transport()
1030
request = smart.repository.SmartServerRepositoryGetStream(backing)
1031
repo, r1, r2 = self.make_two_commit_repo()
1032
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1033
lines = '\n'.join(fetch_spec)
1034
request.execute('', repo._format.network_name())
1035
response = request.do_body(lines)
1036
self.assertEqual(('ok',), response.args)
1037
stream_bytes = ''.join(response.body_stream)
1038
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1041
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1043
def test_missing_revision(self):
1044
"""For a missing revision, ('no', ) is returned."""
1045
backing = self.get_transport()
1046
request = smart.repository.SmartServerRequestHasRevision(backing)
1047
self.make_repository('.')
1048
self.assertEqual(SmartServerResponse(('no', )),
1049
request.execute('', 'revid'))
1051
def test_present_revision(self):
1052
"""For a present revision, ('yes', ) is returned."""
1053
backing = self.get_transport()
1054
request = smart.repository.SmartServerRequestHasRevision(backing)
1055
tree = self.make_branch_and_memory_tree('.')
1058
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1059
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1061
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1062
self.assertEqual(SmartServerResponse(('yes', )),
1063
request.execute('', rev_id_utf8))
1066
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1068
def test_empty_revid(self):
1069
"""With an empty revid, we get only size an number and revisions"""
1070
backing = self.get_transport()
1071
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1072
repository = self.make_repository('.')
1073
stats = repository.gather_stats()
1074
expected_body = 'revisions: 0\n'
1075
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1076
request.execute('', '', 'no'))
1078
def test_revid_with_committers(self):
1079
"""For a revid we get more infos."""
1080
backing = self.get_transport()
1081
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1082
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1083
tree = self.make_branch_and_memory_tree('.')
1086
# Let's build a predictable result
1087
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1088
tree.commit('a commit', timestamp=654321.4, timezone=0,
1092
stats = tree.branch.repository.gather_stats()
1093
expected_body = ('firstrev: 123456.200 3600\n'
1094
'latestrev: 654321.400 0\n'
1096
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1100
def test_not_empty_repository_with_committers(self):
1101
"""For a revid and requesting committers we get the whole thing."""
1102
backing = self.get_transport()
1103
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1104
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1105
tree = self.make_branch_and_memory_tree('.')
1108
# Let's build a predictable result
1109
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1111
tree.commit('a commit', timestamp=654321.4, timezone=0,
1112
committer='bar', rev_id=rev_id_utf8)
1114
stats = tree.branch.repository.gather_stats()
1116
expected_body = ('committers: 2\n'
1117
'firstrev: 123456.200 3600\n'
1118
'latestrev: 654321.400 0\n'
1120
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1122
rev_id_utf8, 'yes'))
1125
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1127
def test_is_shared(self):
1128
"""For a shared repository, ('yes', ) is returned."""
1129
backing = self.get_transport()
1130
request = smart.repository.SmartServerRepositoryIsShared(backing)
1131
self.make_repository('.', shared=True)
1132
self.assertEqual(SmartServerResponse(('yes', )),
1133
request.execute('', ))
1135
def test_is_not_shared(self):
1136
"""For a shared repository, ('no', ) is returned."""
1137
backing = self.get_transport()
1138
request = smart.repository.SmartServerRepositoryIsShared(backing)
1139
self.make_repository('.', shared=False)
1140
self.assertEqual(SmartServerResponse(('no', )),
1141
request.execute('', ))
1144
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1146
def test_lock_write_on_unlocked_repo(self):
1147
backing = self.get_transport()
1148
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1149
repository = self.make_repository('.', format='knit')
1150
response = request.execute('')
1151
nonce = repository.control_files._lock.peek().get('nonce')
1152
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1153
# The repository is now locked. Verify that with a new repository
1155
new_repo = repository.bzrdir.open_repository()
1156
self.assertRaises(errors.LockContention, new_repo.lock_write)
1158
def test_lock_write_on_locked_repo(self):
1159
backing = self.get_transport()
1160
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1161
repository = self.make_repository('.', format='knit')
1162
repository.lock_write()
1163
repository.leave_lock_in_place()
1165
response = request.execute('')
1167
SmartServerResponse(('LockContention',)), response)
1169
def test_lock_write_on_readonly_transport(self):
1170
backing = self.get_readonly_transport()
1171
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1172
repository = self.make_repository('.', format='knit')
1173
response = request.execute('')
1174
self.assertFalse(response.is_successful())
1175
self.assertEqual('LockFailed', response.args[0])
1178
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1180
def make_empty_byte_stream(self, repo):
1181
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1182
return ''.join(byte_stream)
1185
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1187
def test_insert_stream_empty(self):
1188
backing = self.get_transport()
1189
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1190
repository = self.make_repository('.')
1191
response = request.execute('', '')
1192
self.assertEqual(None, response)
1193
response = request.do_chunk(self.make_empty_byte_stream(repository))
1194
self.assertEqual(None, response)
1195
response = request.do_end()
1196
self.assertEqual(SmartServerResponse(('ok', )), response)
1199
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1201
def test_insert_stream_empty(self):
1202
backing = self.get_transport()
1203
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1205
repository = self.make_repository('.', format='knit')
1206
lock_token = repository.lock_write()
1207
response = request.execute('', '', lock_token)
1208
self.assertEqual(None, response)
1209
response = request.do_chunk(self.make_empty_byte_stream(repository))
1210
self.assertEqual(None, response)
1211
response = request.do_end()
1212
self.assertEqual(SmartServerResponse(('ok', )), response)
1215
def test_insert_stream_with_wrong_lock_token(self):
1216
backing = self.get_transport()
1217
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1219
repository = self.make_repository('.', format='knit')
1220
lock_token = repository.lock_write()
1222
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1226
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1229
tests.TestCaseWithMemoryTransport.setUp(self)
1231
def test_unlock_on_locked_repo(self):
1232
backing = self.get_transport()
1233
request = smart.repository.SmartServerRepositoryUnlock(backing)
1234
repository = self.make_repository('.', format='knit')
1235
token = repository.lock_write()
1236
repository.leave_lock_in_place()
1238
response = request.execute('', token)
1240
SmartServerResponse(('ok',)), response)
1241
# The repository is now unlocked. Verify that with a new repository
1243
new_repo = repository.bzrdir.open_repository()
1244
new_repo.lock_write()
1247
def test_unlock_on_unlocked_repo(self):
1248
backing = self.get_transport()
1249
request = smart.repository.SmartServerRepositoryUnlock(backing)
1250
repository = self.make_repository('.', format='knit')
1251
response = request.execute('', 'some token')
1253
SmartServerResponse(('TokenMismatch',)), response)
1256
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1258
def test_is_readonly_no(self):
1259
backing = self.get_transport()
1260
request = smart.request.SmartServerIsReadonly(backing)
1261
response = request.execute()
1263
SmartServerResponse(('no',)), response)
1265
def test_is_readonly_yes(self):
1266
backing = self.get_readonly_transport()
1267
request = smart.request.SmartServerIsReadonly(backing)
1268
response = request.execute()
1270
SmartServerResponse(('yes',)), response)
1273
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1275
def test_set_false(self):
1276
backing = self.get_transport()
1277
repo = self.make_repository('.', shared=True)
1278
repo.set_make_working_trees(True)
1279
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1280
request = request_class(backing)
1281
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1282
request.execute('', 'False'))
1283
repo = repo.bzrdir.open_repository()
1284
self.assertFalse(repo.make_working_trees())
1286
def test_set_true(self):
1287
backing = self.get_transport()
1288
repo = self.make_repository('.', shared=True)
1289
repo.set_make_working_trees(False)
1290
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1291
request = request_class(backing)
1292
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1293
request.execute('', 'True'))
1294
repo = repo.bzrdir.open_repository()
1295
self.assertTrue(repo.make_working_trees())
1298
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1300
def make_repo_needing_autopacking(self, path='.'):
1301
# Make a repo in need of autopacking.
1302
tree = self.make_branch_and_tree('.', format='pack-0.92')
1303
repo = tree.branch.repository
1304
# monkey-patch the pack collection to disable autopacking
1305
repo._pack_collection._max_pack_count = lambda count: count
1307
tree.commit('commit %s' % x)
1308
self.assertEqual(10, len(repo._pack_collection.names()))
1309
del repo._pack_collection._max_pack_count
1312
def test_autopack_needed(self):
1313
repo = self.make_repo_needing_autopacking()
1315
self.addCleanup(repo.unlock)
1316
backing = self.get_transport()
1317
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1319
response = request.execute('')
1320
self.assertEqual(SmartServerResponse(('ok',)), response)
1321
repo._pack_collection.reload_pack_names()
1322
self.assertEqual(1, len(repo._pack_collection.names()))
1324
def test_autopack_not_needed(self):
1325
tree = self.make_branch_and_tree('.', format='pack-0.92')
1326
repo = tree.branch.repository
1328
self.addCleanup(repo.unlock)
1330
tree.commit('commit %s' % x)
1331
backing = self.get_transport()
1332
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1334
response = request.execute('')
1335
self.assertEqual(SmartServerResponse(('ok',)), response)
1336
repo._pack_collection.reload_pack_names()
1337
self.assertEqual(9, len(repo._pack_collection.names()))
1339
def test_autopack_on_nonpack_format(self):
1340
"""A request to autopack a non-pack repo is a no-op."""
1341
repo = self.make_repository('.', format='knit')
1342
backing = self.get_transport()
1343
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1345
response = request.execute('')
1346
self.assertEqual(SmartServerResponse(('ok',)), response)
1349
class TestHandlers(tests.TestCase):
1350
"""Tests for the request.request_handlers object."""
1352
def test_all_registrations_exist(self):
1353
"""All registered request_handlers can be found."""
1354
# If there's a typo in a register_lazy call, this loop will fail with
1355
# an AttributeError.
1356
for key, item in smart.request.request_handlers.iteritems():
1359
def test_registered_methods(self):
1360
"""Test that known methods are registered to the correct object."""
1362
smart.request.request_handlers.get('Branch.get_config_file'),
1363
smart.branch.SmartServerBranchGetConfigFile)
1365
smart.request.request_handlers.get('Branch.get_parent'),
1366
smart.branch.SmartServerBranchGetParent)
1368
smart.request.request_handlers.get('Branch.get_tags_bytes'),
1369
smart.branch.SmartServerBranchGetTagsBytes)
1371
smart.request.request_handlers.get('Branch.lock_write'),
1372
smart.branch.SmartServerBranchRequestLockWrite)
1374
smart.request.request_handlers.get('Branch.last_revision_info'),
1375
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1377
smart.request.request_handlers.get('Branch.revision_history'),
1378
smart.branch.SmartServerRequestRevisionHistory)
1380
smart.request.request_handlers.get('Branch.set_last_revision'),
1381
smart.branch.SmartServerBranchRequestSetLastRevision)
1383
smart.request.request_handlers.get('Branch.set_last_revision_info'),
1384
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1386
smart.request.request_handlers.get('Branch.unlock'),
1387
smart.branch.SmartServerBranchRequestUnlock)
1389
smart.request.request_handlers.get('BzrDir.find_repository'),
1390
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1392
smart.request.request_handlers.get('BzrDir.find_repositoryV2'),
1393
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1395
smart.request.request_handlers.get('BzrDirFormat.initialize'),
1396
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1398
smart.request.request_handlers.get('BzrDir.cloning_metadir'),
1399
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1401
smart.request.request_handlers.get('BzrDir.open_branch'),
1402
smart.bzrdir.SmartServerRequestOpenBranch)
1404
smart.request.request_handlers.get('BzrDir.open_branchV2'),
1405
smart.bzrdir.SmartServerRequestOpenBranchV2)
1407
smart.request.request_handlers.get('PackRepository.autopack'),
1408
smart.packrepository.SmartServerPackRepositoryAutopack)
1410
smart.request.request_handlers.get('Repository.gather_stats'),
1411
smart.repository.SmartServerRepositoryGatherStats)
1413
smart.request.request_handlers.get('Repository.get_parent_map'),
1414
smart.repository.SmartServerRepositoryGetParentMap)
1416
smart.request.request_handlers.get(
1417
'Repository.get_revision_graph'),
1418
smart.repository.SmartServerRepositoryGetRevisionGraph)
1420
smart.request.request_handlers.get('Repository.get_stream'),
1421
smart.repository.SmartServerRepositoryGetStream)
1423
smart.request.request_handlers.get('Repository.has_revision'),
1424
smart.repository.SmartServerRequestHasRevision)
1426
smart.request.request_handlers.get('Repository.insert_stream'),
1427
smart.repository.SmartServerRepositoryInsertStream)
1429
smart.request.request_handlers.get('Repository.insert_stream_locked'),
1430
smart.repository.SmartServerRepositoryInsertStreamLocked)
1432
smart.request.request_handlers.get('Repository.is_shared'),
1433
smart.repository.SmartServerRepositoryIsShared)
1435
smart.request.request_handlers.get('Repository.lock_write'),
1436
smart.repository.SmartServerRepositoryLockWrite)
1438
smart.request.request_handlers.get('Repository.tarball'),
1439
smart.repository.SmartServerRepositoryTarball)
1441
smart.request.request_handlers.get('Repository.unlock'),
1442
smart.repository.SmartServerRepositoryUnlock)
1444
smart.request.request_handlers.get('Transport.is_readonly'),
1445
smart.request.SmartServerIsReadonly)