389
392
backing = self.get_transport()
390
393
request = smart.branch.SmartServerBranchGetConfigFile(backing)
391
394
branch = self.make_branch('.')
392
branch.control_files.put_utf8('branch.conf', 'foo bar baz')
395
branch._transport.put_bytes('branch.conf', 'foo bar baz')
393
396
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
394
397
request.execute(''))
397
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithMemoryTransport):
399
def test_empty(self):
400
backing = self.get_transport()
401
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
402
b = self.make_branch('.')
403
branch_token = b.lock_write()
404
repo_token = b.repository.lock_write()
405
b.repository.unlock()
407
self.assertEqual(SmartServerResponse(('ok',)),
409
'', branch_token, repo_token,
414
def test_not_present_revision_id(self):
415
backing = self.get_transport()
416
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
417
b = self.make_branch('.')
418
branch_token = b.lock_write()
419
repo_token = b.repository.lock_write()
420
b.repository.unlock()
422
revision_id = 'non-existent revision'
424
SmartServerResponse(('NoSuchRevision', revision_id)),
426
'', branch_token, repo_token,
431
def test_revision_id_present(self):
432
backing = self.get_transport()
433
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
434
tree = self.make_branch_and_memory_tree('.')
437
rev_id_utf8 = u'\xc8'.encode('utf-8')
438
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
439
r2 = tree.commit('2nd commit')
441
branch_token = tree.branch.lock_write()
442
repo_token = tree.branch.repository.lock_write()
443
tree.branch.repository.unlock()
446
SmartServerResponse(('ok',)),
448
'', branch_token, repo_token,
450
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
454
def test_revision_id_present2(self):
455
backing = self.get_transport()
456
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
457
tree = self.make_branch_and_memory_tree('.')
460
rev_id_utf8 = u'\xc8'.encode('utf-8')
461
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
462
r2 = tree.commit('2nd commit')
464
tree.branch.set_revision_history([])
465
branch_token = tree.branch.lock_write()
466
repo_token = tree.branch.repository.lock_write()
467
tree.branch.repository.unlock()
470
SmartServerResponse(('ok',)),
472
'', branch_token, repo_token,
474
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
400
class SetLastRevisionTestBase(tests.TestCaseWithMemoryTransport):
401
"""Base test case for verbs that implement set_last_revision."""
404
tests.TestCaseWithMemoryTransport.setUp(self)
405
backing_transport = self.get_transport()
406
self.request = self.request_class(backing_transport)
407
self.tree = self.make_branch_and_memory_tree('.')
409
def lock_branch(self):
411
branch_token = b.lock_write()
412
repo_token = b.repository.lock_write()
413
b.repository.unlock()
414
return branch_token, repo_token
416
def unlock_branch(self):
417
self.tree.branch.unlock()
419
def set_last_revision(self, revision_id, revno):
420
branch_token, repo_token = self.lock_branch()
421
response = self._set_last_revision(
422
revision_id, revno, branch_token, repo_token)
426
def assertRequestSucceeds(self, revision_id, revno):
427
response = self.set_last_revision(revision_id, revno)
428
self.assertEqual(SuccessfulSmartServerResponse(('ok',)), response)
431
class TestSetLastRevisionVerbMixin(object):
432
"""Mixin test case for verbs that implement set_last_revision."""
434
def test_set_null_to_null(self):
435
"""An empty branch can have its last revision set to 'null:'."""
436
self.assertRequestSucceeds('null:', 0)
438
def test_NoSuchRevision(self):
439
"""If the revision_id is not present, the verb returns NoSuchRevision.
441
revision_id = 'non-existent revision'
443
FailedSmartServerResponse(('NoSuchRevision', revision_id)),
444
self.set_last_revision(revision_id, 1))
446
def make_tree_with_two_commits(self):
447
self.tree.lock_write()
449
rev_id_utf8 = u'\xc8'.encode('utf-8')
450
r1 = self.tree.commit('1st commit', rev_id=rev_id_utf8)
451
r2 = self.tree.commit('2nd commit', rev_id='rev-2')
454
def test_branch_last_revision_info_is_updated(self):
455
"""A branch's tip can be set to a revision that is present in its
458
# Make a branch with an empty revision history, but two revisions in
460
self.make_tree_with_two_commits()
461
rev_id_utf8 = u'\xc8'.encode('utf-8')
462
self.tree.branch.set_revision_history([])
464
(0, 'null:'), self.tree.branch.last_revision_info())
465
# We can update the branch to a revision that is present in the
467
self.assertRequestSucceeds(rev_id_utf8, 1)
469
(1, rev_id_utf8), self.tree.branch.last_revision_info())
471
def test_branch_last_revision_info_rewind(self):
472
"""A branch's tip can be set to a revision that is an ancestor of the
475
self.make_tree_with_two_commits()
476
rev_id_utf8 = u'\xc8'.encode('utf-8')
478
(2, 'rev-2'), self.tree.branch.last_revision_info())
479
self.assertRequestSucceeds(rev_id_utf8, 1)
481
(1, rev_id_utf8), self.tree.branch.last_revision_info())
483
def test_TipChangeRejected(self):
484
"""If a pre_change_branch_tip hook raises TipChangeRejected, the verb
485
returns TipChangeRejected.
487
rejection_message = u'rejection message\N{INTERROBANG}'
488
def hook_that_rejects(params):
489
raise errors.TipChangeRejected(rejection_message)
490
Branch.hooks.install_named_hook(
491
'pre_change_branch_tip', hook_that_rejects, None)
493
FailedSmartServerResponse(
494
('TipChangeRejected', rejection_message.encode('utf-8'))),
495
self.set_last_revision('null:', 0))
498
class TestSmartServerBranchRequestSetLastRevision(
499
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
500
"""Tests for Branch.set_last_revision verb."""
502
request_class = smart.branch.SmartServerBranchRequestSetLastRevision
504
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
505
return self.request.execute(
506
'', branch_token, repo_token, revision_id)
509
class TestSmartServerBranchRequestSetLastRevisionInfo(
510
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
511
"""Tests for Branch.set_last_revision_info verb."""
513
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionInfo
515
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
516
return self.request.execute(
517
'', branch_token, repo_token, revno, revision_id)
519
def test_NoSuchRevision(self):
520
"""Branch.set_last_revision_info does not have to return
521
NoSuchRevision if the revision_id is absent.
523
raise tests.TestNotApplicable()
526
class TestSmartServerBranchRequestSetLastRevisionEx(
527
SetLastRevisionTestBase, TestSetLastRevisionVerbMixin):
528
"""Tests for Branch.set_last_revision_ex verb."""
530
request_class = smart.branch.SmartServerBranchRequestSetLastRevisionEx
532
def _set_last_revision(self, revision_id, revno, branch_token, repo_token):
533
return self.request.execute(
534
'', branch_token, repo_token, revision_id, 0, 0)
536
def assertRequestSucceeds(self, revision_id, revno):
537
response = self.set_last_revision(revision_id, revno)
539
SuccessfulSmartServerResponse(('ok', revno, revision_id)),
542
def test_branch_last_revision_info_rewind(self):
543
"""A branch's tip can be set to a revision that is an ancestor of the
544
current tip, but only if allow_overwrite_descendant is passed.
546
self.make_tree_with_two_commits()
547
rev_id_utf8 = u'\xc8'.encode('utf-8')
549
(2, 'rev-2'), self.tree.branch.last_revision_info())
550
# If allow_overwrite_descendant flag is 0, then trying to set the tip
551
# to an older revision ID has no effect.
552
branch_token, repo_token = self.lock_branch()
553
response = self.request.execute(
554
'', branch_token, repo_token, rev_id_utf8, 0, 0)
556
SuccessfulSmartServerResponse(('ok', 2, 'rev-2')),
559
(2, 'rev-2'), self.tree.branch.last_revision_info())
561
# If allow_overwrite_descendant flag is 1, then setting the tip to an
563
response = self.request.execute(
564
'', branch_token, repo_token, rev_id_utf8, 0, 1)
566
SuccessfulSmartServerResponse(('ok', 1, rev_id_utf8)),
570
(1, rev_id_utf8), self.tree.branch.last_revision_info())
572
def make_branch_with_divergent_history(self):
573
"""Make a branch with divergent history in its repo.
575
The branch's tip will be 'child-2', and the repo will also contain
576
'child-1', which diverges from a common base revision.
578
self.tree.lock_write()
580
r1 = self.tree.commit('1st commit')
581
revno_1, revid_1 = self.tree.branch.last_revision_info()
582
r2 = self.tree.commit('2nd commit', rev_id='child-1')
583
# Undo the second commit
584
self.tree.branch.set_last_revision_info(revno_1, revid_1)
585
self.tree.set_parent_ids([revid_1])
586
# Make a new second commit, child-2. child-2 has diverged from
588
new_r2 = self.tree.commit('2nd commit', rev_id='child-2')
591
def test_not_allow_diverged(self):
592
"""If allow_diverged is not passed, then setting a divergent history
593
returns a Diverged error.
595
self.make_branch_with_divergent_history()
597
FailedSmartServerResponse(('Diverged',)),
598
self.set_last_revision('child-1', 2))
599
# The branch tip was not changed.
600
self.assertEqual('child-2', self.tree.branch.last_revision())
602
def test_allow_diverged(self):
603
"""If allow_diverged is passed, then setting a divergent history
606
self.make_branch_with_divergent_history()
607
branch_token, repo_token = self.lock_branch()
608
response = self.request.execute(
609
'', branch_token, repo_token, 'child-1', 1, 0)
611
SuccessfulSmartServerResponse(('ok', 2, 'child-1')),
614
# The branch tip was changed.
615
self.assertEqual('child-1', self.tree.branch.last_revision())
479
618
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
871
1005
SmartServerResponse(('TokenMismatch',)), response)
874
class TestSmartServerRepositoryTarball(tests.TestCaseWithTransport):
876
def test_repository_tarball(self):
877
backing = self.get_transport()
878
request = smart.repository.SmartServerRepositoryTarball(backing)
879
repository = self.make_repository('.')
880
# make some extraneous junk in the repository directory which should
882
self.build_tree(['.bzr/repository/extra-junk'])
883
response = request.execute('', 'bz2')
884
self.assertEqual(('ok',), response.args)
885
# body should be a tbz2
886
body_file = StringIO(response.body)
887
body_tar = tarfile.open('body_tar.tbz2', fileobj=body_file,
889
# let's make sure there are some key repository components inside it.
890
# the tarfile returns directories with trailing slashes...
891
names = set([n.rstrip('/') for n in body_tar.getnames()])
892
self.assertTrue('.bzr/repository/lock' in names)
893
self.assertTrue('.bzr/repository/format' in names)
894
self.assertTrue('.bzr/repository/extra-junk' not in names,
895
"extraneous file present in tar file")
898
class TestSmartServerRepositoryStreamKnitData(tests.TestCaseWithMemoryTransport):
900
def test_fetch_revisions(self):
901
backing = self.get_transport()
902
request = smart.repository.SmartServerRepositoryStreamKnitDataForRevisions(backing)
903
tree = self.make_branch_and_memory_tree('.')
906
rev_id1_utf8 = u'\xc8'.encode('utf-8')
907
rev_id2_utf8 = u'\xc9'.encode('utf-8')
908
r1 = tree.commit('1st commit', rev_id=rev_id1_utf8)
909
r1 = tree.commit('2nd commit', rev_id=rev_id2_utf8)
912
response = request.execute('', rev_id2_utf8)
913
self.assertEqual(('ok',), response.args)
914
unpacker = pack.ContainerReader(StringIO(response.body))
916
for [name], read_bytes in unpacker.iter_records():
918
bytes = read_bytes(None)
919
# The bytes should be a valid bencoded string.
920
bencode.bdecode(bytes)
921
# XXX: assert that the bencoded knit records have the right
924
def test_no_such_revision_error(self):
925
backing = self.get_transport()
926
request = smart.repository.SmartServerRepositoryStreamKnitDataForRevisions(backing)
927
repo = self.make_repository('.')
928
rev_id1_utf8 = u'\xc8'.encode('utf-8')
929
response = request.execute('', rev_id1_utf8)
931
SmartServerResponse(('NoSuchRevision', rev_id1_utf8)),
935
class TestSmartServerRepositoryStreamRevisionsChunked(tests.TestCaseWithMemoryTransport):
937
def test_fetch_revisions(self):
938
backing = self.get_transport()
939
request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
941
tree = self.make_branch_and_memory_tree('.')
944
rev_id1_utf8 = u'\xc8'.encode('utf-8')
945
rev_id2_utf8 = u'\xc9'.encode('utf-8')
946
tree.commit('1st commit', rev_id=rev_id1_utf8)
947
tree.commit('2nd commit', rev_id=rev_id2_utf8)
950
response = request.execute('')
951
self.assertEqual(None, response)
952
response = request.do_body("%s\n%s\n1" % (rev_id2_utf8, rev_id1_utf8))
953
self.assertEqual(('ok',), response.args)
954
parser = pack.ContainerPushParser()
956
for stream_bytes in response.body_stream:
957
parser.accept_bytes(stream_bytes)
958
for [name], record_bytes in parser.read_pending_records():
960
# The bytes should be a valid bencoded string.
961
bencode.bdecode(record_bytes)
962
# XXX: assert that the bencoded knit records have the right
965
def test_no_such_revision_error(self):
966
backing = self.get_transport()
967
request = smart.repository.SmartServerRepositoryStreamRevisionsChunked(
969
repo = self.make_repository('.')
970
rev_id1_utf8 = u'\xc8'.encode('utf-8')
971
response = request.execute('')
972
self.assertEqual(None, response)
973
response = request.do_body("%s\n\n1" % (rev_id1_utf8,))
975
FailedSmartServerResponse(('NoSuchRevision', )),
979
1008
class TestSmartServerIsReadonly(tests.TestCaseWithMemoryTransport):
981
1010
def test_is_readonly_no(self):