/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-18 22:34:21 UTC
  • mto: (3606.5.6 1.6)
  • mto: This revision was merged to the branch mainline in revision 3641.
  • Revision ID: john@arbash-meinel.com-20080818223421-todjny24vj4faj4t
Add tests for the fetching behavior.

The proper parameter passed is 'unordered' add an assert for it, and
fix callers that were passing 'unsorted' instead.
Add tests that we make the right get_record_stream call based
on the value of _fetch_uses_deltas.
Fix the fetch request for signatures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from StringIO import StringIO
 
18
 
 
19
import bzrlib
 
20
from bzrlib import (
 
21
    errors,
 
22
    lockdir,
 
23
    osutils,
 
24
    )
 
25
from bzrlib.errors import BzrBadParameterNotString, NoSuchFile, ReadOnlyError
 
26
from bzrlib.lockable_files import LockableFiles, TransportLock
 
27
from bzrlib.symbol_versioning import (
 
28
    deprecated_in,
 
29
    )
 
30
from bzrlib.tests import (
 
31
    TestCaseInTempDir,
 
32
    TestNotApplicable,
 
33
    )
 
34
from bzrlib.tests.test_smart import TestCaseWithSmartMedium
 
35
from bzrlib.tests.test_transactions import DummyWeave
 
36
from bzrlib.transactions import (PassThroughTransaction,
 
37
                                 ReadOnlyTransaction,
 
38
                                 WriteTransaction,
 
39
                                 )
 
40
from bzrlib.transport import get_transport
 
41
 
 
42
 
 
43
# these tests are applied in each parameterized suite for LockableFiles
 
44
#
 
45
# they use an old style of parameterization, but we want to remove this class
 
46
# so won't modernize them now. - mbp 20080430
 
47
class _TestLockableFiles_mixin(object):
 
48
 
 
49
    def test_read_write(self):
 
50
        self.assertRaises(NoSuchFile,
 
51
            self.applyDeprecated,
 
52
            deprecated_in((1, 5, 0)),
 
53
            self.lockable.get, 'foo')
 
54
        self.assertRaises(NoSuchFile,
 
55
            self.applyDeprecated,
 
56
            deprecated_in((1, 5, 0)),
 
57
            self.lockable.get_utf8, 'foo')
 
58
        self.lockable.lock_write()
 
59
        try:
 
60
            unicode_string = u'bar\u1234'
 
61
            self.assertEqual(4, len(unicode_string))
 
62
            byte_string = unicode_string.encode('utf-8')
 
63
            self.assertEqual(6, len(byte_string))
 
64
            self.assertRaises(UnicodeEncodeError,
 
65
                self.applyDeprecated,
 
66
                deprecated_in((1, 6, 0)),
 
67
                self.lockable.put, 'foo',
 
68
                StringIO(unicode_string))
 
69
            self.applyDeprecated(
 
70
                deprecated_in((1, 6, 0)),
 
71
                self.lockable.put,
 
72
                'foo', StringIO(byte_string))
 
73
            byte_stream = self.applyDeprecated(
 
74
                deprecated_in((1, 5, 0)),
 
75
                self.lockable.get,
 
76
                'foo')
 
77
            self.assertEqual(byte_string, byte_stream.read())
 
78
            unicode_stream = self.applyDeprecated(
 
79
                deprecated_in((1, 5, 0)),
 
80
                self.lockable.get_utf8,
 
81
                'foo')
 
82
            self.assertEqual(unicode_string,
 
83
                unicode_stream.read())
 
84
            self.assertRaises(BzrBadParameterNotString,
 
85
                self.applyDeprecated,
 
86
                deprecated_in((1, 6, 0)),
 
87
                self.lockable.put_utf8,
 
88
                'bar',
 
89
                StringIO(unicode_string))
 
90
            self.applyDeprecated(
 
91
                deprecated_in((1, 6, 0)),
 
92
                self.lockable.put_utf8,
 
93
                'bar',
 
94
                unicode_string)
 
95
            unicode_stream = self.applyDeprecated(
 
96
                deprecated_in((1, 5, 0)),
 
97
                self.lockable.get_utf8,
 
98
                'bar')
 
99
            self.assertEqual(unicode_string,
 
100
                unicode_stream.read())
 
101
            byte_stream = self.applyDeprecated(
 
102
                deprecated_in((1, 5, 0)),
 
103
                self.lockable.get,
 
104
                'bar')
 
105
            self.assertEqual(byte_string, byte_stream.read())
 
106
            self.applyDeprecated(
 
107
                deprecated_in((1, 6, 0)),
 
108
                self.lockable.put_bytes,
 
109
                'raw', 'raw\xffbytes')
 
110
            byte_stream = self.applyDeprecated(
 
111
                deprecated_in((1, 5, 0)),
 
112
                self.lockable.get,
 
113
                'raw')
 
114
            self.assertEqual('raw\xffbytes', byte_stream.read())
 
115
        finally:
 
116
            self.lockable.unlock()
 
117
 
 
118
    def test_locks(self):
 
119
        self.lockable.lock_read()
 
120
        try:
 
121
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
 
122
                              StringIO('bar\u1234'))
 
123
        finally:
 
124
            self.lockable.unlock()
 
125
 
 
126
    def test_transactions(self):
 
127
        self.assertIs(self.lockable.get_transaction().__class__,
 
128
                      PassThroughTransaction)
 
129
        self.lockable.lock_read()
 
130
        try:
 
131
            self.assertIs(self.lockable.get_transaction().__class__,
 
132
                          ReadOnlyTransaction)
 
133
        finally:
 
134
            self.lockable.unlock()
 
135
        self.assertIs(self.lockable.get_transaction().__class__,
 
136
                      PassThroughTransaction)
 
137
        self.lockable.lock_write()
 
138
        self.assertIs(self.lockable.get_transaction().__class__,
 
139
                      WriteTransaction)
 
140
        # check that finish is called:
 
141
        vf = DummyWeave('a')
 
142
        self.lockable.get_transaction().register_dirty(vf)
 
143
        self.lockable.unlock()
 
144
        self.assertTrue(vf.finished)
 
145
 
 
146
    def test__escape(self):
 
147
        self.assertEqual('%25', self.lockable._escape('%'))
 
148
        
 
149
    def test__escape_empty(self):
 
150
        self.assertEqual('', self.lockable._escape(''))
 
151
 
 
152
    def test_break_lock(self):
 
153
        # some locks are not breakable
 
154
        self.lockable.lock_write()
 
155
        try:
 
156
            self.assertRaises(AssertionError, self.lockable.break_lock)
 
157
        except NotImplementedError:
 
158
            # this lock cannot be broken
 
159
            self.lockable.unlock()
 
160
            raise TestNotApplicable("%r is not breakable" % (self.lockable,))
 
161
        l2 = self.get_lockable()
 
162
        orig_factory = bzrlib.ui.ui_factory
 
163
        # silent ui - no need for stdout
 
164
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
165
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
 
166
        try:
 
167
            l2.break_lock()
 
168
        finally:
 
169
            bzrlib.ui.ui_factory = orig_factory
 
170
        try:
 
171
            l2.lock_write()
 
172
            l2.unlock()
 
173
        finally:
 
174
            self.assertRaises(errors.LockBroken, self.lockable.unlock)
 
175
            self.assertFalse(self.lockable.is_locked())
 
176
 
 
177
    def test_lock_write_returns_None_refuses_token(self):
 
178
        token = self.lockable.lock_write()
 
179
        try:
 
180
            if token is not None:
 
181
                # This test does not apply, because this lockable supports
 
182
                # tokens.
 
183
                raise TestNotApplicable("%r uses tokens" % (self.lockable,))
 
184
            self.assertRaises(errors.TokenLockingNotSupported,
 
185
                              self.lockable.lock_write, token='token')
 
186
        finally:
 
187
            self.lockable.unlock()
 
188
 
 
189
    def test_lock_write_returns_token_when_given_token(self):
 
190
        token = self.lockable.lock_write()
 
191
        try:
 
192
            if token is None:
 
193
                # This test does not apply, because this lockable refuses
 
194
                # tokens.
 
195
                return
 
196
            new_lockable = self.get_lockable()
 
197
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
198
            try:
 
199
                self.assertEqual(token, token_from_new_lockable)
 
200
            finally:
 
201
                new_lockable.unlock()
 
202
        finally:
 
203
            self.lockable.unlock()
 
204
 
 
205
    def test_lock_write_raises_on_token_mismatch(self):
 
206
        token = self.lockable.lock_write()
 
207
        try:
 
208
            if token is None:
 
209
                # This test does not apply, because this lockable refuses
 
210
                # tokens.
 
211
                return
 
212
            different_token = token + 'xxx'
 
213
            # Re-using the same lockable instance with a different token will
 
214
            # raise TokenMismatch.
 
215
            self.assertRaises(errors.TokenMismatch,
 
216
                              self.lockable.lock_write, token=different_token)
 
217
            # A seperate instance for the same lockable will also raise
 
218
            # TokenMismatch.
 
219
            # This detects the case where a caller claims to have a lock (via
 
220
            # the token) for an external resource, but doesn't (the token is
 
221
            # different).  Clients need a seperate lock object to make sure the
 
222
            # external resource is probed, whereas the existing lock object
 
223
            # might cache.
 
224
            new_lockable = self.get_lockable()
 
225
            self.assertRaises(errors.TokenMismatch,
 
226
                              new_lockable.lock_write, token=different_token)
 
227
        finally:
 
228
            self.lockable.unlock()
 
229
 
 
230
    def test_lock_write_with_matching_token(self):
 
231
        # If the token matches, so no exception is raised by lock_write.
 
232
        token = self.lockable.lock_write()
 
233
        try:
 
234
            if token is None:
 
235
                # This test does not apply, because this lockable refuses
 
236
                # tokens.
 
237
                return
 
238
            # The same instance will accept a second lock_write if the specified
 
239
            # token matches.
 
240
            self.lockable.lock_write(token=token)
 
241
            self.lockable.unlock()
 
242
            # Calling lock_write on a new instance for the same lockable will
 
243
            # also succeed.
 
244
            new_lockable = self.get_lockable()
 
245
            new_lockable.lock_write(token=token)
 
246
            new_lockable.unlock()
 
247
        finally:
 
248
            self.lockable.unlock()
 
249
 
 
250
    def test_unlock_after_lock_write_with_token(self):
 
251
        # If lock_write did not physically acquire the lock (because it was
 
252
        # passed a token), then unlock should not physically release it.
 
253
        token = self.lockable.lock_write()
 
254
        try:
 
255
            if token is None:
 
256
                # This test does not apply, because this lockable refuses
 
257
                # tokens.
 
258
                return
 
259
            new_lockable = self.get_lockable()
 
260
            new_lockable.lock_write(token=token)
 
261
            new_lockable.unlock()
 
262
            self.assertTrue(self.lockable.get_physical_lock_status())
 
263
        finally:
 
264
            self.lockable.unlock()
 
265
 
 
266
    def test_lock_write_with_token_fails_when_unlocked(self):
 
267
        # Lock and unlock to get a superficially valid token.  This mimics a
 
268
        # likely programming error, where a caller accidentally tries to lock
 
269
        # with a token that is no longer valid (because the original lock was
 
270
        # released).
 
271
        token = self.lockable.lock_write()
 
272
        self.lockable.unlock()
 
273
        if token is None:
 
274
            # This test does not apply, because this lockable refuses
 
275
            # tokens.
 
276
            return
 
277
 
 
278
        self.assertRaises(errors.TokenMismatch,
 
279
                          self.lockable.lock_write, token=token)
 
280
 
 
281
    def test_lock_write_reenter_with_token(self):
 
282
        token = self.lockable.lock_write()
 
283
        try:
 
284
            if token is None:
 
285
                # This test does not apply, because this lockable refuses
 
286
                # tokens.
 
287
                return
 
288
            # Relock with a token.
 
289
            token_from_reentry = self.lockable.lock_write(token=token)
 
290
            try:
 
291
                self.assertEqual(token, token_from_reentry)
 
292
            finally:
 
293
                self.lockable.unlock()
 
294
        finally:
 
295
            self.lockable.unlock()
 
296
        # The lock should be unlocked on disk.  Verify that with a new lock
 
297
        # instance.
 
298
        new_lockable = self.get_lockable()
 
299
        # Calling lock_write now should work, rather than raise LockContention.
 
300
        new_lockable.lock_write()
 
301
        new_lockable.unlock()
 
302
 
 
303
    def test_second_lock_write_returns_same_token(self):
 
304
        first_token = self.lockable.lock_write()
 
305
        try:
 
306
            if first_token is None:
 
307
                # This test does not apply, because this lockable refuses
 
308
                # tokens.
 
309
                return
 
310
            # Relock the already locked lockable.  It should return the same
 
311
            # token.
 
312
            second_token = self.lockable.lock_write()
 
313
            try:
 
314
                self.assertEqual(first_token, second_token)
 
315
            finally:
 
316
                self.lockable.unlock()
 
317
        finally:
 
318
            self.lockable.unlock()
 
319
 
 
320
    def test_leave_in_place(self):
 
321
        token = self.lockable.lock_write()
 
322
        try:
 
323
            if token is None:
 
324
                # This test does not apply, because this lockable refuses
 
325
                # tokens.
 
326
                return
 
327
            self.lockable.leave_in_place()
 
328
        finally:
 
329
            self.lockable.unlock()
 
330
        # At this point, the lock is still in place on disk
 
331
        self.assertRaises(errors.LockContention, self.lockable.lock_write)
 
332
        # But should be relockable with a token.
 
333
        self.lockable.lock_write(token=token)
 
334
        self.lockable.unlock()
 
335
 
 
336
    def test_dont_leave_in_place(self):
 
337
        token = self.lockable.lock_write()
 
338
        try:
 
339
            if token is None:
 
340
                # This test does not apply, because this lockable refuses
 
341
                # tokens.
 
342
                return
 
343
            self.lockable.leave_in_place()
 
344
        finally:
 
345
            self.lockable.unlock()
 
346
        # At this point, the lock is still in place on disk.
 
347
        # Acquire the existing lock with the token, and ask that it is removed
 
348
        # when this object unlocks, and unlock to trigger that removal.
 
349
        new_lockable = self.get_lockable()
 
350
        new_lockable.lock_write(token=token)
 
351
        new_lockable.dont_leave_in_place()
 
352
        new_lockable.unlock()
 
353
        # At this point, the lock is no longer on disk, so we can lock it.
 
354
        third_lockable = self.get_lockable()
 
355
        third_lockable.lock_write()
 
356
        third_lockable.unlock()
 
357
 
 
358
 
 
359
# This method of adapting tests to parameters is different to 
 
360
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
361
# case.  
 
362
class TestLockableFiles_TransportLock(TestCaseInTempDir,
 
363
                                      _TestLockableFiles_mixin):
 
364
 
 
365
    def setUp(self):
 
366
        TestCaseInTempDir.setUp(self)
 
367
        transport = get_transport('.')
 
368
        transport.mkdir('.bzr')
 
369
        self.sub_transport = transport.clone('.bzr')
 
370
        self.lockable = self.get_lockable()
 
371
        self.lockable.create_lock()
 
372
 
 
373
    def tearDown(self):
 
374
        super(TestLockableFiles_TransportLock, self).tearDown()
 
375
        # free the subtransport so that we do not get a 5 second
 
376
        # timeout due to the SFTP connection cache.
 
377
        try:
 
378
            del self.sub_transport
 
379
        except AttributeError:
 
380
            pass
 
381
 
 
382
    def get_lockable(self):
 
383
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
 
384
        
 
385
 
 
386
class TestLockableFiles_LockDir(TestCaseInTempDir,
 
387
                              _TestLockableFiles_mixin):
 
388
    """LockableFile tests run with LockDir underneath"""
 
389
 
 
390
    def setUp(self):
 
391
        TestCaseInTempDir.setUp(self)
 
392
        self.transport = get_transport('.')
 
393
        self.lockable = self.get_lockable()
 
394
        # the lock creation here sets mode - test_permissions on branch 
 
395
        # tests that implicitly, but it might be a good idea to factor 
 
396
        # out the mode checking logic and have it applied to loackable files
 
397
        # directly. RBC 20060418
 
398
        self.lockable.create_lock()
 
399
 
 
400
    def get_lockable(self):
 
401
        return LockableFiles(self.transport, 'my-lock', lockdir.LockDir)
 
402
 
 
403
    def test_lock_created(self):
 
404
        self.assertTrue(self.transport.has('my-lock'))
 
405
        self.lockable.lock_write()
 
406
        self.assertTrue(self.transport.has('my-lock/held/info'))
 
407
        self.lockable.unlock()
 
408
        self.assertFalse(self.transport.has('my-lock/held/info'))
 
409
        self.assertTrue(self.transport.has('my-lock'))
 
410
 
 
411
    def test__file_modes(self):
 
412
        self.transport.mkdir('readonly')
 
413
        osutils.make_readonly('readonly')
 
414
        lockable = LockableFiles(self.transport.clone('readonly'), 'test-lock',
 
415
                                 lockdir.LockDir)
 
416
        # The directory mode should be read-write-execute for the current user
 
417
        self.assertEqual(00700, lockable._dir_mode & 00700)
 
418
        # Files should be read-write for the current user
 
419
        self.assertEqual(00600, lockable._file_mode & 00700)
 
420
 
 
421
 
 
422
class TestLockableFiles_RemoteLockDir(TestCaseWithSmartMedium,
 
423
                              _TestLockableFiles_mixin):
 
424
    """LockableFile tests run with RemoteLockDir on a branch."""
 
425
 
 
426
    def setUp(self):
 
427
        TestCaseWithSmartMedium.setUp(self)
 
428
        # can only get a RemoteLockDir with some RemoteObject...
 
429
        # use a branch as thats what we want. These mixin tests test the end
 
430
        # to end behaviour, so stubbing out the backend and simulating would
 
431
        # defeat the purpose. We test the protocol implementation separately
 
432
        # in test_remote and test_smart as usual.
 
433
        b = self.make_branch('foo')
 
434
        self.addCleanup(b.bzrdir.transport.disconnect)
 
435
        self.transport = get_transport('.')
 
436
        self.lockable = self.get_lockable()
 
437
 
 
438
    def get_lockable(self):
 
439
        # getting a new lockable involves opening a new instance of the branch
 
440
        branch = bzrlib.branch.Branch.open(self.get_url('foo'))
 
441
        self.addCleanup(branch.bzrdir.transport.disconnect)
 
442
        return branch.control_files