28
from StringIO import StringIO
28
from cStringIO import StringIO
32
from bzrlib import bzrdir, errors, pack, smart, tests
39
from bzrlib.branch import BranchReferenceFormat
40
import bzrlib.smart.branch
41
import bzrlib.smart.bzrdir
42
import bzrlib.smart.repository
33
43
from bzrlib.smart.request import (
34
44
FailedSmartServerResponse,
35
46
SmartServerResponse,
36
47
SuccessfulSmartServerResponse,
38
import bzrlib.smart.bzrdir
39
import bzrlib.smart.branch
40
import bzrlib.smart.repository
41
49
from bzrlib.tests import (
44
52
TestScenarioApplier,
54
from bzrlib.transport import chroot, get_transport
46
55
from bzrlib.util import bencode
98
128
self.assertNotEqual(None,
99
129
SmartServerResponse(('ok', )))
102
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
131
def test__str__(self):
132
"""SmartServerResponses can be stringified."""
134
"<SmartServerResponse status=OK args=('args',) body='body'>",
135
str(SuccessfulSmartServerResponse(('args',), 'body')))
137
"<SmartServerResponse status=ERR args=('args',) body='body'>",
138
str(FailedSmartServerResponse(('args',), 'body')))
141
class TestSmartServerRequest(tests.TestCaseWithMemoryTransport):
143
def test_translate_client_path(self):
144
transport = self.get_transport()
145
request = SmartServerRequest(transport, 'foo/')
146
self.assertEqual('./', request.translate_client_path('foo/'))
148
errors.InvalidURLJoin, request.translate_client_path, 'foo/..')
150
errors.PathNotChild, request.translate_client_path, '/')
152
errors.PathNotChild, request.translate_client_path, 'bar/')
153
self.assertEqual('./baz', request.translate_client_path('foo/baz'))
155
def test_transport_from_client_path(self):
156
transport = self.get_transport()
157
request = SmartServerRequest(transport, 'foo/')
160
request.transport_from_client_path('foo/').base)
163
class TestSmartServerRequestFindRepository(tests.TestCaseWithMemoryTransport):
103
164
"""Tests for BzrDir.find_repository."""
105
166
def test_no_repository(self):
150
211
backing = self.get_transport()
151
212
request = self._request_class(backing)
152
213
result = self._make_repository_and_result(shared=True)
153
self.assertEqual(result, request.execute(backing.local_abspath('')))
214
self.assertEqual(result, request.execute(''))
154
215
self.make_bzrdir('subdir')
155
216
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
156
217
self.assertEqual(result2,
157
request.execute(backing.local_abspath('subdir')))
218
request.execute('subdir'))
158
219
self.make_bzrdir('subdir/deeper')
159
220
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
160
221
self.assertEqual(result3,
161
request.execute(backing.local_abspath('subdir/deeper')))
222
request.execute('subdir/deeper'))
163
224
def test_rich_root_and_subtree_encoding(self):
164
225
"""Test for the format attributes for rich root and subtree support."""
177
238
result = self._make_repository_and_result(format='dirstate-with-subtree')
178
239
# check the test will be valid
179
240
self.assertEqual('no', result.args[4])
180
self.assertEqual(result, request.execute(backing.local_abspath('')))
183
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
241
self.assertEqual(result, request.execute(''))
244
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithMemoryTransport):
185
246
def test_empty_dir(self):
186
247
"""Initializing an empty dir should succeed and do it."""
187
248
backing = self.get_transport()
188
249
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
189
250
self.assertEqual(SmartServerResponse(('ok', )),
190
request.execute(backing.local_abspath('.')))
191
252
made_dir = bzrdir.BzrDir.open_from_transport(backing)
192
253
# no branch, tree or repository is expected with the current
193
254
# default formart.
235
296
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
236
297
branch = self.make_branch('branch')
237
298
checkout = branch.create_checkout('reference',lightweight=True)
238
# TODO: once we have an API to probe for references of any sort, we
240
reference_url = backing.abspath('branch') + '/'
299
reference_url = BranchReferenceFormat().get_reference(checkout.bzrdir)
241
300
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
242
301
self.assertEqual(SmartServerResponse(('ok', reference_url)),
243
request.execute(backing.local_abspath('reference')))
246
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
302
request.execute('reference'))
305
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithMemoryTransport):
248
307
def test_empty(self):
249
308
"""For an empty branch, the body is empty."""
413
472
self.assertEqual(
414
473
SmartServerResponse(('ok',)),
416
backing.local_abspath(''), branch_token, repo_token,
475
'', branch_token, repo_token,
418
477
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
420
479
tree.branch.unlock()
423
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
482
class TestSmartServerBranchRequestSetLastRevisionInfo(tests.TestCaseWithTransport):
484
def lock_branch(self, branch):
485
branch_token = branch.lock_write()
486
repo_token = branch.repository.lock_write()
487
branch.repository.unlock()
488
self.addCleanup(branch.unlock)
489
return branch_token, repo_token
491
def make_locked_branch(self, format=None):
492
branch = self.make_branch('.', format=format)
493
branch_token, repo_token = self.lock_branch(branch)
494
return branch, branch_token, repo_token
496
def test_empty(self):
497
"""An empty branch can have its last revision set to 'null:'."""
498
b, branch_token, repo_token = self.make_locked_branch()
499
backing = self.get_transport()
500
request = smart.branch.SmartServerBranchRequestSetLastRevisionInfo(
502
response = request.execute('', branch_token, repo_token, '0', 'null:')
503
self.assertEqual(SmartServerResponse(('ok',)), response)
505
def assertBranchLastRevisionInfo(self, expected_info, branch_relpath):
506
branch = bzrdir.BzrDir.open(branch_relpath).open_branch()
507
self.assertEqual(expected_info, branch.last_revision_info())
509
def test_branch_revision_info_is_updated(self):
510
"""This method really does update the branch last revision info."""
511
tree = self.make_branch_and_memory_tree('.')
514
tree.commit('First commit', rev_id='revision-1')
515
tree.commit('Second commit', rev_id='revision-2')
519
branch_token, repo_token = self.lock_branch(branch)
520
backing = self.get_transport()
521
request = smart.branch.SmartServerBranchRequestSetLastRevisionInfo(
523
self.assertBranchLastRevisionInfo((2, 'revision-2'), '.')
524
response = request.execute(
525
'', branch_token, repo_token, '1', 'revision-1')
526
self.assertEqual(SmartServerResponse(('ok',)), response)
527
self.assertBranchLastRevisionInfo((1, 'revision-1'), '.')
529
def test_not_present_revid(self):
530
"""Some branch formats will check that the revision is present in the
531
repository. When that check fails, a NoSuchRevision error is returned
534
# Make a knit format branch, because that format checks the values
535
# given to set_last_revision_info.
536
b, branch_token, repo_token = self.make_locked_branch(format='knit')
537
backing = self.get_transport()
538
request = smart.branch.SmartServerBranchRequestSetLastRevisionInfo(
540
response = request.execute(
541
'', branch_token, repo_token, '1', 'not-present')
543
SmartServerResponse(('NoSuchRevision', 'not-present')), response)
546
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithMemoryTransport):
426
tests.TestCaseWithTransport.setUp(self)
427
self.reduceLockdirTimeout()
549
tests.TestCaseWithMemoryTransport.setUp(self)
429
551
def test_lock_write_on_unlocked_branch(self):
430
552
backing = self.get_transport()
431
553
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
432
554
branch = self.make_branch('.', format='knit')
433
555
repository = branch.repository
434
response = request.execute(backing.local_abspath(''))
556
response = request.execute('')
435
557
branch_nonce = branch.control_files._lock.peek().get('nonce')
436
558
repository_nonce = repository.control_files._lock.peek().get('nonce')
437
559
self.assertEqual(
498
620
backing = self.get_readonly_transport()
499
621
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
500
622
branch = self.make_branch('.')
501
response = request.execute('')
623
root = self.get_transport().clone('/')
624
path = urlutils.relative_url(root.base, self.get_transport().base)
625
response = request.execute(path)
502
626
error_name, lock_str, why_str = response.args
503
627
self.assertFalse(response.is_successful())
504
628
self.assertEqual('LockFailed', error_name)
507
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
631
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithMemoryTransport):
510
tests.TestCaseWithTransport.setUp(self)
511
self.reduceLockdirTimeout()
634
tests.TestCaseWithMemoryTransport.setUp(self)
513
636
def test_unlock_on_locked_branch_and_repo(self):
514
637
backing = self.get_transport()
745
868
request = smart.repository.SmartServerRepositoryIsShared(backing)
746
869
self.make_repository('.', shared=False)
747
870
self.assertEqual(SmartServerResponse(('no', )),
748
request.execute(backing.local_abspath(''), ))
751
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
871
request.execute('', ))
874
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithMemoryTransport):
754
tests.TestCaseWithTransport.setUp(self)
755
self.reduceLockdirTimeout()
877
tests.TestCaseWithMemoryTransport.setUp(self)
757
879
def test_lock_write_on_unlocked_repo(self):
758
880
backing = self.get_transport()
759
881
request = smart.repository.SmartServerRepositoryLockWrite(backing)
760
882
repository = self.make_repository('.', format='knit')
761
response = request.execute(backing.local_abspath(''))
883
response = request.execute('')
762
884
nonce = repository.control_files._lock.peek().get('nonce')
763
885
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
764
886
# The repository is now locked. Verify that with a new repository