1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the smart wire/domain protocol.
19
This module contains tests for the domain-level smart requests and responses,
20
such as the 'Branch.lock_write' request. Many of these use specific disk
21
formats to exercise calls that only make sense for formats with specific
24
Tests for low-level protocol encoding are found in test_smart_transport.
28
from cStringIO import StringIO
39
from bzrlib.branch import Branch, BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir, bzrlib.smart.bzrdir as smart_dir
42
import bzrlib.smart.packrepository
43
import bzrlib.smart.repository
44
from bzrlib.smart.request import (
45
FailedSmartServerResponse,
48
SuccessfulSmartServerResponse,
50
from bzrlib.tests import (
53
from bzrlib.transport import chroot, get_transport
54
from bzrlib.util import bencode
57
def load_tests(standard_tests, module, loader):
58
"""Multiply tests version and protocol consistency."""
59
# FindRepository tests.
60
bzrdir_mod = bzrlib.smart.bzrdir
63
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV1}),
64
("find_repositoryV2", {
65
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV2}),
66
("find_repositoryV3", {
67
"_request_class":bzrdir_mod.SmartServerRequestFindRepositoryV3}),
69
to_adapt, result = split_suite_by_re(standard_tests,
70
"TestSmartServerRequestFindRepository")
71
v2_only, v1_and_2 = split_suite_by_re(to_adapt,
73
tests.multiply_tests(v1_and_2, scenarios, result)
74
# The first scenario is only applicable to v1 protocols, it is deleted
76
tests.multiply_tests(v2_only, scenarios[1:], result)
80
class TestCaseWithChrootedTransport(tests.TestCaseWithTransport):
83
tests.TestCaseWithTransport.setUp(self)
84
self._chroot_server = None
86
def get_transport(self, relpath=None):
87
if self._chroot_server is None:
88
backing_transport = tests.TestCaseWithTransport.get_transport(self)
89
self._chroot_server = chroot.ChrootServer(backing_transport)
90
self._chroot_server.setUp()
91
self.addCleanup(self._chroot_server.tearDown)
92
t = get_transport(self._chroot_server.get_url())
93
if relpath is not None:
98
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
101
super(TestCaseWithSmartMedium, self).setUp()
102
# We're allowed to set the transport class here, so that we don't use
103
# the default or a parameterized class, but rather use the
104
# TestCaseWithTransport infrastructure to set up a smart server and
106
self.transport_server = self.make_transport_server
108
def make_transport_server(self):
109
return smart.server.SmartTCPServer_for_testing('-' + self.id())
111
def get_smart_medium(self):
112
"""Get a smart medium to use in tests."""
113
return self.get_transport().get_smart_medium()
116
class TestSmartServerResponse(tests.TestCase):
118
def test__eq__(self):
119
self.assertEqual(SmartServerResponse(('ok', )),
120
SmartServerResponse(('ok', )))
121
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
122
SmartServerResponse(('ok', ), 'body'))
123
self.assertNotEqual(SmartServerResponse(('ok', )),
124
SmartServerResponse(('notok', )))
125
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
126
SmartServerResponse(('ok', )))
127
self.assertNotEqual(None,
128
SmartServerResponse(('ok', )))
130
def test__str__(self):
131
"""SmartServerResponses can be stringified."""
133
"<SuccessfulSmartServerResponse args=('args',) body='body'>",
134
str(SuccessfulSmartServerResponse(('args',), 'body')))
136
"<FailedSmartServerResponse args=('args',) body='body'>",
137
str(FailedSmartServerResponse(('args',), 'body')))
140
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
142
def test_translate_client_path(self):
143
transport = self.get_transport()
144
request = SmartServerRequest(transport, 'foo/')
145
self.assertEqual('./', request.translate_client_path('foo/'))
147
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
149
errors.PathNotChild, request.translate_client_path, '/')
151
errors.PathNotChild, request.translate_client_path, 'bar/')
152
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
154
def test_transport_from_client_path(self):
155
transport = self.get_transport()
156
request = SmartServerRequest(transport, 'foo/')
159
request.transport_from_client_path('foo/').base)
162
class TestSmartServerBzrDirRequestCloningMetaDir(
163
tests.TestCaseWithMemoryTransport):
164
"""Tests for BzrDir.cloning_metadir."""
166
def test_cloning_metadir(self):
167
"""When there is a bzrdir present, the call succeeds."""
168
backing = self.get_transport()
169
dir = self.make_bzrdir('.')
170
local_result = dir.cloning_metadir()
171
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
172
request = request_class(backing)
173
expected = SuccessfulSmartServerResponse(
174
(local_result.network_name(),
175
local_result.repository_format.network_name(),
176
('branch', local_result.get_branch_format().network_name())))
177
self.assertEqual(expected, request.execute('', 'False'))
179
def test_cloning_metadir_reference(self):
180
"""The request fails when bzrdir contains a branch reference."""
181
backing = self.get_transport()
182
referenced_branch = self.make_branch('referenced')
183
dir = self.make_bzrdir('.')
184
local_result = dir.cloning_metadir()
185
reference = BranchReferenceFormat().initialize(dir, referenced_branch)
186
reference_url = BranchReferenceFormat().get_reference(dir)
187
# The server shouldn't try to follow the branch reference, so it's fine
188
# if the referenced branch isn't reachable.
189
backing.rename('referenced', 'moved')
190
request_class = smart_dir.SmartServerBzrDirRequestCloningMetaDir
191
request = request_class(backing)
192
expected = FailedSmartServerResponse(('BranchReference',))
193
self.assertEqual(expected, request.execute('', 'False'))
196
class TestSmartServerRequestCreateRepository(tests.TestCaseWithMemoryTransport):
197
"""Tests for BzrDir.create_repository."""
199
def test_makes_repository(self):
200
"""When there is a bzrdir present, the call succeeds."""
201
backing = self.get_transport()
202
self.make_bzrdir('.')
203
request_class = bzrlib.smart.bzrdir.SmartServerRequestCreateRepository
204
request = request_class(backing)
205
reference_bzrdir_format = bzrdir.format_registry.get('default')()
206
reference_format = reference_bzrdir_format.repository_format
207
network_name = reference_format.network_name()
208
expected = SuccessfulSmartServerResponse(
209
('ok', 'no', 'no', 'no', network_name))
210
self.assertEqual(expected, request.execute('', network_name, 'True'))
213
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
214
"""Tests for BzrDir.find_repository."""
216
def test_no_repository(self):
217
"""When there is no repository to be found, ('norepository', ) is returned."""
218
backing = self.get_transport()
219
request = self._request_class(backing)
220
self.make_bzrdir('.')
221
self.assertEqual(SmartServerResponse(('norepository', )),
224
def test_nonshared_repository(self):
225
# nonshared repositorys only allow 'find' to return a handle when the
226
# path the repository is being searched on is the same as that that
227
# the repository is at.
228
backing = self.get_transport()
229
request = self._request_class(backing)
230
result = self._make_repository_and_result()
231
self.assertEqual(result, request.execute(''))
232
self.make_bzrdir('subdir')
233
self.assertEqual(SmartServerResponse(('norepository', )),
234
request.execute('subdir'))
236
def _make_repository_and_result(self, shared=False, format=None):
237
"""Convenience function to setup a repository.
239
:result: The SmartServerResponse to expect when opening it.
241
repo = self.make_repository('.', shared=shared, format=format)
242
if repo.supports_rich_root():
246
if repo._format.supports_tree_reference:
250
if (smart.bzrdir.SmartServerRequestFindRepositoryV3 ==
251
self._request_class):
252
return SuccessfulSmartServerResponse(
253
('ok', '', rich_root, subtrees, 'no',
254
repo._format.network_name()))
255
elif (smart.bzrdir.SmartServerRequestFindRepositoryV2 ==
256
self._request_class):
257
# All tests so far are on formats, and for non-external
259
return SuccessfulSmartServerResponse(
260
('ok', '', rich_root, subtrees, 'no'))
262
return SuccessfulSmartServerResponse(('ok', '', rich_root, subtrees))
264
def test_shared_repository(self):
265
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
266
backing = self.get_transport()
267
request = self._request_class(backing)
268
result = self._make_repository_and_result(shared=True)
269
self.assertEqual(result, request.execute(''))
270
self.make_bzrdir('subdir')
271
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
272
self.assertEqual(result2,
273
request.execute('subdir'))
274
self.make_bzrdir('subdir/deeper')
275
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
276
self.assertEqual(result3,
277
request.execute('subdir/deeper'))
279
def test_rich_root_and_subtree_encoding(self):
280
"""Test for the format attributes for rich root and subtree support."""
281
backing = self.get_transport()
282
request = self._request_class(backing)
283
result = self._make_repository_and_result(format='dirstate-with-subtree')
284
# check the test will be valid
285
self.assertEqual('yes', result.args[2])
286
self.assertEqual('yes', result.args[3])
287
self.assertEqual(result, request.execute(''))
289
def test_supports_external_lookups_no_v2(self):
290
"""Test for the supports_external_lookups attribute."""
291
backing = self.get_transport()
292
request = self._request_class(backing)
293
result = self._make_repository_and_result(format='dirstate-with-subtree')
294
# check the test will be valid
295
self.assertEqual('no', result.args[4])
296
self.assertEqual(result, request.execute(''))
299
class TestSmartServerBzrDirRequestGetConfigFile(
300
tests.TestCaseWithMemoryTransport):
301
"""Tests for BzrDir.get_config_file."""
303
def test_present(self):
304
backing = self.get_transport()
305
dir = self.make_bzrdir('.')
306
dir.get_config().set_default_stack_on("/")
307
local_result = dir._get_config()._get_config_file().read()
308
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
309
request = request_class(backing)
310
expected = SuccessfulSmartServerResponse((), local_result)
311
self.assertEqual(expected, request.execute(''))
313
def test_missing(self):
314
backing = self.get_transport()
315
dir = self.make_bzrdir('.')
316
request_class = smart_dir.SmartServerBzrDirRequestConfigFile
317
request = request_class(backing)
318
expected = SuccessfulSmartServerResponse((), '')
319
self.assertEqual(expected, request.execute(''))
322
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
324
def test_empty_dir(self):
325
"""Initializing an empty dir should succeed and do it."""
326
backing = self.get_transport()
327
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
328
self.assertEqual(SmartServerResponse(('ok', )),
330
made_dir = bzrdir.BzrDir.open_from_transport(backing)
331
# no branch, tree or repository is expected with the current
333
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
334
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
335
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
337
def test_missing_dir(self):
338
"""Initializing a missing directory should fail like the bzrdir api."""
339
backing = self.get_transport()
340
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
341
self.assertRaises(errors.NoSuchFile,
342
request.execute, 'subdir')
344
def test_initialized_dir(self):
345
"""Initializing an extant bzrdir should fail like the bzrdir api."""
346
backing = self.get_transport()
347
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
348
self.make_bzrdir('subdir')
349
self.assertRaises(errors.FileExists,
350
request.execute, 'subdir')
353
class TestSmartServerRequestBzrDirInitializeEx(tests.TestCaseWithMemoryTransport):
354
"""Basic tests for BzrDir.initialize_ex in the smart server.
356
The main unit tests in test_bzrdir exercise the API comprehensively.
359
def test_empty_dir(self):
360
"""Initializing an empty dir should succeed and do it."""
361
backing = self.get_transport()
362
name = self.make_bzrdir('reference')._format.network_name()
363
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
364
self.assertEqual(SmartServerResponse(('', '', '', '', '', '', name,
365
'False', '', '', '')),
366
request.execute(name, '', 'True', 'False', 'False', '', '', '', '',
368
made_dir = bzrdir.BzrDir.open_from_transport(backing)
369
# no branch, tree or repository is expected with the current
371
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
372
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
373
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
375
def test_missing_dir(self):
376
"""Initializing a missing directory should fail like the bzrdir api."""
377
backing = self.get_transport()
378
name = self.make_bzrdir('reference')._format.network_name()
379
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
380
self.assertRaises(errors.NoSuchFile, request.execute, name,
381
'subdir/dir', 'False', 'False', 'False', '', '', '', '', 'False')
383
def test_initialized_dir(self):
384
"""Initializing an extant dirctory should fail like the bzrdir api."""
385
backing = self.get_transport()
386
name = self.make_bzrdir('reference')._format.network_name()
387
request = smart.bzrdir.SmartServerRequestBzrDirInitializeEx(backing)
388
self.make_bzrdir('subdir')
389
self.assertRaises(errors.FileExists, request.execute, name, 'subdir',
390
'False', 'False', 'False', '', '', '', '', 'False')
393
class TestSmartServerRequestOpenBranch(TestCaseWithChrootedTransport):
395
def test_no_branch(self):
396
"""When there is no branch, ('nobranch', ) is returned."""
397
backing = self.get_transport()
398
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
399
self.make_bzrdir('.')
400
self.assertEqual(SmartServerResponse(('nobranch', )),
403
def test_branch(self):
404
"""When there is a branch, 'ok' is returned."""
405
backing = self.get_transport()
406
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
407
self.make_branch('.')
408
self.assertEqual(SmartServerResponse(('ok', '')),
411
def test_branch_reference(self):
412
"""When there is a branch reference, the reference URL is returned."""
413
backing = self.get_transport()
414
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
415
branch = self.make_branch('branch')
416
checkout = branch.create_checkout('reference',lightweight=True)
417
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
418
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
419
self.assertEqual(SmartServerResponse(('ok', reference_url)),
420
request.execute('reference'))
423
class TestSmartServerRequestOpenBranchV2(TestCaseWithChrootedTransport):
425
def test_no_branch(self):
426
"""When there is no branch, ('nobranch', ) is returned."""
427
backing = self.get_transport()
428
self.make_bzrdir('.')
429
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
430
self.assertEqual(SmartServerResponse(('nobranch', )),
433
def test_branch(self):
434
"""When there is a branch, 'ok' is returned."""
435
backing = self.get_transport()
436
expected = self.make_branch('.')._format.network_name()
437
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
438
self.assertEqual(SuccessfulSmartServerResponse(('branch', expected)),
441
def test_branch_reference(self):
442
"""When there is a branch reference, the reference URL is returned."""
443
backing = self.get_transport()
444
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
445
branch = self.make_branch('branch')
446
checkout = branch.create_checkout('reference',lightweight=True)
447
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
448
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
449
self.assertEqual(SuccessfulSmartServerResponse(('ref', reference_url)),
450
request.execute('reference'))
452
def test_stacked_branch(self):
453
"""Opening a stacked branch does not open the stacked-on branch."""
454
trunk = self.make_branch('trunk')
455
feature = self.make_branch('feature', format='1.9')
456
feature.set_stacked_on_url(trunk.base)
458
Branch.hooks.install_named_hook('open', opened_branches.append, None)
459
backing = self.get_transport()
460
request = smart.bzrdir.SmartServerRequestOpenBranchV2(backing)
463
response = request.execute('feature')
465
request.teardown_jail()
466
expected_format = feature._format.network_name()
468
SuccessfulSmartServerResponse(('branch', expected_format)),
470
self.assertLength(1, opened_branches)
473
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
475
def test_empty(self):
476
"""For an empty branch, the body is empty."""
477
backing = self.get_transport()
478
request = smart.branch.SmartServerRequestRevisionHistory(backing)
479
self.make_branch('.')
480
self.assertEqual(SmartServerResponse(('ok', ), ''),
483
def test_not_empty(self):
484
"""For a non-empty branch, the body is empty."""
485
backing = self.get_transport()
486
request = smart.branch.SmartServerRequestRevisionHistory(backing)
487
tree = self.make_branch_and_memory_tree('.')
490
r1 = tree.commit('1st commit')
491
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
494
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
498
class TestSmartServerBranchRequest(tests.TestCaseWithMemoryTransport):
500
def test_no_branch(self):
501
"""When there is a bzrdir and no branch, NotBranchError is raised."""
502
backing = self.get_transport()
503
request = smart.branch.SmartServerBranchRequest(backing)
504
self.make_bzrdir('.')
505
self.assertRaises(errors.NotBranchError,
508
def test_branch_reference(self):
509
"""When there is a branch reference, NotBranchError is raised."""
510
backing = self.get_transport()
511
request = smart.branch.SmartServerBranchRequest(backing)
512
branch = self.make_branch('branch')
513
checkout = branch.create_checkout('reference',lightweight=True)
514
self.assertRaises(errors.NotBranchError,
515
request.execute, 'checkout')
518
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithMemoryTransport):
520
def test_empty(self):
521
"""For an empty branch, the result is ('ok', '0', 'null:')."""
522
backing = self.get_transport()
523
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
524
self.make_branch('.')
525
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
528
def test_not_empty(self):
529
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
530
backing = self.get_transport()
531
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
532
tree = self.make_branch_and_memory_tree('.')
535
rev_id_utf8 = u'\xc8'.encode('utf-8')
536
r1 = tree.commit('1st commit')
537
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
540
SmartServerResponse(('ok', '2', rev_id_utf8)),
544
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithMemoryTransport):
546
def test_default(self):
547
"""With no file, we get empty content."""
548
backing = self.get_transport()
549
request = smart.branch.SmartServerBranchGetConfigFile(backing)
550
branch = self.make_branch('.')
551
# there should be no file by default
553
self.assertEqual(SmartServerResponse(('ok', ), content),
556
def test_with_content(self):
557
# SmartServerBranchGetConfigFile should return the content from
558
# branch.control_files.get('branch.conf') for now - in the future it may
559
# perform more complex processing.
560
backing = self.get_transport()
561
request = smart.branch.SmartServerBranchGetConfigFile(backing)
562
branch = self.make_branch('.')
563
branch._transport.put_bytes('branch.conf', 'foo bar baz')
564
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
568
class TestLockedBranch(tests.TestCaseWithMemoryTransport):
570
def get_lock_tokens(self, branch):
571
branch_token = branch.lock_write()
572
repo_token = branch.repository.lock_write()
573
branch.repository.unlock()
574
return branch_token, repo_token
577
class TestSmartServerBranchRequestSetConfigOption(TestLockedBranch):
579
def test_value_name(self):
580
branch = self.make_branch('.')
581
request = smart.branch.SmartServerBranchRequestSetConfigOption(
582
branch.bzrdir.root_transport)
583
branch_token, repo_token = self.get_lock_tokens(branch)
584
config = branch._get_config()
585
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
587
self.assertEqual(SuccessfulSmartServerResponse(()), result)
588
self.assertEqual('bar', config.get_option('foo'))
590
def test_value_name_section(self):
591
branch = self.make_branch('.')
592
request = smart.branch.SmartServerBranchRequestSetConfigOption(
593
branch.bzrdir.root_transport)
594
branch_token, repo_token = self.get_lock_tokens(branch)
595
config = branch._get_config()
596
result = request.execute('', branch_token, repo_token, 'bar', 'foo',
598
self.assertEqual(SuccessfulSmartServerResponse(()), result)
599
self.assertEqual('bar', config.get_option('foo', 'gam'))
602
class SetLastRevisionTestBase(TestLockedBranch):
603
"""Base test case for verbs that implement set_last_revision."""
606
tests.TestCaseWithMemoryTransport.setUp(self)
607
backing_transport = self.get_transport()
608
self.request = self.request_class(backing_transport)
609
self.tree = self.make_branch_and_memory_tree('.')
611
def lock_branch(self):
612
return self.get_lock_tokens(self.tree.branch)
614
def unlock_branch(self):
615
self.tree.branch.unlock()
617
def set_last_revision(self, revision_id, revno):
618
branch_token, repo_token = self.lock_branch()
619
response = self._set_last_revision(
620
revision_id, revno, branch_token, repo_token)
624
def assertRequestSucceeds(self, revision_id, revno):
625
response = self.set_last_revision(revision_id, revno)
626
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
629
class TestSetLastRevisionVerbMixin(object):
630
"""Mixin test case for verbs that implement set_last_revision."""
632
def test_set_null_to_null(self):
633
"""An empty branch can have its last revision set to 'null:'."""
634
self.assertRequestSucceeds('null:', 0)
636
def test_NoSuchRevision(self):
637
"""If the revision_id is not present, the verb returns NoSuchRevision.
639
revision_id = 'non-existent revision'
641
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
642
self.set_last_revision(revision_id, 1))
644
def make_tree_with_two_commits(self):
645
self.tree.lock_write()
647
rev_id_utf8 = u'\xc8'.encode('utf-8')
648
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
649
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
652
def test_branch_last_revision_info_is_updated(self):
653
"""A branch's tip can be set to a revision that is present in its
656
# Make a branch with an empty revision history, but two revisions in
658
self.make_tree_with_two_commits()
659
rev_id_utf8 = u'\xc8'.encode('utf-8')
660
self.tree.branch.set_revision_history([])
662
(0, 'null:'), self.tree.branch.last_revision_info())
663
# We can update the branch to a revision that is present in the
665
self.assertRequestSucceeds(rev_id_utf8, 1)
667
(1, rev_id_utf8), self.tree.branch.last_revision_info())
669
def test_branch_last_revision_info_rewind(self):
670
"""A branch's tip can be set to a revision that is an ancestor of the
673
self.make_tree_with_two_commits()
674
rev_id_utf8 = u'\xc8'.encode('utf-8')
676
(2, 'rev-2'), self.tree.branch.last_revision_info())
677
self.assertRequestSucceeds(rev_id_utf8, 1)
679
(1, rev_id_utf8), self.tree.branch.last_revision_info())
681
def test_TipChangeRejected(self):
682
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
683
returns TipChangeRejected.
685
rejection_message = u'rejection message\N{INTERROBANG}'
686
def hook_that_rejects(params):
687
raise errors.TipChangeRejected(rejection_message)
688
Branch.hooks.install_named_hook(
689
'pre_change_branch_tip', hook_that_rejects, None)
691
FailedSmartServerResponse(
692
('TipChangeRejected', rejection_message.encode('utf-8'))),
693
self.set_last_revision('null:', 0))
696
class TestSmartServerBranchRequestSetLastRevision(
697
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
698
"""Tests for Branch.set_last_revision verb."""
700
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
702
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
703
return self.request.execute(
704
'', branch_token, repo_token, revision_id)
707
class TestSmartServerBranchRequestSetLastRevisionInfo(
708
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
709
"""Tests for Branch.set_last_revision_info verb."""
711
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
713
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
714
return self.request.execute(
715
'', branch_token, repo_token, revno, revision_id)
717
def test_NoSuchRevision(self):
718
"""Branch.set_last_revision_info does not have to return
719
NoSuchRevision if the revision_id is absent.
721
raise tests.TestNotApplicable()
724
class TestSmartServerBranchRequestSetLastRevisionEx(
725
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
726
"""Tests for Branch.set_last_revision_ex verb."""
728
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
730
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
731
return self.request.execute(
732
'', branch_token, repo_token, revision_id, 0, 0)
734
def assertRequestSucceeds(self, revision_id, revno):
735
response = self.set_last_revision(revision_id, revno)
737
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
740
def test_branch_last_revision_info_rewind(self):
741
"""A branch's tip can be set to a revision that is an ancestor of the
742
current tip, but only if allow_overwrite_descendant is passed.
744
self.make_tree_with_two_commits()
745
rev_id_utf8 = u'\xc8'.encode('utf-8')
747
(2, 'rev-2'), self.tree.branch.last_revision_info())
748
# If allow_overwrite_descendant flag is 0, then trying to set the tip
749
# to an older revision ID has no effect.
750
branch_token, repo_token = self.lock_branch()
751
response = self.request.execute(
752
'', branch_token, repo_token, rev_id_utf8, 0, 0)
754
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
757
(2, 'rev-2'), self.tree.branch.last_revision_info())
759
# If allow_overwrite_descendant flag is 1, then setting the tip to an
761
response = self.request.execute(
762
'', branch_token, repo_token, rev_id_utf8, 0, 1)
764
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
768
(1, rev_id_utf8), self.tree.branch.last_revision_info())
770
def make_branch_with_divergent_history(self):
771
"""Make a branch with divergent history in its repo.
773
The branch's tip will be 'child-2', and the repo will also contain
774
'child-1', which diverges from a common base revision.
776
self.tree.lock_write()
778
r1 = self.tree.commit('1st commit')
779
revno_1, revid_1 = self.tree.branch.last_revision_info()
780
r2 = self.tree.commit('2nd commit', rev_id='child-1')
781
# Undo the second commit
782
self.tree.branch.set_last_revision_info(revno_1, revid_1)
783
self.tree.set_parent_ids([revid_1])
784
# Make a new second commit, child-2. child-2 has diverged from
786
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
789
def test_not_allow_diverged(self):
790
"""If allow_diverged is not passed, then setting a divergent history
791
returns a Diverged error.
793
self.make_branch_with_divergent_history()
795
FailedSmartServerResponse(('Diverged',)),
796
self.set_last_revision('child-1', 2))
797
# The branch tip was not changed.
798
self.assertEqual('child-2', self.tree.branch.last_revision())
800
def test_allow_diverged(self):
801
"""If allow_diverged is passed, then setting a divergent history
804
self.make_branch_with_divergent_history()
805
branch_token, repo_token = self.lock_branch()
806
response = self.request.execute(
807
'', branch_token, repo_token, 'child-1', 1, 0)
809
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
812
# The branch tip was changed.
813
self.assertEqual('child-1', self.tree.branch.last_revision())
816
class TestSmartServerBranchRequestGetParent(tests.TestCaseWithMemoryTransport):
818
def test_get_parent_none(self):
819
base_branch = self.make_branch('base')
820
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
821
response = request.execute('base')
823
SuccessfulSmartServerResponse(('',)), response)
825
def test_get_parent_something(self):
826
base_branch = self.make_branch('base')
827
base_branch.set_parent(self.get_url('foo'))
828
request = smart.branch.SmartServerBranchGetParent(self.get_transport())
829
response = request.execute('base')
831
SuccessfulSmartServerResponse(("../foo",)),
835
class TestSmartServerBranchRequestSetParent(tests.TestCaseWithMemoryTransport):
837
def test_set_parent_none(self):
838
branch = self.make_branch('base', format="1.9")
840
branch._set_parent_location('foo')
842
request = smart.branch.SmartServerBranchRequestSetParentLocation(
843
self.get_transport())
844
branch_token = branch.lock_write()
845
repo_token = branch.repository.lock_write()
847
response = request.execute('base', branch_token, repo_token, '')
849
branch.repository.unlock()
851
self.assertEqual(SuccessfulSmartServerResponse(()), response)
852
self.assertEqual(None, branch.get_parent())
854
def test_set_parent_something(self):
855
branch = self.make_branch('base', format="1.9")
856
request = smart.branch.SmartServerBranchRequestSetParentLocation(
857
self.get_transport())
858
branch_token = branch.lock_write()
859
repo_token = branch.repository.lock_write()
861
response = request.execute('base', branch_token, repo_token,
864
branch.repository.unlock()
866
self.assertEqual(SuccessfulSmartServerResponse(()), response)
867
self.assertEqual('http://bar/', branch.get_parent())
870
class TestSmartServerBranchRequestGetTagsBytes(tests.TestCaseWithMemoryTransport):
871
# Only called when the branch format and tags match [yay factory
872
# methods] so only need to test straight forward cases.
874
def test_get_bytes(self):
875
base_branch = self.make_branch('base')
876
request = smart.branch.SmartServerBranchGetTagsBytes(
877
self.get_transport())
878
response = request.execute('base')
880
SuccessfulSmartServerResponse(('',)), response)
883
class TestSmartServerBranchRequestGetStackedOnURL(tests.TestCaseWithMemoryTransport):
885
def test_get_stacked_on_url(self):
886
base_branch = self.make_branch('base', format='1.6')
887
stacked_branch = self.make_branch('stacked', format='1.6')
888
# typically should be relative
889
stacked_branch.set_stacked_on_url('../base')
890
request = smart.branch.SmartServerBranchRequestGetStackedOnURL(
891
self.get_transport())
892
response = request.execute('stacked')
894
SmartServerResponse(('ok', '../base')),
898
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
901
tests.TestCaseWithMemoryTransport.setUp(self)
903
def test_lock_write_on_unlocked_branch(self):
904
backing = self.get_transport()
905
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
906
branch = self.make_branch('.', format='knit')
907
repository = branch.repository
908
response = request.execute('')
909
branch_nonce = branch.control_files._lock.peek().get('nonce')
910
repository_nonce = repository.control_files._lock.peek().get('nonce')
912
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
914
# The branch (and associated repository) is now locked. Verify that
915
# with a new branch object.
916
new_branch = repository.bzrdir.open_branch()
917
self.assertRaises(errors.LockContention, new_branch.lock_write)
919
def test_lock_write_on_locked_branch(self):
920
backing = self.get_transport()
921
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
922
branch = self.make_branch('.')
924
branch.leave_lock_in_place()
926
response = request.execute('')
928
SmartServerResponse(('LockContention',)), response)
930
def test_lock_write_with_tokens_on_locked_branch(self):
931
backing = self.get_transport()
932
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
933
branch = self.make_branch('.', format='knit')
934
branch_token = branch.lock_write()
935
repo_token = branch.repository.lock_write()
936
branch.repository.unlock()
937
branch.leave_lock_in_place()
938
branch.repository.leave_lock_in_place()
940
response = request.execute('',
941
branch_token, repo_token)
943
SmartServerResponse(('ok', branch_token, repo_token)), response)
945
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
946
backing = self.get_transport()
947
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
948
branch = self.make_branch('.', format='knit')
949
branch_token = branch.lock_write()
950
repo_token = branch.repository.lock_write()
951
branch.repository.unlock()
952
branch.leave_lock_in_place()
953
branch.repository.leave_lock_in_place()
955
response = request.execute('',
956
branch_token+'xxx', repo_token)
958
SmartServerResponse(('TokenMismatch',)), response)
960
def test_lock_write_on_locked_repo(self):
961
backing = self.get_transport()
962
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
963
branch = self.make_branch('.', format='knit')
964
branch.repository.lock_write()
965
branch.repository.leave_lock_in_place()
966
branch.repository.unlock()
967
response = request.execute('')
969
SmartServerResponse(('LockContention',)), response)
971
def test_lock_write_on_readonly_transport(self):
972
backing = self.get_readonly_transport()
973
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
974
branch = self.make_branch('.')
975
root = self.get_transport().clone('/')
976
path = urlutils.relative_url(root.base, self.get_transport().base)
977
response = request.execute(path)
978
error_name, lock_str, why_str = response.args
979
self.assertFalse(response.is_successful())
980
self.assertEqual('LockFailed', error_name)
983
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
986
tests.TestCaseWithMemoryTransport.setUp(self)
988
def test_unlock_on_locked_branch_and_repo(self):
989
backing = self.get_transport()
990
request = smart.branch.SmartServerBranchRequestUnlock(backing)
991
branch = self.make_branch('.', format='knit')
993
branch_token = branch.lock_write()
994
repo_token = branch.repository.lock_write()
995
branch.repository.unlock()
996
# Unlock the branch (and repo) object, leaving the physical locks
998
branch.leave_lock_in_place()
999
branch.repository.leave_lock_in_place()
1001
response = request.execute('',
1002
branch_token, repo_token)
1004
SmartServerResponse(('ok',)), response)
1005
# The branch is now unlocked. Verify that with a new branch
1007
new_branch = branch.bzrdir.open_branch()
1008
new_branch.lock_write()
1011
def test_unlock_on_unlocked_branch_unlocked_repo(self):
1012
backing = self.get_transport()
1013
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1014
branch = self.make_branch('.', format='knit')
1015
response = request.execute(
1016
'', 'branch token', 'repo token')
1018
SmartServerResponse(('TokenMismatch',)), response)
1020
def test_unlock_on_unlocked_branch_locked_repo(self):
1021
backing = self.get_transport()
1022
request = smart.branch.SmartServerBranchRequestUnlock(backing)
1023
branch = self.make_branch('.', format='knit')
1024
# Lock the repository.
1025
repo_token = branch.repository.lock_write()
1026
branch.repository.leave_lock_in_place()
1027
branch.repository.unlock()
1028
# Issue branch lock_write request on the unlocked branch (with locked
1030
response = request.execute(
1031
'', 'branch token', repo_token)
1033
SmartServerResponse(('TokenMismatch',)), response)
1036
class TestSmartServerRepositoryRequest(tests.TestCaseWithMemoryTransport):
1038
def test_no_repository(self):
1039
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
1040
# we test this using a shared repository above the named path,
1041
# thus checking the right search logic is used - that is, that
1042
# its the exact path being looked at and the server is not
1044
backing = self.get_transport()
1045
request = smart.repository.SmartServerRepositoryRequest(backing)
1046
self.make_repository('.', shared=True)
1047
self.make_bzrdir('subdir')
1048
self.assertRaises(errors.NoRepositoryPresent,
1049
request.execute, 'subdir')
1052
class TestSmartServerRepositoryGetParentMap(tests.TestCaseWithMemoryTransport):
1054
def test_trivial_bzipped(self):
1055
# This tests that the wire encoding is actually bzipped
1056
backing = self.get_transport()
1057
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1058
tree = self.make_branch_and_memory_tree('.')
1060
self.assertEqual(None,
1061
request.execute('', 'missing-id'))
1062
# Note that it returns a body that is bzipped.
1064
SuccessfulSmartServerResponse(('ok', ), bz2.compress('')),
1065
request.do_body('\n\n0\n'))
1067
def test_trivial_include_missing(self):
1068
backing = self.get_transport()
1069
request = smart.repository.SmartServerRepositoryGetParentMap(backing)
1070
tree = self.make_branch_and_memory_tree('.')
1072
self.assertEqual(None,
1073
request.execute('', 'missing-id', 'include-missing:'))
1075
SuccessfulSmartServerResponse(('ok', ),
1076
bz2.compress('missing:missing-id')),
1077
request.do_body('\n\n0\n'))
1080
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithMemoryTransport):
1082
def test_none_argument(self):
1083
backing = self.get_transport()
1084
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1085
tree = self.make_branch_and_memory_tree('.')
1088
r1 = tree.commit('1st commit')
1089
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1092
# the lines of revision_id->revision_parent_list has no guaranteed
1093
# order coming out of a dict, so sort both our test and response
1094
lines = sorted([' '.join([r2, r1]), r1])
1095
response = request.execute('', '')
1096
response.body = '\n'.join(sorted(response.body.split('\n')))
1099
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
1101
def test_specific_revision_argument(self):
1102
backing = self.get_transport()
1103
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1104
tree = self.make_branch_and_memory_tree('.')
1107
rev_id_utf8 = u'\xc9'.encode('utf-8')
1108
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
1109
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1112
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
1113
request.execute('', rev_id_utf8))
1115
def test_no_such_revision(self):
1116
backing = self.get_transport()
1117
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
1118
tree = self.make_branch_and_memory_tree('.')
1121
r1 = tree.commit('1st commit')
1124
# Note that it still returns body (of zero bytes).
1126
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
1127
request.execute('', 'missingrevision'))
1130
class TestSmartServerRepositoryGetStream(tests.TestCaseWithMemoryTransport):
1132
def make_two_commit_repo(self):
1133
tree = self.make_branch_and_memory_tree('.')
1136
r1 = tree.commit('1st commit')
1137
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
1139
repo = tree.branch.repository
1142
def test_ancestry_of(self):
1143
"""The search argument may be a 'ancestry-of' some heads'."""
1144
backing = self.get_transport()
1145
request = smart.repository.SmartServerRepositoryGetStream(backing)
1146
repo, r1, r2 = self.make_two_commit_repo()
1147
fetch_spec = ['ancestry-of', r2]
1148
lines = '\n'.join(fetch_spec)
1149
request.execute('', repo._format.network_name())
1150
response = request.do_body(lines)
1151
self.assertEqual(('ok',), response.args)
1152
stream_bytes = ''.join(response.body_stream)
1153
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1155
def test_search(self):
1156
"""The search argument may be a 'search' of some explicit keys."""
1157
backing = self.get_transport()
1158
request = smart.repository.SmartServerRepositoryGetStream(backing)
1159
repo, r1, r2 = self.make_two_commit_repo()
1160
fetch_spec = ['search', '%s %s' % (r1, r2), 'null:', '2']
1161
lines = '\n'.join(fetch_spec)
1162
request.execute('', repo._format.network_name())
1163
response = request.do_body(lines)
1164
self.assertEqual(('ok',), response.args)
1165
stream_bytes = ''.join(response.body_stream)
1166
self.assertStartsWith(stream_bytes, 'Bazaar pack format 1')
1169
class TestSmartServerRequestHasRevision(tests.TestCaseWithMemoryTransport):
1171
def test_missing_revision(self):
1172
"""For a missing revision, ('no', ) is returned."""
1173
backing = self.get_transport()
1174
request = smart.repository.SmartServerRequestHasRevision(backing)
1175
self.make_repository('.')
1176
self.assertEqual(SmartServerResponse(('no', )),
1177
request.execute('', 'revid'))
1179
def test_present_revision(self):
1180
"""For a present revision, ('yes', ) is returned."""
1181
backing = self.get_transport()
1182
request = smart.repository.SmartServerRequestHasRevision(backing)
1183
tree = self.make_branch_and_memory_tree('.')
1186
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1187
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
1189
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
1190
self.assertEqual(SmartServerResponse(('yes', )),
1191
request.execute('', rev_id_utf8))
1194
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithMemoryTransport):
1196
def test_empty_revid(self):
1197
"""With an empty revid, we get only size an number and revisions"""
1198
backing = self.get_transport()
1199
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1200
repository = self.make_repository('.')
1201
stats = repository.gather_stats()
1202
expected_body = 'revisions: 0\n'
1203
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1204
request.execute('', '', 'no'))
1206
def test_revid_with_committers(self):
1207
"""For a revid we get more infos."""
1208
backing = self.get_transport()
1209
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1210
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1211
tree = self.make_branch_and_memory_tree('.')
1214
# Let's build a predictable result
1215
tree.commit('a commit', timestamp=123456.2, timezone=3600)
1216
tree.commit('a commit', timestamp=654321.4, timezone=0,
1220
stats = tree.branch.repository.gather_stats()
1221
expected_body = ('firstrev: 123456.200 3600\n'
1222
'latestrev: 654321.400 0\n'
1224
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1228
def test_not_empty_repository_with_committers(self):
1229
"""For a revid and requesting committers we get the whole thing."""
1230
backing = self.get_transport()
1231
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
1232
request = smart.repository.SmartServerRepositoryGatherStats(backing)
1233
tree = self.make_branch_and_memory_tree('.')
1236
# Let's build a predictable result
1237
tree.commit('a commit', timestamp=123456.2, timezone=3600,
1239
tree.commit('a commit', timestamp=654321.4, timezone=0,
1240
committer='bar', rev_id=rev_id_utf8)
1242
stats = tree.branch.repository.gather_stats()
1244
expected_body = ('committers: 2\n'
1245
'firstrev: 123456.200 3600\n'
1246
'latestrev: 654321.400 0\n'
1248
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
1250
rev_id_utf8, 'yes'))
1253
class TestSmartServerRepositoryIsShared(tests.TestCaseWithMemoryTransport):
1255
def test_is_shared(self):
1256
"""For a shared repository, ('yes', ) is returned."""
1257
backing = self.get_transport()
1258
request = smart.repository.SmartServerRepositoryIsShared(backing)
1259
self.make_repository('.', shared=True)
1260
self.assertEqual(SmartServerResponse(('yes', )),
1261
request.execute('', ))
1263
def test_is_not_shared(self):
1264
"""For a shared repository, ('no', ) is returned."""
1265
backing = self.get_transport()
1266
request = smart.repository.SmartServerRepositoryIsShared(backing)
1267
self.make_repository('.', shared=False)
1268
self.assertEqual(SmartServerResponse(('no', )),
1269
request.execute('', ))
1272
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
1274
def test_lock_write_on_unlocked_repo(self):
1275
backing = self.get_transport()
1276
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1277
repository = self.make_repository('.', format='knit')
1278
response = request.execute('')
1279
nonce = repository.control_files._lock.peek().get('nonce')
1280
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
1281
# The repository is now locked. Verify that with a new repository
1283
new_repo = repository.bzrdir.open_repository()
1284
self.assertRaises(errors.LockContention, new_repo.lock_write)
1286
def test_lock_write_on_locked_repo(self):
1287
backing = self.get_transport()
1288
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1289
repository = self.make_repository('.', format='knit')
1290
repository.lock_write()
1291
repository.leave_lock_in_place()
1293
response = request.execute('')
1295
SmartServerResponse(('LockContention',)), response)
1297
def test_lock_write_on_readonly_transport(self):
1298
backing = self.get_readonly_transport()
1299
request = smart.repository.SmartServerRepositoryLockWrite(backing)
1300
repository = self.make_repository('.', format='knit')
1301
response = request.execute('')
1302
self.assertFalse(response.is_successful())
1303
self.assertEqual('LockFailed', response.args[0])
1306
class TestInsertStreamBase(tests.TestCaseWithMemoryTransport):
1308
def make_empty_byte_stream(self, repo):
1309
byte_stream = smart.repository._stream_to_byte_stream([], repo._format)
1310
return ''.join(byte_stream)
1313
class TestSmartServerRepositoryInsertStream(TestInsertStreamBase):
1315
def test_insert_stream_empty(self):
1316
backing = self.get_transport()
1317
request = smart.repository.SmartServerRepositoryInsertStream(backing)
1318
repository = self.make_repository('.')
1319
response = request.execute('', '')
1320
self.assertEqual(None, response)
1321
response = request.do_chunk(self.make_empty_byte_stream(repository))
1322
self.assertEqual(None, response)
1323
response = request.do_end()
1324
self.assertEqual(SmartServerResponse(('ok', )), response)
1327
class TestSmartServerRepositoryInsertStreamLocked(TestInsertStreamBase):
1329
def test_insert_stream_empty(self):
1330
backing = self.get_transport()
1331
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1333
repository = self.make_repository('.', format='knit')
1334
lock_token = repository.lock_write()
1335
response = request.execute('', '', lock_token)
1336
self.assertEqual(None, response)
1337
response = request.do_chunk(self.make_empty_byte_stream(repository))
1338
self.assertEqual(None, response)
1339
response = request.do_end()
1340
self.assertEqual(SmartServerResponse(('ok', )), response)
1343
def test_insert_stream_with_wrong_lock_token(self):
1344
backing = self.get_transport()
1345
request = smart.repository.SmartServerRepositoryInsertStreamLocked(
1347
repository = self.make_repository('.', format='knit')
1348
lock_token = repository.lock_write()
1350
errors.TokenMismatch, request.execute, '', '', 'wrong-token')
1354
class TestSmartServerRepositoryUnlock(tests.TestCaseWithMemoryTransport):
1357
tests.TestCaseWithMemoryTransport.setUp(self)
1359
def test_unlock_on_locked_repo(self):
1360
backing = self.get_transport()
1361
request = smart.repository.SmartServerRepositoryUnlock(backing)
1362
repository = self.make_repository('.', format='knit')
1363
token = repository.lock_write()
1364
repository.leave_lock_in_place()
1366
response = request.execute('', token)
1368
SmartServerResponse(('ok',)), response)
1369
# The repository is now unlocked. Verify that with a new repository
1371
new_repo = repository.bzrdir.open_repository()
1372
new_repo.lock_write()
1375
def test_unlock_on_unlocked_repo(self):
1376
backing = self.get_transport()
1377
request = smart.repository.SmartServerRepositoryUnlock(backing)
1378
repository = self.make_repository('.', format='knit')
1379
response = request.execute('', 'some token')
1381
SmartServerResponse(('TokenMismatch',)), response)
1384
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
1386
def test_is_readonly_no(self):
1387
backing = self.get_transport()
1388
request = smart.request.SmartServerIsReadonly(backing)
1389
response = request.execute()
1391
SmartServerResponse(('no',)), response)
1393
def test_is_readonly_yes(self):
1394
backing = self.get_readonly_transport()
1395
request = smart.request.SmartServerIsReadonly(backing)
1396
response = request.execute()
1398
SmartServerResponse(('yes',)), response)
1401
class TestSmartServerRepositorySetMakeWorkingTrees(tests.TestCaseWithMemoryTransport):
1403
def test_set_false(self):
1404
backing = self.get_transport()
1405
repo = self.make_repository('.', shared=True)
1406
repo.set_make_working_trees(True)
1407
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1408
request = request_class(backing)
1409
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1410
request.execute('', 'False'))
1411
repo = repo.bzrdir.open_repository()
1412
self.assertFalse(repo.make_working_trees())
1414
def test_set_true(self):
1415
backing = self.get_transport()
1416
repo = self.make_repository('.', shared=True)
1417
repo.set_make_working_trees(False)
1418
request_class = smart.repository.SmartServerRepositorySetMakeWorkingTrees
1419
request = request_class(backing)
1420
self.assertEqual(SuccessfulSmartServerResponse(('ok',)),
1421
request.execute('', 'True'))
1422
repo = repo.bzrdir.open_repository()
1423
self.assertTrue(repo.make_working_trees())
1426
class TestSmartServerPackRepositoryAutopack(tests.TestCaseWithTransport):
1428
def make_repo_needing_autopacking(self, path='.'):
1429
# Make a repo in need of autopacking.
1430
tree = self.make_branch_and_tree('.', format='pack-0.92')
1431
repo = tree.branch.repository
1432
# monkey-patch the pack collection to disable autopacking
1433
repo._pack_collection._max_pack_count = lambda count: count
1435
tree.commit('commit %s' % x)
1436
self.assertEqual(10, len(repo._pack_collection.names()))
1437
del repo._pack_collection._max_pack_count
1440
def test_autopack_needed(self):
1441
repo = self.make_repo_needing_autopacking()
1443
self.addCleanup(repo.unlock)
1444
backing = self.get_transport()
1445
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1447
response = request.execute('')
1448
self.assertEqual(SmartServerResponse(('ok',)), response)
1449
repo._pack_collection.reload_pack_names()
1450
self.assertEqual(1, len(repo._pack_collection.names()))
1452
def test_autopack_not_needed(self):
1453
tree = self.make_branch_and_tree('.', format='pack-0.92')
1454
repo = tree.branch.repository
1456
self.addCleanup(repo.unlock)
1458
tree.commit('commit %s' % x)
1459
backing = self.get_transport()
1460
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1462
response = request.execute('')
1463
self.assertEqual(SmartServerResponse(('ok',)), response)
1464
repo._pack_collection.reload_pack_names()
1465
self.assertEqual(9, len(repo._pack_collection.names()))
1467
def test_autopack_on_nonpack_format(self):
1468
"""A request to autopack a non-pack repo is a no-op."""
1469
repo = self.make_repository('.', format='knit')
1470
backing = self.get_transport()
1471
request = smart.packrepository.SmartServerPackRepositoryAutopack(
1473
response = request.execute('')
1474
self.assertEqual(SmartServerResponse(('ok',)), response)
1477
class TestHandlers(tests.TestCase):
1478
"""Tests for the request.request_handlers object."""
1480
def test_all_registrations_exist(self):
1481
"""All registered request_handlers can be found."""
1482
# If there's a typo in a register_lazy call, this loop will fail with
1483
# an AttributeError.
1484
for key, item in smart.request.request_handlers.iteritems():
1487
def assertHandlerEqual(self, verb, handler):
1488
self.assertEqual(smart.request.request_handlers.get(verb), handler)
1490
def test_registered_methods(self):
1491
"""Test that known methods are registered to the correct object."""
1492
self.assertHandlerEqual('Branch.get_config_file',
1493
smart.branch.SmartServerBranchGetConfigFile)
1494
self.assertHandlerEqual('Branch.get_parent',
1495
smart.branch.SmartServerBranchGetParent)
1496
self.assertHandlerEqual('Branch.get_tags_bytes',
1497
smart.branch.SmartServerBranchGetTagsBytes)
1498
self.assertHandlerEqual('Branch.lock_write',
1499
smart.branch.SmartServerBranchRequestLockWrite)
1500
self.assertHandlerEqual('Branch.last_revision_info',
1501
smart.branch.SmartServerBranchRequestLastRevisionInfo)
1502
self.assertHandlerEqual('Branch.revision_history',
1503
smart.branch.SmartServerRequestRevisionHistory)
1504
self.assertHandlerEqual('Branch.set_config_option',
1505
smart.branch.SmartServerBranchRequestSetConfigOption)
1506
self.assertHandlerEqual('Branch.set_last_revision',
1507
smart.branch.SmartServerBranchRequestSetLastRevision)
1508
self.assertHandlerEqual('Branch.set_last_revision_info',
1509
smart.branch.SmartServerBranchRequestSetLastRevisionInfo)
1510
self.assertHandlerEqual('Branch.set_last_revision_ex',
1511
smart.branch.SmartServerBranchRequestSetLastRevisionEx)
1512
self.assertHandlerEqual('Branch.set_parent_location',
1513
smart.branch.SmartServerBranchRequestSetParentLocation)
1514
self.assertHandlerEqual('Branch.unlock',
1515
smart.branch.SmartServerBranchRequestUnlock)
1516
self.assertHandlerEqual('BzrDir.find_repository',
1517
smart.bzrdir.SmartServerRequestFindRepositoryV1)
1518
self.assertHandlerEqual('BzrDir.find_repositoryV2',
1519
smart.bzrdir.SmartServerRequestFindRepositoryV2)
1520
self.assertHandlerEqual('BzrDirFormat.initialize',
1521
smart.bzrdir.SmartServerRequestInitializeBzrDir)
1522
self.assertHandlerEqual('BzrDirFormat.initialize_ex',
1523
smart.bzrdir.SmartServerRequestBzrDirInitializeEx)
1524
self.assertHandlerEqual('BzrDir.cloning_metadir',
1525
smart.bzrdir.SmartServerBzrDirRequestCloningMetaDir)
1526
self.assertHandlerEqual('BzrDir.get_config_file',
1527
smart.bzrdir.SmartServerBzrDirRequestConfigFile)
1528
self.assertHandlerEqual('BzrDir.open_branch',
1529
smart.bzrdir.SmartServerRequestOpenBranch)
1530
self.assertHandlerEqual('BzrDir.open_branchV2',
1531
smart.bzrdir.SmartServerRequestOpenBranchV2)
1532
self.assertHandlerEqual('PackRepository.autopack',
1533
smart.packrepository.SmartServerPackRepositoryAutopack)
1534
self.assertHandlerEqual('Repository.gather_stats',
1535
smart.repository.SmartServerRepositoryGatherStats)
1536
self.assertHandlerEqual('Repository.get_parent_map',
1537
smart.repository.SmartServerRepositoryGetParentMap)
1538
self.assertHandlerEqual('Repository.get_revision_graph',
1539
smart.repository.SmartServerRepositoryGetRevisionGraph)
1540
self.assertHandlerEqual('Repository.get_stream',
1541
smart.repository.SmartServerRepositoryGetStream)
1542
self.assertHandlerEqual('Repository.has_revision',
1543
smart.repository.SmartServerRequestHasRevision)
1544
self.assertHandlerEqual('Repository.insert_stream',
1545
smart.repository.SmartServerRepositoryInsertStream)
1546
self.assertHandlerEqual('Repository.insert_stream_locked',
1547
smart.repository.SmartServerRepositoryInsertStreamLocked)
1548
self.assertHandlerEqual('Repository.is_shared',
1549
smart.repository.SmartServerRepositoryIsShared)
1550
self.assertHandlerEqual('Repository.lock_write',
1551
smart.repository.SmartServerRepositoryLockWrite)
1552
self.assertHandlerEqual('Repository.tarball',
1553
smart.repository.SmartServerRepositoryTarball)
1554
self.assertHandlerEqual('Repository.unlock',
1555
smart.repository.SmartServerRepositoryUnlock)
1556
self.assertHandlerEqual('Transport.is_readonly',
1557
smart.request.SmartServerIsReadonly)