/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_counted_lock.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-18 22:34:21 UTC
  • mto: (3606.5.6 1.6)
  • mto: This revision was merged to the branch mainline in revision 3641.
  • Revision ID: john@arbash-meinel.com-20080818223421-todjny24vj4faj4t
Add tests for the fetching behavior.

The proper parameter passed is 'unordered' add an assert for it, and
fix callers that were passing 'unsorted' instead.
Add tests that we make the right get_record_stream call based
on the value of _fetch_uses_deltas.
Fix the fetch request for signatures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for bzrlib.counted_lock"""
 
18
 
 
19
from bzrlib.counted_lock import CountedLock
 
20
from bzrlib.errors import (
 
21
    LockError,
 
22
    LockNotHeld,
 
23
    ReadOnlyError,
 
24
    TokenMismatch,
 
25
    )
 
26
from bzrlib.tests import TestCase
 
27
 
 
28
 
 
29
class DummyLock(object):
 
30
    """Lock that just records what's been done to it."""
 
31
 
 
32
    def __init__(self):
 
33
        self._calls = []
 
34
        self._lock_mode = None
 
35
 
 
36
    def is_locked(self):
 
37
        return self._lock_mode is not None
 
38
 
 
39
    def lock_read(self):
 
40
        self._assert_not_locked()
 
41
        self._lock_mode = 'r'
 
42
        self._calls.append('lock_read')
 
43
 
 
44
    def lock_write(self, token=None):
 
45
        if token not in (None, 'token'):
 
46
            raise TokenMismatch(token, 'token')
 
47
        self._assert_not_locked()
 
48
        self._lock_mode = 'w'
 
49
        self._calls.append('lock_write')
 
50
        return 'token'
 
51
 
 
52
    def unlock(self):
 
53
        self._assert_locked()
 
54
        self._lock_mode = None
 
55
        self._calls.append('unlock')
 
56
 
 
57
    def break_lock(self):
 
58
        self._lock_mode = None
 
59
        self._calls.append('break')
 
60
 
 
61
    def _assert_locked(self):
 
62
        if not self._lock_mode:
 
63
            raise LockError("%s is not locked" % (self,))
 
64
 
 
65
    def _assert_not_locked(self):
 
66
        if self._lock_mode:
 
67
            raise LockError("%s is already locked in mode %r" %
 
68
                (self, self._lock_mode))
 
69
 
 
70
    def validate_token(self, token):
 
71
        if token == 'token':
 
72
            # already held by this caller
 
73
            return 'token'
 
74
        elif token is None:
 
75
            return
 
76
        else:
 
77
            raise TokenMismatch(token, 'token')
 
78
 
 
79
 
 
80
class TestDummyLock(TestCase):
 
81
 
 
82
    def test_lock_initially_not_held(self):
 
83
        l = DummyLock()
 
84
        self.assertFalse(l.is_locked())
 
85
 
 
86
    def test_lock_not_reentrant(self):
 
87
        # can't take the underlying lock twice
 
88
        l = DummyLock()
 
89
        l.lock_read()
 
90
        self.assertRaises(LockError, l.lock_read)
 
91
 
 
92
    def test_detect_underlock(self):
 
93
        l = DummyLock()
 
94
        self.assertRaises(LockError, l.unlock)
 
95
 
 
96
    def test_basic_locking(self):
 
97
        # dummy lock works like a basic non reentrant lock
 
98
        real_lock = DummyLock()
 
99
        self.assertFalse(real_lock.is_locked())
 
100
        # lock read and unlock
 
101
        real_lock.lock_read()
 
102
        self.assertTrue(real_lock.is_locked())
 
103
        real_lock.unlock()
 
104
        self.assertFalse(real_lock.is_locked())
 
105
        # lock write and unlock
 
106
        real_lock.lock_write()
 
107
        self.assertTrue(real_lock.is_locked())
 
108
        real_lock.unlock()
 
109
        self.assertFalse(real_lock.is_locked())
 
110
        # check calls
 
111
        self.assertEqual(
 
112
            ['lock_read', 'unlock', 'lock_write', 'unlock'],
 
113
            real_lock._calls)
 
114
 
 
115
    def test_break_lock(self):
 
116
        l = DummyLock()
 
117
        l.lock_write()
 
118
        l.break_lock()
 
119
        self.assertFalse(l.is_locked())
 
120
        self.assertEqual(
 
121
            ['lock_write', 'break'],
 
122
            l._calls)
 
123
 
 
124
 
 
125
class TestCountedLock(TestCase):
 
126
 
 
127
    def test_lock_unlock(self):
 
128
        # Lock and unlock a counted lock
 
129
        real_lock = DummyLock()
 
130
        l = CountedLock(real_lock)
 
131
        self.assertFalse(l.is_locked())
 
132
        # can lock twice, although this isn't allowed on the underlying lock
 
133
        l.lock_read()
 
134
        l.lock_read()
 
135
        self.assertTrue(l.is_locked())
 
136
        # and release
 
137
        l.unlock()
 
138
        self.assertTrue(l.is_locked())
 
139
        l.unlock()
 
140
        self.assertFalse(l.is_locked())
 
141
        self.assertEquals(
 
142
            ['lock_read', 'unlock'],
 
143
            real_lock._calls)
 
144
 
 
145
    def test_unlock_not_locked(self):
 
146
        real_lock = DummyLock()
 
147
        l = CountedLock(real_lock)
 
148
        self.assertRaises(LockNotHeld, l.unlock)
 
149
 
 
150
    def test_read_lock_while_write_locked(self):
 
151
        real_lock = DummyLock()
 
152
        l = CountedLock(real_lock)
 
153
        l.lock_write()
 
154
        l.lock_read()
 
155
        l.lock_write()
 
156
        l.unlock()
 
157
        l.unlock()
 
158
        l.unlock()
 
159
        self.assertFalse(l.is_locked())
 
160
        self.assertEquals(
 
161
            ['lock_write', 'unlock'],
 
162
            real_lock._calls)
 
163
 
 
164
    def test_write_lock_while_read_locked(self):
 
165
        real_lock = DummyLock()
 
166
        l = CountedLock(real_lock)
 
167
        l.lock_read()
 
168
        self.assertRaises(ReadOnlyError, l.lock_write)
 
169
        self.assertRaises(ReadOnlyError, l.lock_write)
 
170
        l.unlock()
 
171
        self.assertFalse(l.is_locked())
 
172
        self.assertEquals(
 
173
            ['lock_read', 'unlock'],
 
174
            real_lock._calls)
 
175
 
 
176
    def test_break_lock(self):
 
177
        real_lock = DummyLock()
 
178
        l = CountedLock(real_lock)
 
179
        l.lock_write()
 
180
        l.lock_write()
 
181
        self.assertTrue(real_lock.is_locked())
 
182
        l.break_lock()
 
183
        self.assertFalse(l.is_locked())
 
184
        self.assertFalse(real_lock.is_locked())