13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Test locks across all branch implemenations"""
19
from bzrlib import errors
20
from bzrlib.branch import BzrBranchFormat4
21
from bzrlib.bzrdir import RemoteBzrDirFormat
22
from bzrlib.tests import TestSkipped
23
from bzrlib.tests.branch_implementations.test_branch import TestCaseWithBranch
24
from bzrlib.tests.lock_helpers import TestPreventLocking, LockWrapper
27
class TestBranchLocking(TestCaseWithBranch):
20
branch as _mod_branch,
24
from bzrlib.tests import (
30
class TestBranchLocking(per_branch.TestCaseWithBranch):
30
TestCaseWithBranch.setUp(self)
33
super(TestBranchLocking, self).setUp()
31
34
self.reduceLockdirTimeout()
33
36
def get_instrumented_branch(self):
34
37
"""Get a Branch object which has been instrumented"""
35
# TODO: jam 20060630 It may be that not all formats have a
38
# TODO: jam 20060630 It may be that not all formats have a
36
39
# 'control_files' member. So we should fail gracefully if
37
# not there. But assuming it has them lets us test the exact
40
# not there. But assuming it has them lets us test the exact
38
41
# lock/unlock order.
40
b = LockWrapper(self.locks, self.get_branch(), 'b')
41
b.repository = LockWrapper(self.locks, b.repository, 'r')
43
b = lock_helpers.LockWrapper(self.locks, self.get_branch(), 'b')
44
b.repository = lock_helpers.LockWrapper(self.locks, b.repository, 'r')
42
45
bcf = b.control_files
43
46
rcf = getattr(b.repository, 'control_files', None)
47
50
# Look out for branch types that reuse their control files
48
51
self.combined_control = bcf is rcf
50
b.control_files = LockWrapper(self.locks, b.control_files, 'bc')
53
b.control_files = lock_helpers.LockWrapper(
54
self.locks, b.control_files, 'bc')
51
55
except AttributeError:
52
56
# RemoteBranch seems to trigger this.
53
raise TestSkipped("Could not instrument branch control files.")
57
raise tests.TestSkipped(
58
'Could not instrument branch control files.')
54
59
if self.combined_control:
55
60
# instrument the repository control files too to ensure its worked
56
61
# with correctly. When they are not shared, we trust the repository
57
# API and only instrument the repository itself.
58
b.repository.control_files = \
59
LockWrapper(self.locks, b.repository.control_files, 'rc')
62
# API and only instrument the repository itself.
63
b.repository.control_files = lock_helpers.LockWrapper(
64
self.locks, b.repository.control_files, 'rc')
62
67
def test_01_lock_read(self):
140
145
self.assertTrue(b.is_locked())
141
146
self.assertTrue(b.repository.is_locked())
142
self.assertRaises(TestPreventLocking, b.unlock)
147
self.assertLogsError(lock_helpers.TestPreventLocking, b.unlock)
143
148
if self.combined_control:
144
149
self.assertTrue(b.is_locked())
146
151
self.assertFalse(b.is_locked())
147
152
self.assertTrue(b.repository.is_locked())
149
# We unlock the branch control files, even if
154
# We unlock the branch control files, even if
150
155
# we fail to unlock the repository
151
156
if self.combined_control:
152
157
self.assertEqual([('b', 'lw', True),
171
176
b.repository._other.unlock()
173
178
def test_04_lock_fail_unlock_control(self):
174
# Make sure repository.unlock() is called, if we fail to unlock self
179
# Make sure repository.unlock() is not called, if we fail to unlock
180
# self leaving ourselves still locked, so that attempts to recover
181
# don't encounter an unlocked repository.
175
182
b = self.get_instrumented_branch()
176
183
b.control_files.disable_unlock()
182
189
self.assertTrue(b.is_locked())
183
190
self.assertTrue(b.repository.is_locked())
184
self.assertRaises(TestPreventLocking, b.unlock)
191
self.assertLogsError(lock_helpers.TestPreventLocking, b.unlock)
185
192
self.assertTrue(b.is_locked())
186
if self.combined_control:
187
self.assertTrue(b.repository.is_locked())
189
self.assertFalse(b.repository.is_locked())
193
self.assertTrue(b.repository.is_locked())
191
# We unlock the repository even if
195
# We unlock the repository even if
192
196
# we fail to unlock the control files
193
197
if self.combined_control:
194
198
self.assertEqual([('b', 'lw', True),
218
221
b = self.get_instrumented_branch()
219
222
b.repository.disable_lock_read()
221
self.assertRaises(TestPreventLocking, b.lock_read)
224
self.assertRaises(lock_helpers.TestPreventLocking, b.lock_read)
222
225
self.assertFalse(b.is_locked())
223
226
self.assertFalse(b.repository.is_locked())
225
228
self.assertEqual([('b', 'lr', True),
229
232
def test_06_lock_write_fail_repo(self):
231
234
b = self.get_instrumented_branch()
232
235
b.repository.disable_lock_write()
234
self.assertRaises(TestPreventLocking, b.lock_write)
237
self.assertRaises(lock_helpers.TestPreventLocking, b.lock_write)
235
238
self.assertFalse(b.is_locked())
236
239
self.assertFalse(b.repository.is_locked())
238
241
self.assertEqual([('b', 'lw', True),
242
245
def test_07_lock_read_fail_control(self):
244
247
b = self.get_instrumented_branch()
245
248
b.control_files.disable_lock_read()
247
self.assertRaises(TestPreventLocking, b.lock_read)
250
self.assertRaises(lock_helpers.TestPreventLocking, b.lock_read)
248
251
self.assertFalse(b.is_locked())
249
252
self.assertFalse(b.repository.is_locked())
268
271
b = self.get_instrumented_branch()
269
272
b.control_files.disable_lock_write()
271
self.assertRaises(TestPreventLocking, b.lock_write)
274
self.assertRaises(lock_helpers.TestPreventLocking, b.lock_write)
272
275
self.assertFalse(b.is_locked())
273
276
self.assertFalse(b.repository.is_locked())
274
277
if self.combined_control:
435
438
# We should be unable to relock the repo.
436
439
self.assertRaises(errors.LockContention, branch.lock_write)
441
branch.lock_write(token)
442
branch.dont_leave_lock_in_place()
438
445
def test_dont_leave_lock_in_place(self):
439
446
branch = self.make_branch('b')
493
500
def test_lock_write_locks_repo_too(self):
494
if isinstance(self.branch_format, BzrBranchFormat4):
501
if isinstance(self.branch_format, _mod_branch.BzrBranchFormat4):
495
502
# Branch format 4 is combined with the repository, so this test