1
# Copyright (C) 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the smart wire/domain protocol."""
19
from StringIO import StringIO
23
from bzrlib import bzrdir, errors, smart, tests
24
from bzrlib.smart.request import SmartServerResponse
25
import bzrlib.smart.bzrdir
26
import bzrlib.smart.branch
27
import bzrlib.smart.repository
30
class TestCaseWithSmartMedium(tests.TestCaseWithTransport):
33
super(TestCaseWithSmartMedium, self).setUp()
34
# We're allowed to set the transport class here, so that we don't use
35
# the default or a parameterized class, but rather use the
36
# TestCaseWithTransport infrastructure to set up a smart server and
38
self.transport_server = smart.server.SmartTCPServer_for_testing
40
def get_smart_medium(self):
41
"""Get a smart medium to use in tests."""
42
return self.get_transport().get_smart_medium()
45
class TestSmartServerResponse(tests.TestCase):
48
self.assertEqual(SmartServerResponse(('ok', )),
49
SmartServerResponse(('ok', )))
50
self.assertEqual(SmartServerResponse(('ok', ), 'body'),
51
SmartServerResponse(('ok', ), 'body'))
52
self.assertNotEqual(SmartServerResponse(('ok', )),
53
SmartServerResponse(('notok', )))
54
self.assertNotEqual(SmartServerResponse(('ok', ), 'body'),
55
SmartServerResponse(('ok', )))
56
self.assertNotEqual(None,
57
SmartServerResponse(('ok', )))
60
class TestSmartServerRequestFindRepository(tests.TestCaseWithTransport):
61
"""Tests for BzrDir.find_repository."""
63
def test_no_repository(self):
64
"""When there is no repository to be found, ('norepository', ) is returned."""
65
backing = self.get_transport()
66
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
68
self.assertEqual(SmartServerResponse(('norepository', )),
69
request.execute(backing.local_abspath('')))
71
def test_nonshared_repository(self):
72
# nonshared repositorys only allow 'find' to return a handle when the
73
# path the repository is being searched on is the same as that that
74
# the repository is at.
75
backing = self.get_transport()
76
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
77
result = self._make_repository_and_result()
78
self.assertEqual(result, request.execute(backing.local_abspath('')))
79
self.make_bzrdir('subdir')
80
self.assertEqual(SmartServerResponse(('norepository', )),
81
request.execute(backing.local_abspath('subdir')))
83
def _make_repository_and_result(self, shared=False, format=None):
84
"""Convenience function to setup a repository.
86
:result: The SmartServerResponse to expect when opening it.
88
repo = self.make_repository('.', shared=shared, format=format)
89
if repo.supports_rich_root():
93
if repo._format.supports_tree_reference:
97
return SmartServerResponse(('ok', '', rich_root, subtrees))
99
def test_shared_repository(self):
100
"""When there is a shared repository, we get 'ok', 'relpath-to-repo'."""
101
backing = self.get_transport()
102
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
103
result = self._make_repository_and_result(shared=True)
104
self.assertEqual(result, request.execute(backing.local_abspath('')))
105
self.make_bzrdir('subdir')
106
result2 = SmartServerResponse(result.args[0:1] + ('..', ) + result.args[2:])
107
self.assertEqual(result2,
108
request.execute(backing.local_abspath('subdir')))
109
self.make_bzrdir('subdir/deeper')
110
result3 = SmartServerResponse(result.args[0:1] + ('../..', ) + result.args[2:])
111
self.assertEqual(result3,
112
request.execute(backing.local_abspath('subdir/deeper')))
114
def test_rich_root_and_subtree_encoding(self):
115
"""Test for the format attributes for rich root and subtree support."""
116
backing = self.get_transport()
117
request = smart.bzrdir.SmartServerRequestFindRepository(backing)
118
result = self._make_repository_and_result(format='dirstate-with-subtree')
119
# check the test will be valid
120
self.assertEqual('yes', result.args[2])
121
self.assertEqual('yes', result.args[3])
122
self.assertEqual(result, request.execute(backing.local_abspath('')))
125
class TestSmartServerRequestInitializeBzrDir(tests.TestCaseWithTransport):
127
def test_empty_dir(self):
128
"""Initializing an empty dir should succeed and do it."""
129
backing = self.get_transport()
130
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
131
self.assertEqual(SmartServerResponse(('ok', )),
132
request.execute(backing.local_abspath('.')))
133
made_dir = bzrdir.BzrDir.open_from_transport(backing)
134
# no branch, tree or repository is expected with the current
136
self.assertRaises(errors.NoWorkingTree, made_dir.open_workingtree)
137
self.assertRaises(errors.NotBranchError, made_dir.open_branch)
138
self.assertRaises(errors.NoRepositoryPresent, made_dir.open_repository)
140
def test_missing_dir(self):
141
"""Initializing a missing directory should fail like the bzrdir api."""
142
backing = self.get_transport()
143
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
144
self.assertRaises(errors.NoSuchFile,
145
request.execute, backing.local_abspath('subdir'))
147
def test_initialized_dir(self):
148
"""Initializing an extant bzrdir should fail like the bzrdir api."""
149
backing = self.get_transport()
150
request = smart.bzrdir.SmartServerRequestInitializeBzrDir(backing)
151
self.make_bzrdir('subdir')
152
self.assertRaises(errors.FileExists,
153
request.execute, backing.local_abspath('subdir'))
156
class TestSmartServerRequestOpenBranch(tests.TestCaseWithTransport):
158
def test_no_branch(self):
159
"""When there is no branch, ('nobranch', ) is returned."""
160
backing = self.get_transport()
161
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
162
self.make_bzrdir('.')
163
self.assertEqual(SmartServerResponse(('nobranch', )),
164
request.execute(backing.local_abspath('')))
166
def test_branch(self):
167
"""When there is a branch, 'ok' is returned."""
168
backing = self.get_transport()
169
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
170
self.make_branch('.')
171
self.assertEqual(SmartServerResponse(('ok', '')),
172
request.execute(backing.local_abspath('')))
174
def test_branch_reference(self):
175
"""When there is a branch reference, the reference URL is returned."""
176
backing = self.get_transport()
177
request = smart.bzrdir.SmartServerRequestOpenBranch(backing)
178
branch = self.make_branch('branch')
179
checkout = branch.create_checkout('reference',lightweight=True)
180
# TODO: once we have an API to probe for references of any sort, we
182
reference_url = backing.abspath('branch') + '/'
183
self.assertFileEqual(reference_url, 'reference/.bzr/branch/location')
184
self.assertEqual(SmartServerResponse(('ok', reference_url)),
185
request.execute(backing.local_abspath('reference')))
188
class TestSmartServerRequestRevisionHistory(tests.TestCaseWithTransport):
190
def test_empty(self):
191
"""For an empty branch, the body is empty."""
192
backing = self.get_transport()
193
request = smart.branch.SmartServerRequestRevisionHistory(backing)
194
self.make_branch('.')
195
self.assertEqual(SmartServerResponse(('ok', ), ''),
196
request.execute(backing.local_abspath('')))
198
def test_not_empty(self):
199
"""For a non-empty branch, the body is empty."""
200
backing = self.get_transport()
201
request = smart.branch.SmartServerRequestRevisionHistory(backing)
202
tree = self.make_branch_and_memory_tree('.')
205
r1 = tree.commit('1st commit')
206
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
209
SmartServerResponse(('ok', ), ('\x00'.join([r1, r2]))),
210
request.execute(backing.local_abspath('')))
213
class TestSmartServerBranchRequest(tests.TestCaseWithTransport):
215
def test_no_branch(self):
216
"""When there is a bzrdir and no branch, NotBranchError is raised."""
217
backing = self.get_transport()
218
request = smart.branch.SmartServerBranchRequest(backing)
219
self.make_bzrdir('.')
220
self.assertRaises(errors.NotBranchError,
221
request.execute, backing.local_abspath(''))
223
def test_branch_reference(self):
224
"""When there is a branch reference, NotBranchError is raised."""
225
backing = self.get_transport()
226
request = smart.branch.SmartServerBranchRequest(backing)
227
branch = self.make_branch('branch')
228
checkout = branch.create_checkout('reference',lightweight=True)
229
self.assertRaises(errors.NotBranchError,
230
request.execute, backing.local_abspath('checkout'))
233
class TestSmartServerBranchRequestLastRevisionInfo(tests.TestCaseWithTransport):
235
def test_empty(self):
236
"""For an empty branch, the result is ('ok', '0', 'null:')."""
237
backing = self.get_transport()
238
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
239
self.make_branch('.')
240
self.assertEqual(SmartServerResponse(('ok', '0', 'null:')),
241
request.execute(backing.local_abspath('')))
243
def test_not_empty(self):
244
"""For a non-empty branch, the result is ('ok', 'revno', 'revid')."""
245
backing = self.get_transport()
246
request = smart.branch.SmartServerBranchRequestLastRevisionInfo(backing)
247
tree = self.make_branch_and_memory_tree('.')
250
rev_id_utf8 = u'\xc8'.encode('utf-8')
251
r1 = tree.commit('1st commit')
252
r2 = tree.commit('2nd commit', rev_id=rev_id_utf8)
255
SmartServerResponse(('ok', '2', rev_id_utf8)),
256
request.execute(backing.local_abspath('')))
259
class TestSmartServerBranchRequestGetConfigFile(tests.TestCaseWithTransport):
261
def test_default(self):
262
"""With no file, we get empty content."""
263
backing = self.get_transport()
264
request = smart.branch.SmartServerBranchGetConfigFile(backing)
265
branch = self.make_branch('.')
266
# there should be no file by default
268
self.assertEqual(SmartServerResponse(('ok', ), content),
269
request.execute(backing.local_abspath('')))
271
def test_with_content(self):
272
# SmartServerBranchGetConfigFile should return the content from
273
# branch.control_files.get('branch.conf') for now - in the future it may
274
# perform more complex processing.
275
backing = self.get_transport()
276
request = smart.branch.SmartServerBranchGetConfigFile(backing)
277
branch = self.make_branch('.')
278
branch.control_files.put_utf8('branch.conf', 'foo bar baz')
279
self.assertEqual(SmartServerResponse(('ok', ), 'foo bar baz'),
280
request.execute(backing.local_abspath('')))
283
class TestSmartServerBranchRequestSetLastRevision(tests.TestCaseWithTransport):
285
def test_empty(self):
286
backing = self.get_transport()
287
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
288
b = self.make_branch('.')
289
branch_token = b.lock_write()
290
repo_token = b.repository.lock_write()
291
b.repository.unlock()
293
self.assertEqual(SmartServerResponse(('ok',)),
295
backing.local_abspath(''), branch_token, repo_token,
300
def test_not_present_revision_id(self):
301
backing = self.get_transport()
302
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
303
b = self.make_branch('.')
304
branch_token = b.lock_write()
305
repo_token = b.repository.lock_write()
306
b.repository.unlock()
308
revision_id = 'non-existent revision'
310
SmartServerResponse(('NoSuchRevision', revision_id)),
312
backing.local_abspath(''), branch_token, repo_token,
317
def test_revision_id_present(self):
318
backing = self.get_transport()
319
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
320
tree = self.make_branch_and_memory_tree('.')
323
rev_id_utf8 = u'\xc8'.encode('utf-8')
324
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
325
r2 = tree.commit('2nd commit')
327
branch_token = tree.branch.lock_write()
328
repo_token = tree.branch.repository.lock_write()
329
tree.branch.repository.unlock()
332
SmartServerResponse(('ok',)),
334
backing.local_abspath(''), branch_token, repo_token,
336
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
340
def test_revision_id_present2(self):
341
backing = self.get_transport()
342
request = smart.branch.SmartServerBranchRequestSetLastRevision(backing)
343
tree = self.make_branch_and_memory_tree('.')
346
rev_id_utf8 = u'\xc8'.encode('utf-8')
347
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
348
r2 = tree.commit('2nd commit')
350
tree.branch.set_revision_history([])
351
branch_token = tree.branch.lock_write()
352
repo_token = tree.branch.repository.lock_write()
353
tree.branch.repository.unlock()
356
SmartServerResponse(('ok',)),
358
backing.local_abspath(''), branch_token, repo_token,
360
self.assertEqual([rev_id_utf8], tree.branch.revision_history())
365
class TestSmartServerBranchRequestLockWrite(tests.TestCaseWithTransport):
368
tests.TestCaseWithTransport.setUp(self)
369
self.reduceLockdirTimeout()
371
def test_lock_write_on_unlocked_branch(self):
372
backing = self.get_transport()
373
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
374
branch = self.make_branch('.')
375
repository = branch.repository
376
response = request.execute(backing.local_abspath(''))
377
branch_nonce = branch.control_files._lock.peek().get('nonce')
378
repository_nonce = repository.control_files._lock.peek().get('nonce')
380
SmartServerResponse(('ok', branch_nonce, repository_nonce)),
382
# The branch (and associated repository) is now locked. Verify that
383
# with a new branch object.
384
new_branch = repository.bzrdir.open_branch()
385
self.assertRaises(errors.LockContention, new_branch.lock_write)
387
def test_lock_write_on_locked_branch(self):
388
backing = self.get_transport()
389
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
390
branch = self.make_branch('.')
392
branch.leave_lock_in_place()
394
response = request.execute(backing.local_abspath(''))
396
SmartServerResponse(('LockContention',)), response)
398
def test_lock_write_with_tokens_on_locked_branch(self):
399
backing = self.get_transport()
400
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
401
branch = self.make_branch('.')
402
branch_token = branch.lock_write()
403
repo_token = branch.repository.lock_write()
404
branch.repository.unlock()
405
branch.leave_lock_in_place()
406
branch.repository.leave_lock_in_place()
408
response = request.execute(backing.local_abspath(''),
409
branch_token, repo_token)
411
SmartServerResponse(('ok', branch_token, repo_token)), response)
413
def test_lock_write_with_mismatched_tokens_on_locked_branch(self):
414
backing = self.get_transport()
415
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
416
branch = self.make_branch('.')
417
branch_token = branch.lock_write()
418
repo_token = branch.repository.lock_write()
419
branch.repository.unlock()
420
branch.leave_lock_in_place()
421
branch.repository.leave_lock_in_place()
423
response = request.execute(backing.local_abspath(''),
424
branch_token+'xxx', repo_token)
426
SmartServerResponse(('TokenMismatch',)), response)
428
def test_lock_write_on_locked_repo(self):
429
backing = self.get_transport()
430
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
431
branch = self.make_branch('.')
432
branch.repository.lock_write()
433
branch.repository.leave_lock_in_place()
434
branch.repository.unlock()
435
response = request.execute(backing.local_abspath(''))
437
SmartServerResponse(('LockContention',)), response)
439
def test_lock_write_on_readonly_transport(self):
440
backing = self.get_readonly_transport()
441
request = smart.branch.SmartServerBranchRequestLockWrite(backing)
442
branch = self.make_branch('.')
443
response = request.execute('')
444
error_name, lock_str, why_str = response.args
445
self.assertFalse(response.is_successful())
446
self.assertEqual('LockFailed', error_name)
449
class TestSmartServerBranchRequestUnlock(tests.TestCaseWithTransport):
452
tests.TestCaseWithTransport.setUp(self)
453
self.reduceLockdirTimeout()
455
def test_unlock_on_locked_branch_and_repo(self):
456
backing = self.get_transport()
457
request = smart.branch.SmartServerBranchRequestUnlock(backing)
458
branch = self.make_branch('.')
460
branch_token = branch.lock_write()
461
repo_token = branch.repository.lock_write()
462
branch.repository.unlock()
463
# Unlock the branch (and repo) object, leaving the physical locks
465
branch.leave_lock_in_place()
466
branch.repository.leave_lock_in_place()
468
response = request.execute(backing.local_abspath(''),
469
branch_token, repo_token)
471
SmartServerResponse(('ok',)), response)
472
# The branch is now unlocked. Verify that with a new branch
474
new_branch = branch.bzrdir.open_branch()
475
new_branch.lock_write()
478
def test_unlock_on_unlocked_branch_unlocked_repo(self):
479
backing = self.get_transport()
480
request = smart.branch.SmartServerBranchRequestUnlock(backing)
481
branch = self.make_branch('.')
482
response = request.execute(
483
backing.local_abspath(''), 'branch token', 'repo token')
485
SmartServerResponse(('TokenMismatch',)), response)
487
def test_unlock_on_unlocked_branch_locked_repo(self):
488
backing = self.get_transport()
489
request = smart.branch.SmartServerBranchRequestUnlock(backing)
490
branch = self.make_branch('.')
491
# Lock the repository.
492
repo_token = branch.repository.lock_write()
493
branch.repository.leave_lock_in_place()
494
branch.repository.unlock()
495
# Issue branch lock_write request on the unlocked branch (with locked
497
response = request.execute(
498
backing.local_abspath(''), 'branch token', repo_token)
500
SmartServerResponse(('TokenMismatch',)), response)
503
class TestSmartServerRepositoryRequest(tests.TestCaseWithTransport):
505
def test_no_repository(self):
506
"""Raise NoRepositoryPresent when there is a bzrdir and no repo."""
507
# we test this using a shared repository above the named path,
508
# thus checking the right search logic is used - that is, that
509
# its the exact path being looked at and the server is not
511
backing = self.get_transport()
512
request = smart.repository.SmartServerRepositoryRequest(backing)
513
self.make_repository('.', shared=True)
514
self.make_bzrdir('subdir')
515
self.assertRaises(errors.NoRepositoryPresent,
516
request.execute, backing.local_abspath('subdir'))
519
class TestSmartServerRepositoryGetRevisionGraph(tests.TestCaseWithTransport):
521
def test_none_argument(self):
522
backing = self.get_transport()
523
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
524
tree = self.make_branch_and_memory_tree('.')
527
r1 = tree.commit('1st commit')
528
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
531
# the lines of revision_id->revision_parent_list has no guaranteed
532
# order coming out of a dict, so sort both our test and response
533
lines = sorted([' '.join([r2, r1]), r1])
534
response = request.execute(backing.local_abspath(''), '')
535
response.body = '\n'.join(sorted(response.body.split('\n')))
538
SmartServerResponse(('ok', ), '\n'.join(lines)), response)
540
def test_specific_revision_argument(self):
541
backing = self.get_transport()
542
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
543
tree = self.make_branch_and_memory_tree('.')
546
rev_id_utf8 = u'\xc9'.encode('utf-8')
547
r1 = tree.commit('1st commit', rev_id=rev_id_utf8)
548
r2 = tree.commit('2nd commit', rev_id=u'\xc8'.encode('utf-8'))
551
self.assertEqual(SmartServerResponse(('ok', ), rev_id_utf8),
552
request.execute(backing.local_abspath(''), rev_id_utf8))
554
def test_no_such_revision(self):
555
backing = self.get_transport()
556
request = smart.repository.SmartServerRepositoryGetRevisionGraph(backing)
557
tree = self.make_branch_and_memory_tree('.')
560
r1 = tree.commit('1st commit')
563
# Note that it still returns body (of zero bytes).
565
SmartServerResponse(('nosuchrevision', 'missingrevision', ), ''),
566
request.execute(backing.local_abspath(''), 'missingrevision'))
569
class TestSmartServerRequestHasRevision(tests.TestCaseWithTransport):
571
def test_missing_revision(self):
572
"""For a missing revision, ('no', ) is returned."""
573
backing = self.get_transport()
574
request = smart.repository.SmartServerRequestHasRevision(backing)
575
self.make_repository('.')
576
self.assertEqual(SmartServerResponse(('no', )),
577
request.execute(backing.local_abspath(''), 'revid'))
579
def test_present_revision(self):
580
"""For a present revision, ('yes', ) is returned."""
581
backing = self.get_transport()
582
request = smart.repository.SmartServerRequestHasRevision(backing)
583
tree = self.make_branch_and_memory_tree('.')
586
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
587
r1 = tree.commit('a commit', rev_id=rev_id_utf8)
589
self.assertTrue(tree.branch.repository.has_revision(rev_id_utf8))
590
self.assertEqual(SmartServerResponse(('yes', )),
591
request.execute(backing.local_abspath(''), rev_id_utf8))
594
class TestSmartServerRepositoryGatherStats(tests.TestCaseWithTransport):
596
def test_empty_revid(self):
597
"""With an empty revid, we get only size an number and revisions"""
598
backing = self.get_transport()
599
request = smart.repository.SmartServerRepositoryGatherStats(backing)
600
repository = self.make_repository('.')
601
stats = repository.gather_stats()
603
expected_body = 'revisions: 0\nsize: %d\n' % size
604
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
605
request.execute(backing.local_abspath(''), '', 'no'))
607
def test_revid_with_committers(self):
608
"""For a revid we get more infos."""
609
backing = self.get_transport()
610
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
611
request = smart.repository.SmartServerRepositoryGatherStats(backing)
612
tree = self.make_branch_and_memory_tree('.')
615
# Let's build a predictable result
616
tree.commit('a commit', timestamp=123456.2, timezone=3600)
617
tree.commit('a commit', timestamp=654321.4, timezone=0,
621
stats = tree.branch.repository.gather_stats()
623
expected_body = ('firstrev: 123456.200 3600\n'
624
'latestrev: 654321.400 0\n'
627
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
628
request.execute(backing.local_abspath(''),
631
def test_not_empty_repository_with_committers(self):
632
"""For a revid and requesting committers we get the whole thing."""
633
backing = self.get_transport()
634
rev_id_utf8 = u'\xc8abc'.encode('utf-8')
635
request = smart.repository.SmartServerRepositoryGatherStats(backing)
636
tree = self.make_branch_and_memory_tree('.')
639
# Let's build a predictable result
640
tree.commit('a commit', timestamp=123456.2, timezone=3600,
642
tree.commit('a commit', timestamp=654321.4, timezone=0,
643
committer='bar', rev_id=rev_id_utf8)
645
stats = tree.branch.repository.gather_stats()
648
expected_body = ('committers: 2\n'
649
'firstrev: 123456.200 3600\n'
650
'latestrev: 654321.400 0\n'
653
self.assertEqual(SmartServerResponse(('ok', ), expected_body),
654
request.execute(backing.local_abspath(''),
658
class TestSmartServerRepositoryIsShared(tests.TestCaseWithTransport):
660
def test_is_shared(self):
661
"""For a shared repository, ('yes', ) is returned."""
662
backing = self.get_transport()
663
request = smart.repository.SmartServerRepositoryIsShared(backing)
664
self.make_repository('.', shared=True)
665
self.assertEqual(SmartServerResponse(('yes', )),
666
request.execute(backing.local_abspath(''), ))
668
def test_is_not_shared(self):
669
"""For a shared repository, ('no', ) is returned."""
670
backing = self.get_transport()
671
request = smart.repository.SmartServerRepositoryIsShared(backing)
672
self.make_repository('.', shared=False)
673
self.assertEqual(SmartServerResponse(('no', )),
674
request.execute(backing.local_abspath(''), ))
677
class TestSmartServerRepositoryLockWrite(tests.TestCaseWithTransport):
680
tests.TestCaseWithTransport.setUp(self)
681
self.reduceLockdirTimeout()
683
def test_lock_write_on_unlocked_repo(self):
684
backing = self.get_transport()
685
request = smart.repository.SmartServerRepositoryLockWrite(backing)
686
repository = self.make_repository('.')
687
response = request.execute(backing.local_abspath(''))
688
nonce = repository.control_files._lock.peek().get('nonce')
689
self.assertEqual(SmartServerResponse(('ok', nonce)), response)
690
# The repository is now locked. Verify that with a new repository
692
new_repo = repository.bzrdir.open_repository()
693
self.assertRaises(errors.LockContention, new_repo.lock_write)
695
def test_lock_write_on_locked_repo(self):
696
backing = self.get_transport()
697
request = smart.repository.SmartServerRepositoryLockWrite(backing)
698
repository = self.make_repository('.')
699
repository.lock_write()
700
repository.leave_lock_in_place()
702
response = request.execute(backing.local_abspath(''))
704
SmartServerResponse(('LockContention',)), response)
706
def test_lock_write_on_readonly_transport(self):
707
backing = self.get_readonly_transport()
708
request = smart.repository.SmartServerRepositoryLockWrite(backing)
709
repository = self.make_repository('.')
710
response = request.execute('')
711
self.assertFalse(response.is_successful())
712
self.assertEqual('LockFailed', response.args[0])
715
class TestSmartServerRepositoryUnlock(tests.TestCaseWithTransport):
718
tests.TestCaseWithTransport.setUp(self)
719
self.reduceLockdirTimeout()
721
def test_unlock_on_locked_repo(self):
722
backing = self.get_transport()
723
request = smart.repository.SmartServerRepositoryUnlock(backing)
724
repository = self.make_repository('.')
725
token = repository.lock_write()
726
repository.leave_lock_in_place()
728
response = request.execute(backing.local_abspath(''), token)
730
SmartServerResponse(('ok',)), response)
731
# The repository is now unlocked. Verify that with a new repository
733
new_repo = repository.bzrdir.open_repository()
734
new_repo.lock_write()
737
def test_unlock_on_unlocked_repo(self):
738
backing = self.get_transport()
739
request = smart.repository.SmartServerRepositoryUnlock(backing)
740
repository = self.make_repository('.')
741
response = request.execute(backing.local_abspath(''), 'some token')
743
SmartServerResponse(('TokenMismatch',)), response)
746
class TestSmartServerRepositoryTarball(tests.TestCaseWithTransport):
748
def test_repository_tarball(self):
749
backing = self.get_transport()
750
request = smart.repository.SmartServerRepositoryTarball(backing)
751
repository = self.make_repository('.')
752
# make some extraneous junk in the repository directory which should
754
self.build_tree(['.bzr/repository/extra-junk'])
755
response = request.execute(backing.local_abspath(''), 'bz2')
756
self.assertEqual(('ok',), response.args)
757
# body should be a tbz2
758
body_file = StringIO(response.body)
759
body_tar = tarfile.open('body_tar.tbz2', fileobj=body_file,
761
# let's make sure there are some key repository components inside it.
762
# the tarfile returns directories with trailing slashes...
763
names = set([n.rstrip('/') for n in body_tar.getnames()])
764
self.assertTrue('.bzr/repository/lock' in names)
765
self.assertTrue('.bzr/repository/format' in names)
766
self.assertTrue('.bzr/repository/extra-junk' not in names,
767
"extraneous file present in tar file")
770
class TestSmartServerIsReadonly(tests.TestCaseWithTransport):
772
def test_is_readonly_no(self):
773
backing = self.get_transport()
774
request = smart.request.SmartServerIsReadonly(backing)
775
response = request.execute()
777
SmartServerResponse(('no',)), response)
779
def test_is_readonly_yes(self):
780
backing = self.get_readonly_transport()
781
request = smart.request.SmartServerIsReadonly(backing)
782
response = request.execute()
784
SmartServerResponse(('yes',)), response)
787
class TestHandlers(tests.TestCase):
788
"""Tests for the request.request_handlers object."""
790
def test_registered_methods(self):
791
"""Test that known methods are registered to the correct object."""
793
smart.request.request_handlers.get('Branch.get_config_file'),
794
smart.branch.SmartServerBranchGetConfigFile)
796
smart.request.request_handlers.get('Branch.lock_write'),
797
smart.branch.SmartServerBranchRequestLockWrite)
799
smart.request.request_handlers.get('Branch.last_revision_info'),
800
smart.branch.SmartServerBranchRequestLastRevisionInfo)
802
smart.request.request_handlers.get('Branch.revision_history'),
803
smart.branch.SmartServerRequestRevisionHistory)
805
smart.request.request_handlers.get('Branch.set_last_revision'),
806
smart.branch.SmartServerBranchRequestSetLastRevision)
808
smart.request.request_handlers.get('Branch.unlock'),
809
smart.branch.SmartServerBranchRequestUnlock)
811
smart.request.request_handlers.get('BzrDir.find_repository'),
812
smart.bzrdir.SmartServerRequestFindRepository)
814
smart.request.request_handlers.get('BzrDirFormat.initialize'),
815
smart.bzrdir.SmartServerRequestInitializeBzrDir)
817
smart.request.request_handlers.get('BzrDir.open_branch'),
818
smart.bzrdir.SmartServerRequestOpenBranch)
820
smart.request.request_handlers.get('Repository.gather_stats'),
821
smart.repository.SmartServerRepositoryGatherStats)
823
smart.request.request_handlers.get('Repository.get_revision_graph'),
824
smart.repository.SmartServerRepositoryGetRevisionGraph)
826
smart.request.request_handlers.get('Repository.has_revision'),
827
smart.repository.SmartServerRequestHasRevision)
829
smart.request.request_handlers.get('Repository.is_shared'),
830
smart.repository.SmartServerRepositoryIsShared)
832
smart.request.request_handlers.get('Repository.lock_write'),
833
smart.repository.SmartServerRepositoryLockWrite)
835
smart.request.request_handlers.get('Repository.unlock'),
836
smart.repository.SmartServerRepositoryUnlock)
838
smart.request.request_handlers.get('Repository.tarball'),
839
smart.repository.SmartServerRepositoryTarball)
841
smart.request.request_handlers.get('Transport.is_readonly'),
842
smart.request.SmartServerIsReadonly)