/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

Add Repository.{dont_,}leave_lock_in_place.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
23
from bzrlib.lockable_files import LockableFiles, TransportLock
 
24
from bzrlib import lockdir
 
25
from bzrlib.lockdir import LockDir
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
28
from bzrlib.tests.test_transactions import DummyWeave
 
29
from bzrlib.transactions import (PassThroughTransaction,
 
30
                                 ReadOnlyTransaction,
 
31
                                 WriteTransaction,
 
32
                                 )
 
33
from bzrlib.transport import get_transport
 
34
 
 
35
 
 
36
# these tests are applied in each parameterized suite for LockableFiles
 
37
class _TestLockableFiles_mixin(object):
 
38
 
 
39
    def setUp(self):
 
40
        # Reduce the default timeout, so that if tests fail, they will do so
 
41
        # reasonably quickly.
 
42
        orig_timeout = lockdir._DEFAULT_TIMEOUT_SECONDS
 
43
        def resetTimeout():
 
44
            lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
 
45
        self.addCleanup(resetTimeout)
 
46
        lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
47
 
 
48
    def test_read_write(self):
 
49
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
50
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
51
        self.lockable.lock_write()
 
52
        try:
 
53
            unicode_string = u'bar\u1234'
 
54
            self.assertEqual(4, len(unicode_string))
 
55
            byte_string = unicode_string.encode('utf-8')
 
56
            self.assertEqual(6, len(byte_string))
 
57
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
 
58
                              StringIO(unicode_string))
 
59
            self.lockable.put('foo', StringIO(byte_string))
 
60
            self.assertEqual(byte_string,
 
61
                             self.lockable.get('foo').read())
 
62
            self.assertEqual(unicode_string,
 
63
                             self.lockable.get_utf8('foo').read())
 
64
            self.assertRaises(BzrBadParameterNotString,
 
65
                              self.lockable.put_utf8,
 
66
                              'bar',
 
67
                              StringIO(unicode_string)
 
68
                              )
 
69
            self.lockable.put_utf8('bar', unicode_string)
 
70
            self.assertEqual(unicode_string, 
 
71
                             self.lockable.get_utf8('bar').read())
 
72
            self.assertEqual(byte_string, 
 
73
                             self.lockable.get('bar').read())
 
74
        finally:
 
75
            self.lockable.unlock()
 
76
 
 
77
    def test_locks(self):
 
78
        self.lockable.lock_read()
 
79
        try:
 
80
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
81
                              StringIO('bar\u1234'))
 
82
        finally:
 
83
            self.lockable.unlock()
 
84
 
 
85
    def test_transactions(self):
 
86
        self.assertIs(self.lockable.get_transaction().__class__,
 
87
                      PassThroughTransaction)
 
88
        self.lockable.lock_read()
 
89
        try:
 
90
            self.assertIs(self.lockable.get_transaction().__class__,
 
91
                          ReadOnlyTransaction)
 
92
        finally:
 
93
            self.lockable.unlock()
 
94
        self.assertIs(self.lockable.get_transaction().__class__,
 
95
                      PassThroughTransaction)
 
96
        self.lockable.lock_write()
 
97
        self.assertIs(self.lockable.get_transaction().__class__,
 
98
                      WriteTransaction)
 
99
        # check that finish is called:
 
100
        vf = DummyWeave('a')
 
101
        self.lockable.get_transaction().register_dirty(vf)
 
102
        self.lockable.unlock()
 
103
        self.assertTrue(vf.finished)
 
104
 
 
105
    def test__escape(self):
 
106
        self.assertEqual('%25', self.lockable._escape('%'))
 
107
        
 
108
    def test__escape_empty(self):
 
109
        self.assertEqual('', self.lockable._escape(''))
 
110
 
 
111
    def test_break_lock(self):
 
112
        # some locks are not breakable
 
113
        self.lockable.lock_write()
 
114
        try:
 
115
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
116
        except NotImplementedError:
 
117
            # this lock cannot be broken
 
118
            self.lockable.unlock()
 
119
            return
 
120
        l2 = self.get_lockable()
 
121
        orig_factory = bzrlib.ui.ui_factory
 
122
        # silent ui - no need for stdout
 
123
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
124
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
125
        try:
 
126
            l2.break_lock()
 
127
        finally:
 
128
            bzrlib.ui.ui_factory = orig_factory
 
129
        try:
 
130
            l2.lock_write()
 
131
            l2.unlock()
 
132
        finally:
 
133
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
134
            self.assertFalse(self.lockable.is_locked())
 
135
 
 
136
    def test_lock_write_returns_None_refuses_token(self):
 
137
        token = self.lockable.lock_write()
 
138
        try:
 
139
            if token is not None:
 
140
                # This test does not apply, because this lockable supports
 
141
                # tokens.
 
142
                return
 
143
            self.assertRaises(errors.TokenLockingNotSupported,
 
144
                              self.lockable.lock_write, token='token')
 
145
        finally:
 
146
            self.lockable.unlock()
 
147
 
 
148
    def test_lock_write_raises_on_token_mismatch(self):
 
149
        token = self.lockable.lock_write()
 
150
        try:
 
151
            if token is None:
 
152
                # This test does not apply, because this lockable refuses
 
153
                # tokens.
 
154
                return
 
155
            different_token = token + 'xxx'
 
156
            # Re-using the same lockable instance with a different token will
 
157
            # raise TokenMismatch.
 
158
            self.assertRaises(errors.TokenMismatch,
 
159
                              self.lockable.lock_write, token=different_token)
 
160
            # A seperate instance for the same lockable will also raise
 
161
            # TokenMismatch.
 
162
            # This detects the case where a caller claims to have a lock (via
 
163
            # the token) for an external resource, but doesn't (the token is
 
164
            # different).  Clients need a seperate lock object to make sure the
 
165
            # external resource is probed, whereas the existing lock object
 
166
            # might cache.
 
167
            new_lockable = self.get_lockable()
 
168
            self.assertRaises(errors.TokenMismatch,
 
169
                              new_lockable.lock_write, token=different_token)
 
170
        finally:
 
171
            self.lockable.unlock()
 
172
 
 
173
    def test_lock_write_with_matching_token(self):
 
174
        # If the token matches, so no exception is raised by lock_write.
 
175
        token = self.lockable.lock_write()
 
176
        try:
 
177
            if token is None:
 
178
                # This test does not apply, because this lockable refuses
 
179
                # tokens.
 
180
                return
 
181
            # The same instance will accept a second lock_write if the specified
 
182
            # token matches.
 
183
            self.lockable.lock_write(token=token)
 
184
            self.lockable.unlock()
 
185
            # Calling lock_write on a new instance for the same lockable will
 
186
            # also succeed.
 
187
            new_lockable = self.get_lockable()
 
188
            new_lockable.lock_write(token=token)
 
189
            new_lockable.unlock()
 
190
        finally:
 
191
            self.lockable.unlock()
 
192
 
 
193
    def test_unlock_after_lock_write_with_token(self):
 
194
        # If lock_write did not physically acquire the lock (because it was
 
195
        # passed a token), then unlock should not physically release it.
 
196
        token = self.lockable.lock_write()
 
197
        try:
 
198
            if token is None:
 
199
                # This test does not apply, because this lockable refuses
 
200
                # tokens.
 
201
                return
 
202
            new_lockable = self.get_lockable()
 
203
            new_lockable.lock_write(token=token)
 
204
            new_lockable.unlock()
 
205
            self.assertTrue(self.lockable.get_physical_lock_status())
 
206
        finally:
 
207
            self.lockable.unlock()
 
208
 
 
209
    def test_lock_write_with_token_fails_when_unlocked(self):
 
210
        # Lock and unlock to get a superficially valid token.  This mimics a
 
211
        # likely programming error, where a caller accidentally tries to lock
 
212
        # with a token that is no longer valid (because the original lock was
 
213
        # released).
 
214
        token = self.lockable.lock_write()
 
215
        self.lockable.unlock()
 
216
        if token is None:
 
217
            # This test does not apply, because this lockable refuses
 
218
            # tokens.
 
219
            return
 
220
 
 
221
        self.assertRaises(errors.TokenMismatch,
 
222
                          self.lockable.lock_write, token=token)
 
223
 
 
224
    def test_lock_write_reenter_with_token(self):
 
225
        token = self.lockable.lock_write()
 
226
        try:
 
227
            if token is None:
 
228
                # This test does not apply, because this lockable refuses
 
229
                # tokens.
 
230
                return
 
231
            # Relock with a token.
 
232
            self.lockable.lock_write(token=token)
 
233
            self.lockable.unlock()
 
234
        finally:
 
235
            self.lockable.unlock()
 
236
        # The lock should be unlocked on disk.  Verify that with a new lock
 
237
        # instance.
 
238
        new_lockable = self.get_lockable()
 
239
        # Calling lock_write now should work, rather than raise LockContention.
 
240
        new_lockable.lock_write()
 
241
        new_lockable.unlock()
 
242
 
 
243
    def test_leave_in_place(self):
 
244
        token = self.lockable.lock_write()
 
245
        try:
 
246
            if token is None:
 
247
                # This test does not apply, because this lockable refuses
 
248
                # tokens.
 
249
                return
 
250
            self.lockable.leave_in_place()
 
251
        finally:
 
252
            self.lockable.unlock()
 
253
        # At this point, the lock is still in place on disk
 
254
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
255
        # But should be relockable with a token.
 
256
        self.lockable.lock_write(token=token)
 
257
        self.lockable.unlock()
 
258
 
 
259
    def test_dont_leave_in_place(self):
 
260
        token = self.lockable.lock_write()
 
261
        try:
 
262
            if token is None:
 
263
                # This test does not apply, because this lockable refuses
 
264
                # tokens.
 
265
                return
 
266
            self.lockable.leave_in_place()
 
267
        finally:
 
268
            self.lockable.unlock()
 
269
        # At this point, the lock is still in place on disk.
 
270
        # Acquire the existing lock with the token, and ask that it is removed
 
271
        # when this object unlocks, and unlock to trigger that removal.
 
272
        new_lockable = self.get_lockable()
 
273
        new_lockable.lock_write(token=token)
 
274
        new_lockable.dont_leave_in_place()
 
275
        new_lockable.unlock()
 
276
        # At this point, the lock is no longer on disk, so we can lock it.
 
277
        third_lockable = self.get_lockable()
 
278
        third_lockable.lock_write()
 
279
        third_lockable.unlock()
 
280
 
 
281
 
 
282
# This method of adapting tests to parameters is different to 
 
283
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
284
# case.  
 
285
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
286
                                      _TestLockableFiles_mixin):
 
287
 
 
288
    def setUp(self):
 
289
        TestCaseInTempDir.setUp(self)
 
290
        _TestLockableFiles_mixin.setUp(self)
 
291
        transport = get_transport('.')
 
292
        transport.mkdir('.bzr')
 
293
        self.sub_transport = transport.clone('.bzr')
 
294
        self.lockable = self.get_lockable()
 
295
        self.lockable.create_lock()
 
296
 
 
297
    def tearDown(self):
 
298
        super(TestLockableFiles_TransportLock, self).tearDown()
 
299
        # free the subtransport so that we do not get a 5 second
 
300
        # timeout due to the SFTP connection cache.
 
301
        del self.sub_transport
 
302
 
 
303
    def get_lockable(self):
 
304
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
305
        
 
306
 
 
307
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
308
                              _TestLockableFiles_mixin):
 
309
    """LockableFile tests run with LockDir underneath"""
 
310
 
 
311
    def setUp(self):
 
312
        TestCaseInTempDir.setUp(self)
 
313
        _TestLockableFiles_mixin.setUp(self)
 
314
        self.transport = get_transport('.')
 
315
        self.lockable = self.get_lockable()
 
316
        # the lock creation here sets mode - test_permissions on branch 
 
317
        # tests that implicitly, but it might be a good idea to factor 
 
318
        # out the mode checking logic and have it applied to loackable files
 
319
        # directly. RBC 20060418
 
320
        self.lockable.create_lock()
 
321
 
 
322
    def get_lockable(self):
 
323
        return LockableFiles(self.transport, 'my-lock', LockDir)
 
324
 
 
325
    def test_lock_created(self):
 
326
        self.assertTrue(self.transport.has('my-lock'))
 
327
        self.lockable.lock_write()
 
328
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
329
        self.lockable.unlock()
 
330
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
331
        self.assertTrue(self.transport.has('my-lock'))
 
332
 
 
333
 
 
334
    # TODO: Test the lockdir inherits the right file and directory permissions
 
335
    # from the LockableFiles.
 
336
        
 
337
 
 
338
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
339
                              _TestLockableFiles_mixin):
 
340
    """LockableFile tests run with RemoteLockDir on a branch."""
 
341
 
 
342
    def setUp(self):
 
343
        TestCaseWithSmartMedium.setUp(self)
 
344
        _TestLockableFiles_mixin.setUp(self)
 
345
        # can only get a RemoteLockDir with some RemoteObject...
 
346
        # use a branch as thats what we want. These mixin tests test the end
 
347
        # to end behaviour, so stubbing out the backend and simulating would
 
348
        # defeat the purpose. We test the protocol implementation separately
 
349
        # in test_remote and test_smart as usual.
 
350
        self.make_branch('foo')
 
351
        self.transport = get_transport('.')
 
352
        self.lockable = self.get_lockable()
 
353
 
 
354
    def get_lockable(self):
 
355
        # getting a new lockable involves opening a new instance of the branch
 
356
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
357
        return branch.control_files
 
358
 
 
359
    def test_lock_write_returns_None_refuses_token(self):
 
360
        # this test is not relevant for RemoteBranchLockableFiles as remote
 
361
        # locks are done directly from the remote branch object.
 
362
        return None
 
363
 
 
364
    def test_lock_write_raises_on_token_mismatch(self):
 
365
        # See test_lock_write_returns_None_refuses_token.
 
366
        return None
 
367
 
 
368
    def test_lock_write_with_matching_token(self):
 
369
        # See test_lock_write_returns_None_refuses_token.
 
370
        return None
 
371
 
 
372
    def test_unlock_after_lock_write_with_token(self):
 
373
        # See test_lock_write_returns_None_refuses_token.
 
374
        return None
 
375
 
 
376
    def test_lock_write_with_token_fails_when_unlocked(self):
 
377
        # See test_lock_write_returns_None_refuses_token.
 
378
        return None
 
379
 
 
380
    def test_lock_write_reenter_with_token(self):
 
381
        # See test_lock_write_returns_None_refuses_token.
 
382
        return None
 
383
 
 
384
    def test_leave_in_place(self):
 
385
        # See test_lock_write_returns_None_refuses_token.
 
386
        return None
 
387
 
 
388
    def test_dont_leave_in_place(self):
 
389
        # See test_lock_write_returns_None_refuses_token.
 
390
        return None