1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protococl."""
19
from bzrlib import bzrdir, errors, smart, tests
20
from bzrlib.smart.request import SmartServerResponse
21
import bzrlib.smart.bzrdir
22
import bzrlib.smart.branch
23
import bzrlib.smart.repository
26
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
29
super(TestCaseWithSmartMedium, self).setUp()
30
# We're allowed to set the transport class here, so that we don't use
31
# the default or a parameterized class, but rather use the
32
# TestCaseWithTransport infrastructure to set up a smart server and
34
self.transport_server = smart.server.SmartTCPServer_for_testing
36
def get_smart_medium(self):
37
"""Get a smart medium to use in tests."""
38
return self.get_transport().get_smart_medium()
41
class TestSmartServerResponse(tests.TestCase):
44
self.assertEqual(SmartServerResponse(('ok', )),
45
SmartServerResponse(('ok', )))
46
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
47
SmartServerResponse(('ok', ), 'body'))
48
self.assertNotEqual(SmartServerResponse(('ok', )),
49
SmartServerResponse(('notok', )))
50
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
51
SmartServerResponse(('ok', )))
52
self.assertNotEqual(None,
53
SmartServerResponse(('ok', )))
56
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
58
def test_no_repository(self):
59
"""When there is no repository to be found, ('norepository', ) is returned."""
60
backing = self.get_transport()
61
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
63
self.assertEqual(SmartServerResponse(('norepository', )),
64
request.execute(backing.local_abspath('')))
66
def test_nonshared_repository(self):
67
# nonshared repositorys only allow 'find' to return a handle when the
68
# path the repository is being searched on is the same as that that
69
# the repository is at.
70
backing = self.get_transport()
71
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
72
self.make_repository('.')
73
self.assertEqual(SmartServerResponse(('ok', '')),
74
request.execute(backing.local_abspath('')))
75
self.make_bzrdir('subdir')
76
self.assertEqual(SmartServerResponse(('norepository', )),
77
request.execute(backing.local_abspath('subdir')))
79
def test_shared_repository(self):
80
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
81
backing = self.get_transport()
82
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
83
self.make_repository('.', shared=True)
84
self.assertEqual(SmartServerResponse(('ok', '')),
85
request.execute(backing.local_abspath('')))
86
self.make_bzrdir('subdir')
87
self.assertEqual(SmartServerResponse(('ok', '..')),
88
request.execute(backing.local_abspath('subdir')))
89
self.make_bzrdir('subdir/deeper')
90
self.assertEqual(SmartServerResponse(('ok', '../..')),
91
request.execute(backing.local_abspath('subdir/deeper')))
94
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
96
def test_empty_dir(self):
97
"""Initializing an empty dir should succeed and do it."""
98
backing = self.get_transport()
99
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
100
self.assertEqual(SmartServerResponse(('ok', )),
101
request.execute(backing.local_abspath('.')))
102
made_dir = bzrdir.BzrDir.open_from_transport(backing)
103
# no branch, tree or repository is expected with the current
105
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
106
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
107
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
109
def test_missing_dir(self):
110
"""Initializing a missing directory should fail like the bzrdir api."""
111
backing = self.get_transport()
112
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
113
self.assertRaises(errors.NoSuchFile,
114
request.execute, backing.local_abspath('subdir'))
116
def test_initialized_dir(self):
117
"""Initializing an extant bzrdir should fail like the bzrdir api."""
118
backing = self.get_transport()
119
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
120
self.make_bzrdir('subdir')
121
self.assertRaises(errors.FileExists,
122
request.execute, backing.local_abspath('subdir'))
125
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
127
def test_no_branch(self):
128
"""When there is no branch, ('nobranch', ) is returned."""
129
backing = self.get_transport()
130
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
131
self.make_bzrdir('.')
132
self.assertEqual(SmartServerResponse(('nobranch', )),
133
request.execute(backing.local_abspath('')))
135
def test_branch(self):
136
"""When there is a branch, 'ok' is returned."""
137
backing = self.get_transport()
138
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
139
self.make_branch('.')
140
self.assertEqual(SmartServerResponse(('ok', '')),
141
request.execute(backing.local_abspath('')))
143
def test_branch_reference(self):
144
"""When there is a branch reference, the reference URL is returned."""
145
backing = self.get_transport()
146
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
147
branch = self.make_branch('branch')
148
checkout = branch.create_checkout('reference',lightweight=True)
149
# TODO: once we have an API to probe for references of any sort, we
151
reference_url = backing.abspath('branch') + '/'
152
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
153
self.assertEqual(SmartServerResponse(('ok', reference_url)),
154
request.execute(backing.local_abspath('reference')))
157
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
159
def test_empty(self):
160
"""For an empty branch, the body is empty."""
161
backing = self.get_transport()
162
request = smart.branch.SmartServerRequestRevisionHistory(backing)
163
self.make_branch('.')
164
self.assertEqual(SmartServerResponse(('ok', ), ''),
165
request.execute(backing.local_abspath('')))
167
def test_not_empty(self):
168
"""For a non-empty branch, the body is empty."""
169
backing = self.get_transport()
170
request = smart.branch.SmartServerRequestRevisionHistory(backing)
171
tree = self.make_branch_and_memory_tree('.')
174
r1 = tree.commit('1st commit')
175
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
178
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
179
request.execute(backing.local_abspath('')))
182
class TestSmartServerBranchRequest(tests.TestCaseWithTransport):
184
def test_no_branch(self):
185
"""When there is a bzrdir and no branch, NotBranchError is raised."""
186
backing = self.get_transport()
187
request = smart.branch.SmartServerBranchRequest(backing)
188
self.make_bzrdir('.')
189
self.assertRaises(errors.NotBranchError,
190
request.execute, backing.local_abspath(''))
192
def test_branch_reference(self):
193
"""When there is a branch reference, NotBranchError is raised."""
194
backing = self.get_transport()
195
request = smart.branch.SmartServerBranchRequest(backing)
196
branch = self.make_branch('branch')
197
checkout = branch.create_checkout('reference',lightweight=True)
198
self.assertRaises(errors.NotBranchError,
199
request.execute, backing.local_abspath('checkout'))
202
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithTransport):
204
def test_empty(self):
205
"""For an empty branch, the result is ('ok', '0', '')."""
206
backing = self.get_transport()
207
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
208
self.make_branch('.')
209
self.assertEqual(SmartServerResponse(('ok', '0', '')),
210
request.execute(backing.local_abspath('')))
212
def test_not_empty(self):
213
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
214
backing = self.get_transport()
215
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
216
tree = self.make_branch_and_memory_tree('.')
220
rev_id_utf8 = rev_id.encode('utf-8')
221
r1 = tree.commit('1st commit')
222
r2 = tree.commit('2nd commit', rev_id=rev_id)
225
SmartServerResponse(('ok', '2', rev_id_utf8)),
226
request.execute(backing.local_abspath('')))
229
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithTransport):
231
def test_default(self):
232
"""With no file, we get empty content."""
233
backing = self.get_transport()
234
request = smart.branch.SmartServerBranchGetConfigFile(backing)
235
branch = self.make_branch('.')
236
# there should be no file by default
238
self.assertEqual(SmartServerResponse(('ok', ), content),
239
request.execute(backing.local_abspath('')))
241
def test_with_content(self):
242
# SmartServerBranchGetConfigFile should return the content from
243
# branch.control_files.get('branch.conf') for now - in the future it may
244
# perform more complex processing.
245
backing = self.get_transport()
246
request = smart.branch.SmartServerBranchGetConfigFile(backing)
247
branch = self.make_branch('.')
248
branch.control_files.put_utf8('branch.conf', 'foo bar baz')
249
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
250
request.execute(backing.local_abspath('')))
253
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithTransport):
255
def test_empty(self):
256
backing = self.get_transport()
257
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
258
b = self.make_branch('.')
259
branch_token, repo_token = b.lock_write()
261
self.assertEqual(SmartServerResponse(('ok',)),
263
backing.local_abspath(''), branch_token, repo_token, ''))
267
def test_not_present_revision_id(self):
268
backing = self.get_transport()
269
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
270
b = self.make_branch('.')
271
branch_token, repo_token = b.lock_write()
273
revision_id = 'non-existent revision'
275
SmartServerResponse(('NoSuchRevision', revision_id)),
277
backing.local_abspath(''), branch_token, repo_token,
282
def test_revision_id_present(self):
283
backing = self.get_transport()
284
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
285
tree = self.make_branch_and_memory_tree('.')
289
rev_id_utf8 = rev_id.encode('utf-8')
290
r1 = tree.commit('1st commit', rev_id=rev_id)
291
r2 = tree.commit('2nd commit')
293
branch_token, repo_token = tree.branch.lock_write()
296
SmartServerResponse(('ok',)),
298
backing.local_abspath(''), branch_token, repo_token,
300
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
304
def test_revision_id_present2(self):
305
backing = self.get_transport()
306
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
307
tree = self.make_branch_and_memory_tree('.')
311
rev_id_utf8 = rev_id.encode('utf-8')
312
r1 = tree.commit('1st commit', rev_id=rev_id)
313
r2 = tree.commit('2nd commit')
315
tree.branch.set_revision_history([])
316
branch_token, repo_token = tree.branch.lock_write()
319
SmartServerResponse(('ok',)),
321
backing.local_abspath(''), branch_token, repo_token,
323
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
328
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
331
tests.TestCaseWithTransport.setUp(self)
332
self.reduceLockdirTimeout()
334
def test_lock_write_on_unlocked_branch(self):
335
backing = self.get_transport()
336
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
337
branch = self.make_branch('.')
338
repository = branch.repository
339
response = request.execute(backing.local_abspath(''))
340
branch_nonce = branch.control_files._lock.peek().get('nonce')
341
repository_nonce = repository.control_files._lock.peek().get('nonce')
343
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
345
# The branch (and associated repository) is now locked. Verify that
346
# with a new branch object.
347
new_branch = repository.bzrdir.open_branch()
348
self.assertRaises(errors.LockContention, new_branch.lock_write)
350
def test_lock_write_on_locked_branch(self):
351
backing = self.get_transport()
352
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
353
branch = self.make_branch('.')
355
branch.leave_lock_in_place()
357
response = request.execute(backing.local_abspath(''))
359
SmartServerResponse(('LockContention',)), response)
361
def test_lock_write_with_tokens_on_locked_branch(self):
362
backing = self.get_transport()
363
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
364
branch = self.make_branch('.')
365
branch_token, repo_token = branch.lock_write()
366
branch.leave_lock_in_place()
367
branch.repository.leave_lock_in_place()
369
response = request.execute(backing.local_abspath(''),
370
branch_token, repo_token)
372
SmartServerResponse(('ok', branch_token, repo_token)), response)
374
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
375
backing = self.get_transport()
376
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
377
branch = self.make_branch('.')
378
branch_token, repo_token = branch.lock_write()
379
branch.leave_lock_in_place()
380
branch.repository.leave_lock_in_place()
382
response = request.execute(backing.local_abspath(''),
383
branch_token+'xxx', repo_token)
385
SmartServerResponse(('TokenMismatch',)), response)
387
def test_lock_write_on_locked_repo(self):
388
backing = self.get_transport()
389
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
390
branch = self.make_branch('.')
391
branch.repository.lock_write()
392
branch.repository.leave_lock_in_place()
393
branch.repository.unlock()
394
response = request.execute(backing.local_abspath(''))
396
SmartServerResponse(('LockContention',)), response)
398
def test_lock_write_on_readonly_transport(self):
399
backing = self.get_readonly_transport()
400
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
401
branch = self.make_branch('.')
402
response = request.execute('')
404
SmartServerResponse(('UnlockableTransport',)), response)
407
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
410
tests.TestCaseWithTransport.setUp(self)
411
self.reduceLockdirTimeout()
413
def test_unlock_on_locked_branch_and_repo(self):
414
backing = self.get_transport()
415
request = smart.branch.SmartServerBranchRequestUnlock(backing)
416
branch = self.make_branch('.')
418
branch_token, repo_token = branch.lock_write()
419
# Unlock the branch (and repo) object, leaving the physical locks
421
branch.leave_lock_in_place()
422
branch.repository.leave_lock_in_place()
424
response = request.execute(backing.local_abspath(''),
425
branch_token, repo_token)
427
SmartServerResponse(('ok',)), response)
428
# The branch is now unlocked. Verify that with a new branch
430
new_branch = branch.bzrdir.open_branch()
431
new_branch.lock_write()
434
def test_unlock_on_unlocked_branch_unlocked_repo(self):
435
backing = self.get_transport()
436
request = smart.branch.SmartServerBranchRequestUnlock(backing)
437
branch = self.make_branch('.')
438
response = request.execute(
439
backing.local_abspath(''), 'branch token', 'repo token')
441
SmartServerResponse(('TokenMismatch',)), response)
443
def test_unlock_on_unlocked_branch_locked_repo(self):
444
backing = self.get_transport()
445
request = smart.branch.SmartServerBranchRequestUnlock(backing)
446
branch = self.make_branch('.')
447
# Lock the repository.
448
repo_token = branch.repository.lock_write()
449
branch.repository.leave_lock_in_place()
450
branch.repository.unlock()
451
# Issue branch lock_write request on the unlocked branch (with locked
453
response = request.execute(
454
backing.local_abspath(''), 'branch token', repo_token)
456
SmartServerResponse(('TokenMismatch',)), response)
459
class TestSmartServerRepositoryRequest(tests.TestCaseWithTransport):
461
def test_no_repository(self):
462
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
463
# we test this using a shared repository above the named path,
464
# thus checking the right search logic is used - that is, that
465
# its the exact path being looked at and the server is not
467
backing = self.get_transport()
468
request = smart.repository.SmartServerRepositoryRequest(backing)
469
self.make_repository('.', shared=True)
470
self.make_bzrdir('subdir')
471
self.assertRaises(errors.NoRepositoryPresent,
472
request.execute, backing.local_abspath('subdir'))
475
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
477
def test_none_argument(self):
478
backing = self.get_transport()
479
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
480
tree = self.make_branch_and_memory_tree('.')
483
r1 = tree.commit('1st commit')
484
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
487
# the lines of revision_id->revision_parent_list has no guaranteed
488
# order coming out of a dict, so sort both our test and response
489
lines = sorted([' '.join([r2, r1]), r1])
490
response = request.execute(backing.local_abspath(''), '')
491
response.body = '\n'.join(sorted(response.body.split('\n')))
494
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
496
def test_specific_revision_argument(self):
497
backing = self.get_transport()
498
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
499
tree = self.make_branch_and_memory_tree('.')
502
r1 = tree.commit('1st commit', rev_id=u'\xc9')
503
r2 = tree.commit('2nd commit', rev_id=u'\xc8')
506
self.assertEqual(SmartServerResponse(('ok', ),
507
u'\xc9'.encode('utf8')),
508
request.execute(backing.local_abspath(''), u'\xc9'.encode('utf8')))
510
def test_no_such_revision(self):
511
backing = self.get_transport()
512
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
513
tree = self.make_branch_and_memory_tree('.')
516
r1 = tree.commit('1st commit')
519
# Note that it still returns body (of zero bytes).
521
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
522
request.execute(backing.local_abspath(''), 'missingrevision'))
525
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
527
def test_missing_revision(self):
528
"""For a missing revision, ('no', ) is returned."""
529
backing = self.get_transport()
530
request = smart.repository.SmartServerRequestHasRevision(backing)
531
self.make_repository('.')
532
self.assertEqual(SmartServerResponse(('no', )),
533
request.execute(backing.local_abspath(''), 'revid'))
535
def test_present_revision(self):
536
"""For a present revision, ('ok', ) is returned."""
537
backing = self.get_transport()
538
request = smart.repository.SmartServerRequestHasRevision(backing)
539
tree = self.make_branch_and_memory_tree('.')
542
r1 = tree.commit('a commit', rev_id=u'\xc8abc')
544
self.assertTrue(tree.branch.repository.has_revision(u'\xc8abc'))
545
self.assertEqual(SmartServerResponse(('ok', )),
546
request.execute(backing.local_abspath(''),
547
u'\xc8abc'.encode('utf8')))
550
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithTransport):
552
def test_empty_revid(self):
553
"""With an empty revid, we get only size an number and revisions"""
554
backing = self.get_transport()
555
request = smart.repository.SmartServerRepositoryGatherStats(backing)
556
repository = self.make_repository('.')
557
stats = repository.gather_stats()
559
expected_body = 'revisions: 0\nsize: %d\n' % size
560
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
561
request.execute(backing.local_abspath(''), '', 'no'))
563
def test_revid_with_committers(self):
564
"""For a revid we get more infos."""
565
backing = self.get_transport()
567
request = smart.repository.SmartServerRepositoryGatherStats(backing)
568
tree = self.make_branch_and_memory_tree('.')
571
# Let's build a predictable result
572
tree.commit('a commit', timestamp=123456.2, timezone=3600)
573
tree.commit('a commit', timestamp=654321.4, timezone=0, rev_id=rev_id)
576
stats = tree.branch.repository.gather_stats()
578
expected_body = ('firstrev: 123456.200 3600\n'
579
'latestrev: 654321.400 0\n'
582
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
583
request.execute(backing.local_abspath(''),
584
rev_id.encode('utf8'), 'no'))
586
def test_not_empty_repository_with_committers(self):
587
"""For a revid and requesting committers we get the whole thing."""
588
backing = self.get_transport()
590
request = smart.repository.SmartServerRepositoryGatherStats(backing)
591
tree = self.make_branch_and_memory_tree('.')
594
# Let's build a predictable result
595
tree.commit('a commit', timestamp=123456.2, timezone=3600,
597
tree.commit('a commit', timestamp=654321.4, timezone=0,
598
committer='bar', rev_id=rev_id)
600
stats = tree.branch.repository.gather_stats()
603
expected_body = ('committers: 2\n'
604
'firstrev: 123456.200 3600\n'
605
'latestrev: 654321.400 0\n'
608
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
609
request.execute(backing.local_abspath(''),
610
rev_id.encode('utf8'), 'yes'))
613
class TestSmartServerRepositoryIsShared(tests.TestCaseWithTransport):
615
def test_is_shared(self):
616
"""For a shared repository, ('yes', ) is returned."""
617
backing = self.get_transport()
618
request = smart.repository.SmartServerRepositoryIsShared(backing)
619
self.make_repository('.', shared=True)
620
self.assertEqual(SmartServerResponse(('yes', )),
621
request.execute(backing.local_abspath(''), ))
623
def test_is_not_shared(self):
624
"""For a shared repository, ('no', ) is returned."""
625
backing = self.get_transport()
626
request = smart.repository.SmartServerRepositoryIsShared(backing)
627
self.make_repository('.', shared=False)
628
self.assertEqual(SmartServerResponse(('no', )),
629
request.execute(backing.local_abspath(''), ))
632
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
635
tests.TestCaseWithTransport.setUp(self)
636
self.reduceLockdirTimeout()
638
def test_lock_write_on_unlocked_repo(self):
639
backing = self.get_transport()
640
request = smart.repository.SmartServerRepositoryLockWrite(backing)
641
repository = self.make_repository('.')
642
response = request.execute(backing.local_abspath(''))
643
nonce = repository.control_files._lock.peek().get('nonce')
644
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
645
# The repository is now locked. Verify that with a new repository
647
new_repo = repository.bzrdir.open_repository()
648
self.assertRaises(errors.LockContention, new_repo.lock_write)
650
def test_lock_write_on_locked_repo(self):
651
backing = self.get_transport()
652
request = smart.repository.SmartServerRepositoryLockWrite(backing)
653
repository = self.make_repository('.')
654
repository.lock_write()
655
repository.leave_lock_in_place()
657
response = request.execute(backing.local_abspath(''))
659
SmartServerResponse(('LockContention',)), response)
661
def test_lock_write_on_readonly_transport(self):
662
backing = self.get_readonly_transport()
663
request = smart.repository.SmartServerRepositoryLockWrite(backing)
664
repository = self.make_repository('.')
665
response = request.execute('')
667
SmartServerResponse(('UnlockableTransport',)), response)
670
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
673
tests.TestCaseWithTransport.setUp(self)
674
self.reduceLockdirTimeout()
676
def test_unlock_on_locked_repo(self):
677
backing = self.get_transport()
678
request = smart.repository.SmartServerRepositoryUnlock(backing)
679
repository = self.make_repository('.')
680
token = repository.lock_write()
681
repository.leave_lock_in_place()
683
response = request.execute(backing.local_abspath(''), token)
685
SmartServerResponse(('ok',)), response)
686
# The repository is now unlocked. Verify that with a new repository
688
new_repo = repository.bzrdir.open_repository()
689
new_repo.lock_write()
692
def test_unlock_on_unlocked_repo(self):
693
backing = self.get_transport()
694
request = smart.repository.SmartServerRepositoryUnlock(backing)
695
repository = self.make_repository('.')
696
response = request.execute(backing.local_abspath(''), 'some token')
698
SmartServerResponse(('TokenMismatch',)), response)
701
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
703
def test_is_readonly_no(self):
704
backing = self.get_transport()
705
request = smart.request.SmartServerIsReadonly(backing)
706
response = request.execute()
708
SmartServerResponse(('no',)), response)
710
def test_is_readonly_yes(self):
711
backing = self.get_readonly_transport()
712
request = smart.request.SmartServerIsReadonly(backing)
713
response = request.execute()
715
SmartServerResponse(('yes',)), response)
718
class TestHandlers(tests.TestCase):
719
"""Tests for the request.request_handlers object."""
721
def test_registered_methods(self):
722
"""Test that known methods are registered to the correct object."""
724
smart.request.request_handlers.get('Branch.get_config_file'),
725
smart.branch.SmartServerBranchGetConfigFile)
727
smart.request.request_handlers.get('Branch.lock_write'),
728
smart.branch.SmartServerBranchRequestLockWrite)
730
smart.request.request_handlers.get('Branch.last_revision_info'),
731
smart.branch.SmartServerBranchRequestLastRevisionInfo)
733
smart.request.request_handlers.get('Branch.revision_history'),
734
smart.branch.SmartServerRequestRevisionHistory)
736
smart.request.request_handlers.get('Branch.set_last_revision'),
737
smart.branch.SmartServerBranchRequestSetLastRevision)
739
smart.request.request_handlers.get('Branch.unlock'),
740
smart.branch.SmartServerBranchRequestUnlock)
742
smart.request.request_handlers.get('BzrDir.find_repository'),
743
smart.bzrdir.SmartServerRequestFindRepository)
745
smart.request.request_handlers.get('BzrDirFormat.initialize'),
746
smart.bzrdir.SmartServerRequestInitializeBzrDir)
748
smart.request.request_handlers.get('BzrDir.open_branch'),
749
smart.bzrdir.SmartServerRequestOpenBranch)
751
smart.request.request_handlers.get('Repository.gather_stats'),
752
smart.repository.SmartServerRepositoryGatherStats)
754
smart.request.request_handlers.get('Repository.get_revision_graph'),
755
smart.repository.SmartServerRepositoryGetRevisionGraph)
757
smart.request.request_handlers.get('Repository.has_revision'),
758
smart.repository.SmartServerRequestHasRevision)
760
smart.request.request_handlers.get('Repository.is_shared'),
761
smart.repository.SmartServerRepositoryIsShared)
763
smart.request.request_handlers.get('Repository.lock_write'),
764
smart.repository.SmartServerRepositoryLockWrite)
766
smart.request.request_handlers.get('Repository.unlock'),
767
smart.repository.SmartServerRepositoryUnlock)
769
smart.request.request_handlers.get('Transport.is_readonly'),
770
smart.request.SmartServerIsReadonly)