/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar
3474.1.2 by Martin Pool
CountedLock.unlock should raise LockNotHeld if appropriate
1
# Copyright (C) 2007, 2008 Canonical Ltd
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
2
#
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
17
"""Tests for bzrlib.counted_lock"""
18
19
from bzrlib.counted_lock import CountedLock
20
from bzrlib.errors import (
21
    LockError,
3474.1.2 by Martin Pool
CountedLock.unlock should raise LockNotHeld if appropriate
22
    LockNotHeld,
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
23
    ReadOnlyError,
3474.1.2 by Martin Pool
CountedLock.unlock should raise LockNotHeld if appropriate
24
    TokenMismatch,
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
25
    )
26
from bzrlib.tests import TestCase
27
28
29
class DummyLock(object):
30
    """Lock that just records what's been done to it."""
31
32
    def __init__(self):
33
        self._calls = []
34
        self._lock_mode = None
35
36
    def is_locked(self):
37
        return self._lock_mode is not None
38
39
    def lock_read(self):
40
        self._assert_not_locked()
41
        self._lock_mode = 'r'
42
        self._calls.append('lock_read')
43
3474.1.3 by Martin Pool
CountedLock now handles and tests lock tokens
44
    def lock_write(self, token=None):
45
        if token not in (None, 'token'):
46
            raise TokenMismatch(token, 'token')
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
47
        self._assert_not_locked()
48
        self._lock_mode = 'w'
49
        self._calls.append('lock_write')
3474.1.3 by Martin Pool
CountedLock now handles and tests lock tokens
50
        return 'token'
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
51
52
    def unlock(self):
53
        self._assert_locked()
54
        self._lock_mode = None
55
        self._calls.append('unlock')
56
57
    def break_lock(self):
58
        self._lock_mode = None
59
        self._calls.append('break')
60
61
    def _assert_locked(self):
62
        if not self._lock_mode:
63
            raise LockError("%s is not locked" % (self,))
64
65
    def _assert_not_locked(self):
66
        if self._lock_mode:
67
            raise LockError("%s is already locked in mode %r" %
68
                (self, self._lock_mode))
69
3474.1.2 by Martin Pool
CountedLock.unlock should raise LockNotHeld if appropriate
70
    def validate_token(self, token):
71
        if token == 'token':
72
            # already held by this caller
73
            return 'token'
3474.1.3 by Martin Pool
CountedLock now handles and tests lock tokens
74
        elif token is None:
75
            return
3474.1.2 by Martin Pool
CountedLock.unlock should raise LockNotHeld if appropriate
76
        else:
77
            raise TokenMismatch(token, 'token')
78
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
79
80
class TestDummyLock(TestCase):
81
82
    def test_lock_initially_not_held(self):
83
        l = DummyLock()
84
        self.assertFalse(l.is_locked())
85
86
    def test_lock_not_reentrant(self):
87
        # can't take the underlying lock twice
88
        l = DummyLock()
89
        l.lock_read()
90
        self.assertRaises(LockError, l.lock_read)
91
92
    def test_detect_underlock(self):
93
        l = DummyLock()
94
        self.assertRaises(LockError, l.unlock)
95
96
    def test_basic_locking(self):
97
        # dummy lock works like a basic non reentrant lock
98
        real_lock = DummyLock()
99
        self.assertFalse(real_lock.is_locked())
100
        # lock read and unlock
101
        real_lock.lock_read()
102
        self.assertTrue(real_lock.is_locked())
103
        real_lock.unlock()
104
        self.assertFalse(real_lock.is_locked())
105
        # lock write and unlock
106
        real_lock.lock_write()
107
        self.assertTrue(real_lock.is_locked())
108
        real_lock.unlock()
109
        self.assertFalse(real_lock.is_locked())
110
        # check calls
111
        self.assertEqual(
112
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
113
            real_lock._calls)
114
115
    def test_break_lock(self):
116
        l = DummyLock()
117
        l.lock_write()
118
        l.break_lock()
119
        self.assertFalse(l.is_locked())
120
        self.assertEqual(
121
            ['lock_write', 'break'],
122
            l._calls)
123
124
125
class TestCountedLock(TestCase):
126
127
    def test_lock_unlock(self):
128
        # Lock and unlock a counted lock
129
        real_lock = DummyLock()
130
        l = CountedLock(real_lock)
131
        self.assertFalse(l.is_locked())
132
        # can lock twice, although this isn't allowed on the underlying lock
133
        l.lock_read()
134
        l.lock_read()
135
        self.assertTrue(l.is_locked())
136
        # and release
137
        l.unlock()
138
        self.assertTrue(l.is_locked())
139
        l.unlock()
140
        self.assertFalse(l.is_locked())
141
        self.assertEquals(
142
            ['lock_read', 'unlock'],
143
            real_lock._calls)
144
145
    def test_unlock_not_locked(self):
146
        real_lock = DummyLock()
147
        l = CountedLock(real_lock)
3474.1.2 by Martin Pool
CountedLock.unlock should raise LockNotHeld if appropriate
148
        self.assertRaises(LockNotHeld, l.unlock)
2475.4.1 by Martin Pool
Start adding CountedLock class to partially replace LockableFiles
149
150
    def test_read_lock_while_write_locked(self):
151
        real_lock = DummyLock()
152
        l = CountedLock(real_lock)
153
        l.lock_write()
154
        l.lock_read()
155
        l.lock_write()
156
        l.unlock()
157
        l.unlock()
158
        l.unlock()
159
        self.assertFalse(l.is_locked())
160
        self.assertEquals(
161
            ['lock_write', 'unlock'],
162
            real_lock._calls)
163
164
    def test_write_lock_while_read_locked(self):
165
        real_lock = DummyLock()
166
        l = CountedLock(real_lock)
167
        l.lock_read()
168
        self.assertRaises(ReadOnlyError, l.lock_write)
169
        self.assertRaises(ReadOnlyError, l.lock_write)
170
        l.unlock()
171
        self.assertFalse(l.is_locked())
172
        self.assertEquals(
173
            ['lock_read', 'unlock'],
174
            real_lock._calls)
175
176
    def test_break_lock(self):
177
        real_lock = DummyLock()
178
        l = CountedLock(real_lock)
179
        l.lock_write()
180
        l.lock_write()
181
        self.assertTrue(real_lock.is_locked())
182
        l.break_lock()
183
        self.assertFalse(l.is_locked())
184
        self.assertFalse(real_lock.is_locked())