/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
23
from bzrlib.lockable_files import LockableFiles, TransportLock
 
24
from bzrlib import lockdir
 
25
from bzrlib.lockdir import LockDir
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
28
from bzrlib.tests.test_transactions import DummyWeave
 
29
from bzrlib.transactions import (PassThroughTransaction,
 
30
                                 ReadOnlyTransaction,
 
31
                                 WriteTransaction,
 
32
                                 )
 
33
from bzrlib.transport import get_transport
 
34
 
 
35
 
 
36
# these tests are applied in each parameterized suite for LockableFiles
 
37
class _TestLockableFiles_mixin(object):
 
38
 
 
39
    def setUp(self):
 
40
        self.reduceLockdirTimeout()
 
41
 
 
42
    def test_read_write(self):
 
43
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
44
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
45
        self.lockable.lock_write()
 
46
        try:
 
47
            unicode_string = u'bar\u1234'
 
48
            self.assertEqual(4, len(unicode_string))
 
49
            byte_string = unicode_string.encode('utf-8')
 
50
            self.assertEqual(6, len(byte_string))
 
51
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
 
52
                              StringIO(unicode_string))
 
53
            self.lockable.put('foo', StringIO(byte_string))
 
54
            self.assertEqual(byte_string,
 
55
                             self.lockable.get('foo').read())
 
56
            self.assertEqual(unicode_string,
 
57
                             self.lockable.get_utf8('foo').read())
 
58
            self.assertRaises(BzrBadParameterNotString,
 
59
                              self.lockable.put_utf8,
 
60
                              'bar',
 
61
                              StringIO(unicode_string)
 
62
                              )
 
63
            self.lockable.put_utf8('bar', unicode_string)
 
64
            self.assertEqual(unicode_string,
 
65
                             self.lockable.get_utf8('bar').read())
 
66
            self.assertEqual(byte_string,
 
67
                             self.lockable.get('bar').read())
 
68
            self.lockable.put_bytes('raw', 'raw\xffbytes')
 
69
            self.assertEqual('raw\xffbytes',
 
70
                             self.lockable.get('raw').read())
 
71
        finally:
 
72
            self.lockable.unlock()
 
73
 
 
74
    def test_locks(self):
 
75
        self.lockable.lock_read()
 
76
        try:
 
77
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
78
                              StringIO('bar\u1234'))
 
79
        finally:
 
80
            self.lockable.unlock()
 
81
 
 
82
    def test_transactions(self):
 
83
        self.assertIs(self.lockable.get_transaction().__class__,
 
84
                      PassThroughTransaction)
 
85
        self.lockable.lock_read()
 
86
        try:
 
87
            self.assertIs(self.lockable.get_transaction().__class__,
 
88
                          ReadOnlyTransaction)
 
89
        finally:
 
90
            self.lockable.unlock()
 
91
        self.assertIs(self.lockable.get_transaction().__class__,
 
92
                      PassThroughTransaction)
 
93
        self.lockable.lock_write()
 
94
        self.assertIs(self.lockable.get_transaction().__class__,
 
95
                      WriteTransaction)
 
96
        # check that finish is called:
 
97
        vf = DummyWeave('a')
 
98
        self.lockable.get_transaction().register_dirty(vf)
 
99
        self.lockable.unlock()
 
100
        self.assertTrue(vf.finished)
 
101
 
 
102
    def test__escape(self):
 
103
        self.assertEqual('%25', self.lockable._escape('%'))
 
104
        
 
105
    def test__escape_empty(self):
 
106
        self.assertEqual('', self.lockable._escape(''))
 
107
 
 
108
    def test_break_lock(self):
 
109
        # some locks are not breakable
 
110
        self.lockable.lock_write()
 
111
        try:
 
112
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
113
        except NotImplementedError:
 
114
            # this lock cannot be broken
 
115
            self.lockable.unlock()
 
116
            return
 
117
        l2 = self.get_lockable()
 
118
        orig_factory = bzrlib.ui.ui_factory
 
119
        # silent ui - no need for stdout
 
120
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
121
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
122
        try:
 
123
            l2.break_lock()
 
124
        finally:
 
125
            bzrlib.ui.ui_factory = orig_factory
 
126
        try:
 
127
            l2.lock_write()
 
128
            l2.unlock()
 
129
        finally:
 
130
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
131
            self.assertFalse(self.lockable.is_locked())
 
132
 
 
133
    def test_lock_write_returns_None_refuses_token(self):
 
134
        token = self.lockable.lock_write()
 
135
        try:
 
136
            if token is not None:
 
137
                # This test does not apply, because this lockable supports
 
138
                # tokens.
 
139
                return
 
140
            self.assertRaises(errors.TokenLockingNotSupported,
 
141
                              self.lockable.lock_write, token='token')
 
142
        finally:
 
143
            self.lockable.unlock()
 
144
 
 
145
    def test_lock_write_returns_token_when_given_token(self):
 
146
        token = self.lockable.lock_write()
 
147
        try:
 
148
            if token is None:
 
149
                # This test does not apply, because this lockable refuses
 
150
                # tokens.
 
151
                return
 
152
            new_lockable = self.get_lockable()
 
153
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
154
            try:
 
155
                self.assertEqual(token, token_from_new_lockable)
 
156
            finally:
 
157
                new_lockable.unlock()
 
158
        finally:
 
159
            self.lockable.unlock()
 
160
 
 
161
    def test_lock_write_raises_on_token_mismatch(self):
 
162
        token = self.lockable.lock_write()
 
163
        try:
 
164
            if token is None:
 
165
                # This test does not apply, because this lockable refuses
 
166
                # tokens.
 
167
                return
 
168
            different_token = token + 'xxx'
 
169
            # Re-using the same lockable instance with a different token will
 
170
            # raise TokenMismatch.
 
171
            self.assertRaises(errors.TokenMismatch,
 
172
                              self.lockable.lock_write, token=different_token)
 
173
            # A seperate instance for the same lockable will also raise
 
174
            # TokenMismatch.
 
175
            # This detects the case where a caller claims to have a lock (via
 
176
            # the token) for an external resource, but doesn't (the token is
 
177
            # different).  Clients need a seperate lock object to make sure the
 
178
            # external resource is probed, whereas the existing lock object
 
179
            # might cache.
 
180
            new_lockable = self.get_lockable()
 
181
            self.assertRaises(errors.TokenMismatch,
 
182
                              new_lockable.lock_write, token=different_token)
 
183
        finally:
 
184
            self.lockable.unlock()
 
185
 
 
186
    def test_lock_write_with_matching_token(self):
 
187
        # If the token matches, so no exception is raised by lock_write.
 
188
        token = self.lockable.lock_write()
 
189
        try:
 
190
            if token is None:
 
191
                # This test does not apply, because this lockable refuses
 
192
                # tokens.
 
193
                return
 
194
            # The same instance will accept a second lock_write if the specified
 
195
            # token matches.
 
196
            self.lockable.lock_write(token=token)
 
197
            self.lockable.unlock()
 
198
            # Calling lock_write on a new instance for the same lockable will
 
199
            # also succeed.
 
200
            new_lockable = self.get_lockable()
 
201
            new_lockable.lock_write(token=token)
 
202
            new_lockable.unlock()
 
203
        finally:
 
204
            self.lockable.unlock()
 
205
 
 
206
    def test_unlock_after_lock_write_with_token(self):
 
207
        # If lock_write did not physically acquire the lock (because it was
 
208
        # passed a token), then unlock should not physically release it.
 
209
        token = self.lockable.lock_write()
 
210
        try:
 
211
            if token is None:
 
212
                # This test does not apply, because this lockable refuses
 
213
                # tokens.
 
214
                return
 
215
            new_lockable = self.get_lockable()
 
216
            new_lockable.lock_write(token=token)
 
217
            new_lockable.unlock()
 
218
            self.assertTrue(self.lockable.get_physical_lock_status())
 
219
        finally:
 
220
            self.lockable.unlock()
 
221
 
 
222
    def test_lock_write_with_token_fails_when_unlocked(self):
 
223
        # Lock and unlock to get a superficially valid token.  This mimics a
 
224
        # likely programming error, where a caller accidentally tries to lock
 
225
        # with a token that is no longer valid (because the original lock was
 
226
        # released).
 
227
        token = self.lockable.lock_write()
 
228
        self.lockable.unlock()
 
229
        if token is None:
 
230
            # This test does not apply, because this lockable refuses
 
231
            # tokens.
 
232
            return
 
233
 
 
234
        self.assertRaises(errors.TokenMismatch,
 
235
                          self.lockable.lock_write, token=token)
 
236
 
 
237
    def test_lock_write_reenter_with_token(self):
 
238
        token = self.lockable.lock_write()
 
239
        try:
 
240
            if token is None:
 
241
                # This test does not apply, because this lockable refuses
 
242
                # tokens.
 
243
                return
 
244
            # Relock with a token.
 
245
            token_from_reentry = self.lockable.lock_write(token=token)
 
246
            try:
 
247
                self.assertEqual(token, token_from_reentry)
 
248
            finally:
 
249
                self.lockable.unlock()
 
250
        finally:
 
251
            self.lockable.unlock()
 
252
        # The lock should be unlocked on disk.  Verify that with a new lock
 
253
        # instance.
 
254
        new_lockable = self.get_lockable()
 
255
        # Calling lock_write now should work, rather than raise LockContention.
 
256
        new_lockable.lock_write()
 
257
        new_lockable.unlock()
 
258
 
 
259
    def test_leave_in_place(self):
 
260
        token = self.lockable.lock_write()
 
261
        try:
 
262
            if token is None:
 
263
                # This test does not apply, because this lockable refuses
 
264
                # tokens.
 
265
                return
 
266
            self.lockable.leave_in_place()
 
267
        finally:
 
268
            self.lockable.unlock()
 
269
        # At this point, the lock is still in place on disk
 
270
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
271
        # But should be relockable with a token.
 
272
        self.lockable.lock_write(token=token)
 
273
        self.lockable.unlock()
 
274
 
 
275
    def test_dont_leave_in_place(self):
 
276
        token = self.lockable.lock_write()
 
277
        try:
 
278
            if token is None:
 
279
                # This test does not apply, because this lockable refuses
 
280
                # tokens.
 
281
                return
 
282
            self.lockable.leave_in_place()
 
283
        finally:
 
284
            self.lockable.unlock()
 
285
        # At this point, the lock is still in place on disk.
 
286
        # Acquire the existing lock with the token, and ask that it is removed
 
287
        # when this object unlocks, and unlock to trigger that removal.
 
288
        new_lockable = self.get_lockable()
 
289
        new_lockable.lock_write(token=token)
 
290
        new_lockable.dont_leave_in_place()
 
291
        new_lockable.unlock()
 
292
        # At this point, the lock is no longer on disk, so we can lock it.
 
293
        third_lockable = self.get_lockable()
 
294
        third_lockable.lock_write()
 
295
        third_lockable.unlock()
 
296
 
 
297
 
 
298
# This method of adapting tests to parameters is different to 
 
299
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
300
# case.  
 
301
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
302
                                      _TestLockableFiles_mixin):
 
303
 
 
304
    def setUp(self):
 
305
        TestCaseInTempDir.setUp(self)
 
306
        _TestLockableFiles_mixin.setUp(self)
 
307
        transport = get_transport('.')
 
308
        transport.mkdir('.bzr')
 
309
        self.sub_transport = transport.clone('.bzr')
 
310
        self.lockable = self.get_lockable()
 
311
        self.lockable.create_lock()
 
312
 
 
313
    def tearDown(self):
 
314
        super(TestLockableFiles_TransportLock, self).tearDown()
 
315
        # free the subtransport so that we do not get a 5 second
 
316
        # timeout due to the SFTP connection cache.
 
317
        del self.sub_transport
 
318
 
 
319
    def get_lockable(self):
 
320
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
321
        
 
322
 
 
323
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
324
                              _TestLockableFiles_mixin):
 
325
    """LockableFile tests run with LockDir underneath"""
 
326
 
 
327
    def setUp(self):
 
328
        TestCaseInTempDir.setUp(self)
 
329
        _TestLockableFiles_mixin.setUp(self)
 
330
        self.transport = get_transport('.')
 
331
        self.lockable = self.get_lockable()
 
332
        # the lock creation here sets mode - test_permissions on branch 
 
333
        # tests that implicitly, but it might be a good idea to factor 
 
334
        # out the mode checking logic and have it applied to loackable files
 
335
        # directly. RBC 20060418
 
336
        self.lockable.create_lock()
 
337
 
 
338
    def get_lockable(self):
 
339
        return LockableFiles(self.transport, 'my-lock', LockDir)
 
340
 
 
341
    def test_lock_created(self):
 
342
        self.assertTrue(self.transport.has('my-lock'))
 
343
        self.lockable.lock_write()
 
344
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
345
        self.lockable.unlock()
 
346
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
347
        self.assertTrue(self.transport.has('my-lock'))
 
348
 
 
349
 
 
350
    # TODO: Test the lockdir inherits the right file and directory permissions
 
351
    # from the LockableFiles.
 
352
        
 
353
 
 
354
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
355
                              _TestLockableFiles_mixin):
 
356
    """LockableFile tests run with RemoteLockDir on a branch."""
 
357
 
 
358
    def setUp(self):
 
359
        TestCaseWithSmartMedium.setUp(self)
 
360
        _TestLockableFiles_mixin.setUp(self)
 
361
        # can only get a RemoteLockDir with some RemoteObject...
 
362
        # use a branch as thats what we want. These mixin tests test the end
 
363
        # to end behaviour, so stubbing out the backend and simulating would
 
364
        # defeat the purpose. We test the protocol implementation separately
 
365
        # in test_remote and test_smart as usual.
 
366
        self.make_branch('foo')
 
367
        self.transport = get_transport('.')
 
368
        self.lockable = self.get_lockable()
 
369
 
 
370
    def get_lockable(self):
 
371
        # getting a new lockable involves opening a new instance of the branch
 
372
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
373
        return branch.control_files