/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: John Arbash Meinel
  • Date: 2009-06-18 18:18:36 UTC
  • mto: This revision was merged to the branch mainline in revision 4461.
  • Revision ID: john@arbash-meinel.com-20090618181836-biodfkat9a8eyzjz
The new add_inventory_by_delta is returning a CHKInventory when mapping from NULL
Which is completely valid, but 'broke' one of the tests.
So to fix it, changed the test to use CHKInventories on both sides, and add an __eq__
member. The nice thing is that CHKInventory.__eq__ is fairly cheap, since it only
has to check the root keys.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for bzrlib.counted_lock"""
 
18
 
 
19
from bzrlib.counted_lock import CountedLock
 
20
from bzrlib.errors import (
 
21
    LockError,
 
22
    LockNotHeld,
 
23
    ReadOnlyError,
 
24
    TokenMismatch,
 
25
    )
 
26
from bzrlib.tests import TestCase
 
27
 
 
28
 
 
29
class DummyLock(object):
 
30
    """Lock that just records what's been done to it."""
 
31
 
 
32
    def __init__(self):
 
33
        self._calls = []
 
34
        self._lock_mode = None
 
35
 
 
36
    def is_locked(self):
 
37
        return self._lock_mode is not None
 
38
 
 
39
    def lock_read(self):
 
40
        self._assert_not_locked()
 
41
        self._lock_mode = 'r'
 
42
        self._calls.append('lock_read')
 
43
 
 
44
    def lock_write(self, token=None):
 
45
        if token is not None:
 
46
            if token == 'token':
 
47
                # already held by this caller
 
48
                return 'token'
 
49
            else:
 
50
                raise TokenMismatch()
 
51
        self._assert_not_locked()
 
52
        self._lock_mode = 'w'
 
53
        self._calls.append('lock_write')
 
54
        return 'token'
 
55
 
 
56
    def unlock(self):
 
57
        self._assert_locked()
 
58
        self._lock_mode = None
 
59
        self._calls.append('unlock')
 
60
 
 
61
    def break_lock(self):
 
62
        self._lock_mode = None
 
63
        self._calls.append('break')
 
64
 
 
65
    def _assert_locked(self):
 
66
        if not self._lock_mode:
 
67
            raise LockError("%s is not locked" % (self,))
 
68
 
 
69
    def _assert_not_locked(self):
 
70
        if self._lock_mode:
 
71
            raise LockError("%s is already locked in mode %r" %
 
72
                (self, self._lock_mode))
 
73
 
 
74
    def validate_token(self, token):
 
75
        if token == 'token':
 
76
            # already held by this caller
 
77
            return 'token'
 
78
        elif token is None:
 
79
            return
 
80
        else:
 
81
            raise TokenMismatch(token, 'token')
 
82
 
 
83
 
 
84
class TestDummyLock(TestCase):
 
85
 
 
86
    def test_lock_initially_not_held(self):
 
87
        l = DummyLock()
 
88
        self.assertFalse(l.is_locked())
 
89
 
 
90
    def test_lock_not_reentrant(self):
 
91
        # can't take the underlying lock twice
 
92
        l = DummyLock()
 
93
        l.lock_read()
 
94
        self.assertRaises(LockError, l.lock_read)
 
95
 
 
96
    def test_detect_underlock(self):
 
97
        l = DummyLock()
 
98
        self.assertRaises(LockError, l.unlock)
 
99
 
 
100
    def test_basic_locking(self):
 
101
        # dummy lock works like a basic non reentrant lock
 
102
        real_lock = DummyLock()
 
103
        self.assertFalse(real_lock.is_locked())
 
104
        # lock read and unlock
 
105
        real_lock.lock_read()
 
106
        self.assertTrue(real_lock.is_locked())
 
107
        real_lock.unlock()
 
108
        self.assertFalse(real_lock.is_locked())
 
109
        # lock write and unlock
 
110
        result = real_lock.lock_write()
 
111
        self.assertEqual('token', result)
 
112
        self.assertTrue(real_lock.is_locked())
 
113
        real_lock.unlock()
 
114
        self.assertFalse(real_lock.is_locked())
 
115
        # check calls
 
116
        self.assertEqual(
 
117
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
 
118
            real_lock._calls)
 
119
 
 
120
    def test_break_lock(self):
 
121
        l = DummyLock()
 
122
        l.lock_write()
 
123
        l.break_lock()
 
124
        self.assertFalse(l.is_locked())
 
125
        self.assertEqual(
 
126
            ['lock_write', 'break'],
 
127
            l._calls)
 
128
 
 
129
 
 
130
class TestCountedLock(TestCase):
 
131
 
 
132
    def test_read_lock(self):
 
133
        # Lock and unlock a counted lock
 
134
        real_lock = DummyLock()
 
135
        l = CountedLock(real_lock)
 
136
        self.assertFalse(l.is_locked())
 
137
        # can lock twice, although this isn't allowed on the underlying lock
 
138
        l.lock_read()
 
139
        l.lock_read()
 
140
        self.assertTrue(l.is_locked())
 
141
        # and release
 
142
        l.unlock()
 
143
        self.assertTrue(l.is_locked())
 
144
        l.unlock()
 
145
        self.assertFalse(l.is_locked())
 
146
        self.assertEquals(
 
147
            ['lock_read', 'unlock'],
 
148
            real_lock._calls)
 
149
 
 
150
    def test_unlock_not_locked(self):
 
151
        real_lock = DummyLock()
 
152
        l = CountedLock(real_lock)
 
153
        self.assertRaises(LockNotHeld, l.unlock)
 
154
 
 
155
    def test_read_lock_while_write_locked(self):
 
156
        real_lock = DummyLock()
 
157
        l = CountedLock(real_lock)
 
158
        l.lock_write()
 
159
        l.lock_read()
 
160
        self.assertEquals('token', l.lock_write())
 
161
        l.unlock()
 
162
        l.unlock()
 
163
        l.unlock()
 
164
        self.assertFalse(l.is_locked())
 
165
        self.assertEquals(
 
166
            ['lock_write', 'unlock'],
 
167
            real_lock._calls)
 
168
 
 
169
    def test_write_lock_while_read_locked(self):
 
170
        real_lock = DummyLock()
 
171
        l = CountedLock(real_lock)
 
172
        l.lock_read()
 
173
        self.assertRaises(ReadOnlyError, l.lock_write)
 
174
        self.assertRaises(ReadOnlyError, l.lock_write)
 
175
        l.unlock()
 
176
        self.assertFalse(l.is_locked())
 
177
        self.assertEquals(
 
178
            ['lock_read', 'unlock'],
 
179
            real_lock._calls)
 
180
 
 
181
    def test_write_lock_reentrant(self):
 
182
        real_lock = DummyLock()
 
183
        l = CountedLock(real_lock)
 
184
        self.assertEqual('token', l.lock_write())
 
185
        self.assertEqual('token', l.lock_write())
 
186
        l.unlock()
 
187
        l.unlock()
 
188
 
 
189
    def test_reenter_with_token(self):
 
190
        real_lock = DummyLock()
 
191
        l1 = CountedLock(real_lock)
 
192
        l2 = CountedLock(real_lock)
 
193
        token = l1.lock_write()
 
194
        self.assertEqual('token', token)
 
195
        # now imagine that we lost that connection, but we still have the
 
196
        # token...
 
197
        del l1
 
198
        # because we can supply the token, we can acquire the lock through
 
199
        # another instance
 
200
        self.assertTrue(real_lock.is_locked())
 
201
        self.assertFalse(l2.is_locked())
 
202
        self.assertEqual(token, l2.lock_write(token=token))
 
203
        self.assertTrue(l2.is_locked())
 
204
        self.assertTrue(real_lock.is_locked())
 
205
        l2.unlock()
 
206
        self.assertFalse(l2.is_locked())
 
207
        self.assertFalse(real_lock.is_locked())
 
208
 
 
209
    def test_break_lock(self):
 
210
        real_lock = DummyLock()
 
211
        l = CountedLock(real_lock)
 
212
        l.lock_write()
 
213
        l.lock_write()
 
214
        self.assertTrue(real_lock.is_locked())
 
215
        l.break_lock()
 
216
        self.assertFalse(l.is_locked())
 
217
        self.assertFalse(real_lock.is_locked())