/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: Michael Ellerman
  • Date: 2006-05-31 08:44:29 UTC
  • mto: (1711.2.63 jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1792.
  • Revision ID: michael@ellerman.id.au-20060531084429-35e5429abda9f560
Add optional location to ancestry and fix behaviour for checkouts.

This adds an optional location parameter to the ancestry command. It also
changes the behaviour of ancestry on checkouts such that if they have
been created with a subset of the branch history, only the subset is
shown by 'bzr ancestry'. Tests for all of that as well.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
from StringIO import StringIO
18
18
 
19
19
import bzrlib
20
 
from bzrlib import (
21
 
    errors,
22
 
    lockdir,
23
 
    osutils,
24
 
    )
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
25
22
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
26
23
from bzrlib.lockable_files import LockableFiles, TransportLock
27
 
from bzrlib.symbol_versioning import (
28
 
    deprecated_in,
29
 
    )
30
 
from bzrlib.tests import (
31
 
    TestCaseInTempDir,
32
 
    TestNotApplicable,
33
 
    )
34
 
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
24
from bzrlib.lockdir import LockDir
 
25
from bzrlib.tests import TestCaseInTempDir
35
26
from bzrlib.tests.test_transactions import DummyWeave
36
27
from bzrlib.transactions import (PassThroughTransaction,
37
28
                                 ReadOnlyTransaction,
41
32
 
42
33
 
43
34
# these tests are applied in each parameterized suite for LockableFiles
44
 
#
45
 
# they use an old style of parameterization, but we want to remove this class
46
 
# so won't modernize them now. - mbp 20080430
47
35
class _TestLockableFiles_mixin(object):
48
36
 
 
37
    def test_read_write(self):
 
38
        self.assertRaises(NoSuchFile, self.lockable.get, 'foo')
 
39
        self.assertRaises(NoSuchFile, self.lockable.get_utf8, 'foo')
 
40
        self.lockable.lock_write()
 
41
        try:
 
42
            unicode_string = u'bar\u1234'
 
43
            self.assertEqual(4, len(unicode_string))
 
44
            byte_string = unicode_string.encode('utf-8')
 
45
            self.assertEqual(6, len(byte_string))
 
46
            self.assertRaises(UnicodeEncodeError, self.lockable.put, 'foo', 
 
47
                              StringIO(unicode_string))
 
48
            self.lockable.put('foo', StringIO(byte_string))
 
49
            self.assertEqual(byte_string,
 
50
                             self.lockable.get('foo').read())
 
51
            self.assertEqual(unicode_string,
 
52
                             self.lockable.get_utf8('foo').read())
 
53
            self.assertRaises(BzrBadParameterNotString,
 
54
                              self.lockable.put_utf8,
 
55
                              'bar',
 
56
                              StringIO(unicode_string)
 
57
                              )
 
58
            self.lockable.put_utf8('bar', unicode_string)
 
59
            self.assertEqual(unicode_string, 
 
60
                             self.lockable.get_utf8('bar').read())
 
61
            self.assertEqual(byte_string, 
 
62
                             self.lockable.get('bar').read())
 
63
        finally:
 
64
            self.lockable.unlock()
 
65
 
 
66
    def test_locks(self):
 
67
        self.lockable.lock_read()
 
68
        try:
 
69
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo', 
 
70
                              StringIO('bar\u1234'))
 
71
        finally:
 
72
            self.lockable.unlock()
 
73
 
49
74
    def test_transactions(self):
50
75
        self.assertIs(self.lockable.get_transaction().__class__,
51
76
                      PassThroughTransaction)
68
93
 
69
94
    def test__escape(self):
70
95
        self.assertEqual('%25', self.lockable._escape('%'))
71
 
 
 
96
        
72
97
    def test__escape_empty(self):
73
98
        self.assertEqual('', self.lockable._escape(''))
74
99
 
80
105
        except NotImplementedError:
81
106
            # this lock cannot be broken
82
107
            self.lockable.unlock()
83
 
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
 
108
            return
84
109
        l2 = self.get_lockable()
85
110
        orig_factory = bzrlib.ui.ui_factory
86
111
        # silent ui - no need for stdout
87
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
112
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
113
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
88
114
        try:
89
115
            l2.break_lock()
90
116
        finally:
96
122
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
97
123
            self.assertFalse(self.lockable.is_locked())
98
124
 
99
 
    def test_lock_write_returns_None_refuses_token(self):
100
 
        token = self.lockable.lock_write()
101
 
        self.addCleanup(self.lockable.unlock)
102
 
        if token is not None:
103
 
            # This test does not apply, because this lockable supports
104
 
            # tokens.
105
 
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
106
 
        self.assertRaises(errors.TokenLockingNotSupported,
107
 
                          self.lockable.lock_write, token='token')
108
 
 
109
 
    def test_lock_write_returns_token_when_given_token(self):
110
 
        token = self.lockable.lock_write()
111
 
        self.addCleanup(self.lockable.unlock)
112
 
        if token is None:
113
 
            # This test does not apply, because this lockable refuses
114
 
            # tokens.
115
 
            return
116
 
        new_lockable = self.get_lockable()
117
 
        token_from_new_lockable = new_lockable.lock_write(token=token)
118
 
        self.addCleanup(new_lockable.unlock)
119
 
        self.assertEqual(token, token_from_new_lockable)
120
 
 
121
 
    def test_lock_write_raises_on_token_mismatch(self):
122
 
        token = self.lockable.lock_write()
123
 
        self.addCleanup(self.lockable.unlock)
124
 
        if token is None:
125
 
            # This test does not apply, because this lockable refuses
126
 
            # tokens.
127
 
            return
128
 
        different_token = token + 'xxx'
129
 
        # Re-using the same lockable instance with a different token will
130
 
        # raise TokenMismatch.
131
 
        self.assertRaises(errors.TokenMismatch,
132
 
                          self.lockable.lock_write, token=different_token)
133
 
        # A separate instance for the same lockable will also raise
134
 
        # TokenMismatch.
135
 
        # This detects the case where a caller claims to have a lock (via
136
 
        # the token) for an external resource, but doesn't (the token is
137
 
        # different).  Clients need a separate lock object to make sure the
138
 
        # external resource is probed, whereas the existing lock object
139
 
        # might cache.
140
 
        new_lockable = self.get_lockable()
141
 
        self.assertRaises(errors.TokenMismatch,
142
 
                          new_lockable.lock_write, token=different_token)
143
 
 
144
 
    def test_lock_write_with_matching_token(self):
145
 
        # If the token matches, so no exception is raised by lock_write.
146
 
        token = self.lockable.lock_write()
147
 
        self.addCleanup(self.lockable.unlock)
148
 
        if token is None:
149
 
            # This test does not apply, because this lockable refuses
150
 
            # tokens.
151
 
            return
152
 
        # The same instance will accept a second lock_write if the specified
153
 
        # token matches.
154
 
        self.lockable.lock_write(token=token)
155
 
        self.lockable.unlock()
156
 
        # Calling lock_write on a new instance for the same lockable will
157
 
        # also succeed.
158
 
        new_lockable = self.get_lockable()
159
 
        new_lockable.lock_write(token=token)
160
 
        new_lockable.unlock()
161
 
 
162
 
    def test_unlock_after_lock_write_with_token(self):
163
 
        # If lock_write did not physically acquire the lock (because it was
164
 
        # passed a token), then unlock should not physically release it.
165
 
        token = self.lockable.lock_write()
166
 
        self.addCleanup(self.lockable.unlock)
167
 
        if token is None:
168
 
            # This test does not apply, because this lockable refuses
169
 
            # tokens.
170
 
            return
171
 
        new_lockable = self.get_lockable()
172
 
        new_lockable.lock_write(token=token)
173
 
        new_lockable.unlock()
174
 
        self.assertTrue(self.lockable.get_physical_lock_status())
175
 
 
176
 
    def test_lock_write_with_token_fails_when_unlocked(self):
177
 
        # Lock and unlock to get a superficially valid token.  This mimics a
178
 
        # likely programming error, where a caller accidentally tries to lock
179
 
        # with a token that is no longer valid (because the original lock was
180
 
        # released).
181
 
        token = self.lockable.lock_write()
182
 
        self.lockable.unlock()
183
 
        if token is None:
184
 
            # This test does not apply, because this lockable refuses
185
 
            # tokens.
186
 
            return
187
 
 
188
 
        self.assertRaises(errors.TokenMismatch,
189
 
                          self.lockable.lock_write, token=token)
190
 
 
191
 
    def test_lock_write_reenter_with_token(self):
192
 
        token = self.lockable.lock_write()
193
 
        try:
194
 
            if token is None:
195
 
                # This test does not apply, because this lockable refuses
196
 
                # tokens.
197
 
                return
198
 
            # Relock with a token.
199
 
            token_from_reentry = self.lockable.lock_write(token=token)
200
 
            try:
201
 
                self.assertEqual(token, token_from_reentry)
202
 
            finally:
203
 
                self.lockable.unlock()
204
 
        finally:
205
 
            self.lockable.unlock()
206
 
        # The lock should be unlocked on disk.  Verify that with a new lock
207
 
        # instance.
208
 
        new_lockable = self.get_lockable()
209
 
        # Calling lock_write now should work, rather than raise LockContention.
210
 
        new_lockable.lock_write()
211
 
        new_lockable.unlock()
212
 
 
213
 
    def test_second_lock_write_returns_same_token(self):
214
 
        first_token = self.lockable.lock_write()
215
 
        try:
216
 
            if first_token is None:
217
 
                # This test does not apply, because this lockable refuses
218
 
                # tokens.
219
 
                return
220
 
            # Relock the already locked lockable.  It should return the same
221
 
            # token.
222
 
            second_token = self.lockable.lock_write()
223
 
            try:
224
 
                self.assertEqual(first_token, second_token)
225
 
            finally:
226
 
                self.lockable.unlock()
227
 
        finally:
228
 
            self.lockable.unlock()
229
 
 
230
 
    def test_leave_in_place(self):
231
 
        token = self.lockable.lock_write()
232
 
        try:
233
 
            if token is None:
234
 
                # This test does not apply, because this lockable refuses
235
 
                # tokens.
236
 
                return
237
 
            self.lockable.leave_in_place()
238
 
        finally:
239
 
            self.lockable.unlock()
240
 
        # At this point, the lock is still in place on disk
241
 
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
242
 
        # But should be relockable with a token.
243
 
        self.lockable.lock_write(token=token)
244
 
        self.lockable.unlock()
245
 
        # Cleanup: we should still be able to get the lock, but we restore the
246
 
        # behavior to clearing the lock when unlocking.
247
 
        self.lockable.lock_write(token=token)
248
 
        self.lockable.dont_leave_in_place()
249
 
        self.lockable.unlock()
250
 
 
251
 
    def test_dont_leave_in_place(self):
252
 
        token = self.lockable.lock_write()
253
 
        try:
254
 
            if token is None:
255
 
                # This test does not apply, because this lockable refuses
256
 
                # tokens.
257
 
                return
258
 
            self.lockable.leave_in_place()
259
 
        finally:
260
 
            self.lockable.unlock()
261
 
        # At this point, the lock is still in place on disk.
262
 
        # Acquire the existing lock with the token, and ask that it is removed
263
 
        # when this object unlocks, and unlock to trigger that removal.
264
 
        new_lockable = self.get_lockable()
265
 
        new_lockable.lock_write(token=token)
266
 
        new_lockable.dont_leave_in_place()
267
 
        new_lockable.unlock()
268
 
        # At this point, the lock is no longer on disk, so we can lock it.
269
 
        third_lockable = self.get_lockable()
270
 
        third_lockable.lock_write()
271
 
        third_lockable.unlock()
272
 
 
273
 
 
274
 
# This method of adapting tests to parameters is different to
275
 
# the TestProviderAdapters used elsewhere, but seems simpler for this
276
 
# case.
 
125
 
 
126
# This method of adapting tests to parameters is different to 
 
127
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
128
# case.  
277
129
class TestLockableFiles_TransportLock(TestCaseInTempDir,
278
130
                                      _TestLockableFiles_mixin):
279
131
 
280
132
    def setUp(self):
281
 
        TestCaseInTempDir.setUp(self)
 
133
        super(TestLockableFiles_TransportLock, self).setUp()
282
134
        transport = get_transport('.')
283
135
        transport.mkdir('.bzr')
284
136
        self.sub_transport = transport.clone('.bzr')
285
137
        self.lockable = self.get_lockable()
286
138
        self.lockable.create_lock()
287
139
 
288
 
    def stop_server(self):
289
 
        super(TestLockableFiles_TransportLock, self).stop_server()
 
140
    def tearDown(self):
 
141
        super(TestLockableFiles_TransportLock, self).tearDown()
290
142
        # free the subtransport so that we do not get a 5 second
291
143
        # timeout due to the SFTP connection cache.
292
 
        try:
293
 
            del self.sub_transport
294
 
        except AttributeError:
295
 
            pass
 
144
        del self.sub_transport
296
145
 
297
146
    def get_lockable(self):
298
147
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
299
 
 
 
148
        
300
149
 
301
150
class TestLockableFiles_LockDir(TestCaseInTempDir,
302
151
                              _TestLockableFiles_mixin):
303
152
    """LockableFile tests run with LockDir underneath"""
304
153
 
305
154
    def setUp(self):
306
 
        TestCaseInTempDir.setUp(self)
 
155
        super(TestLockableFiles_LockDir, self).setUp()
307
156
        self.transport = get_transport('.')
308
157
        self.lockable = self.get_lockable()
309
 
        # the lock creation here sets mode - test_permissions on branch
310
 
        # tests that implicitly, but it might be a good idea to factor
 
158
        # the lock creation here sets mode - test_permissions on branch 
 
159
        # tests that implicitly, but it might be a good idea to factor 
311
160
        # out the mode checking logic and have it applied to loackable files
312
161
        # directly. RBC 20060418
313
162
        self.lockable.create_lock()
314
163
 
315
164
    def get_lockable(self):
316
 
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
 
165
        return LockableFiles(self.transport, 'my-lock', LockDir)
317
166
 
318
167
    def test_lock_created(self):
319
168
        self.assertTrue(self.transport.has('my-lock'))
323
172
        self.assertFalse(self.transport.has('my-lock/held/info'))
324
173
        self.assertTrue(self.transport.has('my-lock'))
325
174
 
326
 
    def test__file_modes(self):
327
 
        self.transport.mkdir('readonly')
328
 
        osutils.make_readonly('readonly')
329
 
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
330
 
                                 lockdir.LockDir)
331
 
        # The directory mode should be read-write-execute for the current user
332
 
        self.assertEqual(00700, lockable._dir_mode & 00700)
333
 
        # Files should be read-write for the current user
334
 
        self.assertEqual(00600, lockable._file_mode & 00700)
335
 
 
336
 
 
337
 
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
338
 
                              _TestLockableFiles_mixin):
339
 
    """LockableFile tests run with RemoteLockDir on a branch."""
340
 
 
341
 
    def setUp(self):
342
 
        TestCaseWithSmartMedium.setUp(self)
343
 
        # can only get a RemoteLockDir with some RemoteObject...
344
 
        # use a branch as thats what we want. These mixin tests test the end
345
 
        # to end behaviour, so stubbing out the backend and simulating would
346
 
        # defeat the purpose. We test the protocol implementation separately
347
 
        # in test_remote and test_smart as usual.
348
 
        b = self.make_branch('foo')
349
 
        self.addCleanup(b.bzrdir.transport.disconnect)
350
 
        self.transport = get_transport('.')
351
 
        self.lockable = self.get_lockable()
352
 
 
353
 
    def get_lockable(self):
354
 
        # getting a new lockable involves opening a new instance of the branch
355
 
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
356
 
        self.addCleanup(branch.bzrdir.transport.disconnect)
357
 
        return branch.control_files
 
175
 
 
176
    # TODO: Test the lockdir inherits the right file and directory permissions
 
177
    # from the LockableFiles.