/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-22 16:28:47 UTC
  • mto: This revision was merged to the branch mainline in revision 4922.
  • Revision ID: john@arbash-meinel.com-20091222162847-tvnsc69to4l4uf5r
Implement a permute_for_extension helper.

Use it for all of the 'simple' extension permutations.
It basically permutes all tests in the current module, by setting TestCase.module.
Which works well for most of our extension tests. Some had more advanced
handling of permutations (extra permutations, custom vars, etc.)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib import (
 
21
    errors,
 
22
    lockdir,
 
23
    osutils,
 
24
    )
 
25
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
26
from bzrlib.lockable_files import LockableFiles, TransportLock
 
27
from bzrlib.symbol_versioning import (
 
28
    deprecated_in,
 
29
    )
 
30
from bzrlib.tests import (
 
31
    TestCaseInTempDir,
 
32
    TestNotApplicable,
 
33
    )
 
34
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
35
from bzrlib.tests.test_transactions import DummyWeave
 
36
from bzrlib.transactions import (PassThroughTransaction,
 
37
                                 ReadOnlyTransaction,
 
38
                                 WriteTransaction,
 
39
                                 )
 
40
from bzrlib.transport import get_transport
 
41
 
 
42
 
 
43
# these tests are applied in each parameterized suite for LockableFiles
 
44
#
 
45
# they use an old style of parameterization, but we want to remove this class
 
46
# so won't modernize them now. - mbp 20080430
 
47
class _TestLockableFiles_mixin(object):
 
48
 
 
49
    def test_transactions(self):
 
50
        self.assertIs(self.lockable.get_transaction().__class__,
 
51
                      PassThroughTransaction)
 
52
        self.lockable.lock_read()
 
53
        try:
 
54
            self.assertIs(self.lockable.get_transaction().__class__,
 
55
                          ReadOnlyTransaction)
 
56
        finally:
 
57
            self.lockable.unlock()
 
58
        self.assertIs(self.lockable.get_transaction().__class__,
 
59
                      PassThroughTransaction)
 
60
        self.lockable.lock_write()
 
61
        self.assertIs(self.lockable.get_transaction().__class__,
 
62
                      WriteTransaction)
 
63
        # check that finish is called:
 
64
        vf = DummyWeave('a')
 
65
        self.lockable.get_transaction().register_dirty(vf)
 
66
        self.lockable.unlock()
 
67
        self.assertTrue(vf.finished)
 
68
 
 
69
    def test__escape(self):
 
70
        self.assertEqual('%25', self.lockable._escape('%'))
 
71
 
 
72
    def test__escape_empty(self):
 
73
        self.assertEqual('', self.lockable._escape(''))
 
74
 
 
75
    def test_break_lock(self):
 
76
        # some locks are not breakable
 
77
        self.lockable.lock_write()
 
78
        try:
 
79
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
80
        except NotImplementedError:
 
81
            # this lock cannot be broken
 
82
            self.lockable.unlock()
 
83
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
 
84
        l2 = self.get_lockable()
 
85
        orig_factory = bzrlib.ui.ui_factory
 
86
        # silent ui - no need for stdout
 
87
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
88
        try:
 
89
            l2.break_lock()
 
90
        finally:
 
91
            bzrlib.ui.ui_factory = orig_factory
 
92
        try:
 
93
            l2.lock_write()
 
94
            l2.unlock()
 
95
        finally:
 
96
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
97
            self.assertFalse(self.lockable.is_locked())
 
98
 
 
99
    def test_lock_write_returns_None_refuses_token(self):
 
100
        token = self.lockable.lock_write()
 
101
        self.addCleanup(self.lockable.unlock)
 
102
        if token is not None:
 
103
            # This test does not apply, because this lockable supports
 
104
            # tokens.
 
105
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
 
106
        self.assertRaises(errors.TokenLockingNotSupported,
 
107
                          self.lockable.lock_write, token='token')
 
108
 
 
109
    def test_lock_write_returns_token_when_given_token(self):
 
110
        token = self.lockable.lock_write()
 
111
        self.addCleanup(self.lockable.unlock)
 
112
        if token is None:
 
113
            # This test does not apply, because this lockable refuses
 
114
            # tokens.
 
115
            return
 
116
        new_lockable = self.get_lockable()
 
117
        token_from_new_lockable = new_lockable.lock_write(token=token)
 
118
        self.addCleanup(new_lockable.unlock)
 
119
        self.assertEqual(token, token_from_new_lockable)
 
120
 
 
121
    def test_lock_write_raises_on_token_mismatch(self):
 
122
        token = self.lockable.lock_write()
 
123
        self.addCleanup(self.lockable.unlock)
 
124
        if token is None:
 
125
            # This test does not apply, because this lockable refuses
 
126
            # tokens.
 
127
            return
 
128
        different_token = token + 'xxx'
 
129
        # Re-using the same lockable instance with a different token will
 
130
        # raise TokenMismatch.
 
131
        self.assertRaises(errors.TokenMismatch,
 
132
                          self.lockable.lock_write, token=different_token)
 
133
        # A separate instance for the same lockable will also raise
 
134
        # TokenMismatch.
 
135
        # This detects the case where a caller claims to have a lock (via
 
136
        # the token) for an external resource, but doesn't (the token is
 
137
        # different).  Clients need a separate lock object to make sure the
 
138
        # external resource is probed, whereas the existing lock object
 
139
        # might cache.
 
140
        new_lockable = self.get_lockable()
 
141
        self.assertRaises(errors.TokenMismatch,
 
142
                          new_lockable.lock_write, token=different_token)
 
143
 
 
144
    def test_lock_write_with_matching_token(self):
 
145
        # If the token matches, so no exception is raised by lock_write.
 
146
        token = self.lockable.lock_write()
 
147
        self.addCleanup(self.lockable.unlock)
 
148
        if token is None:
 
149
            # This test does not apply, because this lockable refuses
 
150
            # tokens.
 
151
            return
 
152
        # The same instance will accept a second lock_write if the specified
 
153
        # token matches.
 
154
        self.lockable.lock_write(token=token)
 
155
        self.lockable.unlock()
 
156
        # Calling lock_write on a new instance for the same lockable will
 
157
        # also succeed.
 
158
        new_lockable = self.get_lockable()
 
159
        new_lockable.lock_write(token=token)
 
160
        new_lockable.unlock()
 
161
 
 
162
    def test_unlock_after_lock_write_with_token(self):
 
163
        # If lock_write did not physically acquire the lock (because it was
 
164
        # passed a token), then unlock should not physically release it.
 
165
        token = self.lockable.lock_write()
 
166
        self.addCleanup(self.lockable.unlock)
 
167
        if token is None:
 
168
            # This test does not apply, because this lockable refuses
 
169
            # tokens.
 
170
            return
 
171
        new_lockable = self.get_lockable()
 
172
        new_lockable.lock_write(token=token)
 
173
        new_lockable.unlock()
 
174
        self.assertTrue(self.lockable.get_physical_lock_status())
 
175
 
 
176
    def test_lock_write_with_token_fails_when_unlocked(self):
 
177
        # Lock and unlock to get a superficially valid token.  This mimics a
 
178
        # likely programming error, where a caller accidentally tries to lock
 
179
        # with a token that is no longer valid (because the original lock was
 
180
        # released).
 
181
        token = self.lockable.lock_write()
 
182
        self.lockable.unlock()
 
183
        if token is None:
 
184
            # This test does not apply, because this lockable refuses
 
185
            # tokens.
 
186
            return
 
187
 
 
188
        self.assertRaises(errors.TokenMismatch,
 
189
                          self.lockable.lock_write, token=token)
 
190
 
 
191
    def test_lock_write_reenter_with_token(self):
 
192
        token = self.lockable.lock_write()
 
193
        try:
 
194
            if token is None:
 
195
                # This test does not apply, because this lockable refuses
 
196
                # tokens.
 
197
                return
 
198
            # Relock with a token.
 
199
            token_from_reentry = self.lockable.lock_write(token=token)
 
200
            try:
 
201
                self.assertEqual(token, token_from_reentry)
 
202
            finally:
 
203
                self.lockable.unlock()
 
204
        finally:
 
205
            self.lockable.unlock()
 
206
        # The lock should be unlocked on disk.  Verify that with a new lock
 
207
        # instance.
 
208
        new_lockable = self.get_lockable()
 
209
        # Calling lock_write now should work, rather than raise LockContention.
 
210
        new_lockable.lock_write()
 
211
        new_lockable.unlock()
 
212
 
 
213
    def test_second_lock_write_returns_same_token(self):
 
214
        first_token = self.lockable.lock_write()
 
215
        try:
 
216
            if first_token is None:
 
217
                # This test does not apply, because this lockable refuses
 
218
                # tokens.
 
219
                return
 
220
            # Relock the already locked lockable.  It should return the same
 
221
            # token.
 
222
            second_token = self.lockable.lock_write()
 
223
            try:
 
224
                self.assertEqual(first_token, second_token)
 
225
            finally:
 
226
                self.lockable.unlock()
 
227
        finally:
 
228
            self.lockable.unlock()
 
229
 
 
230
    def test_leave_in_place(self):
 
231
        token = self.lockable.lock_write()
 
232
        try:
 
233
            if token is None:
 
234
                # This test does not apply, because this lockable refuses
 
235
                # tokens.
 
236
                return
 
237
            self.lockable.leave_in_place()
 
238
        finally:
 
239
            self.lockable.unlock()
 
240
        # At this point, the lock is still in place on disk
 
241
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
242
        # But should be relockable with a token.
 
243
        self.lockable.lock_write(token=token)
 
244
        self.lockable.unlock()
 
245
        # Cleanup: we should still be able to get the lock, but we restore the
 
246
        # behavior to clearing the lock when unlocking.
 
247
        self.lockable.lock_write(token=token)
 
248
        self.lockable.dont_leave_in_place()
 
249
        self.lockable.unlock()
 
250
 
 
251
    def test_dont_leave_in_place(self):
 
252
        token = self.lockable.lock_write()
 
253
        try:
 
254
            if token is None:
 
255
                # This test does not apply, because this lockable refuses
 
256
                # tokens.
 
257
                return
 
258
            self.lockable.leave_in_place()
 
259
        finally:
 
260
            self.lockable.unlock()
 
261
        # At this point, the lock is still in place on disk.
 
262
        # Acquire the existing lock with the token, and ask that it is removed
 
263
        # when this object unlocks, and unlock to trigger that removal.
 
264
        new_lockable = self.get_lockable()
 
265
        new_lockable.lock_write(token=token)
 
266
        new_lockable.dont_leave_in_place()
 
267
        new_lockable.unlock()
 
268
        # At this point, the lock is no longer on disk, so we can lock it.
 
269
        third_lockable = self.get_lockable()
 
270
        third_lockable.lock_write()
 
271
        third_lockable.unlock()
 
272
 
 
273
 
 
274
# This method of adapting tests to parameters is different to
 
275
# the TestProviderAdapters used elsewhere, but seems simpler for this
 
276
# case.
 
277
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
278
                                      _TestLockableFiles_mixin):
 
279
 
 
280
    def setUp(self):
 
281
        TestCaseInTempDir.setUp(self)
 
282
        transport = get_transport('.')
 
283
        transport.mkdir('.bzr')
 
284
        self.sub_transport = transport.clone('.bzr')
 
285
        self.lockable = self.get_lockable()
 
286
        self.lockable.create_lock()
 
287
 
 
288
    def tearDown(self):
 
289
        super(TestLockableFiles_TransportLock, self).tearDown()
 
290
        # free the subtransport so that we do not get a 5 second
 
291
        # timeout due to the SFTP connection cache.
 
292
        try:
 
293
            del self.sub_transport
 
294
        except AttributeError:
 
295
            pass
 
296
 
 
297
    def get_lockable(self):
 
298
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
299
 
 
300
 
 
301
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
302
                              _TestLockableFiles_mixin):
 
303
    """LockableFile tests run with LockDir underneath"""
 
304
 
 
305
    def setUp(self):
 
306
        TestCaseInTempDir.setUp(self)
 
307
        self.transport = get_transport('.')
 
308
        self.lockable = self.get_lockable()
 
309
        # the lock creation here sets mode - test_permissions on branch
 
310
        # tests that implicitly, but it might be a good idea to factor
 
311
        # out the mode checking logic and have it applied to loackable files
 
312
        # directly. RBC 20060418
 
313
        self.lockable.create_lock()
 
314
 
 
315
    def get_lockable(self):
 
316
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
 
317
 
 
318
    def test_lock_created(self):
 
319
        self.assertTrue(self.transport.has('my-lock'))
 
320
        self.lockable.lock_write()
 
321
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
322
        self.lockable.unlock()
 
323
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
324
        self.assertTrue(self.transport.has('my-lock'))
 
325
 
 
326
    def test__file_modes(self):
 
327
        self.transport.mkdir('readonly')
 
328
        osutils.make_readonly('readonly')
 
329
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
 
330
                                 lockdir.LockDir)
 
331
        # The directory mode should be read-write-execute for the current user
 
332
        self.assertEqual(00700, lockable._dir_mode & 00700)
 
333
        # Files should be read-write for the current user
 
334
        self.assertEqual(00600, lockable._file_mode & 00700)
 
335
 
 
336
 
 
337
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
338
                              _TestLockableFiles_mixin):
 
339
    """LockableFile tests run with RemoteLockDir on a branch."""
 
340
 
 
341
    def setUp(self):
 
342
        TestCaseWithSmartMedium.setUp(self)
 
343
        # can only get a RemoteLockDir with some RemoteObject...
 
344
        # use a branch as thats what we want. These mixin tests test the end
 
345
        # to end behaviour, so stubbing out the backend and simulating would
 
346
        # defeat the purpose. We test the protocol implementation separately
 
347
        # in test_remote and test_smart as usual.
 
348
        b = self.make_branch('foo')
 
349
        self.addCleanup(b.bzrdir.transport.disconnect)
 
350
        self.transport = get_transport('.')
 
351
        self.lockable = self.get_lockable()
 
352
 
 
353
    def get_lockable(self):
 
354
        # getting a new lockable involves opening a new instance of the branch
 
355
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
356
        self.addCleanup(branch.bzrdir.transport.disconnect)
 
357
        return branch.control_files