/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Robert Collins
  • Date: 2005-10-16 22:31:25 UTC
  • mto: This revision was merged to the branch mainline in revision 1458.
  • Revision ID: robertc@lifelesslap.robertcollins.net-20051016223125-26d4401cb94b7b82
Branch.relpath has been moved to WorkingTree.relpath.

WorkingTree no no longer takes an inventory, rather it takes an optional branch
parameter, and if None is given will open the branch at basedir implicitly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
from StringIO import StringIO
18
 
 
19
 
import bzrlib
20
 
from bzrlib import (
21
 
    errors,
22
 
    lockdir,
23
 
    osutils,
24
 
    )
25
 
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
26
 
from bzrlib.lockable_files import LockableFiles, TransportLock
27
 
from bzrlib.symbol_versioning import (
28
 
    deprecated_in,
29
 
    )
30
 
from bzrlib.tests import (
31
 
    TestCaseInTempDir,
32
 
    TestNotApplicable,
33
 
    )
34
 
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
35
 
from bzrlib.tests.test_transactions import DummyWeave
36
 
from bzrlib.transactions import (PassThroughTransaction,
37
 
                                 ReadOnlyTransaction,
38
 
                                 WriteTransaction,
39
 
                                 )
40
 
from bzrlib.transport import get_transport
41
 
 
42
 
 
43
 
# these tests are applied in each parameterized suite for LockableFiles
44
 
#
45
 
# they use an old style of parameterization, but we want to remove this class
46
 
# so won't modernize them now. - mbp 20080430
47
 
class _TestLockableFiles_mixin(object):
48
 
 
49
 
    def test_transactions(self):
50
 
        self.assertIs(self.lockable.get_transaction().__class__,
51
 
                      PassThroughTransaction)
52
 
        self.lockable.lock_read()
53
 
        try:
54
 
            self.assertIs(self.lockable.get_transaction().__class__,
55
 
                          ReadOnlyTransaction)
56
 
        finally:
57
 
            self.lockable.unlock()
58
 
        self.assertIs(self.lockable.get_transaction().__class__,
59
 
                      PassThroughTransaction)
60
 
        self.lockable.lock_write()
61
 
        self.assertIs(self.lockable.get_transaction().__class__,
62
 
                      WriteTransaction)
63
 
        # check that finish is called:
64
 
        vf = DummyWeave('a')
65
 
        self.lockable.get_transaction().register_dirty(vf)
66
 
        self.lockable.unlock()
67
 
        self.assertTrue(vf.finished)
68
 
 
69
 
    def test__escape(self):
70
 
        self.assertEqual('%25', self.lockable._escape('%'))
71
 
 
72
 
    def test__escape_empty(self):
73
 
        self.assertEqual('', self.lockable._escape(''))
74
 
 
75
 
    def test_break_lock(self):
76
 
        # some locks are not breakable
77
 
        self.lockable.lock_write()
78
 
        try:
79
 
            self.assertRaises(AssertionError, self.lockable.break_lock)
80
 
        except NotImplementedError:
81
 
            # this lock cannot be broken
82
 
            self.lockable.unlock()
83
 
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
84
 
        l2 = self.get_lockable()
85
 
        orig_factory = bzrlib.ui.ui_factory
86
 
        # silent ui - no need for stdout
87
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
88
 
        try:
89
 
            l2.break_lock()
90
 
        finally:
91
 
            bzrlib.ui.ui_factory = orig_factory
92
 
        try:
93
 
            l2.lock_write()
94
 
            l2.unlock()
95
 
        finally:
96
 
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
97
 
            self.assertFalse(self.lockable.is_locked())
98
 
 
99
 
    def test_lock_write_returns_None_refuses_token(self):
100
 
        token = self.lockable.lock_write()
101
 
        self.addCleanup(self.lockable.unlock)
102
 
        if token is not None:
103
 
            # This test does not apply, because this lockable supports
104
 
            # tokens.
105
 
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
106
 
        self.assertRaises(errors.TokenLockingNotSupported,
107
 
                          self.lockable.lock_write, token='token')
108
 
 
109
 
    def test_lock_write_returns_token_when_given_token(self):
110
 
        token = self.lockable.lock_write()
111
 
        self.addCleanup(self.lockable.unlock)
112
 
        if token is None:
113
 
            # This test does not apply, because this lockable refuses
114
 
            # tokens.
115
 
            return
116
 
        new_lockable = self.get_lockable()
117
 
        token_from_new_lockable = new_lockable.lock_write(token=token)
118
 
        self.addCleanup(new_lockable.unlock)
119
 
        self.assertEqual(token, token_from_new_lockable)
120
 
 
121
 
    def test_lock_write_raises_on_token_mismatch(self):
122
 
        token = self.lockable.lock_write()
123
 
        self.addCleanup(self.lockable.unlock)
124
 
        if token is None:
125
 
            # This test does not apply, because this lockable refuses
126
 
            # tokens.
127
 
            return
128
 
        different_token = token + 'xxx'
129
 
        # Re-using the same lockable instance with a different token will
130
 
        # raise TokenMismatch.
131
 
        self.assertRaises(errors.TokenMismatch,
132
 
                          self.lockable.lock_write, token=different_token)
133
 
        # A separate instance for the same lockable will also raise
134
 
        # TokenMismatch.
135
 
        # This detects the case where a caller claims to have a lock (via
136
 
        # the token) for an external resource, but doesn't (the token is
137
 
        # different).  Clients need a separate lock object to make sure the
138
 
        # external resource is probed, whereas the existing lock object
139
 
        # might cache.
140
 
        new_lockable = self.get_lockable()
141
 
        self.assertRaises(errors.TokenMismatch,
142
 
                          new_lockable.lock_write, token=different_token)
143
 
 
144
 
    def test_lock_write_with_matching_token(self):
145
 
        # If the token matches, so no exception is raised by lock_write.
146
 
        token = self.lockable.lock_write()
147
 
        self.addCleanup(self.lockable.unlock)
148
 
        if token is None:
149
 
            # This test does not apply, because this lockable refuses
150
 
            # tokens.
151
 
            return
152
 
        # The same instance will accept a second lock_write if the specified
153
 
        # token matches.
154
 
        self.lockable.lock_write(token=token)
155
 
        self.lockable.unlock()
156
 
        # Calling lock_write on a new instance for the same lockable will
157
 
        # also succeed.
158
 
        new_lockable = self.get_lockable()
159
 
        new_lockable.lock_write(token=token)
160
 
        new_lockable.unlock()
161
 
 
162
 
    def test_unlock_after_lock_write_with_token(self):
163
 
        # If lock_write did not physically acquire the lock (because it was
164
 
        # passed a token), then unlock should not physically release it.
165
 
        token = self.lockable.lock_write()
166
 
        self.addCleanup(self.lockable.unlock)
167
 
        if token is None:
168
 
            # This test does not apply, because this lockable refuses
169
 
            # tokens.
170
 
            return
171
 
        new_lockable = self.get_lockable()
172
 
        new_lockable.lock_write(token=token)
173
 
        new_lockable.unlock()
174
 
        self.assertTrue(self.lockable.get_physical_lock_status())
175
 
 
176
 
    def test_lock_write_with_token_fails_when_unlocked(self):
177
 
        # Lock and unlock to get a superficially valid token.  This mimics a
178
 
        # likely programming error, where a caller accidentally tries to lock
179
 
        # with a token that is no longer valid (because the original lock was
180
 
        # released).
181
 
        token = self.lockable.lock_write()
182
 
        self.lockable.unlock()
183
 
        if token is None:
184
 
            # This test does not apply, because this lockable refuses
185
 
            # tokens.
186
 
            return
187
 
 
188
 
        self.assertRaises(errors.TokenMismatch,
189
 
                          self.lockable.lock_write, token=token)
190
 
 
191
 
    def test_lock_write_reenter_with_token(self):
192
 
        token = self.lockable.lock_write()
193
 
        try:
194
 
            if token is None:
195
 
                # This test does not apply, because this lockable refuses
196
 
                # tokens.
197
 
                return
198
 
            # Relock with a token.
199
 
            token_from_reentry = self.lockable.lock_write(token=token)
200
 
            try:
201
 
                self.assertEqual(token, token_from_reentry)
202
 
            finally:
203
 
                self.lockable.unlock()
204
 
        finally:
205
 
            self.lockable.unlock()
206
 
        # The lock should be unlocked on disk.  Verify that with a new lock
207
 
        # instance.
208
 
        new_lockable = self.get_lockable()
209
 
        # Calling lock_write now should work, rather than raise LockContention.
210
 
        new_lockable.lock_write()
211
 
        new_lockable.unlock()
212
 
 
213
 
    def test_second_lock_write_returns_same_token(self):
214
 
        first_token = self.lockable.lock_write()
215
 
        try:
216
 
            if first_token is None:
217
 
                # This test does not apply, because this lockable refuses
218
 
                # tokens.
219
 
                return
220
 
            # Relock the already locked lockable.  It should return the same
221
 
            # token.
222
 
            second_token = self.lockable.lock_write()
223
 
            try:
224
 
                self.assertEqual(first_token, second_token)
225
 
            finally:
226
 
                self.lockable.unlock()
227
 
        finally:
228
 
            self.lockable.unlock()
229
 
 
230
 
    def test_leave_in_place(self):
231
 
        token = self.lockable.lock_write()
232
 
        try:
233
 
            if token is None:
234
 
                # This test does not apply, because this lockable refuses
235
 
                # tokens.
236
 
                return
237
 
            self.lockable.leave_in_place()
238
 
        finally:
239
 
            self.lockable.unlock()
240
 
        # At this point, the lock is still in place on disk
241
 
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
242
 
        # But should be relockable with a token.
243
 
        self.lockable.lock_write(token=token)
244
 
        self.lockable.unlock()
245
 
        # Cleanup: we should still be able to get the lock, but we restore the
246
 
        # behavior to clearing the lock when unlocking.
247
 
        self.lockable.lock_write(token=token)
248
 
        self.lockable.dont_leave_in_place()
249
 
        self.lockable.unlock()
250
 
 
251
 
    def test_dont_leave_in_place(self):
252
 
        token = self.lockable.lock_write()
253
 
        try:
254
 
            if token is None:
255
 
                # This test does not apply, because this lockable refuses
256
 
                # tokens.
257
 
                return
258
 
            self.lockable.leave_in_place()
259
 
        finally:
260
 
            self.lockable.unlock()
261
 
        # At this point, the lock is still in place on disk.
262
 
        # Acquire the existing lock with the token, and ask that it is removed
263
 
        # when this object unlocks, and unlock to trigger that removal.
264
 
        new_lockable = self.get_lockable()
265
 
        new_lockable.lock_write(token=token)
266
 
        new_lockable.dont_leave_in_place()
267
 
        new_lockable.unlock()
268
 
        # At this point, the lock is no longer on disk, so we can lock it.
269
 
        third_lockable = self.get_lockable()
270
 
        third_lockable.lock_write()
271
 
        third_lockable.unlock()
272
 
 
273
 
 
274
 
# This method of adapting tests to parameters is different to
275
 
# the TestProviderAdapters used elsewhere, but seems simpler for this
276
 
# case.
277
 
class TestLockableFiles_TransportLock(TestCaseInTempDir,
278
 
                                      _TestLockableFiles_mixin):
279
 
 
280
 
    def setUp(self):
281
 
        TestCaseInTempDir.setUp(self)
282
 
        transport = get_transport('.')
283
 
        transport.mkdir('.bzr')
284
 
        self.sub_transport = transport.clone('.bzr')
285
 
        self.lockable = self.get_lockable()
286
 
        self.lockable.create_lock()
287
 
 
288
 
    def stop_server(self):
289
 
        super(TestLockableFiles_TransportLock, self).stop_server()
290
 
        # free the subtransport so that we do not get a 5 second
291
 
        # timeout due to the SFTP connection cache.
292
 
        try:
293
 
            del self.sub_transport
294
 
        except AttributeError:
295
 
            pass
296
 
 
297
 
    def get_lockable(self):
298
 
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
299
 
 
300
 
 
301
 
class TestLockableFiles_LockDir(TestCaseInTempDir,
302
 
                              _TestLockableFiles_mixin):
303
 
    """LockableFile tests run with LockDir underneath"""
304
 
 
305
 
    def setUp(self):
306
 
        TestCaseInTempDir.setUp(self)
307
 
        self.transport = get_transport('.')
308
 
        self.lockable = self.get_lockable()
309
 
        # the lock creation here sets mode - test_permissions on branch
310
 
        # tests that implicitly, but it might be a good idea to factor
311
 
        # out the mode checking logic and have it applied to loackable files
312
 
        # directly. RBC 20060418
313
 
        self.lockable.create_lock()
314
 
 
315
 
    def get_lockable(self):
316
 
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
317
 
 
318
 
    def test_lock_created(self):
319
 
        self.assertTrue(self.transport.has('my-lock'))
320
 
        self.lockable.lock_write()
321
 
        self.assertTrue(self.transport.has('my-lock/held/info'))
322
 
        self.lockable.unlock()
323
 
        self.assertFalse(self.transport.has('my-lock/held/info'))
324
 
        self.assertTrue(self.transport.has('my-lock'))
325
 
 
326
 
    def test__file_modes(self):
327
 
        self.transport.mkdir('readonly')
328
 
        osutils.make_readonly('readonly')
329
 
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
330
 
                                 lockdir.LockDir)
331
 
        # The directory mode should be read-write-execute for the current user
332
 
        self.assertEqual(00700, lockable._dir_mode & 00700)
333
 
        # Files should be read-write for the current user
334
 
        self.assertEqual(00600, lockable._file_mode & 00700)
335
 
 
336
 
 
337
 
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
338
 
                              _TestLockableFiles_mixin):
339
 
    """LockableFile tests run with RemoteLockDir on a branch."""
340
 
 
341
 
    def setUp(self):
342
 
        TestCaseWithSmartMedium.setUp(self)
343
 
        # can only get a RemoteLockDir with some RemoteObject...
344
 
        # use a branch as thats what we want. These mixin tests test the end
345
 
        # to end behaviour, so stubbing out the backend and simulating would
346
 
        # defeat the purpose. We test the protocol implementation separately
347
 
        # in test_remote and test_smart as usual.
348
 
        b = self.make_branch('foo')
349
 
        self.addCleanup(b.bzrdir.transport.disconnect)
350
 
        self.transport = get_transport('.')
351
 
        self.lockable = self.get_lockable()
352
 
 
353
 
    def get_lockable(self):
354
 
        # getting a new lockable involves opening a new instance of the branch
355
 
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
356
 
        self.addCleanup(branch.bzrdir.transport.disconnect)
357
 
        return branch.control_files