/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Robert Collins
  • Date: 2007-04-18 08:39:02 UTC
  • mto: (2425.1.2 integration)
  • mto: This revision was merged to the branch mainline in revision 2427.
  • Revision ID: robertc@robertcollins.net-20070418083902-4o66h9fk7zeisvwa
Command objects can now declare related help topics by having _see_also
set to a list of related topic. Updated the HACKING guide entry on
documentation to be more clear about how the help for commands is
generated and to reference this new feature. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
import bzrlib.errors as errors
 
21
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
22
from bzrlib.lockable_files import LockableFiles, TransportLock
 
23
from bzrlib.lockdir import LockDir
 
24
from bzrlib.tests import TestCaseInTempDir
 
25
from bzrlib.tests.test_transactions import DummyWeave
 
26
from bzrlib.transactions import (PassThroughTransaction,
 
27
                                 ReadOnlyTransaction,
 
28
                                 WriteTransaction,
 
29
                                 )
 
30
from bzrlib.transport import get_transport
 
31
 
 
32
 
 
33
# these tests are applied in each parameterized suite for LockableFiles
 
34
class _TestLockableFiles_mixin(object):
 
35
 
 
36
    def test_read_write(self):
 
37
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
38
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
39
        self.lockable.lock_write()
 
40
        try:
 
41
            unicode_string = u'bar\u1234'
 
42
            self.assertEqual(4, len(unicode_string))
 
43
            byte_string = unicode_string.encode('utf-8')
 
44
            self.assertEqual(6, len(byte_string))
 
45
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
 
46
                              StringIO(unicode_string))
 
47
            self.lockable.put('foo', StringIO(byte_string))
 
48
            self.assertEqual(byte_string,
 
49
                             self.lockable.get('foo').read())
 
50
            self.assertEqual(unicode_string,
 
51
                             self.lockable.get_utf8('foo').read())
 
52
            self.assertRaises(BzrBadParameterNotString,
 
53
                              self.lockable.put_utf8,
 
54
                              'bar',
 
55
                              StringIO(unicode_string)
 
56
                              )
 
57
            self.lockable.put_utf8('bar', unicode_string)
 
58
            self.assertEqual(unicode_string,
 
59
                             self.lockable.get_utf8('bar').read())
 
60
            self.assertEqual(byte_string,
 
61
                             self.lockable.get('bar').read())
 
62
            self.lockable.put_bytes('raw', 'raw\xffbytes')
 
63
            self.assertEqual('raw\xffbytes',
 
64
                             self.lockable.get('raw').read())
 
65
        finally:
 
66
            self.lockable.unlock()
 
67
 
 
68
    def test_locks(self):
 
69
        self.lockable.lock_read()
 
70
        try:
 
71
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
72
                              StringIO('bar\u1234'))
 
73
        finally:
 
74
            self.lockable.unlock()
 
75
 
 
76
    def test_transactions(self):
 
77
        self.assertIs(self.lockable.get_transaction().__class__,
 
78
                      PassThroughTransaction)
 
79
        self.lockable.lock_read()
 
80
        try:
 
81
            self.assertIs(self.lockable.get_transaction().__class__,
 
82
                          ReadOnlyTransaction)
 
83
        finally:
 
84
            self.lockable.unlock()
 
85
        self.assertIs(self.lockable.get_transaction().__class__,
 
86
                      PassThroughTransaction)
 
87
        self.lockable.lock_write()
 
88
        self.assertIs(self.lockable.get_transaction().__class__,
 
89
                      WriteTransaction)
 
90
        # check that finish is called:
 
91
        vf = DummyWeave('a')
 
92
        self.lockable.get_transaction().register_dirty(vf)
 
93
        self.lockable.unlock()
 
94
        self.assertTrue(vf.finished)
 
95
 
 
96
    def test__escape(self):
 
97
        self.assertEqual('%25', self.lockable._escape('%'))
 
98
        
 
99
    def test__escape_empty(self):
 
100
        self.assertEqual('', self.lockable._escape(''))
 
101
 
 
102
    def test_break_lock(self):
 
103
        # some locks are not breakable
 
104
        self.lockable.lock_write()
 
105
        try:
 
106
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
107
        except NotImplementedError:
 
108
            # this lock cannot be broken
 
109
            self.lockable.unlock()
 
110
            return
 
111
        l2 = self.get_lockable()
 
112
        orig_factory = bzrlib.ui.ui_factory
 
113
        # silent ui - no need for stdout
 
114
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
115
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
116
        try:
 
117
            l2.break_lock()
 
118
        finally:
 
119
            bzrlib.ui.ui_factory = orig_factory
 
120
        try:
 
121
            l2.lock_write()
 
122
            l2.unlock()
 
123
        finally:
 
124
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
125
            self.assertFalse(self.lockable.is_locked())
 
126
 
 
127
    def test_lock_write_returns_None_refuses_token(self):
 
128
        token = self.lockable.lock_write()
 
129
        try:
 
130
            if token is not None:
 
131
                # This test does not apply, because this lockable supports
 
132
                # tokens.
 
133
                return
 
134
            self.assertRaises(errors.TokenLockingNotSupported,
 
135
                              self.lockable.lock_write, token='token')
 
136
        finally:
 
137
            self.lockable.unlock()
 
138
 
 
139
    def test_lock_write_returns_token_when_given_token(self):
 
140
        token = self.lockable.lock_write()
 
141
        try:
 
142
            if token is None:
 
143
                # This test does not apply, because this lockable refuses
 
144
                # tokens.
 
145
                return
 
146
            new_lockable = self.get_lockable()
 
147
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
148
            try:
 
149
                self.assertEqual(token, token_from_new_lockable)
 
150
            finally:
 
151
                new_lockable.unlock()
 
152
        finally:
 
153
            self.lockable.unlock()
 
154
 
 
155
    def test_lock_write_raises_on_token_mismatch(self):
 
156
        token = self.lockable.lock_write()
 
157
        try:
 
158
            if token is None:
 
159
                # This test does not apply, because this lockable refuses
 
160
                # tokens.
 
161
                return
 
162
            different_token = token + 'xxx'
 
163
            # Re-using the same lockable instance with a different token will
 
164
            # raise TokenMismatch.
 
165
            self.assertRaises(errors.TokenMismatch,
 
166
                              self.lockable.lock_write, token=different_token)
 
167
            # A seperate instance for the same lockable will also raise
 
168
            # TokenMismatch.
 
169
            # This detects the case where a caller claims to have a lock (via
 
170
            # the token) for an external resource, but doesn't (the token is
 
171
            # different).  Clients need a seperate lock object to make sure the
 
172
            # external resource is probed, whereas the existing lock object
 
173
            # might cache.
 
174
            new_lockable = self.get_lockable()
 
175
            self.assertRaises(errors.TokenMismatch,
 
176
                              new_lockable.lock_write, token=different_token)
 
177
        finally:
 
178
            self.lockable.unlock()
 
179
 
 
180
    def test_lock_write_with_matching_token(self):
 
181
        # If the token matches, so no exception is raised by lock_write.
 
182
        token = self.lockable.lock_write()
 
183
        try:
 
184
            if token is None:
 
185
                # This test does not apply, because this lockable refuses
 
186
                # tokens.
 
187
                return
 
188
            # The same instance will accept a second lock_write if the specified
 
189
            # token matches.
 
190
            self.lockable.lock_write(token=token)
 
191
            self.lockable.unlock()
 
192
            # Calling lock_write on a new instance for the same lockable will
 
193
            # also succeed.
 
194
            new_lockable = self.get_lockable()
 
195
            new_lockable.lock_write(token=token)
 
196
            new_lockable.unlock()
 
197
        finally:
 
198
            self.lockable.unlock()
 
199
 
 
200
    def test_unlock_after_lock_write_with_token(self):
 
201
        # If lock_write did not physically acquire the lock (because it was
 
202
        # passed a token), then unlock should not physically release it.
 
203
        token = self.lockable.lock_write()
 
204
        try:
 
205
            if token is None:
 
206
                # This test does not apply, because this lockable refuses
 
207
                # tokens.
 
208
                return
 
209
            new_lockable = self.get_lockable()
 
210
            new_lockable.lock_write(token=token)
 
211
            new_lockable.unlock()
 
212
            self.assertTrue(self.lockable.get_physical_lock_status())
 
213
        finally:
 
214
            self.lockable.unlock()
 
215
 
 
216
    def test_lock_write_with_token_fails_when_unlocked(self):
 
217
        # Lock and unlock to get a superficially valid token.  This mimics a
 
218
        # likely programming error, where a caller accidentally tries to lock
 
219
        # with a token that is no longer valid (because the original lock was
 
220
        # released).
 
221
        token = self.lockable.lock_write()
 
222
        self.lockable.unlock()
 
223
        if token is None:
 
224
            # This test does not apply, because this lockable refuses
 
225
            # tokens.
 
226
            return
 
227
 
 
228
        self.assertRaises(errors.TokenMismatch,
 
229
                          self.lockable.lock_write, token=token)
 
230
 
 
231
    def test_lock_write_reenter_with_token(self):
 
232
        token = self.lockable.lock_write()
 
233
        try:
 
234
            if token is None:
 
235
                # This test does not apply, because this lockable refuses
 
236
                # tokens.
 
237
                return
 
238
            # Relock with a token.
 
239
            token_from_reentry = self.lockable.lock_write(token=token)
 
240
            try:
 
241
                self.assertEqual(token, token_from_reentry)
 
242
            finally:
 
243
                self.lockable.unlock()
 
244
        finally:
 
245
            self.lockable.unlock()
 
246
        # The lock should be unlocked on disk.  Verify that with a new lock
 
247
        # instance.
 
248
        new_lockable = self.get_lockable()
 
249
        # Calling lock_write now should work, rather than raise LockContention.
 
250
        new_lockable.lock_write()
 
251
        new_lockable.unlock()
 
252
 
 
253
    def test_second_lock_write_returns_same_token(self):
 
254
        first_token = self.lockable.lock_write()
 
255
        try:
 
256
            if first_token is None:
 
257
                # This test does not apply, because this lockable refuses
 
258
                # tokens.
 
259
                return
 
260
            # Relock the already locked lockable.  It should return the same
 
261
            # token.
 
262
            second_token = self.lockable.lock_write()
 
263
            try:
 
264
                self.assertEqual(first_token, second_token)
 
265
            finally:
 
266
                self.lockable.unlock()
 
267
        finally:
 
268
            self.lockable.unlock()
 
269
 
 
270
    def test_leave_in_place(self):
 
271
        token = self.lockable.lock_write()
 
272
        try:
 
273
            if token is None:
 
274
                # This test does not apply, because this lockable refuses
 
275
                # tokens.
 
276
                return
 
277
            self.lockable.leave_in_place()
 
278
        finally:
 
279
            self.lockable.unlock()
 
280
        # At this point, the lock is still in place on disk
 
281
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
282
        # But should be relockable with a token.
 
283
        self.lockable.lock_write(token=token)
 
284
        self.lockable.unlock()
 
285
 
 
286
    def test_dont_leave_in_place(self):
 
287
        token = self.lockable.lock_write()
 
288
        try:
 
289
            if token is None:
 
290
                # This test does not apply, because this lockable refuses
 
291
                # tokens.
 
292
                return
 
293
            self.lockable.leave_in_place()
 
294
        finally:
 
295
            self.lockable.unlock()
 
296
        # At this point, the lock is still in place on disk.
 
297
        # Acquire the existing lock with the token, and ask that it is removed
 
298
        # when this object unlocks, and unlock to trigger that removal.
 
299
        new_lockable = self.get_lockable()
 
300
        new_lockable.lock_write(token=token)
 
301
        new_lockable.dont_leave_in_place()
 
302
        new_lockable.unlock()
 
303
        # At this point, the lock is no longer on disk, so we can lock it.
 
304
        third_lockable = self.get_lockable()
 
305
        third_lockable.lock_write()
 
306
        third_lockable.unlock()
 
307
 
 
308
 
 
309
# This method of adapting tests to parameters is different to 
 
310
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
311
# case.  
 
312
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
313
                                      _TestLockableFiles_mixin):
 
314
 
 
315
    def setUp(self):
 
316
        TestCaseInTempDir.setUp(self)
 
317
        transport = get_transport('.')
 
318
        transport.mkdir('.bzr')
 
319
        self.sub_transport = transport.clone('.bzr')
 
320
        self.lockable = self.get_lockable()
 
321
        self.lockable.create_lock()
 
322
 
 
323
    def tearDown(self):
 
324
        super(TestLockableFiles_TransportLock, self).tearDown()
 
325
        # free the subtransport so that we do not get a 5 second
 
326
        # timeout due to the SFTP connection cache.
 
327
        del self.sub_transport
 
328
 
 
329
    def get_lockable(self):
 
330
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
331
        
 
332
 
 
333
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
334
                              _TestLockableFiles_mixin):
 
335
    """LockableFile tests run with LockDir underneath"""
 
336
 
 
337
    def setUp(self):
 
338
        TestCaseInTempDir.setUp(self)
 
339
        self.transport = get_transport('.')
 
340
        self.lockable = self.get_lockable()
 
341
        # the lock creation here sets mode - test_permissions on branch 
 
342
        # tests that implicitly, but it might be a good idea to factor 
 
343
        # out the mode checking logic and have it applied to loackable files
 
344
        # directly. RBC 20060418
 
345
        self.lockable.create_lock()
 
346
 
 
347
    def get_lockable(self):
 
348
        return LockableFiles(self.transport, 'my-lock', LockDir)
 
349
 
 
350
    def test_lock_created(self):
 
351
        self.assertTrue(self.transport.has('my-lock'))
 
352
        self.lockable.lock_write()
 
353
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
354
        self.lockable.unlock()
 
355
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
356
        self.assertTrue(self.transport.has('my-lock'))
 
357
 
 
358
 
 
359
    # TODO: Test the lockdir inherits the right file and directory permissions
 
360
    # from the LockableFiles.