/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Andrew Bennetts
  • Date: 2007-04-12 16:22:21 UTC
  • mto: (2018.18.11 hpss-faster-copy)
  • mto: This revision was merged to the branch mainline in revision 2435.
  • Revision ID: andrew.bennetts@canonical.com-20070412162221-6imfutzivlmg4rg5
Fix four tests I broke with the Branch.lock_write changes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
23
from bzrlib.lockable_files import LockableFiles, TransportLock
 
24
from bzrlib import lockdir
 
25
from bzrlib.lockdir import LockDir
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
28
from bzrlib.tests.test_transactions import DummyWeave
 
29
from bzrlib.transactions import (PassThroughTransaction,
 
30
                                 ReadOnlyTransaction,
 
31
                                 WriteTransaction,
 
32
                                 )
 
33
from bzrlib.transport import get_transport
 
34
 
 
35
 
 
36
# these tests are applied in each parameterized suite for LockableFiles
 
37
class _TestLockableFiles_mixin(object):
 
38
 
 
39
    def setUp(self):
 
40
        self.reduceLockdirTimeout()
 
41
 
 
42
    def test_read_write(self):
 
43
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
44
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
45
        self.lockable.lock_write()
 
46
        try:
 
47
            unicode_string = u'bar\u1234'
 
48
            self.assertEqual(4, len(unicode_string))
 
49
            byte_string = unicode_string.encode('utf-8')
 
50
            self.assertEqual(6, len(byte_string))
 
51
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
 
52
                              StringIO(unicode_string))
 
53
            self.lockable.put('foo', StringIO(byte_string))
 
54
            self.assertEqual(byte_string,
 
55
                             self.lockable.get('foo').read())
 
56
            self.assertEqual(unicode_string,
 
57
                             self.lockable.get_utf8('foo').read())
 
58
            self.assertRaises(BzrBadParameterNotString,
 
59
                              self.lockable.put_utf8,
 
60
                              'bar',
 
61
                              StringIO(unicode_string)
 
62
                              )
 
63
            self.lockable.put_utf8('bar', unicode_string)
 
64
            self.assertEqual(unicode_string,
 
65
                             self.lockable.get_utf8('bar').read())
 
66
            self.assertEqual(byte_string,
 
67
                             self.lockable.get('bar').read())
 
68
            self.lockable.put_bytes('raw', 'raw\xffbytes')
 
69
            self.assertEqual('raw\xffbytes',
 
70
                             self.lockable.get('raw').read())
 
71
        finally:
 
72
            self.lockable.unlock()
 
73
 
 
74
    def test_locks(self):
 
75
        self.lockable.lock_read()
 
76
        try:
 
77
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
78
                              StringIO('bar\u1234'))
 
79
        finally:
 
80
            self.lockable.unlock()
 
81
 
 
82
    def test_transactions(self):
 
83
        self.assertIs(self.lockable.get_transaction().__class__,
 
84
                      PassThroughTransaction)
 
85
        self.lockable.lock_read()
 
86
        try:
 
87
            self.assertIs(self.lockable.get_transaction().__class__,
 
88
                          ReadOnlyTransaction)
 
89
        finally:
 
90
            self.lockable.unlock()
 
91
        self.assertIs(self.lockable.get_transaction().__class__,
 
92
                      PassThroughTransaction)
 
93
        self.lockable.lock_write()
 
94
        self.assertIs(self.lockable.get_transaction().__class__,
 
95
                      WriteTransaction)
 
96
        # check that finish is called:
 
97
        vf = DummyWeave('a')
 
98
        self.lockable.get_transaction().register_dirty(vf)
 
99
        self.lockable.unlock()
 
100
        self.assertTrue(vf.finished)
 
101
 
 
102
    def test__escape(self):
 
103
        self.assertEqual('%25', self.lockable._escape('%'))
 
104
        
 
105
    def test__escape_empty(self):
 
106
        self.assertEqual('', self.lockable._escape(''))
 
107
 
 
108
    def test_break_lock(self):
 
109
        # some locks are not breakable
 
110
        self.lockable.lock_write()
 
111
        try:
 
112
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
113
        except NotImplementedError:
 
114
            # this lock cannot be broken
 
115
            self.lockable.unlock()
 
116
            return
 
117
        l2 = self.get_lockable()
 
118
        orig_factory = bzrlib.ui.ui_factory
 
119
        # silent ui - no need for stdout
 
120
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
121
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
122
        try:
 
123
            l2.break_lock()
 
124
        finally:
 
125
            bzrlib.ui.ui_factory = orig_factory
 
126
        try:
 
127
            l2.lock_write()
 
128
            l2.unlock()
 
129
        finally:
 
130
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
131
            self.assertFalse(self.lockable.is_locked())
 
132
 
 
133
    def test_lock_write_returns_None_refuses_token(self):
 
134
        token = self.lockable.lock_write()
 
135
        try:
 
136
            if token is not None:
 
137
                # This test does not apply, because this lockable supports
 
138
                # tokens.
 
139
                return
 
140
            self.assertRaises(errors.TokenLockingNotSupported,
 
141
                              self.lockable.lock_write, token='token')
 
142
        finally:
 
143
            self.lockable.unlock()
 
144
 
 
145
    def test_lock_write_returns_token_when_given_token(self):
 
146
        token = self.lockable.lock_write()
 
147
        try:
 
148
            if token is None:
 
149
                # This test does not apply, because this lockable refuses
 
150
                # tokens.
 
151
                return
 
152
            new_lockable = self.get_lockable()
 
153
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
154
            try:
 
155
                self.assertEqual(token, token_from_new_lockable)
 
156
            finally:
 
157
                new_lockable.unlock()
 
158
        finally:
 
159
            self.lockable.unlock()
 
160
 
 
161
    def test_lock_write_raises_on_token_mismatch(self):
 
162
        token = self.lockable.lock_write()
 
163
        try:
 
164
            if token is None:
 
165
                # This test does not apply, because this lockable refuses
 
166
                # tokens.
 
167
                return
 
168
            different_token = token + 'xxx'
 
169
            # Re-using the same lockable instance with a different token will
 
170
            # raise TokenMismatch.
 
171
            self.assertRaises(errors.TokenMismatch,
 
172
                              self.lockable.lock_write, token=different_token)
 
173
            # A seperate instance for the same lockable will also raise
 
174
            # TokenMismatch.
 
175
            # This detects the case where a caller claims to have a lock (via
 
176
            # the token) for an external resource, but doesn't (the token is
 
177
            # different).  Clients need a seperate lock object to make sure the
 
178
            # external resource is probed, whereas the existing lock object
 
179
            # might cache.
 
180
            new_lockable = self.get_lockable()
 
181
            self.assertRaises(errors.TokenMismatch,
 
182
                              new_lockable.lock_write, token=different_token)
 
183
        finally:
 
184
            self.lockable.unlock()
 
185
 
 
186
    def test_lock_write_with_matching_token(self):
 
187
        # If the token matches, so no exception is raised by lock_write.
 
188
        token = self.lockable.lock_write()
 
189
        try:
 
190
            if token is None:
 
191
                # This test does not apply, because this lockable refuses
 
192
                # tokens.
 
193
                return
 
194
            # The same instance will accept a second lock_write if the specified
 
195
            # token matches.
 
196
            self.lockable.lock_write(token=token)
 
197
            self.lockable.unlock()
 
198
            # Calling lock_write on a new instance for the same lockable will
 
199
            # also succeed.
 
200
            new_lockable = self.get_lockable()
 
201
            new_lockable.lock_write(token=token)
 
202
            new_lockable.unlock()
 
203
        finally:
 
204
            self.lockable.unlock()
 
205
 
 
206
    def test_unlock_after_lock_write_with_token(self):
 
207
        # If lock_write did not physically acquire the lock (because it was
 
208
        # passed a token), then unlock should not physically release it.
 
209
        token = self.lockable.lock_write()
 
210
        try:
 
211
            if token is None:
 
212
                # This test does not apply, because this lockable refuses
 
213
                # tokens.
 
214
                return
 
215
            new_lockable = self.get_lockable()
 
216
            new_lockable.lock_write(token=token)
 
217
            new_lockable.unlock()
 
218
            self.assertTrue(self.lockable.get_physical_lock_status())
 
219
        finally:
 
220
            self.lockable.unlock()
 
221
 
 
222
    def test_lock_write_with_token_fails_when_unlocked(self):
 
223
        # Lock and unlock to get a superficially valid token.  This mimics a
 
224
        # likely programming error, where a caller accidentally tries to lock
 
225
        # with a token that is no longer valid (because the original lock was
 
226
        # released).
 
227
        token = self.lockable.lock_write()
 
228
        self.lockable.unlock()
 
229
        if token is None:
 
230
            # This test does not apply, because this lockable refuses
 
231
            # tokens.
 
232
            return
 
233
 
 
234
        self.assertRaises(errors.TokenMismatch,
 
235
                          self.lockable.lock_write, token=token)
 
236
 
 
237
    def test_lock_write_reenter_with_token(self):
 
238
        token = self.lockable.lock_write()
 
239
        try:
 
240
            if token is None:
 
241
                # This test does not apply, because this lockable refuses
 
242
                # tokens.
 
243
                return
 
244
            # Relock with a token.
 
245
            token_from_reentry = self.lockable.lock_write(token=token)
 
246
            try:
 
247
                self.assertEqual(token, token_from_reentry)
 
248
            finally:
 
249
                self.lockable.unlock()
 
250
        finally:
 
251
            self.lockable.unlock()
 
252
        # The lock should be unlocked on disk.  Verify that with a new lock
 
253
        # instance.
 
254
        new_lockable = self.get_lockable()
 
255
        # Calling lock_write now should work, rather than raise LockContention.
 
256
        new_lockable.lock_write()
 
257
        new_lockable.unlock()
 
258
 
 
259
    def test_second_lock_write_returns_same_token(self):
 
260
        first_token = self.lockable.lock_write()
 
261
        try:
 
262
            if first_token is None:
 
263
                # This test does not apply, because this lockable refuses
 
264
                # tokens.
 
265
                return
 
266
            # Relock the already locked lockable.  It should return the same
 
267
            # token.
 
268
            second_token = self.lockable.lock_write()
 
269
            try:
 
270
                self.assertEqual(first_token, second_token)
 
271
            finally:
 
272
                self.lockable.unlock()
 
273
        finally:
 
274
            self.lockable.unlock()
 
275
 
 
276
    def test_leave_in_place(self):
 
277
        token = self.lockable.lock_write()
 
278
        try:
 
279
            if token is None:
 
280
                # This test does not apply, because this lockable refuses
 
281
                # tokens.
 
282
                return
 
283
            self.lockable.leave_in_place()
 
284
        finally:
 
285
            self.lockable.unlock()
 
286
        # At this point, the lock is still in place on disk
 
287
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
288
        # But should be relockable with a token.
 
289
        self.lockable.lock_write(token=token)
 
290
        self.lockable.unlock()
 
291
 
 
292
    def test_dont_leave_in_place(self):
 
293
        token = self.lockable.lock_write()
 
294
        try:
 
295
            if token is None:
 
296
                # This test does not apply, because this lockable refuses
 
297
                # tokens.
 
298
                return
 
299
            self.lockable.leave_in_place()
 
300
        finally:
 
301
            self.lockable.unlock()
 
302
        # At this point, the lock is still in place on disk.
 
303
        # Acquire the existing lock with the token, and ask that it is removed
 
304
        # when this object unlocks, and unlock to trigger that removal.
 
305
        new_lockable = self.get_lockable()
 
306
        new_lockable.lock_write(token=token)
 
307
        new_lockable.dont_leave_in_place()
 
308
        new_lockable.unlock()
 
309
        # At this point, the lock is no longer on disk, so we can lock it.
 
310
        third_lockable = self.get_lockable()
 
311
        third_lockable.lock_write()
 
312
        third_lockable.unlock()
 
313
 
 
314
 
 
315
# This method of adapting tests to parameters is different to 
 
316
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
317
# case.  
 
318
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
319
                                      _TestLockableFiles_mixin):
 
320
 
 
321
    def setUp(self):
 
322
        TestCaseInTempDir.setUp(self)
 
323
        _TestLockableFiles_mixin.setUp(self)
 
324
        transport = get_transport('.')
 
325
        transport.mkdir('.bzr')
 
326
        self.sub_transport = transport.clone('.bzr')
 
327
        self.lockable = self.get_lockable()
 
328
        self.lockable.create_lock()
 
329
 
 
330
    def tearDown(self):
 
331
        super(TestLockableFiles_TransportLock, self).tearDown()
 
332
        # free the subtransport so that we do not get a 5 second
 
333
        # timeout due to the SFTP connection cache.
 
334
        del self.sub_transport
 
335
 
 
336
    def get_lockable(self):
 
337
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
338
        
 
339
 
 
340
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
341
                              _TestLockableFiles_mixin):
 
342
    """LockableFile tests run with LockDir underneath"""
 
343
 
 
344
    def setUp(self):
 
345
        TestCaseInTempDir.setUp(self)
 
346
        _TestLockableFiles_mixin.setUp(self)
 
347
        self.transport = get_transport('.')
 
348
        self.lockable = self.get_lockable()
 
349
        # the lock creation here sets mode - test_permissions on branch 
 
350
        # tests that implicitly, but it might be a good idea to factor 
 
351
        # out the mode checking logic and have it applied to loackable files
 
352
        # directly. RBC 20060418
 
353
        self.lockable.create_lock()
 
354
 
 
355
    def get_lockable(self):
 
356
        return LockableFiles(self.transport, 'my-lock', LockDir)
 
357
 
 
358
    def test_lock_created(self):
 
359
        self.assertTrue(self.transport.has('my-lock'))
 
360
        self.lockable.lock_write()
 
361
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
362
        self.lockable.unlock()
 
363
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
364
        self.assertTrue(self.transport.has('my-lock'))
 
365
 
 
366
 
 
367
    # TODO: Test the lockdir inherits the right file and directory permissions
 
368
    # from the LockableFiles.
 
369
        
 
370
 
 
371
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
372
                              _TestLockableFiles_mixin):
 
373
    """LockableFile tests run with RemoteLockDir on a branch."""
 
374
 
 
375
    def setUp(self):
 
376
        TestCaseWithSmartMedium.setUp(self)
 
377
        _TestLockableFiles_mixin.setUp(self)
 
378
        # can only get a RemoteLockDir with some RemoteObject...
 
379
        # use a branch as thats what we want. These mixin tests test the end
 
380
        # to end behaviour, so stubbing out the backend and simulating would
 
381
        # defeat the purpose. We test the protocol implementation separately
 
382
        # in test_remote and test_smart as usual.
 
383
        self.make_branch('foo')
 
384
        self.transport = get_transport('.')
 
385
        self.lockable = self.get_lockable()
 
386
 
 
387
    def get_lockable(self):
 
388
        # getting a new lockable involves opening a new instance of the branch
 
389
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
390
        return branch.control_files