1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerBzrDirRequestGetConfigFile(
300
tests.TestCaseWithMemoryTransport):
301
"""Tests for BzrDir.get_config_file."""
303
def test_present(self):
304
backing = self.get_transport()
305
dir = self.make_bzrdir('.')
306
dir.get_config().set_default_stack_on("/")
307
local_result = dir._get_config()._get_config_file().read()
308
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
309
request = request_class(backing)
310
expected = SuccessfulSmartServerResponse((), local_result)
311
self.assertEqual(expected, request.execute(''))
313
def test_missing(self):
314
backing = self.get_transport()
315
dir = self.make_bzrdir('.')
316
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
317
request = request_class(backing)
318
expected = SuccessfulSmartServerResponse((), '')
319
self.assertEqual(expected, request.execute(''))
322
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
324
def test_empty_dir(self):
325
"""Initializing an empty dir should succeed and do it."""
326
backing = self.get_transport()
327
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
328
self.assertEqual(SmartServerResponse(('ok', )),
330
made_dir = bzrdir.BzrDir.open_from_transport(backing)
331
# no branch, tree or repository is expected with the current
333
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
334
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
335
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
337
def test_missing_dir(self):
338
"""Initializing a missing directory should fail like the bzrdir api."""
339
backing = self.get_transport()
340
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
341
self.assertRaises(errors.NoSuchFile,
342
request.execute, 'subdir')
344
def test_initialized_dir(self):
345
"""Initializing an extant bzrdir should fail like the bzrdir api."""
346
backing = self.get_transport()
347
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
348
self.make_bzrdir('subdir')
349
self.assertRaises(errors.FileExists,
350
request.execute, 'subdir')
353
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
355
def test_no_branch(self):
356
"""When there is no branch, ('nobranch', ) is returned."""
357
backing = self.get_transport()
358
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
359
self.make_bzrdir('.')
360
self.assertEqual(SmartServerResponse(('nobranch', )),
363
def test_branch(self):
364
"""When there is a branch, 'ok' is returned."""
365
backing = self.get_transport()
366
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
367
self.make_branch('.')
368
self.assertEqual(SmartServerResponse(('ok', '')),
371
def test_branch_reference(self):
372
"""When there is a branch reference, the reference URL is returned."""
373
backing = self.get_transport()
374
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
375
branch = self.make_branch('branch')
376
checkout = branch.create_checkout('reference',lightweight=True)
377
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
378
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
379
self.assertEqual(SmartServerResponse(('ok', reference_url)),
380
request.execute('reference'))
383
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
385
def test_no_branch(self):
386
"""When there is no branch, ('nobranch', ) is returned."""
387
backing = self.get_transport()
388
self.make_bzrdir('.')
389
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
390
self.assertEqual(SmartServerResponse(('nobranch', )),
393
def test_branch(self):
394
"""When there is a branch, 'ok' is returned."""
395
backing = self.get_transport()
396
expected = self.make_branch('.')._format.network_name()
397
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
398
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
401
def test_branch_reference(self):
402
"""When there is a branch reference, the reference URL is returned."""
403
backing = self.get_transport()
404
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
405
branch = self.make_branch('branch')
406
checkout = branch.create_checkout('reference',lightweight=True)
407
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
408
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
409
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
410
request.execute('reference'))
412
def test_stacked_branch(self):
413
"""Opening a stacked branch does not open the stacked-on branch."""
414
trunk = self.make_branch('trunk')
415
feature = self.make_branch('feature', format='1.9')
416
feature.set_stacked_on_url(trunk.base)
418
Branch.hooks.install_named_hook('open', opened_branches.append, None)
419
backing = self.get_transport()
420
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
423
response = request.execute('feature')
425
request.teardown_jail()
426
expected_format = feature._format.network_name()
428
SuccessfulSmartServerResponse(('branch', expected_format)),
430
self.assertLength(1, opened_branches)
433
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
435
def test_empty(self):
436
"""For an empty branch, the body is empty."""
437
backing = self.get_transport()
438
request = smart.branch.SmartServerRequestRevisionHistory(backing)
439
self.make_branch('.')
440
self.assertEqual(SmartServerResponse(('ok', ), ''),
443
def test_not_empty(self):
444
"""For a non-empty branch, the body is empty."""
445
backing = self.get_transport()
446
request = smart.branch.SmartServerRequestRevisionHistory(backing)
447
tree = self.make_branch_and_memory_tree('.')
450
r1 = tree.commit('1st commit')
451
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
454
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
458
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
460
def test_no_branch(self):
461
"""When there is a bzrdir and no branch, NotBranchError is raised."""
462
backing = self.get_transport()
463
request = smart.branch.SmartServerBranchRequest(backing)
464
self.make_bzrdir('.')
465
self.assertRaises(errors.NotBranchError,
468
def test_branch_reference(self):
469
"""When there is a branch reference, NotBranchError is raised."""
470
backing = self.get_transport()
471
request = smart.branch.SmartServerBranchRequest(backing)
472
branch = self.make_branch('branch')
473
checkout = branch.create_checkout('reference',lightweight=True)
474
self.assertRaises(errors.NotBranchError,
475
request.execute, 'checkout')
478
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
480
def test_empty(self):
481
"""For an empty branch, the result is ('ok', '0', 'null:')."""
482
backing = self.get_transport()
483
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
484
self.make_branch('.')
485
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
488
def test_not_empty(self):
489
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
490
backing = self.get_transport()
491
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
492
tree = self.make_branch_and_memory_tree('.')
495
rev_id_utf8 = u'\xc8'.encode('utf-8')
496
r1 = tree.commit('1st commit')
497
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
500
SmartServerResponse(('ok', '2', rev_id_utf8)),
504
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
506
def test_default(self):
507
"""With no file, we get empty content."""
508
backing = self.get_transport()
509
request = smart.branch.SmartServerBranchGetConfigFile(backing)
510
branch = self.make_branch('.')
511
# there should be no file by default
513
self.assertEqual(SmartServerResponse(('ok', ), content),
516
def test_with_content(self):
517
# SmartServerBranchGetConfigFile should return the content from
518
# branch.control_files.get('branch.conf') for now - in the future it may
519
# perform more complex processing.
520
backing = self.get_transport()
521
request = smart.branch.SmartServerBranchGetConfigFile(backing)
522
branch = self.make_branch('.')
523
branch._transport.put_bytes('branch.conf', 'foo bar baz')
524
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
528
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
530
def get_lock_tokens(self, branch):
531
branch_token = branch.lock_write()
532
repo_token = branch.repository.lock_write()
533
branch.repository.unlock()
534
return branch_token, repo_token
537
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
539
def test_value_name(self):
540
branch = self.make_branch('.')
541
request = smart.branch.SmartServerBranchRequestSetConfigOption(
542
branch.bzrdir.root_transport)
543
branch_token, repo_token = self.get_lock_tokens(branch)
544
config = branch._get_config()
545
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
547
self.assertEqual(SuccessfulSmartServerResponse(()), result)
548
self.assertEqual('bar', config.get_option('foo'))
550
def test_value_name_section(self):
551
branch = self.make_branch('.')
552
request = smart.branch.SmartServerBranchRequestSetConfigOption(
553
branch.bzrdir.root_transport)
554
branch_token, repo_token = self.get_lock_tokens(branch)
555
config = branch._get_config()
556
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
558
self.assertEqual(SuccessfulSmartServerResponse(()), result)
559
self.assertEqual('bar', config.get_option('foo', 'gam'))
562
class SetLastRevisionTestBase(TestLockedBranch):
563
"""Base test case for verbs that implement set_last_revision."""
566
tests.TestCaseWithMemoryTransport.setUp(self)
567
backing_transport = self.get_transport()
568
self.request = self.request_class(backing_transport)
569
self.tree = self.make_branch_and_memory_tree('.')
571
def lock_branch(self):
572
return self.get_lock_tokens(self.tree.branch)
574
def unlock_branch(self):
575
self.tree.branch.unlock()
577
def set_last_revision(self, revision_id, revno):
578
branch_token, repo_token = self.lock_branch()
579
response = self._set_last_revision(
580
revision_id, revno, branch_token, repo_token)
584
def assertRequestSucceeds(self, revision_id, revno):
585
response = self.set_last_revision(revision_id, revno)
586
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
589
class TestSetLastRevisionVerbMixin(object):
590
"""Mixin test case for verbs that implement set_last_revision."""
592
def test_set_null_to_null(self):
593
"""An empty branch can have its last revision set to 'null:'."""
594
self.assertRequestSucceeds('null:', 0)
596
def test_NoSuchRevision(self):
597
"""If the revision_id is not present, the verb returns NoSuchRevision.
599
revision_id = 'non-existent revision'
601
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
602
self.set_last_revision(revision_id, 1))
604
def make_tree_with_two_commits(self):
605
self.tree.lock_write()
607
rev_id_utf8 = u'\xc8'.encode('utf-8')
608
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
609
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
612
def test_branch_last_revision_info_is_updated(self):
613
"""A branch's tip can be set to a revision that is present in its
616
# Make a branch with an empty revision history, but two revisions in
618
self.make_tree_with_two_commits()
619
rev_id_utf8 = u'\xc8'.encode('utf-8')
620
self.tree.branch.set_revision_history([])
622
(0, 'null:'), self.tree.branch.last_revision_info())
623
# We can update the branch to a revision that is present in the
625
self.assertRequestSucceeds(rev_id_utf8, 1)
627
(1, rev_id_utf8), self.tree.branch.last_revision_info())
629
def test_branch_last_revision_info_rewind(self):
630
"""A branch's tip can be set to a revision that is an ancestor of the
633
self.make_tree_with_two_commits()
634
rev_id_utf8 = u'\xc8'.encode('utf-8')
636
(2, 'rev-2'), self.tree.branch.last_revision_info())
637
self.assertRequestSucceeds(rev_id_utf8, 1)
639
(1, rev_id_utf8), self.tree.branch.last_revision_info())
641
def test_TipChangeRejected(self):
642
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
643
returns TipChangeRejected.
645
rejection_message = u'rejection message\N{INTERROBANG}'
646
def hook_that_rejects(params):
647
raise errors.TipChangeRejected(rejection_message)
648
Branch.hooks.install_named_hook(
649
'pre_change_branch_tip', hook_that_rejects, None)
651
FailedSmartServerResponse(
652
('TipChangeRejected', rejection_message.encode('utf-8'))),
653
self.set_last_revision('null:', 0))
656
class TestSmartServerBranchRequestSetLastRevision(
657
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
658
"""Tests for Branch.set_last_revision verb."""
660
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
662
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
663
return self.request.execute(
664
'', branch_token, repo_token, revision_id)
667
class TestSmartServerBranchRequestSetLastRevisionInfo(
668
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
669
"""Tests for Branch.set_last_revision_info verb."""
671
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
673
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
674
return self.request.execute(
675
'', branch_token, repo_token, revno, revision_id)
677
def test_NoSuchRevision(self):
678
"""Branch.set_last_revision_info does not have to return
679
NoSuchRevision if the revision_id is absent.
681
raise tests.TestNotApplicable()
684
class TestSmartServerBranchRequestSetLastRevisionEx(
685
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
686
"""Tests for Branch.set_last_revision_ex verb."""
688
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
690
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
691
return self.request.execute(
692
'', branch_token, repo_token, revision_id, 0, 0)
694
def assertRequestSucceeds(self, revision_id, revno):
695
response = self.set_last_revision(revision_id, revno)
697
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
700
def test_branch_last_revision_info_rewind(self):
701
"""A branch's tip can be set to a revision that is an ancestor of the
702
current tip, but only if allow_overwrite_descendant is passed.
704
self.make_tree_with_two_commits()
705
rev_id_utf8 = u'\xc8'.encode('utf-8')
707
(2, 'rev-2'), self.tree.branch.last_revision_info())
708
# If allow_overwrite_descendant flag is 0, then trying to set the tip
709
# to an older revision ID has no effect.
710
branch_token, repo_token = self.lock_branch()
711
response = self.request.execute(
712
'', branch_token, repo_token, rev_id_utf8, 0, 0)
714
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
717
(2, 'rev-2'), self.tree.branch.last_revision_info())
719
# If allow_overwrite_descendant flag is 1, then setting the tip to an
721
response = self.request.execute(
722
'', branch_token, repo_token, rev_id_utf8, 0, 1)
724
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
728
(1, rev_id_utf8), self.tree.branch.last_revision_info())
730
def make_branch_with_divergent_history(self):
731
"""Make a branch with divergent history in its repo.
733
The branch's tip will be 'child-2', and the repo will also contain
734
'child-1', which diverges from a common base revision.
736
self.tree.lock_write()
738
r1 = self.tree.commit('1st commit')
739
revno_1, revid_1 = self.tree.branch.last_revision_info()
740
r2 = self.tree.commit('2nd commit', rev_id='child-1')
741
# Undo the second commit
742
self.tree.branch.set_last_revision_info(revno_1, revid_1)
743
self.tree.set_parent_ids([revid_1])
744
# Make a new second commit, child-2. child-2 has diverged from
746
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
749
def test_not_allow_diverged(self):
750
"""If allow_diverged is not passed, then setting a divergent history
751
returns a Diverged error.
753
self.make_branch_with_divergent_history()
755
FailedSmartServerResponse(('Diverged',)),
756
self.set_last_revision('child-1', 2))
757
# The branch tip was not changed.
758
self.assertEqual('child-2', self.tree.branch.last_revision())
760
def test_allow_diverged(self):
761
"""If allow_diverged is passed, then setting a divergent history
764
self.make_branch_with_divergent_history()
765
branch_token, repo_token = self.lock_branch()
766
response = self.request.execute(
767
'', branch_token, repo_token, 'child-1', 1, 0)
769
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
772
# The branch tip was changed.
773
self.assertEqual('child-1', self.tree.branch.last_revision())
776
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
778
def test_get_parent_none(self):
779
base_branch = self.make_branch('base')
780
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
781
response = request.execute('base')
783
SuccessfulSmartServerResponse(('',)), response)
785
def test_get_parent_something(self):
786
base_branch = self.make_branch('base')
787
base_branch.set_parent(self.get_url('foo'))
788
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
789
response = request.execute('base')
791
SuccessfulSmartServerResponse(("../foo",)),
795
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
796
# Only called when the branch format and tags match [yay factory
797
# methods] so only need to test straight forward cases.
799
def test_get_bytes(self):
800
base_branch = self.make_branch('base')
801
request = smart.branch.SmartServerBranchGetTagsBytes(
802
self.get_transport())
803
response = request.execute('base')
805
SuccessfulSmartServerResponse(('',)), response)
808
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
810
def test_get_stacked_on_url(self):
811
base_branch = self.make_branch('base', format='1.6')
812
stacked_branch = self.make_branch('stacked', format='1.6')
813
# typically should be relative
814
stacked_branch.set_stacked_on_url('../base')
815
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
816
self.get_transport())
817
response = request.execute('stacked')
819
SmartServerResponse(('ok', '../base')),
823
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
826
tests.TestCaseWithMemoryTransport.setUp(self)
828
def test_lock_write_on_unlocked_branch(self):
829
backing = self.get_transport()
830
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
831
branch = self.make_branch('.', format='knit')
832
repository = branch.repository
833
response = request.execute('')
834
branch_nonce = branch.control_files._lock.peek().get('nonce')
835
repository_nonce = repository.control_files._lock.peek().get('nonce')
837
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
839
# The branch (and associated repository) is now locked. Verify that
840
# with a new branch object.
841
new_branch = repository.bzrdir.open_branch()
842
self.assertRaises(errors.LockContention, new_branch.lock_write)
844
def test_lock_write_on_locked_branch(self):
845
backing = self.get_transport()
846
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
847
branch = self.make_branch('.')
849
branch.leave_lock_in_place()
851
response = request.execute('')
853
SmartServerResponse(('LockContention',)), response)
855
def test_lock_write_with_tokens_on_locked_branch(self):
856
backing = self.get_transport()
857
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
858
branch = self.make_branch('.', format='knit')
859
branch_token = branch.lock_write()
860
repo_token = branch.repository.lock_write()
861
branch.repository.unlock()
862
branch.leave_lock_in_place()
863
branch.repository.leave_lock_in_place()
865
response = request.execute('',
866
branch_token, repo_token)
868
SmartServerResponse(('ok', branch_token, repo_token)), response)
870
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
871
backing = self.get_transport()
872
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
873
branch = self.make_branch('.', format='knit')
874
branch_token = branch.lock_write()
875
repo_token = branch.repository.lock_write()
876
branch.repository.unlock()
877
branch.leave_lock_in_place()
878
branch.repository.leave_lock_in_place()
880
response = request.execute('',
881
branch_token+'xxx', repo_token)
883
SmartServerResponse(('TokenMismatch',)), response)
885
def test_lock_write_on_locked_repo(self):
886
backing = self.get_transport()
887
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
888
branch = self.make_branch('.', format='knit')
889
branch.repository.lock_write()
890
branch.repository.leave_lock_in_place()
891
branch.repository.unlock()
892
response = request.execute('')
894
SmartServerResponse(('LockContention',)), response)
896
def test_lock_write_on_readonly_transport(self):
897
backing = self.get_readonly_transport()
898
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
899
branch = self.make_branch('.')
900
root = self.get_transport().clone('/')
901
path = urlutils.relative_url(root.base, self.get_transport().base)
902
response = request.execute(path)
903
error_name, lock_str, why_str = response.args
904
self.assertFalse(response.is_successful())
905
self.assertEqual('LockFailed', error_name)
908
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
911
tests.TestCaseWithMemoryTransport.setUp(self)
913
def test_unlock_on_locked_branch_and_repo(self):
914
backing = self.get_transport()
915
request = smart.branch.SmartServerBranchRequestUnlock(backing)
916
branch = self.make_branch('.', format='knit')
918
branch_token = branch.lock_write()
919
repo_token = branch.repository.lock_write()
920
branch.repository.unlock()
921
# Unlock the branch (and repo) object, leaving the physical locks
923
branch.leave_lock_in_place()
924
branch.repository.leave_lock_in_place()
926
response = request.execute('',
927
branch_token, repo_token)
929
SmartServerResponse(('ok',)), response)
930
# The branch is now unlocked. Verify that with a new branch
932
new_branch = branch.bzrdir.open_branch()
933
new_branch.lock_write()
936
def test_unlock_on_unlocked_branch_unlocked_repo(self):
937
backing = self.get_transport()
938
request = smart.branch.SmartServerBranchRequestUnlock(backing)
939
branch = self.make_branch('.', format='knit')
940
response = request.execute(
941
'', 'branch token', 'repo token')
943
SmartServerResponse(('TokenMismatch',)), response)
945
def test_unlock_on_unlocked_branch_locked_repo(self):
946
backing = self.get_transport()
947
request = smart.branch.SmartServerBranchRequestUnlock(backing)
948
branch = self.make_branch('.', format='knit')
949
# Lock the repository.
950
repo_token = branch.repository.lock_write()
951
branch.repository.leave_lock_in_place()
952
branch.repository.unlock()
953
# Issue branch lock_write request on the unlocked branch (with locked
955
response = request.execute(
956
'', 'branch token', repo_token)
958
SmartServerResponse(('TokenMismatch',)), response)
961
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
963
def test_no_repository(self):
964
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
965
# we test this using a shared repository above the named path,
966
# thus checking the right search logic is used - that is, that
967
# its the exact path being looked at and the server is not
969
backing = self.get_transport()
970
request = smart.repository.SmartServerRepositoryRequest(backing)
971
self.make_repository('.', shared=True)
972
self.make_bzrdir('subdir')
973
self.assertRaises(errors.NoRepositoryPresent,
974
request.execute, 'subdir')
977
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
979
def test_trivial_bzipped(self):
980
# This tests that the wire encoding is actually bzipped
981
backing = self.get_transport()
982
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
983
tree = self.make_branch_and_memory_tree('.')
985
self.assertEqual(None,
986
request.execute('', 'missing-id'))
987
# Note that it returns a body that is bzipped.
989
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
990
request.do_body('\n\n0\n'))
992
def test_trivial_include_missing(self):
993
backing = self.get_transport()
994
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
995
tree = self.make_branch_and_memory_tree('.')
997
self.assertEqual(None,
998
request.execute('', 'missing-id', 'include-missing:'))
1000
SuccessfulSmartServerResponse(('ok', ),
1001
bz2.compress('missing:missing-id')),
1002
request.do_body('\n\n0\n'))
1005
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1007
def test_none_argument(self):
1008
backing = self.get_transport()
1009
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1010
tree = self.make_branch_and_memory_tree('.')
1013
r1 = tree.commit('1st commit')
1014
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1017
# the lines of revision_id->revision_parent_list has no guaranteed
1018
# order coming out of a dict, so sort both our test and response
1019
lines = sorted([' '.join([r2, r1]), r1])
1020
response = request.execute('', '')
1021
response.body = '\n'.join(sorted(response.body.split('\n')))
1024
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1026
def test_specific_revision_argument(self):
1027
backing = self.get_transport()
1028
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1029
tree = self.make_branch_and_memory_tree('.')
1032
rev_id_utf8 = u'\xc9'.encode('utf-8')
1033
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1034
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1037
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1038
request.execute('', rev_id_utf8))
1040
def test_no_such_revision(self):
1041
backing = self.get_transport()
1042
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1043
tree = self.make_branch_and_memory_tree('.')
1046
r1 = tree.commit('1st commit')
1049
# Note that it still returns body (of zero bytes).
1051
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1052
request.execute('', 'missingrevision'))
1055
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1057
def make_two_commit_repo(self):
1058
tree = self.make_branch_and_memory_tree('.')
1061
r1 = tree.commit('1st commit')
1062
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1064
repo = tree.branch.repository
1067
def test_ancestry_of(self):
1068
"""The search argument may be a 'ancestry-of' some heads'."""
1069
backing = self.get_transport()
1070
request = smart.repository.SmartServerRepositoryGetStream(backing)
1071
repo, r1, r2 = self.make_two_commit_repo()
1072
fetch_spec = ['ancestry-of', r2]
1073
lines = '\n'.join(fetch_spec)
1074
request.execute('', repo._format.network_name())
1075
response = request.do_body(lines)
1076
self.assertEqual(('ok',), response.args)
1077
stream_bytes = ''.join(response.body_stream)
1078
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1080
def test_search(self):
1081
"""The search argument may be a 'search' of some explicit keys."""
1082
backing = self.get_transport()
1083
request = smart.repository.SmartServerRepositoryGetStream(backing)
1084
repo, r1, r2 = self.make_two_commit_repo()
1085
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1086
lines = '\n'.join(fetch_spec)
1087
request.execute('', repo._format.network_name())
1088
response = request.do_body(lines)
1089
self.assertEqual(('ok',), response.args)
1090
stream_bytes = ''.join(response.body_stream)
1091
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1094
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1096
def test_missing_revision(self):
1097
"""For a missing revision, ('no', ) is returned."""
1098
backing = self.get_transport()
1099
request = smart.repository.SmartServerRequestHasRevision(backing)
1100
self.make_repository('.')
1101
self.assertEqual(SmartServerResponse(('no', )),
1102
request.execute('', 'revid'))
1104
def test_present_revision(self):
1105
"""For a present revision, ('yes', ) is returned."""
1106
backing = self.get_transport()
1107
request = smart.repository.SmartServerRequestHasRevision(backing)
1108
tree = self.make_branch_and_memory_tree('.')
1111
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1112
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1114
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1115
self.assertEqual(SmartServerResponse(('yes', )),
1116
request.execute('', rev_id_utf8))
1119
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1121
def test_empty_revid(self):
1122
"""With an empty revid, we get only size an number and revisions"""
1123
backing = self.get_transport()
1124
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1125
repository = self.make_repository('.')
1126
stats = repository.gather_stats()
1127
expected_body = 'revisions: 0\n'
1128
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1129
request.execute('', '', 'no'))
1131
def test_revid_with_committers(self):
1132
"""For a revid we get more infos."""
1133
backing = self.get_transport()
1134
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1135
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1136
tree = self.make_branch_and_memory_tree('.')
1139
# Let's build a predictable result
1140
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1141
tree.commit('a commit', timestamp=654321.4, timezone=0,
1145
stats = tree.branch.repository.gather_stats()
1146
expected_body = ('firstrev: 123456.200 3600\n'
1147
'latestrev: 654321.400 0\n'
1149
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1153
def test_not_empty_repository_with_committers(self):
1154
"""For a revid and requesting committers we get the whole thing."""
1155
backing = self.get_transport()
1156
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1157
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1158
tree = self.make_branch_and_memory_tree('.')
1161
# Let's build a predictable result
1162
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1164
tree.commit('a commit', timestamp=654321.4, timezone=0,
1165
committer='bar', rev_id=rev_id_utf8)
1167
stats = tree.branch.repository.gather_stats()
1169
expected_body = ('committers: 2\n'
1170
'firstrev: 123456.200 3600\n'
1171
'latestrev: 654321.400 0\n'
1173
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1175
rev_id_utf8, 'yes'))
1178
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1180
def test_is_shared(self):
1181
"""For a shared repository, ('yes', ) is returned."""
1182
backing = self.get_transport()
1183
request = smart.repository.SmartServerRepositoryIsShared(backing)
1184
self.make_repository('.', shared=True)
1185
self.assertEqual(SmartServerResponse(('yes', )),
1186
request.execute('', ))
1188
def test_is_not_shared(self):
1189
"""For a shared repository, ('no', ) is returned."""
1190
backing = self.get_transport()
1191
request = smart.repository.SmartServerRepositoryIsShared(backing)
1192
self.make_repository('.', shared=False)
1193
self.assertEqual(SmartServerResponse(('no', )),
1194
request.execute('', ))
1197
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1199
def test_lock_write_on_unlocked_repo(self):
1200
backing = self.get_transport()
1201
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1202
repository = self.make_repository('.', format='knit')
1203
response = request.execute('')
1204
nonce = repository.control_files._lock.peek().get('nonce')
1205
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1206
# The repository is now locked. Verify that with a new repository
1208
new_repo = repository.bzrdir.open_repository()
1209
self.assertRaises(errors.LockContention, new_repo.lock_write)
1211
def test_lock_write_on_locked_repo(self):
1212
backing = self.get_transport()
1213
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1214
repository = self.make_repository('.', format='knit')
1215
repository.lock_write()
1216
repository.leave_lock_in_place()
1218
response = request.execute('')
1220
SmartServerResponse(('LockContention',)), response)
1222
def test_lock_write_on_readonly_transport(self):
1223
backing = self.get_readonly_transport()
1224
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1225
repository = self.make_repository('.', format='knit')
1226
response = request.execute('')
1227
self.assertFalse(response.is_successful())
1228
self.assertEqual('LockFailed', response.args[0])
1231
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1233
def make_empty_byte_stream(self, repo):
1234
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1235
return ''.join(byte_stream)
1238
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1240
def test_insert_stream_empty(self):
1241
backing = self.get_transport()
1242
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1243
repository = self.make_repository('.')
1244
response = request.execute('', '')
1245
self.assertEqual(None, response)
1246
response = request.do_chunk(self.make_empty_byte_stream(repository))
1247
self.assertEqual(None, response)
1248
response = request.do_end()
1249
self.assertEqual(SmartServerResponse(('ok', )), response)
1252
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1254
def test_insert_stream_empty(self):
1255
backing = self.get_transport()
1256
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1258
repository = self.make_repository('.', format='knit')
1259
lock_token = repository.lock_write()
1260
response = request.execute('', '', lock_token)
1261
self.assertEqual(None, response)
1262
response = request.do_chunk(self.make_empty_byte_stream(repository))
1263
self.assertEqual(None, response)
1264
response = request.do_end()
1265
self.assertEqual(SmartServerResponse(('ok', )), response)
1268
def test_insert_stream_with_wrong_lock_token(self):
1269
backing = self.get_transport()
1270
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1272
repository = self.make_repository('.', format='knit')
1273
lock_token = repository.lock_write()
1275
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1279
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1282
tests.TestCaseWithMemoryTransport.setUp(self)
1284
def test_unlock_on_locked_repo(self):
1285
backing = self.get_transport()
1286
request = smart.repository.SmartServerRepositoryUnlock(backing)
1287
repository = self.make_repository('.', format='knit')
1288
token = repository.lock_write()
1289
repository.leave_lock_in_place()
1291
response = request.execute('', token)
1293
SmartServerResponse(('ok',)), response)
1294
# The repository is now unlocked. Verify that with a new repository
1296
new_repo = repository.bzrdir.open_repository()
1297
new_repo.lock_write()
1300
def test_unlock_on_unlocked_repo(self):
1301
backing = self.get_transport()
1302
request = smart.repository.SmartServerRepositoryUnlock(backing)
1303
repository = self.make_repository('.', format='knit')
1304
response = request.execute('', 'some token')
1306
SmartServerResponse(('TokenMismatch',)), response)
1309
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1311
def test_is_readonly_no(self):
1312
backing = self.get_transport()
1313
request = smart.request.SmartServerIsReadonly(backing)
1314
response = request.execute()
1316
SmartServerResponse(('no',)), response)
1318
def test_is_readonly_yes(self):
1319
backing = self.get_readonly_transport()
1320
request = smart.request.SmartServerIsReadonly(backing)
1321
response = request.execute()
1323
SmartServerResponse(('yes',)), response)
1326
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1328
def test_set_false(self):
1329
backing = self.get_transport()
1330
repo = self.make_repository('.', shared=True)
1331
repo.set_make_working_trees(True)
1332
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1333
request = request_class(backing)
1334
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1335
request.execute('', 'False'))
1336
repo = repo.bzrdir.open_repository()
1337
self.assertFalse(repo.make_working_trees())
1339
def test_set_true(self):
1340
backing = self.get_transport()
1341
repo = self.make_repository('.', shared=True)
1342
repo.set_make_working_trees(False)
1343
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1344
request = request_class(backing)
1345
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1346
request.execute('', 'True'))
1347
repo = repo.bzrdir.open_repository()
1348
self.assertTrue(repo.make_working_trees())
1351
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1353
def make_repo_needing_autopacking(self, path='.'):
1354
# Make a repo in need of autopacking.
1355
tree = self.make_branch_and_tree('.', format='pack-0.92')
1356
repo = tree.branch.repository
1357
# monkey-patch the pack collection to disable autopacking
1358
repo._pack_collection._max_pack_count = lambda count: count
1360
tree.commit('commit %s' % x)
1361
self.assertEqual(10, len(repo._pack_collection.names()))
1362
del repo._pack_collection._max_pack_count
1365
def test_autopack_needed(self):
1366
repo = self.make_repo_needing_autopacking()
1368
self.addCleanup(repo.unlock)
1369
backing = self.get_transport()
1370
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1372
response = request.execute('')
1373
self.assertEqual(SmartServerResponse(('ok',)), response)
1374
repo._pack_collection.reload_pack_names()
1375
self.assertEqual(1, len(repo._pack_collection.names()))
1377
def test_autopack_not_needed(self):
1378
tree = self.make_branch_and_tree('.', format='pack-0.92')
1379
repo = tree.branch.repository
1381
self.addCleanup(repo.unlock)
1383
tree.commit('commit %s' % x)
1384
backing = self.get_transport()
1385
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1387
response = request.execute('')
1388
self.assertEqual(SmartServerResponse(('ok',)), response)
1389
repo._pack_collection.reload_pack_names()
1390
self.assertEqual(9, len(repo._pack_collection.names()))
1392
def test_autopack_on_nonpack_format(self):
1393
"""A request to autopack a non-pack repo is a no-op."""
1394
repo = self.make_repository('.', format='knit')
1395
backing = self.get_transport()
1396
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1398
response = request.execute('')
1399
self.assertEqual(SmartServerResponse(('ok',)), response)
1402
class TestHandlers(tests.TestCase):
1403
"""Tests for the request.request_handlers object."""
1405
def test_all_registrations_exist(self):
1406
"""All registered request_handlers can be found."""
1407
# If there's a typo in a register_lazy call, this loop will fail with
1408
# an AttributeError.
1409
for key, item in smart.request.request_handlers.iteritems():
1412
def assertHandlerEqual(self, verb, handler):
1413
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1415
def test_registered_methods(self):
1416
"""Test that known methods are registered to the correct object."""
1417
self.assertHandlerEqual('Branch.get_config_file',
1418
smart.branch.SmartServerBranchGetConfigFile)
1419
self.assertHandlerEqual('Branch.get_parent',
1420
smart.branch.SmartServerBranchGetParent)
1421
self.assertHandlerEqual('Branch.get_tags_bytes',
1422
smart.branch.SmartServerBranchGetTagsBytes)
1423
self.assertHandlerEqual('Branch.lock_write',
1424
smart.branch.SmartServerBranchRequestLockWrite)
1425
self.assertHandlerEqual('Branch.last_revision_info',
1426
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1427
self.assertHandlerEqual('Branch.revision_history',
1428
smart.branch.SmartServerRequestRevisionHistory)
1429
self.assertHandlerEqual('Branch.set_config_option',
1430
smart.branch.SmartServerBranchRequestSetConfigOption)
1431
self.assertHandlerEqual('Branch.set_last_revision',
1432
smart.branch.SmartServerBranchRequestSetLastRevision)
1433
self.assertHandlerEqual('Branch.set_last_revision_info',
1434
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1435
self.assertHandlerEqual('Branch.unlock',
1436
smart.branch.SmartServerBranchRequestUnlock)
1437
self.assertHandlerEqual('BzrDir.find_repository',
1438
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1439
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1440
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1441
self.assertHandlerEqual('BzrDirFormat.initialize',
1442
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1443
self.assertHandlerEqual('BzrDir.cloning_metadir',
1444
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1445
self.assertHandlerEqual('BzrDir.get_config_file',
1446
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1447
self.assertHandlerEqual('BzrDir.open_branch',
1448
smart.bzrdir.SmartServerRequestOpenBranch)
1449
self.assertHandlerEqual('BzrDir.open_branchV2',
1450
smart.bzrdir.SmartServerRequestOpenBranchV2)
1451
self.assertHandlerEqual('PackRepository.autopack',
1452
smart.packrepository.SmartServerPackRepositoryAutopack)
1453
self.assertHandlerEqual('Repository.gather_stats',
1454
smart.repository.SmartServerRepositoryGatherStats)
1455
self.assertHandlerEqual('Repository.get_parent_map',
1456
smart.repository.SmartServerRepositoryGetParentMap)
1457
self.assertHandlerEqual('Repository.get_revision_graph',
1458
smart.repository.SmartServerRepositoryGetRevisionGraph)
1459
self.assertHandlerEqual('Repository.get_stream',
1460
smart.repository.SmartServerRepositoryGetStream)
1461
self.assertHandlerEqual('Repository.has_revision',
1462
smart.repository.SmartServerRequestHasRevision)
1463
self.assertHandlerEqual('Repository.insert_stream',
1464
smart.repository.SmartServerRepositoryInsertStream)
1465
self.assertHandlerEqual('Repository.insert_stream_locked',
1466
smart.repository.SmartServerRepositoryInsertStreamLocked)
1467
self.assertHandlerEqual('Repository.is_shared',
1468
smart.repository.SmartServerRepositoryIsShared)
1469
self.assertHandlerEqual('Repository.lock_write',
1470
smart.repository.SmartServerRepositoryLockWrite)
1471
self.assertHandlerEqual('Repository.tarball',
1472
smart.repository.SmartServerRepositoryTarball)
1473
self.assertHandlerEqual('Repository.unlock',
1474
smart.repository.SmartServerRepositoryUnlock)
1475
self.assertHandlerEqual('Transport.is_readonly',
1476
smart.request.SmartServerIsReadonly)