1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerBzrDirRequestGetConfigFile(
300
tests.TestCaseWithMemoryTransport):
301
"""Tests for BzrDir.get_config_file."""
303
def test_present(self):
304
backing = self.get_transport()
305
dir = self.make_bzrdir('.')
306
dir.get_config().set_default_stack_on("/")
307
local_result = dir._get_config()._get_config_file().read()
308
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
309
request = request_class(backing)
310
expected = SuccessfulSmartServerResponse((), local_result)
311
self.assertEqual(expected, request.execute(''))
313
def test_missing(self):
314
backing = self.get_transport()
315
dir = self.make_bzrdir('.')
316
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
317
request = request_class(backing)
318
expected = SuccessfulSmartServerResponse((), '')
319
self.assertEqual(expected, request.execute(''))
322
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
324
def test_empty_dir(self):
325
"""Initializing an empty dir should succeed and do it."""
326
backing = self.get_transport()
327
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
328
self.assertEqual(SmartServerResponse(('ok', )),
330
made_dir = bzrdir.BzrDir.open_from_transport(backing)
331
# no branch, tree or repository is expected with the current
333
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
334
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
335
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
337
def test_missing_dir(self):
338
"""Initializing a missing directory should fail like the bzrdir api."""
339
backing = self.get_transport()
340
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
341
self.assertRaises(errors.NoSuchFile,
342
request.execute, 'subdir')
344
def test_initialized_dir(self):
345
"""Initializing an extant bzrdir should fail like the bzrdir api."""
346
backing = self.get_transport()
347
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
348
self.make_bzrdir('subdir')
349
self.assertRaises(errors.FileExists,
350
request.execute, 'subdir')
353
class TestSmartServerRequestBzrDirInitializeEx(tests.TestCaseWithMemoryTransport):
354
"""Basic tests for BzrDir.initialize_ex in the smart server.
356
The main unit tests in test_bzrdir exercise the API coprehensively.
359
def test_empty_dir(self):
360
"""Initializing an empty dir should succeed and do it."""
361
backing = self.get_transport()
362
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
363
self.assertEqual(SmartServerResponse(()), request.execute('', 'True'))
364
made_dir = bzrdir.BzrDir.open_from_transport(backing)
365
# no branch, tree or repository is expected with the current
367
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
368
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
369
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
371
def test_missing_dir(self):
372
"""Initializing a missing directory should fail like the bzrdir api."""
373
backing = self.get_transport()
374
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
375
self.assertRaises(errors.NoSuchFile, request.execute, 'subdir/dir', 'False')
377
def test_initialized_dir(self):
378
"""Initializing an extant dirctory should fail like the bzrdir api."""
379
backing = self.get_transport()
380
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
381
self.make_bzrdir('subdir')
382
self.assertRaises(errors.FileExists, request.execute, 'subdir', 'False')
385
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
387
def test_no_branch(self):
388
"""When there is no branch, ('nobranch', ) is returned."""
389
backing = self.get_transport()
390
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
391
self.make_bzrdir('.')
392
self.assertEqual(SmartServerResponse(('nobranch', )),
395
def test_branch(self):
396
"""When there is a branch, 'ok' is returned."""
397
backing = self.get_transport()
398
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
399
self.make_branch('.')
400
self.assertEqual(SmartServerResponse(('ok', '')),
403
def test_branch_reference(self):
404
"""When there is a branch reference, the reference URL is returned."""
405
backing = self.get_transport()
406
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
407
branch = self.make_branch('branch')
408
checkout = branch.create_checkout('reference',lightweight=True)
409
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
410
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
411
self.assertEqual(SmartServerResponse(('ok', reference_url)),
412
request.execute('reference'))
415
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
417
def test_no_branch(self):
418
"""When there is no branch, ('nobranch', ) is returned."""
419
backing = self.get_transport()
420
self.make_bzrdir('.')
421
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
422
self.assertEqual(SmartServerResponse(('nobranch', )),
425
def test_branch(self):
426
"""When there is a branch, 'ok' is returned."""
427
backing = self.get_transport()
428
expected = self.make_branch('.')._format.network_name()
429
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
430
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
433
def test_branch_reference(self):
434
"""When there is a branch reference, the reference URL is returned."""
435
backing = self.get_transport()
436
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
437
branch = self.make_branch('branch')
438
checkout = branch.create_checkout('reference',lightweight=True)
439
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
440
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
441
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
442
request.execute('reference'))
444
def test_stacked_branch(self):
445
"""Opening a stacked branch does not open the stacked-on branch."""
446
trunk = self.make_branch('trunk')
447
feature = self.make_branch('feature', format='1.9')
448
feature.set_stacked_on_url(trunk.base)
450
Branch.hooks.install_named_hook('open', opened_branches.append, None)
451
backing = self.get_transport()
452
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
455
response = request.execute('feature')
457
request.teardown_jail()
458
expected_format = feature._format.network_name()
460
SuccessfulSmartServerResponse(('branch', expected_format)),
462
self.assertLength(1, opened_branches)
465
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
467
def test_empty(self):
468
"""For an empty branch, the body is empty."""
469
backing = self.get_transport()
470
request = smart.branch.SmartServerRequestRevisionHistory(backing)
471
self.make_branch('.')
472
self.assertEqual(SmartServerResponse(('ok', ), ''),
475
def test_not_empty(self):
476
"""For a non-empty branch, the body is empty."""
477
backing = self.get_transport()
478
request = smart.branch.SmartServerRequestRevisionHistory(backing)
479
tree = self.make_branch_and_memory_tree('.')
482
r1 = tree.commit('1st commit')
483
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
486
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
490
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
492
def test_no_branch(self):
493
"""When there is a bzrdir and no branch, NotBranchError is raised."""
494
backing = self.get_transport()
495
request = smart.branch.SmartServerBranchRequest(backing)
496
self.make_bzrdir('.')
497
self.assertRaises(errors.NotBranchError,
500
def test_branch_reference(self):
501
"""When there is a branch reference, NotBranchError is raised."""
502
backing = self.get_transport()
503
request = smart.branch.SmartServerBranchRequest(backing)
504
branch = self.make_branch('branch')
505
checkout = branch.create_checkout('reference',lightweight=True)
506
self.assertRaises(errors.NotBranchError,
507
request.execute, 'checkout')
510
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
512
def test_empty(self):
513
"""For an empty branch, the result is ('ok', '0', 'null:')."""
514
backing = self.get_transport()
515
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
516
self.make_branch('.')
517
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
520
def test_not_empty(self):
521
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
522
backing = self.get_transport()
523
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
524
tree = self.make_branch_and_memory_tree('.')
527
rev_id_utf8 = u'\xc8'.encode('utf-8')
528
r1 = tree.commit('1st commit')
529
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
532
SmartServerResponse(('ok', '2', rev_id_utf8)),
536
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
538
def test_default(self):
539
"""With no file, we get empty content."""
540
backing = self.get_transport()
541
request = smart.branch.SmartServerBranchGetConfigFile(backing)
542
branch = self.make_branch('.')
543
# there should be no file by default
545
self.assertEqual(SmartServerResponse(('ok', ), content),
548
def test_with_content(self):
549
# SmartServerBranchGetConfigFile should return the content from
550
# branch.control_files.get('branch.conf') for now - in the future it may
551
# perform more complex processing.
552
backing = self.get_transport()
553
request = smart.branch.SmartServerBranchGetConfigFile(backing)
554
branch = self.make_branch('.')
555
branch._transport.put_bytes('branch.conf', 'foo bar baz')
556
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
560
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
562
def get_lock_tokens(self, branch):
563
branch_token = branch.lock_write()
564
repo_token = branch.repository.lock_write()
565
branch.repository.unlock()
566
return branch_token, repo_token
569
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
571
def test_value_name(self):
572
branch = self.make_branch('.')
573
request = smart.branch.SmartServerBranchRequestSetConfigOption(
574
branch.bzrdir.root_transport)
575
branch_token, repo_token = self.get_lock_tokens(branch)
576
config = branch._get_config()
577
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
579
self.assertEqual(SuccessfulSmartServerResponse(()), result)
580
self.assertEqual('bar', config.get_option('foo'))
582
def test_value_name_section(self):
583
branch = self.make_branch('.')
584
request = smart.branch.SmartServerBranchRequestSetConfigOption(
585
branch.bzrdir.root_transport)
586
branch_token, repo_token = self.get_lock_tokens(branch)
587
config = branch._get_config()
588
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
590
self.assertEqual(SuccessfulSmartServerResponse(()), result)
591
self.assertEqual('bar', config.get_option('foo', 'gam'))
594
class SetLastRevisionTestBase(TestLockedBranch):
595
"""Base test case for verbs that implement set_last_revision."""
598
tests.TestCaseWithMemoryTransport.setUp(self)
599
backing_transport = self.get_transport()
600
self.request = self.request_class(backing_transport)
601
self.tree = self.make_branch_and_memory_tree('.')
603
def lock_branch(self):
604
return self.get_lock_tokens(self.tree.branch)
606
def unlock_branch(self):
607
self.tree.branch.unlock()
609
def set_last_revision(self, revision_id, revno):
610
branch_token, repo_token = self.lock_branch()
611
response = self._set_last_revision(
612
revision_id, revno, branch_token, repo_token)
616
def assertRequestSucceeds(self, revision_id, revno):
617
response = self.set_last_revision(revision_id, revno)
618
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
621
class TestSetLastRevisionVerbMixin(object):
622
"""Mixin test case for verbs that implement set_last_revision."""
624
def test_set_null_to_null(self):
625
"""An empty branch can have its last revision set to 'null:'."""
626
self.assertRequestSucceeds('null:', 0)
628
def test_NoSuchRevision(self):
629
"""If the revision_id is not present, the verb returns NoSuchRevision.
631
revision_id = 'non-existent revision'
633
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
634
self.set_last_revision(revision_id, 1))
636
def make_tree_with_two_commits(self):
637
self.tree.lock_write()
639
rev_id_utf8 = u'\xc8'.encode('utf-8')
640
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
641
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
644
def test_branch_last_revision_info_is_updated(self):
645
"""A branch's tip can be set to a revision that is present in its
648
# Make a branch with an empty revision history, but two revisions in
650
self.make_tree_with_two_commits()
651
rev_id_utf8 = u'\xc8'.encode('utf-8')
652
self.tree.branch.set_revision_history([])
654
(0, 'null:'), self.tree.branch.last_revision_info())
655
# We can update the branch to a revision that is present in the
657
self.assertRequestSucceeds(rev_id_utf8, 1)
659
(1, rev_id_utf8), self.tree.branch.last_revision_info())
661
def test_branch_last_revision_info_rewind(self):
662
"""A branch's tip can be set to a revision that is an ancestor of the
665
self.make_tree_with_two_commits()
666
rev_id_utf8 = u'\xc8'.encode('utf-8')
668
(2, 'rev-2'), self.tree.branch.last_revision_info())
669
self.assertRequestSucceeds(rev_id_utf8, 1)
671
(1, rev_id_utf8), self.tree.branch.last_revision_info())
673
def test_TipChangeRejected(self):
674
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
675
returns TipChangeRejected.
677
rejection_message = u'rejection message\N{INTERROBANG}'
678
def hook_that_rejects(params):
679
raise errors.TipChangeRejected(rejection_message)
680
Branch.hooks.install_named_hook(
681
'pre_change_branch_tip', hook_that_rejects, None)
683
FailedSmartServerResponse(
684
('TipChangeRejected', rejection_message.encode('utf-8'))),
685
self.set_last_revision('null:', 0))
688
class TestSmartServerBranchRequestSetLastRevision(
689
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
690
"""Tests for Branch.set_last_revision verb."""
692
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
694
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
695
return self.request.execute(
696
'', branch_token, repo_token, revision_id)
699
class TestSmartServerBranchRequestSetLastRevisionInfo(
700
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
701
"""Tests for Branch.set_last_revision_info verb."""
703
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
705
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
706
return self.request.execute(
707
'', branch_token, repo_token, revno, revision_id)
709
def test_NoSuchRevision(self):
710
"""Branch.set_last_revision_info does not have to return
711
NoSuchRevision if the revision_id is absent.
713
raise tests.TestNotApplicable()
716
class TestSmartServerBranchRequestSetLastRevisionEx(
717
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
718
"""Tests for Branch.set_last_revision_ex verb."""
720
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
722
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
723
return self.request.execute(
724
'', branch_token, repo_token, revision_id, 0, 0)
726
def assertRequestSucceeds(self, revision_id, revno):
727
response = self.set_last_revision(revision_id, revno)
729
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
732
def test_branch_last_revision_info_rewind(self):
733
"""A branch's tip can be set to a revision that is an ancestor of the
734
current tip, but only if allow_overwrite_descendant is passed.
736
self.make_tree_with_two_commits()
737
rev_id_utf8 = u'\xc8'.encode('utf-8')
739
(2, 'rev-2'), self.tree.branch.last_revision_info())
740
# If allow_overwrite_descendant flag is 0, then trying to set the tip
741
# to an older revision ID has no effect.
742
branch_token, repo_token = self.lock_branch()
743
response = self.request.execute(
744
'', branch_token, repo_token, rev_id_utf8, 0, 0)
746
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
749
(2, 'rev-2'), self.tree.branch.last_revision_info())
751
# If allow_overwrite_descendant flag is 1, then setting the tip to an
753
response = self.request.execute(
754
'', branch_token, repo_token, rev_id_utf8, 0, 1)
756
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
760
(1, rev_id_utf8), self.tree.branch.last_revision_info())
762
def make_branch_with_divergent_history(self):
763
"""Make a branch with divergent history in its repo.
765
The branch's tip will be 'child-2', and the repo will also contain
766
'child-1', which diverges from a common base revision.
768
self.tree.lock_write()
770
r1 = self.tree.commit('1st commit')
771
revno_1, revid_1 = self.tree.branch.last_revision_info()
772
r2 = self.tree.commit('2nd commit', rev_id='child-1')
773
# Undo the second commit
774
self.tree.branch.set_last_revision_info(revno_1, revid_1)
775
self.tree.set_parent_ids([revid_1])
776
# Make a new second commit, child-2. child-2 has diverged from
778
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
781
def test_not_allow_diverged(self):
782
"""If allow_diverged is not passed, then setting a divergent history
783
returns a Diverged error.
785
self.make_branch_with_divergent_history()
787
FailedSmartServerResponse(('Diverged',)),
788
self.set_last_revision('child-1', 2))
789
# The branch tip was not changed.
790
self.assertEqual('child-2', self.tree.branch.last_revision())
792
def test_allow_diverged(self):
793
"""If allow_diverged is passed, then setting a divergent history
796
self.make_branch_with_divergent_history()
797
branch_token, repo_token = self.lock_branch()
798
response = self.request.execute(
799
'', branch_token, repo_token, 'child-1', 1, 0)
801
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
804
# The branch tip was changed.
805
self.assertEqual('child-1', self.tree.branch.last_revision())
808
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
810
def test_get_parent_none(self):
811
base_branch = self.make_branch('base')
812
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
813
response = request.execute('base')
815
SuccessfulSmartServerResponse(('',)), response)
817
def test_get_parent_something(self):
818
base_branch = self.make_branch('base')
819
base_branch.set_parent(self.get_url('foo'))
820
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
821
response = request.execute('base')
823
SuccessfulSmartServerResponse(("../foo",)),
827
class TestSmartServerBranchRequestSetParent(tests.TestCaseWithMemoryTransport):
829
def test_set_parent_none(self):
830
branch = self.make_branch('base', format="1.9")
832
branch._set_parent_location('foo')
834
request = smart.branch.SmartServerBranchRequestSetParentLocation(
835
self.get_transport())
836
branch_token = branch.lock_write()
837
repo_token = branch.repository.lock_write()
839
response = request.execute('base', branch_token, repo_token, '')
841
branch.repository.unlock()
843
self.assertEqual(SuccessfulSmartServerResponse(()), response)
844
self.assertEqual(None, branch.get_parent())
846
def test_set_parent_something(self):
847
branch = self.make_branch('base', format="1.9")
848
request = smart.branch.SmartServerBranchRequestSetParentLocation(
849
self.get_transport())
850
branch_token = branch.lock_write()
851
repo_token = branch.repository.lock_write()
853
response = request.execute('base', branch_token, repo_token,
856
branch.repository.unlock()
858
self.assertEqual(SuccessfulSmartServerResponse(()), response)
859
self.assertEqual('http://bar/', branch.get_parent())
862
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
863
# Only called when the branch format and tags match [yay factory
864
# methods] so only need to test straight forward cases.
866
def test_get_bytes(self):
867
base_branch = self.make_branch('base')
868
request = smart.branch.SmartServerBranchGetTagsBytes(
869
self.get_transport())
870
response = request.execute('base')
872
SuccessfulSmartServerResponse(('',)), response)
875
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
877
def test_get_stacked_on_url(self):
878
base_branch = self.make_branch('base', format='1.6')
879
stacked_branch = self.make_branch('stacked', format='1.6')
880
# typically should be relative
881
stacked_branch.set_stacked_on_url('../base')
882
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
883
self.get_transport())
884
response = request.execute('stacked')
886
SmartServerResponse(('ok', '../base')),
890
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
893
tests.TestCaseWithMemoryTransport.setUp(self)
895
def test_lock_write_on_unlocked_branch(self):
896
backing = self.get_transport()
897
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
898
branch = self.make_branch('.', format='knit')
899
repository = branch.repository
900
response = request.execute('')
901
branch_nonce = branch.control_files._lock.peek().get('nonce')
902
repository_nonce = repository.control_files._lock.peek().get('nonce')
904
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
906
# The branch (and associated repository) is now locked. Verify that
907
# with a new branch object.
908
new_branch = repository.bzrdir.open_branch()
909
self.assertRaises(errors.LockContention, new_branch.lock_write)
911
def test_lock_write_on_locked_branch(self):
912
backing = self.get_transport()
913
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
914
branch = self.make_branch('.')
916
branch.leave_lock_in_place()
918
response = request.execute('')
920
SmartServerResponse(('LockContention',)), response)
922
def test_lock_write_with_tokens_on_locked_branch(self):
923
backing = self.get_transport()
924
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
925
branch = self.make_branch('.', format='knit')
926
branch_token = branch.lock_write()
927
repo_token = branch.repository.lock_write()
928
branch.repository.unlock()
929
branch.leave_lock_in_place()
930
branch.repository.leave_lock_in_place()
932
response = request.execute('',
933
branch_token, repo_token)
935
SmartServerResponse(('ok', branch_token, repo_token)), response)
937
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
938
backing = self.get_transport()
939
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
940
branch = self.make_branch('.', format='knit')
941
branch_token = branch.lock_write()
942
repo_token = branch.repository.lock_write()
943
branch.repository.unlock()
944
branch.leave_lock_in_place()
945
branch.repository.leave_lock_in_place()
947
response = request.execute('',
948
branch_token+'xxx', repo_token)
950
SmartServerResponse(('TokenMismatch',)), response)
952
def test_lock_write_on_locked_repo(self):
953
backing = self.get_transport()
954
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
955
branch = self.make_branch('.', format='knit')
956
branch.repository.lock_write()
957
branch.repository.leave_lock_in_place()
958
branch.repository.unlock()
959
response = request.execute('')
961
SmartServerResponse(('LockContention',)), response)
963
def test_lock_write_on_readonly_transport(self):
964
backing = self.get_readonly_transport()
965
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
966
branch = self.make_branch('.')
967
root = self.get_transport().clone('/')
968
path = urlutils.relative_url(root.base, self.get_transport().base)
969
response = request.execute(path)
970
error_name, lock_str, why_str = response.args
971
self.assertFalse(response.is_successful())
972
self.assertEqual('LockFailed', error_name)
975
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
978
tests.TestCaseWithMemoryTransport.setUp(self)
980
def test_unlock_on_locked_branch_and_repo(self):
981
backing = self.get_transport()
982
request = smart.branch.SmartServerBranchRequestUnlock(backing)
983
branch = self.make_branch('.', format='knit')
985
branch_token = branch.lock_write()
986
repo_token = branch.repository.lock_write()
987
branch.repository.unlock()
988
# Unlock the branch (and repo) object, leaving the physical locks
990
branch.leave_lock_in_place()
991
branch.repository.leave_lock_in_place()
993
response = request.execute('',
994
branch_token, repo_token)
996
SmartServerResponse(('ok',)), response)
997
# The branch is now unlocked. Verify that with a new branch
999
new_branch = branch.bzrdir.open_branch()
1000
new_branch.lock_write()
1003
def test_unlock_on_unlocked_branch_unlocked_repo(self):
1004
backing = self.get_transport()
1005
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1006
branch = self.make_branch('.', format='knit')
1007
response = request.execute(
1008
'', 'branch token', 'repo token')
1010
SmartServerResponse(('TokenMismatch',)), response)
1012
def test_unlock_on_unlocked_branch_locked_repo(self):
1013
backing = self.get_transport()
1014
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1015
branch = self.make_branch('.', format='knit')
1016
# Lock the repository.
1017
repo_token = branch.repository.lock_write()
1018
branch.repository.leave_lock_in_place()
1019
branch.repository.unlock()
1020
# Issue branch lock_write request on the unlocked branch (with locked
1022
response = request.execute(
1023
'', 'branch token', repo_token)
1025
SmartServerResponse(('TokenMismatch',)), response)
1028
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
1030
def test_no_repository(self):
1031
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
1032
# we test this using a shared repository above the named path,
1033
# thus checking the right search logic is used - that is, that
1034
# its the exact path being looked at and the server is not
1036
backing = self.get_transport()
1037
request = smart.repository.SmartServerRepositoryRequest(backing)
1038
self.make_repository('.', shared=True)
1039
self.make_bzrdir('subdir')
1040
self.assertRaises(errors.NoRepositoryPresent,
1041
request.execute, 'subdir')
1044
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
1046
def test_trivial_bzipped(self):
1047
# This tests that the wire encoding is actually bzipped
1048
backing = self.get_transport()
1049
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1050
tree = self.make_branch_and_memory_tree('.')
1052
self.assertEqual(None,
1053
request.execute('', 'missing-id'))
1054
# Note that it returns a body that is bzipped.
1056
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
1057
request.do_body('\n\n0\n'))
1059
def test_trivial_include_missing(self):
1060
backing = self.get_transport()
1061
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1062
tree = self.make_branch_and_memory_tree('.')
1064
self.assertEqual(None,
1065
request.execute('', 'missing-id', 'include-missing:'))
1067
SuccessfulSmartServerResponse(('ok', ),
1068
bz2.compress('missing:missing-id')),
1069
request.do_body('\n\n0\n'))
1072
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1074
def test_none_argument(self):
1075
backing = self.get_transport()
1076
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1077
tree = self.make_branch_and_memory_tree('.')
1080
r1 = tree.commit('1st commit')
1081
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1084
# the lines of revision_id->revision_parent_list has no guaranteed
1085
# order coming out of a dict, so sort both our test and response
1086
lines = sorted([' '.join([r2, r1]), r1])
1087
response = request.execute('', '')
1088
response.body = '\n'.join(sorted(response.body.split('\n')))
1091
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1093
def test_specific_revision_argument(self):
1094
backing = self.get_transport()
1095
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1096
tree = self.make_branch_and_memory_tree('.')
1099
rev_id_utf8 = u'\xc9'.encode('utf-8')
1100
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1101
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1104
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1105
request.execute('', rev_id_utf8))
1107
def test_no_such_revision(self):
1108
backing = self.get_transport()
1109
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1110
tree = self.make_branch_and_memory_tree('.')
1113
r1 = tree.commit('1st commit')
1116
# Note that it still returns body (of zero bytes).
1118
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1119
request.execute('', 'missingrevision'))
1122
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1124
def make_two_commit_repo(self):
1125
tree = self.make_branch_and_memory_tree('.')
1128
r1 = tree.commit('1st commit')
1129
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1131
repo = tree.branch.repository
1134
def test_ancestry_of(self):
1135
"""The search argument may be a 'ancestry-of' some heads'."""
1136
backing = self.get_transport()
1137
request = smart.repository.SmartServerRepositoryGetStream(backing)
1138
repo, r1, r2 = self.make_two_commit_repo()
1139
fetch_spec = ['ancestry-of', r2]
1140
lines = '\n'.join(fetch_spec)
1141
request.execute('', repo._format.network_name())
1142
response = request.do_body(lines)
1143
self.assertEqual(('ok',), response.args)
1144
stream_bytes = ''.join(response.body_stream)
1145
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1147
def test_search(self):
1148
"""The search argument may be a 'search' of some explicit keys."""
1149
backing = self.get_transport()
1150
request = smart.repository.SmartServerRepositoryGetStream(backing)
1151
repo, r1, r2 = self.make_two_commit_repo()
1152
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1153
lines = '\n'.join(fetch_spec)
1154
request.execute('', repo._format.network_name())
1155
response = request.do_body(lines)
1156
self.assertEqual(('ok',), response.args)
1157
stream_bytes = ''.join(response.body_stream)
1158
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1161
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1163
def test_missing_revision(self):
1164
"""For a missing revision, ('no', ) is returned."""
1165
backing = self.get_transport()
1166
request = smart.repository.SmartServerRequestHasRevision(backing)
1167
self.make_repository('.')
1168
self.assertEqual(SmartServerResponse(('no', )),
1169
request.execute('', 'revid'))
1171
def test_present_revision(self):
1172
"""For a present revision, ('yes', ) is returned."""
1173
backing = self.get_transport()
1174
request = smart.repository.SmartServerRequestHasRevision(backing)
1175
tree = self.make_branch_and_memory_tree('.')
1178
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1179
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1181
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1182
self.assertEqual(SmartServerResponse(('yes', )),
1183
request.execute('', rev_id_utf8))
1186
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1188
def test_empty_revid(self):
1189
"""With an empty revid, we get only size an number and revisions"""
1190
backing = self.get_transport()
1191
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1192
repository = self.make_repository('.')
1193
stats = repository.gather_stats()
1194
expected_body = 'revisions: 0\n'
1195
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1196
request.execute('', '', 'no'))
1198
def test_revid_with_committers(self):
1199
"""For a revid we get more infos."""
1200
backing = self.get_transport()
1201
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1202
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1203
tree = self.make_branch_and_memory_tree('.')
1206
# Let's build a predictable result
1207
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1208
tree.commit('a commit', timestamp=654321.4, timezone=0,
1212
stats = tree.branch.repository.gather_stats()
1213
expected_body = ('firstrev: 123456.200 3600\n'
1214
'latestrev: 654321.400 0\n'
1216
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1220
def test_not_empty_repository_with_committers(self):
1221
"""For a revid and requesting committers we get the whole thing."""
1222
backing = self.get_transport()
1223
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1224
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1225
tree = self.make_branch_and_memory_tree('.')
1228
# Let's build a predictable result
1229
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1231
tree.commit('a commit', timestamp=654321.4, timezone=0,
1232
committer='bar', rev_id=rev_id_utf8)
1234
stats = tree.branch.repository.gather_stats()
1236
expected_body = ('committers: 2\n'
1237
'firstrev: 123456.200 3600\n'
1238
'latestrev: 654321.400 0\n'
1240
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1242
rev_id_utf8, 'yes'))
1245
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1247
def test_is_shared(self):
1248
"""For a shared repository, ('yes', ) is returned."""
1249
backing = self.get_transport()
1250
request = smart.repository.SmartServerRepositoryIsShared(backing)
1251
self.make_repository('.', shared=True)
1252
self.assertEqual(SmartServerResponse(('yes', )),
1253
request.execute('', ))
1255
def test_is_not_shared(self):
1256
"""For a shared repository, ('no', ) is returned."""
1257
backing = self.get_transport()
1258
request = smart.repository.SmartServerRepositoryIsShared(backing)
1259
self.make_repository('.', shared=False)
1260
self.assertEqual(SmartServerResponse(('no', )),
1261
request.execute('', ))
1264
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1266
def test_lock_write_on_unlocked_repo(self):
1267
backing = self.get_transport()
1268
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1269
repository = self.make_repository('.', format='knit')
1270
response = request.execute('')
1271
nonce = repository.control_files._lock.peek().get('nonce')
1272
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1273
# The repository is now locked. Verify that with a new repository
1275
new_repo = repository.bzrdir.open_repository()
1276
self.assertRaises(errors.LockContention, new_repo.lock_write)
1278
def test_lock_write_on_locked_repo(self):
1279
backing = self.get_transport()
1280
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1281
repository = self.make_repository('.', format='knit')
1282
repository.lock_write()
1283
repository.leave_lock_in_place()
1285
response = request.execute('')
1287
SmartServerResponse(('LockContention',)), response)
1289
def test_lock_write_on_readonly_transport(self):
1290
backing = self.get_readonly_transport()
1291
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1292
repository = self.make_repository('.', format='knit')
1293
response = request.execute('')
1294
self.assertFalse(response.is_successful())
1295
self.assertEqual('LockFailed', response.args[0])
1298
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1300
def make_empty_byte_stream(self, repo):
1301
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1302
return ''.join(byte_stream)
1305
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1307
def test_insert_stream_empty(self):
1308
backing = self.get_transport()
1309
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1310
repository = self.make_repository('.')
1311
response = request.execute('', '')
1312
self.assertEqual(None, response)
1313
response = request.do_chunk(self.make_empty_byte_stream(repository))
1314
self.assertEqual(None, response)
1315
response = request.do_end()
1316
self.assertEqual(SmartServerResponse(('ok', )), response)
1319
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1321
def test_insert_stream_empty(self):
1322
backing = self.get_transport()
1323
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1325
repository = self.make_repository('.', format='knit')
1326
lock_token = repository.lock_write()
1327
response = request.execute('', '', lock_token)
1328
self.assertEqual(None, response)
1329
response = request.do_chunk(self.make_empty_byte_stream(repository))
1330
self.assertEqual(None, response)
1331
response = request.do_end()
1332
self.assertEqual(SmartServerResponse(('ok', )), response)
1335
def test_insert_stream_with_wrong_lock_token(self):
1336
backing = self.get_transport()
1337
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1339
repository = self.make_repository('.', format='knit')
1340
lock_token = repository.lock_write()
1342
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1346
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1349
tests.TestCaseWithMemoryTransport.setUp(self)
1351
def test_unlock_on_locked_repo(self):
1352
backing = self.get_transport()
1353
request = smart.repository.SmartServerRepositoryUnlock(backing)
1354
repository = self.make_repository('.', format='knit')
1355
token = repository.lock_write()
1356
repository.leave_lock_in_place()
1358
response = request.execute('', token)
1360
SmartServerResponse(('ok',)), response)
1361
# The repository is now unlocked. Verify that with a new repository
1363
new_repo = repository.bzrdir.open_repository()
1364
new_repo.lock_write()
1367
def test_unlock_on_unlocked_repo(self):
1368
backing = self.get_transport()
1369
request = smart.repository.SmartServerRepositoryUnlock(backing)
1370
repository = self.make_repository('.', format='knit')
1371
response = request.execute('', 'some token')
1373
SmartServerResponse(('TokenMismatch',)), response)
1376
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1378
def test_is_readonly_no(self):
1379
backing = self.get_transport()
1380
request = smart.request.SmartServerIsReadonly(backing)
1381
response = request.execute()
1383
SmartServerResponse(('no',)), response)
1385
def test_is_readonly_yes(self):
1386
backing = self.get_readonly_transport()
1387
request = smart.request.SmartServerIsReadonly(backing)
1388
response = request.execute()
1390
SmartServerResponse(('yes',)), response)
1393
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1395
def test_set_false(self):
1396
backing = self.get_transport()
1397
repo = self.make_repository('.', shared=True)
1398
repo.set_make_working_trees(True)
1399
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1400
request = request_class(backing)
1401
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1402
request.execute('', 'False'))
1403
repo = repo.bzrdir.open_repository()
1404
self.assertFalse(repo.make_working_trees())
1406
def test_set_true(self):
1407
backing = self.get_transport()
1408
repo = self.make_repository('.', shared=True)
1409
repo.set_make_working_trees(False)
1410
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1411
request = request_class(backing)
1412
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1413
request.execute('', 'True'))
1414
repo = repo.bzrdir.open_repository()
1415
self.assertTrue(repo.make_working_trees())
1418
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1420
def make_repo_needing_autopacking(self, path='.'):
1421
# Make a repo in need of autopacking.
1422
tree = self.make_branch_and_tree('.', format='pack-0.92')
1423
repo = tree.branch.repository
1424
# monkey-patch the pack collection to disable autopacking
1425
repo._pack_collection._max_pack_count = lambda count: count
1427
tree.commit('commit %s' % x)
1428
self.assertEqual(10, len(repo._pack_collection.names()))
1429
del repo._pack_collection._max_pack_count
1432
def test_autopack_needed(self):
1433
repo = self.make_repo_needing_autopacking()
1435
self.addCleanup(repo.unlock)
1436
backing = self.get_transport()
1437
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1439
response = request.execute('')
1440
self.assertEqual(SmartServerResponse(('ok',)), response)
1441
repo._pack_collection.reload_pack_names()
1442
self.assertEqual(1, len(repo._pack_collection.names()))
1444
def test_autopack_not_needed(self):
1445
tree = self.make_branch_and_tree('.', format='pack-0.92')
1446
repo = tree.branch.repository
1448
self.addCleanup(repo.unlock)
1450
tree.commit('commit %s' % x)
1451
backing = self.get_transport()
1452
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1454
response = request.execute('')
1455
self.assertEqual(SmartServerResponse(('ok',)), response)
1456
repo._pack_collection.reload_pack_names()
1457
self.assertEqual(9, len(repo._pack_collection.names()))
1459
def test_autopack_on_nonpack_format(self):
1460
"""A request to autopack a non-pack repo is a no-op."""
1461
repo = self.make_repository('.', format='knit')
1462
backing = self.get_transport()
1463
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1465
response = request.execute('')
1466
self.assertEqual(SmartServerResponse(('ok',)), response)
1469
class TestHandlers(tests.TestCase):
1470
"""Tests for the request.request_handlers object."""
1472
def test_all_registrations_exist(self):
1473
"""All registered request_handlers can be found."""
1474
# If there's a typo in a register_lazy call, this loop will fail with
1475
# an AttributeError.
1476
for key, item in smart.request.request_handlers.iteritems():
1479
def assertHandlerEqual(self, verb, handler):
1480
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1482
def test_registered_methods(self):
1483
"""Test that known methods are registered to the correct object."""
1484
self.assertHandlerEqual('Branch.get_config_file',
1485
smart.branch.SmartServerBranchGetConfigFile)
1486
self.assertHandlerEqual('Branch.get_parent',
1487
smart.branch.SmartServerBranchGetParent)
1488
self.assertHandlerEqual('Branch.get_tags_bytes',
1489
smart.branch.SmartServerBranchGetTagsBytes)
1490
self.assertHandlerEqual('Branch.lock_write',
1491
smart.branch.SmartServerBranchRequestLockWrite)
1492
self.assertHandlerEqual('Branch.last_revision_info',
1493
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1494
self.assertHandlerEqual('Branch.revision_history',
1495
smart.branch.SmartServerRequestRevisionHistory)
1496
self.assertHandlerEqual('Branch.set_config_option',
1497
smart.branch.SmartServerBranchRequestSetConfigOption)
1498
self.assertHandlerEqual('Branch.set_last_revision',
1499
smart.branch.SmartServerBranchRequestSetLastRevision)
1500
self.assertHandlerEqual('Branch.set_last_revision_info',
1501
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1502
self.assertHandlerEqual('Branch.set_last_revision_ex',
1503
smart.branch.SmartServerBranchRequestSetLastRevisionEx)
1504
self.assertHandlerEqual('Branch.set_parent_location',
1505
smart.branch.SmartServerBranchRequestSetParentLocation)
1506
self.assertHandlerEqual('Branch.unlock',
1507
smart.branch.SmartServerBranchRequestUnlock)
1508
self.assertHandlerEqual('BzrDir.find_repository',
1509
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1510
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1511
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1512
self.assertHandlerEqual('BzrDirFormat.initialize',
1513
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1514
self.assertHandlerEqual('BzrDirFormat.initialize_ex',
1515
smart.bzrdir.SmartServerRequestBzrDirInitializeEx)
1516
self.assertHandlerEqual('BzrDir.cloning_metadir',
1517
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1518
self.assertHandlerEqual('BzrDir.get_config_file',
1519
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1520
self.assertHandlerEqual('BzrDir.open_branch',
1521
smart.bzrdir.SmartServerRequestOpenBranch)
1522
self.assertHandlerEqual('BzrDir.open_branchV2',
1523
smart.bzrdir.SmartServerRequestOpenBranchV2)
1524
self.assertHandlerEqual('PackRepository.autopack',
1525
smart.packrepository.SmartServerPackRepositoryAutopack)
1526
self.assertHandlerEqual('Repository.gather_stats',
1527
smart.repository.SmartServerRepositoryGatherStats)
1528
self.assertHandlerEqual('Repository.get_parent_map',
1529
smart.repository.SmartServerRepositoryGetParentMap)
1530
self.assertHandlerEqual('Repository.get_revision_graph',
1531
smart.repository.SmartServerRepositoryGetRevisionGraph)
1532
self.assertHandlerEqual('Repository.get_stream',
1533
smart.repository.SmartServerRepositoryGetStream)
1534
self.assertHandlerEqual('Repository.has_revision',
1535
smart.repository.SmartServerRequestHasRevision)
1536
self.assertHandlerEqual('Repository.insert_stream',
1537
smart.repository.SmartServerRepositoryInsertStream)
1538
self.assertHandlerEqual('Repository.insert_stream_locked',
1539
smart.repository.SmartServerRepositoryInsertStreamLocked)
1540
self.assertHandlerEqual('Repository.is_shared',
1541
smart.repository.SmartServerRepositoryIsShared)
1542
self.assertHandlerEqual('Repository.lock_write',
1543
smart.repository.SmartServerRepositoryLockWrite)
1544
self.assertHandlerEqual('Repository.tarball',
1545
smart.repository.SmartServerRepositoryTarball)
1546
self.assertHandlerEqual('Repository.unlock',
1547
smart.repository.SmartServerRepositoryUnlock)
1548
self.assertHandlerEqual('Transport.is_readonly',
1549
smart.request.SmartServerIsReadonly)