/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockable_files.py

  • Committer: John Arbash Meinel
  • Date: 2008-07-09 21:42:24 UTC
  • mto: This revision was merged to the branch mainline in revision 3543.
  • Revision ID: john@arbash-meinel.com-20080709214224-r75k87r6a01pfc3h
Restore a real weave merge to 'bzr merge --weave'.

To do so efficiently, we only add the simple LCAs to the final weave
object, unless we run into complexities with the merge graph.
This gives the same effective result as adding all the texts,
with the advantage of not having to extract all of them.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
from StringIO import StringIO
18
18
 
46
46
# so won't modernize them now. - mbp 20080430
47
47
class _TestLockableFiles_mixin(object):
48
48
 
 
49
    def test_read_write(self):
 
50
        self.assertRaises(NoSuchFile,
 
51
            self.applyDeprecated,
 
52
            deprecated_in((1, 5, 0)),
 
53
            self.lockable.get, 'foo')
 
54
        self.assertRaises(NoSuchFile,
 
55
            self.applyDeprecated,
 
56
            deprecated_in((1, 5, 0)),
 
57
            self.lockable.get_utf8, 'foo')
 
58
        self.lockable.lock_write()
 
59
        try:
 
60
            unicode_string = u'bar\u1234'
 
61
            self.assertEqual(4, len(unicode_string))
 
62
            byte_string = unicode_string.encode('utf-8')
 
63
            self.assertEqual(6, len(byte_string))
 
64
            self.assertRaises(UnicodeEncodeError,
 
65
                self.applyDeprecated,
 
66
                deprecated_in((1, 6, 0)),
 
67
                self.lockable.put, 'foo',
 
68
                StringIO(unicode_string))
 
69
            self.applyDeprecated(
 
70
                deprecated_in((1, 6, 0)),
 
71
                self.lockable.put,
 
72
                'foo', StringIO(byte_string))
 
73
            byte_stream = self.applyDeprecated(
 
74
                deprecated_in((1, 5, 0)),
 
75
                self.lockable.get,
 
76
                'foo')
 
77
            self.assertEqual(byte_string, byte_stream.read())
 
78
            unicode_stream = self.applyDeprecated(
 
79
                deprecated_in((1, 5, 0)),
 
80
                self.lockable.get_utf8,
 
81
                'foo')
 
82
            self.assertEqual(unicode_string,
 
83
                unicode_stream.read())
 
84
            self.assertRaises(BzrBadParameterNotString,
 
85
                self.applyDeprecated,
 
86
                deprecated_in((1, 6, 0)),
 
87
                self.lockable.put_utf8,
 
88
                'bar',
 
89
                StringIO(unicode_string))
 
90
            self.applyDeprecated(
 
91
                deprecated_in((1, 6, 0)),
 
92
                self.lockable.put_utf8,
 
93
                'bar',
 
94
                unicode_string)
 
95
            unicode_stream = self.applyDeprecated(
 
96
                deprecated_in((1, 5, 0)),
 
97
                self.lockable.get_utf8,
 
98
                'bar')
 
99
            self.assertEqual(unicode_string,
 
100
                unicode_stream.read())
 
101
            byte_stream = self.applyDeprecated(
 
102
                deprecated_in((1, 5, 0)),
 
103
                self.lockable.get,
 
104
                'bar')
 
105
            self.assertEqual(byte_string, byte_stream.read())
 
106
            self.applyDeprecated(
 
107
                deprecated_in((1, 6, 0)),
 
108
                self.lockable.put_bytes,
 
109
                'raw', 'raw\xffbytes')
 
110
            byte_stream = self.applyDeprecated(
 
111
                deprecated_in((1, 5, 0)),
 
112
                self.lockable.get,
 
113
                'raw')
 
114
            self.assertEqual('raw\xffbytes', byte_stream.read())
 
115
        finally:
 
116
            self.lockable.unlock()
 
117
 
 
118
    def test_locks(self):
 
119
        self.lockable.lock_read()
 
120
        try:
 
121
            self.assertRaises(ReadOnlyError, self.lockable.put, 'foo',
 
122
                              StringIO('bar\u1234'))
 
123
        finally:
 
124
            self.lockable.unlock()
 
125
 
49
126
    def test_transactions(self):
50
127
        self.assertIs(self.lockable.get_transaction().__class__,
51
128
                      PassThroughTransaction)
68
145
 
69
146
    def test__escape(self):
70
147
        self.assertEqual('%25', self.lockable._escape('%'))
71
 
 
 
148
        
72
149
    def test__escape_empty(self):
73
150
        self.assertEqual('', self.lockable._escape(''))
74
151
 
84
161
        l2 = self.get_lockable()
85
162
        orig_factory = bzrlib.ui.ui_factory
86
163
        # silent ui - no need for stdout
87
 
        bzrlib.ui.ui_factory = bzrlib.ui.CannedInputUIFactory([True])
 
164
        bzrlib.ui.ui_factory = bzrlib.ui.SilentUIFactory()
 
165
        bzrlib.ui.ui_factory.stdin = StringIO("y\n")
88
166
        try:
89
167
            l2.break_lock()
90
168
        finally:
98
176
 
99
177
    def test_lock_write_returns_None_refuses_token(self):
100
178
        token = self.lockable.lock_write()
101
 
        self.addCleanup(self.lockable.unlock)
102
 
        if token is not None:
103
 
            # This test does not apply, because this lockable supports
104
 
            # tokens.
105
 
            raise TestNotApplicable("%r uses tokens" % (self.lockable,))
106
 
        self.assertRaises(errors.TokenLockingNotSupported,
107
 
                          self.lockable.lock_write, token='token')
 
179
        try:
 
180
            if token is not None:
 
181
                # This test does not apply, because this lockable supports
 
182
                # tokens.
 
183
                raise TestNotApplicable("%r uses tokens" % (self.lockable,))
 
184
            self.assertRaises(errors.TokenLockingNotSupported,
 
185
                              self.lockable.lock_write, token='token')
 
186
        finally:
 
187
            self.lockable.unlock()
108
188
 
109
189
    def test_lock_write_returns_token_when_given_token(self):
110
190
        token = self.lockable.lock_write()
111
 
        self.addCleanup(self.lockable.unlock)
112
 
        if token is None:
113
 
            # This test does not apply, because this lockable refuses
114
 
            # tokens.
115
 
            return
116
 
        new_lockable = self.get_lockable()
117
 
        token_from_new_lockable = new_lockable.lock_write(token=token)
118
 
        self.addCleanup(new_lockable.unlock)
119
 
        self.assertEqual(token, token_from_new_lockable)
 
191
        try:
 
192
            if token is None:
 
193
                # This test does not apply, because this lockable refuses
 
194
                # tokens.
 
195
                return
 
196
            new_lockable = self.get_lockable()
 
197
            token_from_new_lockable = new_lockable.lock_write(token=token)
 
198
            try:
 
199
                self.assertEqual(token, token_from_new_lockable)
 
200
            finally:
 
201
                new_lockable.unlock()
 
202
        finally:
 
203
            self.lockable.unlock()
120
204
 
121
205
    def test_lock_write_raises_on_token_mismatch(self):
122
206
        token = self.lockable.lock_write()
123
 
        self.addCleanup(self.lockable.unlock)
124
 
        if token is None:
125
 
            # This test does not apply, because this lockable refuses
126
 
            # tokens.
127
 
            return
128
 
        different_token = token + 'xxx'
129
 
        # Re-using the same lockable instance with a different token will
130
 
        # raise TokenMismatch.
131
 
        self.assertRaises(errors.TokenMismatch,
132
 
                          self.lockable.lock_write, token=different_token)
133
 
        # A separate instance for the same lockable will also raise
134
 
        # TokenMismatch.
135
 
        # This detects the case where a caller claims to have a lock (via
136
 
        # the token) for an external resource, but doesn't (the token is
137
 
        # different).  Clients need a separate lock object to make sure the
138
 
        # external resource is probed, whereas the existing lock object
139
 
        # might cache.
140
 
        new_lockable = self.get_lockable()
141
 
        self.assertRaises(errors.TokenMismatch,
142
 
                          new_lockable.lock_write, token=different_token)
 
207
        try:
 
208
            if token is None:
 
209
                # This test does not apply, because this lockable refuses
 
210
                # tokens.
 
211
                return
 
212
            different_token = token + 'xxx'
 
213
            # Re-using the same lockable instance with a different token will
 
214
            # raise TokenMismatch.
 
215
            self.assertRaises(errors.TokenMismatch,
 
216
                              self.lockable.lock_write, token=different_token)
 
217
            # A seperate instance for the same lockable will also raise
 
218
            # TokenMismatch.
 
219
            # This detects the case where a caller claims to have a lock (via
 
220
            # the token) for an external resource, but doesn't (the token is
 
221
            # different).  Clients need a seperate lock object to make sure the
 
222
            # external resource is probed, whereas the existing lock object
 
223
            # might cache.
 
224
            new_lockable = self.get_lockable()
 
225
            self.assertRaises(errors.TokenMismatch,
 
226
                              new_lockable.lock_write, token=different_token)
 
227
        finally:
 
228
            self.lockable.unlock()
143
229
 
144
230
    def test_lock_write_with_matching_token(self):
145
231
        # If the token matches, so no exception is raised by lock_write.
146
232
        token = self.lockable.lock_write()
147
 
        self.addCleanup(self.lockable.unlock)
148
 
        if token is None:
149
 
            # This test does not apply, because this lockable refuses
150
 
            # tokens.
151
 
            return
152
 
        # The same instance will accept a second lock_write if the specified
153
 
        # token matches.
154
 
        self.lockable.lock_write(token=token)
155
 
        self.lockable.unlock()
156
 
        # Calling lock_write on a new instance for the same lockable will
157
 
        # also succeed.
158
 
        new_lockable = self.get_lockable()
159
 
        new_lockable.lock_write(token=token)
160
 
        new_lockable.unlock()
 
233
        try:
 
234
            if token is None:
 
235
                # This test does not apply, because this lockable refuses
 
236
                # tokens.
 
237
                return
 
238
            # The same instance will accept a second lock_write if the specified
 
239
            # token matches.
 
240
            self.lockable.lock_write(token=token)
 
241
            self.lockable.unlock()
 
242
            # Calling lock_write on a new instance for the same lockable will
 
243
            # also succeed.
 
244
            new_lockable = self.get_lockable()
 
245
            new_lockable.lock_write(token=token)
 
246
            new_lockable.unlock()
 
247
        finally:
 
248
            self.lockable.unlock()
161
249
 
162
250
    def test_unlock_after_lock_write_with_token(self):
163
251
        # If lock_write did not physically acquire the lock (because it was
164
252
        # passed a token), then unlock should not physically release it.
165
253
        token = self.lockable.lock_write()
166
 
        self.addCleanup(self.lockable.unlock)
167
 
        if token is None:
168
 
            # This test does not apply, because this lockable refuses
169
 
            # tokens.
170
 
            return
171
 
        new_lockable = self.get_lockable()
172
 
        new_lockable.lock_write(token=token)
173
 
        new_lockable.unlock()
174
 
        self.assertTrue(self.lockable.get_physical_lock_status())
 
254
        try:
 
255
            if token is None:
 
256
                # This test does not apply, because this lockable refuses
 
257
                # tokens.
 
258
                return
 
259
            new_lockable = self.get_lockable()
 
260
            new_lockable.lock_write(token=token)
 
261
            new_lockable.unlock()
 
262
            self.assertTrue(self.lockable.get_physical_lock_status())
 
263
        finally:
 
264
            self.lockable.unlock()
175
265
 
176
266
    def test_lock_write_with_token_fails_when_unlocked(self):
177
267
        # Lock and unlock to get a superficially valid token.  This mimics a
242
332
        # But should be relockable with a token.
243
333
        self.lockable.lock_write(token=token)
244
334
        self.lockable.unlock()
245
 
        # Cleanup: we should still be able to get the lock, but we restore the
246
 
        # behavior to clearing the lock when unlocking.
247
 
        self.lockable.lock_write(token=token)
248
 
        self.lockable.dont_leave_in_place()
249
 
        self.lockable.unlock()
250
335
 
251
336
    def test_dont_leave_in_place(self):
252
337
        token = self.lockable.lock_write()
271
356
        third_lockable.unlock()
272
357
 
273
358
 
274
 
# This method of adapting tests to parameters is different to
275
 
# the TestProviderAdapters used elsewhere, but seems simpler for this
276
 
# case.
 
359
# This method of adapting tests to parameters is different to 
 
360
# the TestProviderAdapters used elsewhere, but seems simpler for this 
 
361
# case.  
277
362
class TestLockableFiles_TransportLock(TestCaseInTempDir,
278
363
                                      _TestLockableFiles_mixin):
279
364
 
285
370
        self.lockable = self.get_lockable()
286
371
        self.lockable.create_lock()
287
372
 
288
 
    def stop_server(self):
289
 
        super(TestLockableFiles_TransportLock, self).stop_server()
 
373
    def tearDown(self):
 
374
        super(TestLockableFiles_TransportLock, self).tearDown()
290
375
        # free the subtransport so that we do not get a 5 second
291
376
        # timeout due to the SFTP connection cache.
292
377
        try:
296
381
 
297
382
    def get_lockable(self):
298
383
        return LockableFiles(self.sub_transport, 'my-lock', TransportLock)
299
 
 
 
384
        
300
385
 
301
386
class TestLockableFiles_LockDir(TestCaseInTempDir,
302
387
                              _TestLockableFiles_mixin):
306
391
        TestCaseInTempDir.setUp(self)
307
392
        self.transport = get_transport('.')
308
393
        self.lockable = self.get_lockable()
309
 
        # the lock creation here sets mode - test_permissions on branch
310
 
        # tests that implicitly, but it might be a good idea to factor
 
394
        # the lock creation here sets mode - test_permissions on branch 
 
395
        # tests that implicitly, but it might be a good idea to factor 
311
396
        # out the mode checking logic and have it applied to loackable files
312
397
        # directly. RBC 20060418
313
398
        self.lockable.create_lock()