/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Andrew Bennetts
  • Date: 2007-02-28 07:08:25 UTC
  • mfrom: (2018.13.1 hpss)
  • mto: (2018.5.80 hpss)
  • mto: This revision was merged to the branch mainline in revision 2435.
  • Revision ID: andrew.bennetts@canonical.com-20070228070825-q2dvkjb0a11ouhtx
Update to current hpss branch?  Fix lots of test failures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
23
from bzrlib.lockable_files import LockableFiles, TransportLock
 
24
from bzrlib import lockdir
 
25
from bzrlib.lockdir import LockDir
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
28
from bzrlib.tests.test_transactions import DummyWeave
 
29
from bzrlib.transactions import (PassThroughTransaction,
 
30
                                 ReadOnlyTransaction,
 
31
                                 WriteTransaction,
 
32
                                 )
 
33
from bzrlib.transport import get_transport
 
34
 
 
35
 
 
36
# these tests are applied in each parameterized suite for LockableFiles
 
37
class _TestLockableFiles_mixin(object):
 
38
 
 
39
    def setUp(self):
 
40
        self.reduceLockdirTimeout()
 
41
 
 
42
    def test_read_write(self):
 
43
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
44
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
45
        self.lockable.lock_write()
 
46
        try:
 
47
            unicode_string = u'bar\u1234'
 
48
            self.assertEqual(4, len(unicode_string))
 
49
            byte_string = unicode_string.encode('utf-8')
 
50
            self.assertEqual(6, len(byte_string))
 
51
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
 
52
                              StringIO(unicode_string))
 
53
            self.lockable.put('foo', StringIO(byte_string))
 
54
            self.assertEqual(byte_string,
 
55
                             self.lockable.get('foo').read())
 
56
            self.assertEqual(unicode_string,
 
57
                             self.lockable.get_utf8('foo').read())
 
58
            self.assertRaises(BzrBadParameterNotString,
 
59
                              self.lockable.put_utf8,
 
60
                              'bar',
 
61
                              StringIO(unicode_string)
 
62
                              )
 
63
            self.lockable.put_utf8('bar', unicode_string)
 
64
            self.assertEqual(unicode_string, 
 
65
                             self.lockable.get_utf8('bar').read())
 
66
            self.assertEqual(byte_string, 
 
67
                             self.lockable.get('bar').read())
 
68
        finally:
 
69
            self.lockable.unlock()
 
70
 
 
71
    def test_locks(self):
 
72
        self.lockable.lock_read()
 
73
        try:
 
74
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
75
                              StringIO('bar\u1234'))
 
76
        finally:
 
77
            self.lockable.unlock()
 
78
 
 
79
    def test_transactions(self):
 
80
        self.assertIs(self.lockable.get_transaction().__class__,
 
81
                      PassThroughTransaction)
 
82
        self.lockable.lock_read()
 
83
        try:
 
84
            self.assertIs(self.lockable.get_transaction().__class__,
 
85
                          ReadOnlyTransaction)
 
86
        finally:
 
87
            self.lockable.unlock()
 
88
        self.assertIs(self.lockable.get_transaction().__class__,
 
89
                      PassThroughTransaction)
 
90
        self.lockable.lock_write()
 
91
        self.assertIs(self.lockable.get_transaction().__class__,
 
92
                      WriteTransaction)
 
93
        # check that finish is called:
 
94
        vf = DummyWeave('a')
 
95
        self.lockable.get_transaction().register_dirty(vf)
 
96
        self.lockable.unlock()
 
97
        self.assertTrue(vf.finished)
 
98
 
 
99
    def test__escape(self):
 
100
        self.assertEqual('%25', self.lockable._escape('%'))
 
101
        
 
102
    def test__escape_empty(self):
 
103
        self.assertEqual('', self.lockable._escape(''))
 
104
 
 
105
    def test_break_lock(self):
 
106
        # some locks are not breakable
 
107
        self.lockable.lock_write()
 
108
        try:
 
109
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
110
        except NotImplementedError:
 
111
            # this lock cannot be broken
 
112
            self.lockable.unlock()
 
113
            return
 
114
        l2 = self.get_lockable()
 
115
        orig_factory = bzrlib.ui.ui_factory
 
116
        # silent ui - no need for stdout
 
117
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
118
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
119
        try:
 
120
            l2.break_lock()
 
121
        finally:
 
122
            bzrlib.ui.ui_factory = orig_factory
 
123
        try:
 
124
            l2.lock_write()
 
125
            l2.unlock()
 
126
        finally:
 
127
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
128
            self.assertFalse(self.lockable.is_locked())
 
129
 
 
130
    def test_lock_write_returns_None_refuses_token(self):
 
131
        token = self.lockable.lock_write()
 
132
        try:
 
133
            if token is not None:
 
134
                # This test does not apply, because this lockable supports
 
135
                # tokens.
 
136
                return
 
137
            self.assertRaises(errors.TokenLockingNotSupported,
 
138
                              self.lockable.lock_write, token='token')
 
139
        finally:
 
140
            self.lockable.unlock()
 
141
 
 
142
    def test_lock_write_returns_token_when_given_token(self):
 
143
        token = self.lockable.lock_write()
 
144
        try:
 
145
            if token is None:
 
146
                # This test does not apply, because this lockable refuses
 
147
                # tokens.
 
148
                return
 
149
            new_lockable = self.get_lockable()
 
150
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
151
            try:
 
152
                self.assertEqual(token, token_from_new_lockable)
 
153
            finally:
 
154
                new_lockable.unlock()
 
155
        finally:
 
156
            self.lockable.unlock()
 
157
 
 
158
    def test_lock_write_raises_on_token_mismatch(self):
 
159
        token = self.lockable.lock_write()
 
160
        try:
 
161
            if token is None:
 
162
                # This test does not apply, because this lockable refuses
 
163
                # tokens.
 
164
                return
 
165
            different_token = token + 'xxx'
 
166
            # Re-using the same lockable instance with a different token will
 
167
            # raise TokenMismatch.
 
168
            self.assertRaises(errors.TokenMismatch,
 
169
                              self.lockable.lock_write, token=different_token)
 
170
            # A seperate instance for the same lockable will also raise
 
171
            # TokenMismatch.
 
172
            # This detects the case where a caller claims to have a lock (via
 
173
            # the token) for an external resource, but doesn't (the token is
 
174
            # different).  Clients need a seperate lock object to make sure the
 
175
            # external resource is probed, whereas the existing lock object
 
176
            # might cache.
 
177
            new_lockable = self.get_lockable()
 
178
            self.assertRaises(errors.TokenMismatch,
 
179
                              new_lockable.lock_write, token=different_token)
 
180
        finally:
 
181
            self.lockable.unlock()
 
182
 
 
183
    def test_lock_write_with_matching_token(self):
 
184
        # If the token matches, so no exception is raised by lock_write.
 
185
        token = self.lockable.lock_write()
 
186
        try:
 
187
            if token is None:
 
188
                # This test does not apply, because this lockable refuses
 
189
                # tokens.
 
190
                return
 
191
            # The same instance will accept a second lock_write if the specified
 
192
            # token matches.
 
193
            self.lockable.lock_write(token=token)
 
194
            self.lockable.unlock()
 
195
            # Calling lock_write on a new instance for the same lockable will
 
196
            # also succeed.
 
197
            new_lockable = self.get_lockable()
 
198
            new_lockable.lock_write(token=token)
 
199
            new_lockable.unlock()
 
200
        finally:
 
201
            self.lockable.unlock()
 
202
 
 
203
    def test_unlock_after_lock_write_with_token(self):
 
204
        # If lock_write did not physically acquire the lock (because it was
 
205
        # passed a token), then unlock should not physically release it.
 
206
        token = self.lockable.lock_write()
 
207
        try:
 
208
            if token is None:
 
209
                # This test does not apply, because this lockable refuses
 
210
                # tokens.
 
211
                return
 
212
            new_lockable = self.get_lockable()
 
213
            new_lockable.lock_write(token=token)
 
214
            new_lockable.unlock()
 
215
            self.assertTrue(self.lockable.get_physical_lock_status())
 
216
        finally:
 
217
            self.lockable.unlock()
 
218
 
 
219
    def test_lock_write_with_token_fails_when_unlocked(self):
 
220
        # Lock and unlock to get a superficially valid token.  This mimics a
 
221
        # likely programming error, where a caller accidentally tries to lock
 
222
        # with a token that is no longer valid (because the original lock was
 
223
        # released).
 
224
        token = self.lockable.lock_write()
 
225
        self.lockable.unlock()
 
226
        if token is None:
 
227
            # This test does not apply, because this lockable refuses
 
228
            # tokens.
 
229
            return
 
230
 
 
231
        self.assertRaises(errors.TokenMismatch,
 
232
                          self.lockable.lock_write, token=token)
 
233
 
 
234
    def test_lock_write_reenter_with_token(self):
 
235
        token = self.lockable.lock_write()
 
236
        try:
 
237
            if token is None:
 
238
                # This test does not apply, because this lockable refuses
 
239
                # tokens.
 
240
                return
 
241
            # Relock with a token.
 
242
            token_from_reentry = self.lockable.lock_write(token=token)
 
243
            try:
 
244
                self.assertEqual(token, token_from_reentry)
 
245
            finally:
 
246
                self.lockable.unlock()
 
247
        finally:
 
248
            self.lockable.unlock()
 
249
        # The lock should be unlocked on disk.  Verify that with a new lock
 
250
        # instance.
 
251
        new_lockable = self.get_lockable()
 
252
        # Calling lock_write now should work, rather than raise LockContention.
 
253
        new_lockable.lock_write()
 
254
        new_lockable.unlock()
 
255
 
 
256
    def test_leave_in_place(self):
 
257
        token = self.lockable.lock_write()
 
258
        try:
 
259
            if token is None:
 
260
                # This test does not apply, because this lockable refuses
 
261
                # tokens.
 
262
                return
 
263
            self.lockable.leave_in_place()
 
264
        finally:
 
265
            self.lockable.unlock()
 
266
        # At this point, the lock is still in place on disk
 
267
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
268
        # But should be relockable with a token.
 
269
        self.lockable.lock_write(token=token)
 
270
        self.lockable.unlock()
 
271
 
 
272
    def test_dont_leave_in_place(self):
 
273
        token = self.lockable.lock_write()
 
274
        try:
 
275
            if token is None:
 
276
                # This test does not apply, because this lockable refuses
 
277
                # tokens.
 
278
                return
 
279
            self.lockable.leave_in_place()
 
280
        finally:
 
281
            self.lockable.unlock()
 
282
        # At this point, the lock is still in place on disk.
 
283
        # Acquire the existing lock with the token, and ask that it is removed
 
284
        # when this object unlocks, and unlock to trigger that removal.
 
285
        new_lockable = self.get_lockable()
 
286
        new_lockable.lock_write(token=token)
 
287
        new_lockable.dont_leave_in_place()
 
288
        new_lockable.unlock()
 
289
        # At this point, the lock is no longer on disk, so we can lock it.
 
290
        third_lockable = self.get_lockable()
 
291
        third_lockable.lock_write()
 
292
        third_lockable.unlock()
 
293
 
 
294
 
 
295
# This method of adapting tests to parameters is different to 
 
296
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
297
# case.  
 
298
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
299
                                      _TestLockableFiles_mixin):
 
300
 
 
301
    def setUp(self):
 
302
        TestCaseInTempDir.setUp(self)
 
303
        _TestLockableFiles_mixin.setUp(self)
 
304
        transport = get_transport('.')
 
305
        transport.mkdir('.bzr')
 
306
        self.sub_transport = transport.clone('.bzr')
 
307
        self.lockable = self.get_lockable()
 
308
        self.lockable.create_lock()
 
309
 
 
310
    def tearDown(self):
 
311
        super(TestLockableFiles_TransportLock, self).tearDown()
 
312
        # free the subtransport so that we do not get a 5 second
 
313
        # timeout due to the SFTP connection cache.
 
314
        del self.sub_transport
 
315
 
 
316
    def get_lockable(self):
 
317
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
318
        
 
319
 
 
320
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
321
                              _TestLockableFiles_mixin):
 
322
    """LockableFile tests run with LockDir underneath"""
 
323
 
 
324
    def setUp(self):
 
325
        TestCaseInTempDir.setUp(self)
 
326
        _TestLockableFiles_mixin.setUp(self)
 
327
        self.transport = get_transport('.')
 
328
        self.lockable = self.get_lockable()
 
329
        # the lock creation here sets mode - test_permissions on branch 
 
330
        # tests that implicitly, but it might be a good idea to factor 
 
331
        # out the mode checking logic and have it applied to loackable files
 
332
        # directly. RBC 20060418
 
333
        self.lockable.create_lock()
 
334
 
 
335
    def get_lockable(self):
 
336
        return LockableFiles(self.transport, 'my-lock', LockDir)
 
337
 
 
338
    def test_lock_created(self):
 
339
        self.assertTrue(self.transport.has('my-lock'))
 
340
        self.lockable.lock_write()
 
341
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
342
        self.lockable.unlock()
 
343
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
344
        self.assertTrue(self.transport.has('my-lock'))
 
345
 
 
346
 
 
347
    # TODO: Test the lockdir inherits the right file and directory permissions
 
348
    # from the LockableFiles.
 
349
        
 
350
 
 
351
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
352
                              _TestLockableFiles_mixin):
 
353
    """LockableFile tests run with RemoteLockDir on a branch."""
 
354
 
 
355
    def setUp(self):
 
356
        TestCaseWithSmartMedium.setUp(self)
 
357
        _TestLockableFiles_mixin.setUp(self)
 
358
        # can only get a RemoteLockDir with some RemoteObject...
 
359
        # use a branch as thats what we want. These mixin tests test the end
 
360
        # to end behaviour, so stubbing out the backend and simulating would
 
361
        # defeat the purpose. We test the protocol implementation separately
 
362
        # in test_remote and test_smart as usual.
 
363
        self.make_branch('foo')
 
364
        self.transport = get_transport('.')
 
365
        self.lockable = self.get_lockable()
 
366
 
 
367
    def get_lockable(self):
 
368
        # getting a new lockable involves opening a new instance of the branch
 
369
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
370
        return branch.control_files
 
371
 
 
372
    def test_lock_write_returns_None_refuses_token(self):
 
373
        # this test is not relevant for RemoteBranchLockableFiles as remote
 
374
        # locks are done directly from the remote branch object.
 
375
        return None
 
376
 
 
377
    def test_lock_write_raises_on_token_mismatch(self):
 
378
        # See test_lock_write_returns_None_refuses_token.
 
379
        return None
 
380
 
 
381
    def test_lock_write_with_matching_token(self):
 
382
        # See test_lock_write_returns_None_refuses_token.
 
383
        return None
 
384
 
 
385
    def test_unlock_after_lock_write_with_token(self):
 
386
        # See test_lock_write_returns_None_refuses_token.
 
387
        return None
 
388
 
 
389
    def test_lock_write_with_token_fails_when_unlocked(self):
 
390
        # See test_lock_write_returns_None_refuses_token.
 
391
        return None
 
392
 
 
393
    def test_lock_write_reenter_with_token(self):
 
394
        # See test_lock_write_returns_None_refuses_token.
 
395
        return None
 
396
 
 
397
    def test_leave_in_place(self):
 
398
        # See test_lock_write_returns_None_refuses_token.
 
399
        return None
 
400
 
 
401
    def test_dont_leave_in_place(self):
 
402
        # See test_lock_write_returns_None_refuses_token.
 
403
        return None
 
404
 
 
405
    def test_lock_write_returns_token_when_given_token(self):
 
406
        # See test_lock_write_returns_None_refuses_token.
 
407
        return None