/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_branch/test_locking.py

  • Committer: Martin von Gagern
  • Date: 2010-04-20 08:47:38 UTC
  • mfrom: (5167 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5195.
  • Revision ID: martin.vgagern@gmx.net-20100420084738-ygymnqmdllzrhpfn
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Test locks across all branch implemenations"""
18
18
 
19
 
from bzrlib import errors
20
 
from bzrlib.branch import BzrBranchFormat4
21
 
from bzrlib.bzrdir import RemoteBzrDirFormat
22
 
from bzrlib.tests import TestSkipped
23
 
from bzrlib.tests.branch_implementations.test_branch import TestCaseWithBranch
24
 
from bzrlib.tests.lock_helpers import TestPreventLocking, LockWrapper
25
 
 
26
 
 
27
 
class TestBranchLocking(TestCaseWithBranch):
 
19
from bzrlib import (
 
20
    branch as _mod_branch,
 
21
    errors,
 
22
    tests,
 
23
    )
 
24
from bzrlib.tests import (
 
25
    lock_helpers,
 
26
    per_branch,
 
27
    )
 
28
 
 
29
 
 
30
class TestBranchLocking(per_branch.TestCaseWithBranch):
28
31
 
29
32
    def setUp(self):
30
 
        TestCaseWithBranch.setUp(self)
 
33
        super(TestBranchLocking, self).setUp()
31
34
        self.reduceLockdirTimeout()
32
35
 
33
36
    def get_instrumented_branch(self):
34
37
        """Get a Branch object which has been instrumented"""
35
 
        # TODO: jam 20060630 It may be that not all formats have a 
 
38
        # TODO: jam 20060630 It may be that not all formats have a
36
39
        # 'control_files' member. So we should fail gracefully if
37
 
        # not there. But assuming it has them lets us test the exact 
 
40
        # not there. But assuming it has them lets us test the exact
38
41
        # lock/unlock order.
39
42
        self.locks = []
40
 
        b = LockWrapper(self.locks, self.get_branch(), 'b')
41
 
        b.repository = LockWrapper(self.locks, b.repository, 'r')
 
43
        b = lock_helpers.LockWrapper(self.locks, self.get_branch(), 'b')
 
44
        b.repository = lock_helpers.LockWrapper(self.locks, b.repository, 'r')
42
45
        bcf = b.control_files
43
46
        rcf = getattr(b.repository, 'control_files', None)
44
47
        if rcf is None:
47
50
            # Look out for branch types that reuse their control files
48
51
            self.combined_control = bcf is rcf
49
52
        try:
50
 
            b.control_files = LockWrapper(self.locks, b.control_files, 'bc')
 
53
            b.control_files = lock_helpers.LockWrapper(
 
54
                self.locks, b.control_files, 'bc')
51
55
        except AttributeError:
52
56
            # RemoteBranch seems to trigger this.
53
 
            raise TestSkipped("Could not instrument branch control files.")
 
57
            raise tests.TestSkipped(
 
58
                'Could not instrument branch control files.')
54
59
        if self.combined_control:
55
60
            # instrument the repository control files too to ensure its worked
56
61
            # with correctly. When they are not shared, we trust the repository
57
 
            # API and only instrument the repository itself. 
58
 
            b.repository.control_files = \
59
 
                LockWrapper(self.locks, b.repository.control_files, 'rc')
 
62
            # API and only instrument the repository itself.
 
63
            b.repository.control_files = lock_helpers.LockWrapper(
 
64
                self.locks, b.repository.control_files, 'rc')
60
65
        return b
61
66
 
62
67
    def test_01_lock_read(self):
139
144
        try:
140
145
            self.assertTrue(b.is_locked())
141
146
            self.assertTrue(b.repository.is_locked())
142
 
            self.assertRaises(TestPreventLocking, b.unlock)
 
147
            self.assertLogsError(lock_helpers.TestPreventLocking, b.unlock)
143
148
            if self.combined_control:
144
149
                self.assertTrue(b.is_locked())
145
150
            else:
146
151
                self.assertFalse(b.is_locked())
147
152
            self.assertTrue(b.repository.is_locked())
148
153
 
149
 
            # We unlock the branch control files, even if 
 
154
            # We unlock the branch control files, even if
150
155
            # we fail to unlock the repository
151
156
            if self.combined_control:
152
157
                self.assertEqual([('b', 'lw', True),
171
176
            b.repository._other.unlock()
172
177
 
173
178
    def test_04_lock_fail_unlock_control(self):
174
 
        # Make sure repository.unlock() is called, if we fail to unlock self
 
179
        # Make sure repository.unlock() is not called, if we fail to unlock
 
180
        # self leaving ourselves still locked, so that attempts to recover
 
181
        # don't encounter an unlocked repository.
175
182
        b = self.get_instrumented_branch()
176
183
        b.control_files.disable_unlock()
177
184
 
181
188
        try:
182
189
            self.assertTrue(b.is_locked())
183
190
            self.assertTrue(b.repository.is_locked())
184
 
            self.assertRaises(TestPreventLocking, b.unlock)
 
191
            self.assertLogsError(lock_helpers.TestPreventLocking, b.unlock)
185
192
            self.assertTrue(b.is_locked())
186
 
            if self.combined_control:
187
 
                self.assertTrue(b.repository.is_locked())
188
 
            else:
189
 
                self.assertFalse(b.repository.is_locked())
 
193
            self.assertTrue(b.repository.is_locked())
190
194
 
191
 
            # We unlock the repository even if 
 
195
            # We unlock the repository even if
192
196
            # we fail to unlock the control files
193
197
            if self.combined_control:
194
198
                self.assertEqual([('b', 'lw', True),
206
210
                                  ('bc', 'lw', True),
207
211
                                  ('b', 'ul', True),
208
212
                                  ('bc', 'ul', False),
209
 
                                  ('r', 'ul', True),
210
213
                                 ], self.locks)
211
214
 
212
215
        finally:
218
221
        b = self.get_instrumented_branch()
219
222
        b.repository.disable_lock_read()
220
223
 
221
 
        self.assertRaises(TestPreventLocking, b.lock_read)
 
224
        self.assertRaises(lock_helpers.TestPreventLocking, b.lock_read)
222
225
        self.assertFalse(b.is_locked())
223
226
        self.assertFalse(b.repository.is_locked())
224
227
 
225
228
        self.assertEqual([('b', 'lr', True),
226
 
                          ('r', 'lr', False), 
 
229
                          ('r', 'lr', False),
227
230
                         ], self.locks)
228
231
 
229
232
    def test_06_lock_write_fail_repo(self):
231
234
        b = self.get_instrumented_branch()
232
235
        b.repository.disable_lock_write()
233
236
 
234
 
        self.assertRaises(TestPreventLocking, b.lock_write)
 
237
        self.assertRaises(lock_helpers.TestPreventLocking, b.lock_write)
235
238
        self.assertFalse(b.is_locked())
236
239
        self.assertFalse(b.repository.is_locked())
237
240
 
238
241
        self.assertEqual([('b', 'lw', True),
239
 
                          ('r', 'lw', False), 
 
242
                          ('r', 'lw', False),
240
243
                         ], self.locks)
241
244
 
242
245
    def test_07_lock_read_fail_control(self):
244
247
        b = self.get_instrumented_branch()
245
248
        b.control_files.disable_lock_read()
246
249
 
247
 
        self.assertRaises(TestPreventLocking, b.lock_read)
 
250
        self.assertRaises(lock_helpers.TestPreventLocking, b.lock_read)
248
251
        self.assertFalse(b.is_locked())
249
252
        self.assertFalse(b.repository.is_locked())
250
253
 
268
271
        b = self.get_instrumented_branch()
269
272
        b.control_files.disable_lock_write()
270
273
 
271
 
        self.assertRaises(TestPreventLocking, b.lock_write)
 
274
        self.assertRaises(lock_helpers.TestPreventLocking, b.lock_write)
272
275
        self.assertFalse(b.is_locked())
273
276
        self.assertFalse(b.repository.is_locked())
274
277
        if self.combined_control:
434
437
            branch.unlock()
435
438
        # We should be unable to relock the repo.
436
439
        self.assertRaises(errors.LockContention, branch.lock_write)
 
440
        # Cleanup
 
441
        branch.lock_write(token)
 
442
        branch.dont_leave_lock_in_place()
 
443
        branch.unlock()
437
444
 
438
445
    def test_dont_leave_lock_in_place(self):
439
446
        branch = self.make_branch('b')
491
498
        branch.unlock()
492
499
 
493
500
    def test_lock_write_locks_repo_too(self):
494
 
        if isinstance(self.branch_format, BzrBranchFormat4):
 
501
        if isinstance(self.branch_format, _mod_branch.BzrBranchFormat4):
495
502
            # Branch format 4 is combined with the repository, so this test
496
503
            # doesn't apply.
497
504
            return