/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-04-01 06:48:38 UTC
  • mfrom: (2389.1.1 0.15-to-trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20070401064838-34903c7b0d0c8007
merge 0.15 back to dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
from StringIO import StringIO
18
18
 
19
19
import bzrlib
 
20
from bzrlib.branch import Branch
20
21
import bzrlib.errors as errors
21
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
22
23
from bzrlib.lockable_files import LockableFiles, TransportLock
124
125
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
125
126
            self.assertFalse(self.lockable.is_locked())
126
127
 
127
 
    def test_lock_write_returns_None_refuses_token(self):
128
 
        token = self.lockable.lock_write()
129
 
        try:
130
 
            if token is not None:
131
 
                # This test does not apply, because this lockable supports
132
 
                # tokens.
133
 
                return
134
 
            self.assertRaises(errors.TokenLockingNotSupported,
135
 
                              self.lockable.lock_write, token='token')
136
 
        finally:
137
 
            self.lockable.unlock()
138
 
 
139
 
    def test_lock_write_returns_token_when_given_token(self):
140
 
        token = self.lockable.lock_write()
141
 
        try:
142
 
            if token is None:
143
 
                # This test does not apply, because this lockable refuses
144
 
                # tokens.
145
 
                return
146
 
            new_lockable = self.get_lockable()
147
 
            token_from_new_lockable = new_lockable.lock_write(token=token)
148
 
            try:
149
 
                self.assertEqual(token, token_from_new_lockable)
150
 
            finally:
151
 
                new_lockable.unlock()
152
 
        finally:
153
 
            self.lockable.unlock()
154
 
 
155
 
    def test_lock_write_raises_on_token_mismatch(self):
156
 
        token = self.lockable.lock_write()
157
 
        try:
158
 
            if token is None:
159
 
                # This test does not apply, because this lockable refuses
160
 
                # tokens.
161
 
                return
162
 
            different_token = token + 'xxx'
163
 
            # Re-using the same lockable instance with a different token will
164
 
            # raise TokenMismatch.
165
 
            self.assertRaises(errors.TokenMismatch,
166
 
                              self.lockable.lock_write, token=different_token)
167
 
            # A seperate instance for the same lockable will also raise
168
 
            # TokenMismatch.
169
 
            # This detects the case where a caller claims to have a lock (via
170
 
            # the token) for an external resource, but doesn't (the token is
171
 
            # different).  Clients need a seperate lock object to make sure the
172
 
            # external resource is probed, whereas the existing lock object
173
 
            # might cache.
174
 
            new_lockable = self.get_lockable()
175
 
            self.assertRaises(errors.TokenMismatch,
176
 
                              new_lockable.lock_write, token=different_token)
177
 
        finally:
178
 
            self.lockable.unlock()
179
 
 
180
 
    def test_lock_write_with_matching_token(self):
181
 
        # If the token matches, so no exception is raised by lock_write.
182
 
        token = self.lockable.lock_write()
183
 
        try:
184
 
            if token is None:
185
 
                # This test does not apply, because this lockable refuses
186
 
                # tokens.
187
 
                return
188
 
            # The same instance will accept a second lock_write if the specified
189
 
            # token matches.
190
 
            self.lockable.lock_write(token=token)
191
 
            self.lockable.unlock()
192
 
            # Calling lock_write on a new instance for the same lockable will
193
 
            # also succeed.
194
 
            new_lockable = self.get_lockable()
195
 
            new_lockable.lock_write(token=token)
196
 
            new_lockable.unlock()
197
 
        finally:
198
 
            self.lockable.unlock()
199
 
 
200
 
    def test_unlock_after_lock_write_with_token(self):
201
 
        # If lock_write did not physically acquire the lock (because it was
202
 
        # passed a token), then unlock should not physically release it.
203
 
        token = self.lockable.lock_write()
204
 
        try:
205
 
            if token is None:
206
 
                # This test does not apply, because this lockable refuses
207
 
                # tokens.
208
 
                return
209
 
            new_lockable = self.get_lockable()
210
 
            new_lockable.lock_write(token=token)
211
 
            new_lockable.unlock()
212
 
            self.assertTrue(self.lockable.get_physical_lock_status())
213
 
        finally:
214
 
            self.lockable.unlock()
215
 
 
216
 
    def test_lock_write_with_token_fails_when_unlocked(self):
217
 
        # Lock and unlock to get a superficially valid token.  This mimics a
218
 
        # likely programming error, where a caller accidentally tries to lock
219
 
        # with a token that is no longer valid (because the original lock was
220
 
        # released).
221
 
        token = self.lockable.lock_write()
222
 
        self.lockable.unlock()
223
 
        if token is None:
224
 
            # This test does not apply, because this lockable refuses
225
 
            # tokens.
226
 
            return
227
 
 
228
 
        self.assertRaises(errors.TokenMismatch,
229
 
                          self.lockable.lock_write, token=token)
230
 
 
231
 
    def test_lock_write_reenter_with_token(self):
232
 
        token = self.lockable.lock_write()
233
 
        try:
234
 
            if token is None:
235
 
                # This test does not apply, because this lockable refuses
236
 
                # tokens.
237
 
                return
238
 
            # Relock with a token.
239
 
            token_from_reentry = self.lockable.lock_write(token=token)
240
 
            try:
241
 
                self.assertEqual(token, token_from_reentry)
242
 
            finally:
243
 
                self.lockable.unlock()
244
 
        finally:
245
 
            self.lockable.unlock()
246
 
        # The lock should be unlocked on disk.  Verify that with a new lock
247
 
        # instance.
248
 
        new_lockable = self.get_lockable()
249
 
        # Calling lock_write now should work, rather than raise LockContention.
250
 
        new_lockable.lock_write()
251
 
        new_lockable.unlock()
252
 
 
253
 
    def test_second_lock_write_returns_same_token(self):
254
 
        first_token = self.lockable.lock_write()
255
 
        try:
256
 
            if first_token is None:
257
 
                # This test does not apply, because this lockable refuses
258
 
                # tokens.
259
 
                return
260
 
            # Relock the already locked lockable.  It should return the same
261
 
            # token.
262
 
            second_token = self.lockable.lock_write()
263
 
            try:
264
 
                self.assertEqual(first_token, second_token)
265
 
            finally:
266
 
                self.lockable.unlock()
267
 
        finally:
268
 
            self.lockable.unlock()
269
 
 
270
 
    def test_leave_in_place(self):
271
 
        token = self.lockable.lock_write()
272
 
        try:
273
 
            if token is None:
274
 
                # This test does not apply, because this lockable refuses
275
 
                # tokens.
276
 
                return
277
 
            self.lockable.leave_in_place()
278
 
        finally:
279
 
            self.lockable.unlock()
280
 
        # At this point, the lock is still in place on disk
281
 
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
282
 
        # But should be relockable with a token.
283
 
        self.lockable.lock_write(token=token)
284
 
        self.lockable.unlock()
285
 
 
286
 
    def test_dont_leave_in_place(self):
287
 
        token = self.lockable.lock_write()
288
 
        try:
289
 
            if token is None:
290
 
                # This test does not apply, because this lockable refuses
291
 
                # tokens.
292
 
                return
293
 
            self.lockable.leave_in_place()
294
 
        finally:
295
 
            self.lockable.unlock()
296
 
        # At this point, the lock is still in place on disk.
297
 
        # Acquire the existing lock with the token, and ask that it is removed
298
 
        # when this object unlocks, and unlock to trigger that removal.
299
 
        new_lockable = self.get_lockable()
300
 
        new_lockable.lock_write(token=token)
301
 
        new_lockable.dont_leave_in_place()
302
 
        new_lockable.unlock()
303
 
        # At this point, the lock is no longer on disk, so we can lock it.
304
 
        third_lockable = self.get_lockable()
305
 
        third_lockable.lock_write()
306
 
        third_lockable.unlock()
307
 
 
308
128
 
309
129
# This method of adapting tests to parameters is different to 
310
130
# the TestProviderAdapters used elsewhere, but seems simpler for this 
313
133
                                      _TestLockableFiles_mixin):
314
134
 
315
135
    def setUp(self):
316
 
        TestCaseInTempDir.setUp(self)
 
136
        super(TestLockableFiles_TransportLock, self).setUp()
317
137
        transport = get_transport('.')
318
138
        transport.mkdir('.bzr')
319
139
        self.sub_transport = transport.clone('.bzr')
335
155
    """LockableFile tests run with LockDir underneath"""
336
156
 
337
157
    def setUp(self):
338
 
        TestCaseInTempDir.setUp(self)
 
158
        super(TestLockableFiles_LockDir, self).setUp()
339
159
        self.transport = get_transport('.')
340
160
        self.lockable = self.get_lockable()
341
161
        # the lock creation here sets mode - test_permissions on branch