/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Robert Collins
  • Date: 2007-09-19 05:14:14 UTC
  • mto: (2835.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 2836.
  • Revision ID: robertc@robertcollins.net-20070919051414-2tgjqteg7k3ps4h0
* ``pull``, ``merge`` and ``push`` will no longer silently correct some
  repository index errors that occured as a result of the Weave disk format.
  Instead the ``reconcile`` command needs to be run to correct those
  problems if they exist (and it has been able to fix most such problems
  since bzr 0.8). Some new problems have been identified during this release
  and you should run ``bzr check`` once on every repository to see if you
  need to reconcile. If you cannot ``pull`` or ``merge`` from a remote
  repository due to mismatched parent errors - a symptom of index errors -
  you should simply take a full copy of that remote repository to a clean
  directory outside any local repositories, then run reconcile on it, and
  finally pull from it locally. (And naturally email the repositories owner
  to ask them to upgrade and run reconcile).
  (Robert Collins)

* ``VersionedFile.fix_parents`` has been removed as a harmful API.
  ``VersionedFile.join`` will no longer accept different parents on either
  side of a join - it will either ignore them, or error, depending on the
  implementation. See notes when upgrading for more information.
  (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
import bzrlib.errors as errors
 
21
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
22
from bzrlib.lockable_files import LockableFiles, TransportLock
 
23
from bzrlib import lockdir
 
24
from bzrlib.lockdir import LockDir
 
25
from bzrlib.tests import TestCaseInTempDir
 
26
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
27
from bzrlib.tests.test_transactions import DummyWeave
 
28
from bzrlib.transactions import (PassThroughTransaction,
 
29
                                 ReadOnlyTransaction,
 
30
                                 WriteTransaction,
 
31
                                 )
 
32
from bzrlib.transport import get_transport
 
33
 
 
34
 
 
35
# these tests are applied in each parameterized suite for LockableFiles
 
36
class _TestLockableFiles_mixin(object):
 
37
 
 
38
    def test_read_write(self):
 
39
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
40
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
41
        self.lockable.lock_write()
 
42
        try:
 
43
            unicode_string = u'bar\u1234'
 
44
            self.assertEqual(4, len(unicode_string))
 
45
            byte_string = unicode_string.encode('utf-8')
 
46
            self.assertEqual(6, len(byte_string))
 
47
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo',
 
48
                              StringIO(unicode_string))
 
49
            self.lockable.put('foo', StringIO(byte_string))
 
50
            self.assertEqual(byte_string,
 
51
                             self.lockable.get('foo').read())
 
52
            self.assertEqual(unicode_string,
 
53
                             self.lockable.get_utf8('foo').read())
 
54
            self.assertRaises(BzrBadParameterNotString,
 
55
                              self.lockable.put_utf8,
 
56
                              'bar',
 
57
                              StringIO(unicode_string)
 
58
                              )
 
59
            self.lockable.put_utf8('bar', unicode_string)
 
60
            self.assertEqual(unicode_string,
 
61
                             self.lockable.get_utf8('bar').read())
 
62
            self.assertEqual(byte_string,
 
63
                             self.lockable.get('bar').read())
 
64
            self.lockable.put_bytes('raw', 'raw\xffbytes')
 
65
            self.assertEqual('raw\xffbytes',
 
66
                             self.lockable.get('raw').read())
 
67
        finally:
 
68
            self.lockable.unlock()
 
69
 
 
70
    def test_locks(self):
 
71
        self.lockable.lock_read()
 
72
        try:
 
73
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
74
                              StringIO('bar\u1234'))
 
75
        finally:
 
76
            self.lockable.unlock()
 
77
 
 
78
    def test_transactions(self):
 
79
        self.assertIs(self.lockable.get_transaction().__class__,
 
80
                      PassThroughTransaction)
 
81
        self.lockable.lock_read()
 
82
        try:
 
83
            self.assertIs(self.lockable.get_transaction().__class__,
 
84
                          ReadOnlyTransaction)
 
85
        finally:
 
86
            self.lockable.unlock()
 
87
        self.assertIs(self.lockable.get_transaction().__class__,
 
88
                      PassThroughTransaction)
 
89
        self.lockable.lock_write()
 
90
        self.assertIs(self.lockable.get_transaction().__class__,
 
91
                      WriteTransaction)
 
92
        # check that finish is called:
 
93
        vf = DummyWeave('a')
 
94
        self.lockable.get_transaction().register_dirty(vf)
 
95
        self.lockable.unlock()
 
96
        self.assertTrue(vf.finished)
 
97
 
 
98
    def test__escape(self):
 
99
        self.assertEqual('%25', self.lockable._escape('%'))
 
100
        
 
101
    def test__escape_empty(self):
 
102
        self.assertEqual('', self.lockable._escape(''))
 
103
 
 
104
    def test_break_lock(self):
 
105
        # some locks are not breakable
 
106
        self.lockable.lock_write()
 
107
        try:
 
108
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
109
        except NotImplementedError:
 
110
            # this lock cannot be broken
 
111
            self.lockable.unlock()
 
112
            return
 
113
        l2 = self.get_lockable()
 
114
        orig_factory = bzrlib.ui.ui_factory
 
115
        # silent ui - no need for stdout
 
116
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
117
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
118
        try:
 
119
            l2.break_lock()
 
120
        finally:
 
121
            bzrlib.ui.ui_factory = orig_factory
 
122
        try:
 
123
            l2.lock_write()
 
124
            l2.unlock()
 
125
        finally:
 
126
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
127
            self.assertFalse(self.lockable.is_locked())
 
128
 
 
129
    def test_lock_write_returns_None_refuses_token(self):
 
130
        token = self.lockable.lock_write()
 
131
        try:
 
132
            if token is not None:
 
133
                # This test does not apply, because this lockable supports
 
134
                # tokens.
 
135
                return
 
136
            self.assertRaises(errors.TokenLockingNotSupported,
 
137
                              self.lockable.lock_write, token='token')
 
138
        finally:
 
139
            self.lockable.unlock()
 
140
 
 
141
    def test_lock_write_returns_token_when_given_token(self):
 
142
        token = self.lockable.lock_write()
 
143
        try:
 
144
            if token is None:
 
145
                # This test does not apply, because this lockable refuses
 
146
                # tokens.
 
147
                return
 
148
            new_lockable = self.get_lockable()
 
149
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
150
            try:
 
151
                self.assertEqual(token, token_from_new_lockable)
 
152
            finally:
 
153
                new_lockable.unlock()
 
154
        finally:
 
155
            self.lockable.unlock()
 
156
 
 
157
    def test_lock_write_raises_on_token_mismatch(self):
 
158
        token = self.lockable.lock_write()
 
159
        try:
 
160
            if token is None:
 
161
                # This test does not apply, because this lockable refuses
 
162
                # tokens.
 
163
                return
 
164
            different_token = token + 'xxx'
 
165
            # Re-using the same lockable instance with a different token will
 
166
            # raise TokenMismatch.
 
167
            self.assertRaises(errors.TokenMismatch,
 
168
                              self.lockable.lock_write, token=different_token)
 
169
            # A seperate instance for the same lockable will also raise
 
170
            # TokenMismatch.
 
171
            # This detects the case where a caller claims to have a lock (via
 
172
            # the token) for an external resource, but doesn't (the token is
 
173
            # different).  Clients need a seperate lock object to make sure the
 
174
            # external resource is probed, whereas the existing lock object
 
175
            # might cache.
 
176
            new_lockable = self.get_lockable()
 
177
            self.assertRaises(errors.TokenMismatch,
 
178
                              new_lockable.lock_write, token=different_token)
 
179
        finally:
 
180
            self.lockable.unlock()
 
181
 
 
182
    def test_lock_write_with_matching_token(self):
 
183
        # If the token matches, so no exception is raised by lock_write.
 
184
        token = self.lockable.lock_write()
 
185
        try:
 
186
            if token is None:
 
187
                # This test does not apply, because this lockable refuses
 
188
                # tokens.
 
189
                return
 
190
            # The same instance will accept a second lock_write if the specified
 
191
            # token matches.
 
192
            self.lockable.lock_write(token=token)
 
193
            self.lockable.unlock()
 
194
            # Calling lock_write on a new instance for the same lockable will
 
195
            # also succeed.
 
196
            new_lockable = self.get_lockable()
 
197
            new_lockable.lock_write(token=token)
 
198
            new_lockable.unlock()
 
199
        finally:
 
200
            self.lockable.unlock()
 
201
 
 
202
    def test_unlock_after_lock_write_with_token(self):
 
203
        # If lock_write did not physically acquire the lock (because it was
 
204
        # passed a token), then unlock should not physically release it.
 
205
        token = self.lockable.lock_write()
 
206
        try:
 
207
            if token is None:
 
208
                # This test does not apply, because this lockable refuses
 
209
                # tokens.
 
210
                return
 
211
            new_lockable = self.get_lockable()
 
212
            new_lockable.lock_write(token=token)
 
213
            new_lockable.unlock()
 
214
            self.assertTrue(self.lockable.get_physical_lock_status())
 
215
        finally:
 
216
            self.lockable.unlock()
 
217
 
 
218
    def test_lock_write_with_token_fails_when_unlocked(self):
 
219
        # Lock and unlock to get a superficially valid token.  This mimics a
 
220
        # likely programming error, where a caller accidentally tries to lock
 
221
        # with a token that is no longer valid (because the original lock was
 
222
        # released).
 
223
        token = self.lockable.lock_write()
 
224
        self.lockable.unlock()
 
225
        if token is None:
 
226
            # This test does not apply, because this lockable refuses
 
227
            # tokens.
 
228
            return
 
229
 
 
230
        self.assertRaises(errors.TokenMismatch,
 
231
                          self.lockable.lock_write, token=token)
 
232
 
 
233
    def test_lock_write_reenter_with_token(self):
 
234
        token = self.lockable.lock_write()
 
235
        try:
 
236
            if token is None:
 
237
                # This test does not apply, because this lockable refuses
 
238
                # tokens.
 
239
                return
 
240
            # Relock with a token.
 
241
            token_from_reentry = self.lockable.lock_write(token=token)
 
242
            try:
 
243
                self.assertEqual(token, token_from_reentry)
 
244
            finally:
 
245
                self.lockable.unlock()
 
246
        finally:
 
247
            self.lockable.unlock()
 
248
        # The lock should be unlocked on disk.  Verify that with a new lock
 
249
        # instance.
 
250
        new_lockable = self.get_lockable()
 
251
        # Calling lock_write now should work, rather than raise LockContention.
 
252
        new_lockable.lock_write()
 
253
        new_lockable.unlock()
 
254
 
 
255
    def test_second_lock_write_returns_same_token(self):
 
256
        first_token = self.lockable.lock_write()
 
257
        try:
 
258
            if first_token is None:
 
259
                # This test does not apply, because this lockable refuses
 
260
                # tokens.
 
261
                return
 
262
            # Relock the already locked lockable.  It should return the same
 
263
            # token.
 
264
            second_token = self.lockable.lock_write()
 
265
            try:
 
266
                self.assertEqual(first_token, second_token)
 
267
            finally:
 
268
                self.lockable.unlock()
 
269
        finally:
 
270
            self.lockable.unlock()
 
271
 
 
272
    def test_leave_in_place(self):
 
273
        token = self.lockable.lock_write()
 
274
        try:
 
275
            if token is None:
 
276
                # This test does not apply, because this lockable refuses
 
277
                # tokens.
 
278
                return
 
279
            self.lockable.leave_in_place()
 
280
        finally:
 
281
            self.lockable.unlock()
 
282
        # At this point, the lock is still in place on disk
 
283
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
284
        # But should be relockable with a token.
 
285
        self.lockable.lock_write(token=token)
 
286
        self.lockable.unlock()
 
287
 
 
288
    def test_dont_leave_in_place(self):
 
289
        token = self.lockable.lock_write()
 
290
        try:
 
291
            if token is None:
 
292
                # This test does not apply, because this lockable refuses
 
293
                # tokens.
 
294
                return
 
295
            self.lockable.leave_in_place()
 
296
        finally:
 
297
            self.lockable.unlock()
 
298
        # At this point, the lock is still in place on disk.
 
299
        # Acquire the existing lock with the token, and ask that it is removed
 
300
        # when this object unlocks, and unlock to trigger that removal.
 
301
        new_lockable = self.get_lockable()
 
302
        new_lockable.lock_write(token=token)
 
303
        new_lockable.dont_leave_in_place()
 
304
        new_lockable.unlock()
 
305
        # At this point, the lock is no longer on disk, so we can lock it.
 
306
        third_lockable = self.get_lockable()
 
307
        third_lockable.lock_write()
 
308
        third_lockable.unlock()
 
309
 
 
310
 
 
311
# This method of adapting tests to parameters is different to 
 
312
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
313
# case.  
 
314
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
315
                                      _TestLockableFiles_mixin):
 
316
 
 
317
    def setUp(self):
 
318
        TestCaseInTempDir.setUp(self)
 
319
        transport = get_transport('.')
 
320
        transport.mkdir('.bzr')
 
321
        self.sub_transport = transport.clone('.bzr')
 
322
        self.lockable = self.get_lockable()
 
323
        self.lockable.create_lock()
 
324
 
 
325
    def tearDown(self):
 
326
        super(TestLockableFiles_TransportLock, self).tearDown()
 
327
        # free the subtransport so that we do not get a 5 second
 
328
        # timeout due to the SFTP connection cache.
 
329
        del self.sub_transport
 
330
 
 
331
    def get_lockable(self):
 
332
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
333
        
 
334
 
 
335
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
336
                              _TestLockableFiles_mixin):
 
337
    """LockableFile tests run with LockDir underneath"""
 
338
 
 
339
    def setUp(self):
 
340
        TestCaseInTempDir.setUp(self)
 
341
        self.transport = get_transport('.')
 
342
        self.lockable = self.get_lockable()
 
343
        # the lock creation here sets mode - test_permissions on branch 
 
344
        # tests that implicitly, but it might be a good idea to factor 
 
345
        # out the mode checking logic and have it applied to loackable files
 
346
        # directly. RBC 20060418
 
347
        self.lockable.create_lock()
 
348
 
 
349
    def get_lockable(self):
 
350
        return LockableFiles(self.transport, 'my-lock', LockDir)
 
351
 
 
352
    def test_lock_created(self):
 
353
        self.assertTrue(self.transport.has('my-lock'))
 
354
        self.lockable.lock_write()
 
355
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
356
        self.lockable.unlock()
 
357
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
358
        self.assertTrue(self.transport.has('my-lock'))
 
359
 
 
360
 
 
361
    # TODO: Test the lockdir inherits the right file and directory permissions
 
362
    # from the LockableFiles.
 
363
        
 
364
 
 
365
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
366
                              _TestLockableFiles_mixin):
 
367
    """LockableFile tests run with RemoteLockDir on a branch."""
 
368
 
 
369
    def setUp(self):
 
370
        TestCaseWithSmartMedium.setUp(self)
 
371
        # can only get a RemoteLockDir with some RemoteObject...
 
372
        # use a branch as thats what we want. These mixin tests test the end
 
373
        # to end behaviour, so stubbing out the backend and simulating would
 
374
        # defeat the purpose. We test the protocol implementation separately
 
375
        # in test_remote and test_smart as usual.
 
376
        b = self.make_branch('foo')
 
377
        self.addCleanup(b.bzrdir.transport.disconnect)
 
378
        self.transport = get_transport('.')
 
379
        self.lockable = self.get_lockable()
 
380
 
 
381
    def get_lockable(self):
 
382
        # getting a new lockable involves opening a new instance of the branch
 
383
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
384
        self.addCleanup(branch.bzrdir.transport.disconnect)
 
385
        return branch.control_files