/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-07-20 08:56:45 UTC
  • mfrom: (4526.9.23 apply-inventory-delta)
  • Revision ID: pqm@pqm.ubuntu.com-20090720085645-54mtgybxua0yx6hw
(robertc) Add checks for inventory deltas which try to ensure that
        deltas that are not an exact fit are not applied. (Robert
        Collins, bug 397705, bug 367633)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib import (
 
21
    errors,
 
22
    lockdir,
 
23
    osutils,
 
24
    )
 
25
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
26
from bzrlib.lockable_files import LockableFiles, TransportLock
 
27
from bzrlib.symbol_versioning import (
 
28
    deprecated_in,
 
29
    )
 
30
from bzrlib.tests import (
 
31
    TestCaseInTempDir,
 
32
    TestNotApplicable,
 
33
    )
 
34
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
35
from bzrlib.tests.test_transactions import DummyWeave
 
36
from bzrlib.transactions import (PassThroughTransaction,
 
37
                                 ReadOnlyTransaction,
 
38
                                 WriteTransaction,
 
39
                                 )
 
40
from bzrlib.transport import get_transport
 
41
 
 
42
 
 
43
# these tests are applied in each parameterized suite for LockableFiles
 
44
#
 
45
# they use an old style of parameterization, but we want to remove this class
 
46
# so won't modernize them now. - mbp 20080430
 
47
class _TestLockableFiles_mixin(object):
 
48
 
 
49
    def test_read_write(self):
 
50
        self.assertRaises(NoSuchFile,
 
51
            self.applyDeprecated,
 
52
            deprecated_in((1, 5, 0)),
 
53
            self.lockable.get, 'foo')
 
54
        self.assertRaises(NoSuchFile,
 
55
            self.applyDeprecated,
 
56
            deprecated_in((1, 5, 0)),
 
57
            self.lockable.get_utf8, 'foo')
 
58
        self.lockable.lock_write()
 
59
        self.addCleanup(self.lockable.unlock)
 
60
        unicode_string = u'bar\u1234'
 
61
        self.assertEqual(4, len(unicode_string))
 
62
        byte_string = unicode_string.encode('utf-8')
 
63
        self.assertEqual(6, len(byte_string))
 
64
        self.assertRaises(UnicodeEncodeError,
 
65
            self.applyDeprecated,
 
66
            deprecated_in((1, 6, 0)),
 
67
            self.lockable.put, 'foo',
 
68
            StringIO(unicode_string))
 
69
        self.applyDeprecated(
 
70
            deprecated_in((1, 6, 0)),
 
71
            self.lockable.put,
 
72
            'foo', StringIO(byte_string))
 
73
        byte_stream = self.applyDeprecated(
 
74
            deprecated_in((1, 5, 0)),
 
75
            self.lockable.get,
 
76
            'foo')
 
77
        self.assertEqual(byte_string, byte_stream.read())
 
78
        unicode_stream = self.applyDeprecated(
 
79
            deprecated_in((1, 5, 0)),
 
80
            self.lockable.get_utf8,
 
81
            'foo')
 
82
        self.assertEqual(unicode_string,
 
83
            unicode_stream.read())
 
84
        self.assertRaises(BzrBadParameterNotString,
 
85
            self.applyDeprecated,
 
86
            deprecated_in((1, 6, 0)),
 
87
            self.lockable.put_utf8,
 
88
            'bar',
 
89
            StringIO(unicode_string))
 
90
        self.applyDeprecated(
 
91
            deprecated_in((1, 6, 0)),
 
92
            self.lockable.put_utf8,
 
93
            'bar',
 
94
            unicode_string)
 
95
        unicode_stream = self.applyDeprecated(
 
96
            deprecated_in((1, 5, 0)),
 
97
            self.lockable.get_utf8,
 
98
            'bar')
 
99
        self.assertEqual(unicode_string,
 
100
            unicode_stream.read())
 
101
        byte_stream = self.applyDeprecated(
 
102
            deprecated_in((1, 5, 0)),
 
103
            self.lockable.get,
 
104
            'bar')
 
105
        self.assertEqual(byte_string, byte_stream.read())
 
106
        self.applyDeprecated(
 
107
            deprecated_in((1, 6, 0)),
 
108
            self.lockable.put_bytes,
 
109
            'raw', 'raw\xffbytes')
 
110
        byte_stream = self.applyDeprecated(
 
111
            deprecated_in((1, 5, 0)),
 
112
            self.lockable.get,
 
113
            'raw')
 
114
        self.assertEqual('raw\xffbytes', byte_stream.read())
 
115
 
 
116
    def test_locks(self):
 
117
        self.lockable.lock_read()
 
118
        self.addCleanup(self.lockable.unlock)
 
119
        self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
 
120
                          StringIO('bar\u1234'))
 
121
 
 
122
    def test_transactions(self):
 
123
        self.assertIs(self.lockable.get_transaction().__class__,
 
124
                      PassThroughTransaction)
 
125
        self.lockable.lock_read()
 
126
        try:
 
127
            self.assertIs(self.lockable.get_transaction().__class__,
 
128
                          ReadOnlyTransaction)
 
129
        finally:
 
130
            self.lockable.unlock()
 
131
        self.assertIs(self.lockable.get_transaction().__class__,
 
132
                      PassThroughTransaction)
 
133
        self.lockable.lock_write()
 
134
        self.assertIs(self.lockable.get_transaction().__class__,
 
135
                      WriteTransaction)
 
136
        # check that finish is called:
 
137
        vf = DummyWeave('a')
 
138
        self.lockable.get_transaction().register_dirty(vf)
 
139
        self.lockable.unlock()
 
140
        self.assertTrue(vf.finished)
 
141
 
 
142
    def test__escape(self):
 
143
        self.assertEqual('%25', self.lockable._escape('%'))
 
144
 
 
145
    def test__escape_empty(self):
 
146
        self.assertEqual('', self.lockable._escape(''))
 
147
 
 
148
    def test_break_lock(self):
 
149
        # some locks are not breakable
 
150
        self.lockable.lock_write()
 
151
        try:
 
152
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
153
        except NotImplementedError:
 
154
            # this lock cannot be broken
 
155
            self.lockable.unlock()
 
156
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
 
157
        l2 = self.get_lockable()
 
158
        orig_factory = bzrlib.ui.ui_factory
 
159
        # silent ui - no need for stdout
 
160
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
161
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
162
        try:
 
163
            l2.break_lock()
 
164
        finally:
 
165
            bzrlib.ui.ui_factory = orig_factory
 
166
        try:
 
167
            l2.lock_write()
 
168
            l2.unlock()
 
169
        finally:
 
170
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
171
            self.assertFalse(self.lockable.is_locked())
 
172
 
 
173
    def test_lock_write_returns_None_refuses_token(self):
 
174
        token = self.lockable.lock_write()
 
175
        self.addCleanup(self.lockable.unlock)
 
176
        if token is not None:
 
177
            # This test does not apply, because this lockable supports
 
178
            # tokens.
 
179
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
 
180
        self.assertRaises(errors.TokenLockingNotSupported,
 
181
                          self.lockable.lock_write, token='token')
 
182
 
 
183
    def test_lock_write_returns_token_when_given_token(self):
 
184
        token = self.lockable.lock_write()
 
185
        self.addCleanup(self.lockable.unlock)
 
186
        if token is None:
 
187
            # This test does not apply, because this lockable refuses
 
188
            # tokens.
 
189
            return
 
190
        new_lockable = self.get_lockable()
 
191
        token_from_new_lockable = new_lockable.lock_write(token=token)
 
192
        self.addCleanup(new_lockable.unlock)
 
193
        self.assertEqual(token, token_from_new_lockable)
 
194
 
 
195
    def test_lock_write_raises_on_token_mismatch(self):
 
196
        token = self.lockable.lock_write()
 
197
        self.addCleanup(self.lockable.unlock)
 
198
        if token is None:
 
199
            # This test does not apply, because this lockable refuses
 
200
            # tokens.
 
201
            return
 
202
        different_token = token + 'xxx'
 
203
        # Re-using the same lockable instance with a different token will
 
204
        # raise TokenMismatch.
 
205
        self.assertRaises(errors.TokenMismatch,
 
206
                          self.lockable.lock_write, token=different_token)
 
207
        # A separate instance for the same lockable will also raise
 
208
        # TokenMismatch.
 
209
        # This detects the case where a caller claims to have a lock (via
 
210
        # the token) for an external resource, but doesn't (the token is
 
211
        # different).  Clients need a separate lock object to make sure the
 
212
        # external resource is probed, whereas the existing lock object
 
213
        # might cache.
 
214
        new_lockable = self.get_lockable()
 
215
        self.assertRaises(errors.TokenMismatch,
 
216
                          new_lockable.lock_write, token=different_token)
 
217
 
 
218
    def test_lock_write_with_matching_token(self):
 
219
        # If the token matches, so no exception is raised by lock_write.
 
220
        token = self.lockable.lock_write()
 
221
        self.addCleanup(self.lockable.unlock)
 
222
        if token is None:
 
223
            # This test does not apply, because this lockable refuses
 
224
            # tokens.
 
225
            return
 
226
        # The same instance will accept a second lock_write if the specified
 
227
        # token matches.
 
228
        self.lockable.lock_write(token=token)
 
229
        self.lockable.unlock()
 
230
        # Calling lock_write on a new instance for the same lockable will
 
231
        # also succeed.
 
232
        new_lockable = self.get_lockable()
 
233
        new_lockable.lock_write(token=token)
 
234
        new_lockable.unlock()
 
235
 
 
236
    def test_unlock_after_lock_write_with_token(self):
 
237
        # If lock_write did not physically acquire the lock (because it was
 
238
        # passed a token), then unlock should not physically release it.
 
239
        token = self.lockable.lock_write()
 
240
        self.addCleanup(self.lockable.unlock)
 
241
        if token is None:
 
242
            # This test does not apply, because this lockable refuses
 
243
            # tokens.
 
244
            return
 
245
        new_lockable = self.get_lockable()
 
246
        new_lockable.lock_write(token=token)
 
247
        new_lockable.unlock()
 
248
        self.assertTrue(self.lockable.get_physical_lock_status())
 
249
 
 
250
    def test_lock_write_with_token_fails_when_unlocked(self):
 
251
        # Lock and unlock to get a superficially valid token.  This mimics a
 
252
        # likely programming error, where a caller accidentally tries to lock
 
253
        # with a token that is no longer valid (because the original lock was
 
254
        # released).
 
255
        token = self.lockable.lock_write()
 
256
        self.lockable.unlock()
 
257
        if token is None:
 
258
            # This test does not apply, because this lockable refuses
 
259
            # tokens.
 
260
            return
 
261
 
 
262
        self.assertRaises(errors.TokenMismatch,
 
263
                          self.lockable.lock_write, token=token)
 
264
 
 
265
    def test_lock_write_reenter_with_token(self):
 
266
        token = self.lockable.lock_write()
 
267
        try:
 
268
            if token is None:
 
269
                # This test does not apply, because this lockable refuses
 
270
                # tokens.
 
271
                return
 
272
            # Relock with a token.
 
273
            token_from_reentry = self.lockable.lock_write(token=token)
 
274
            try:
 
275
                self.assertEqual(token, token_from_reentry)
 
276
            finally:
 
277
                self.lockable.unlock()
 
278
        finally:
 
279
            self.lockable.unlock()
 
280
        # The lock should be unlocked on disk.  Verify that with a new lock
 
281
        # instance.
 
282
        new_lockable = self.get_lockable()
 
283
        # Calling lock_write now should work, rather than raise LockContention.
 
284
        new_lockable.lock_write()
 
285
        new_lockable.unlock()
 
286
 
 
287
    def test_second_lock_write_returns_same_token(self):
 
288
        first_token = self.lockable.lock_write()
 
289
        try:
 
290
            if first_token is None:
 
291
                # This test does not apply, because this lockable refuses
 
292
                # tokens.
 
293
                return
 
294
            # Relock the already locked lockable.  It should return the same
 
295
            # token.
 
296
            second_token = self.lockable.lock_write()
 
297
            try:
 
298
                self.assertEqual(first_token, second_token)
 
299
            finally:
 
300
                self.lockable.unlock()
 
301
        finally:
 
302
            self.lockable.unlock()
 
303
 
 
304
    def test_leave_in_place(self):
 
305
        token = self.lockable.lock_write()
 
306
        try:
 
307
            if token is None:
 
308
                # This test does not apply, because this lockable refuses
 
309
                # tokens.
 
310
                return
 
311
            self.lockable.leave_in_place()
 
312
        finally:
 
313
            self.lockable.unlock()
 
314
        # At this point, the lock is still in place on disk
 
315
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
316
        # But should be relockable with a token.
 
317
        self.lockable.lock_write(token=token)
 
318
        self.lockable.unlock()
 
319
        # Cleanup: we should still be able to get the lock, but we restore the
 
320
        # behavior to clearing the lock when unlocking.
 
321
        self.lockable.lock_write(token=token)
 
322
        self.lockable.dont_leave_in_place()
 
323
        self.lockable.unlock()
 
324
 
 
325
    def test_dont_leave_in_place(self):
 
326
        token = self.lockable.lock_write()
 
327
        try:
 
328
            if token is None:
 
329
                # This test does not apply, because this lockable refuses
 
330
                # tokens.
 
331
                return
 
332
            self.lockable.leave_in_place()
 
333
        finally:
 
334
            self.lockable.unlock()
 
335
        # At this point, the lock is still in place on disk.
 
336
        # Acquire the existing lock with the token, and ask that it is removed
 
337
        # when this object unlocks, and unlock to trigger that removal.
 
338
        new_lockable = self.get_lockable()
 
339
        new_lockable.lock_write(token=token)
 
340
        new_lockable.dont_leave_in_place()
 
341
        new_lockable.unlock()
 
342
        # At this point, the lock is no longer on disk, so we can lock it.
 
343
        third_lockable = self.get_lockable()
 
344
        third_lockable.lock_write()
 
345
        third_lockable.unlock()
 
346
 
 
347
 
 
348
# This method of adapting tests to parameters is different to
 
349
# the TestProviderAdapters used elsewhere, but seems simpler for this
 
350
# case.
 
351
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
352
                                      _TestLockableFiles_mixin):
 
353
 
 
354
    def setUp(self):
 
355
        TestCaseInTempDir.setUp(self)
 
356
        transport = get_transport('.')
 
357
        transport.mkdir('.bzr')
 
358
        self.sub_transport = transport.clone('.bzr')
 
359
        self.lockable = self.get_lockable()
 
360
        self.lockable.create_lock()
 
361
 
 
362
    def tearDown(self):
 
363
        super(TestLockableFiles_TransportLock, self).tearDown()
 
364
        # free the subtransport so that we do not get a 5 second
 
365
        # timeout due to the SFTP connection cache.
 
366
        try:
 
367
            del self.sub_transport
 
368
        except AttributeError:
 
369
            pass
 
370
 
 
371
    def get_lockable(self):
 
372
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
373
 
 
374
 
 
375
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
376
                              _TestLockableFiles_mixin):
 
377
    """LockableFile tests run with LockDir underneath"""
 
378
 
 
379
    def setUp(self):
 
380
        TestCaseInTempDir.setUp(self)
 
381
        self.transport = get_transport('.')
 
382
        self.lockable = self.get_lockable()
 
383
        # the lock creation here sets mode - test_permissions on branch
 
384
        # tests that implicitly, but it might be a good idea to factor
 
385
        # out the mode checking logic and have it applied to loackable files
 
386
        # directly. RBC 20060418
 
387
        self.lockable.create_lock()
 
388
 
 
389
    def get_lockable(self):
 
390
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
 
391
 
 
392
    def test_lock_created(self):
 
393
        self.assertTrue(self.transport.has('my-lock'))
 
394
        self.lockable.lock_write()
 
395
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
396
        self.lockable.unlock()
 
397
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
398
        self.assertTrue(self.transport.has('my-lock'))
 
399
 
 
400
    def test__file_modes(self):
 
401
        self.transport.mkdir('readonly')
 
402
        osutils.make_readonly('readonly')
 
403
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
 
404
                                 lockdir.LockDir)
 
405
        # The directory mode should be read-write-execute for the current user
 
406
        self.assertEqual(00700, lockable._dir_mode & 00700)
 
407
        # Files should be read-write for the current user
 
408
        self.assertEqual(00600, lockable._file_mode & 00700)
 
409
 
 
410
 
 
411
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
412
                              _TestLockableFiles_mixin):
 
413
    """LockableFile tests run with RemoteLockDir on a branch."""
 
414
 
 
415
    def setUp(self):
 
416
        TestCaseWithSmartMedium.setUp(self)
 
417
        # can only get a RemoteLockDir with some RemoteObject...
 
418
        # use a branch as thats what we want. These mixin tests test the end
 
419
        # to end behaviour, so stubbing out the backend and simulating would
 
420
        # defeat the purpose. We test the protocol implementation separately
 
421
        # in test_remote and test_smart as usual.
 
422
        b = self.make_branch('foo')
 
423
        self.addCleanup(b.bzrdir.transport.disconnect)
 
424
        self.transport = get_transport('.')
 
425
        self.lockable = self.get_lockable()
 
426
 
 
427
    def get_lockable(self):
 
428
        # getting a new lockable involves opening a new instance of the branch
 
429
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
430
        self.addCleanup(branch.bzrdir.transport.disconnect)
 
431
        return branch.control_files