/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

  • Committer: Robert Collins
  • Date: 2006-03-03 02:09:49 UTC
  • mto: (1594.2.4 integration)
  • mto: This revision was merged to the branch mainline in revision 1596.
  • Revision ID: robertc@robertcollins.net-20060303020949-0ddc6f33d0a43943
Smoke test for RevisionStore factories creating revision stores.

Show diffs side-by-side

added added

removed removed

Lines of Context:
31
31
# concurrent actors.  This is not a typical (or necessarily supported) use;
32
32
# they're really meant for guarding between processes.
33
33
 
34
 
# These tests are run on the default transport provided by the test framework
35
 
# (typically a local disk transport).  That can be changed by the --transport
36
 
# option to bzr selftest.  The required properties of the transport
37
 
# implementation are tested separately.  (The main requirement is just that
38
 
# they don't allow overwriting nonempty directories.)
39
 
 
40
34
class TestLockDir(TestCaseWithTransport):
41
35
    """Test LockDir operations"""
42
36
 
64
58
        """Acquire and release a lock"""
65
59
        t = self.get_transport()
66
60
        lf = LockDir(t, 'test_lock')
67
 
        lf.create()
68
61
        lf.attempt_lock()
69
62
        try:
70
63
            self.assertTrue(lf.is_held)
72
65
            lf.unlock()
73
66
            self.assertFalse(lf.is_held)
74
67
 
75
 
    def test_11_create_readonly_transport(self):
76
 
        """Fail to create lock on readonly transport"""
 
68
    def test_11_lock_readonly_transport(self):
 
69
        """Fail to lock on readonly transport"""
77
70
        t = self.get_readonly_transport()
78
71
        lf = LockDir(t, 'test_lock')
79
 
        self.assertRaises(UnlockableTransport, lf.create)
80
 
 
81
 
    def test_12_lock_readonly_transport(self):
82
 
        """Fail to lock on readonly transport"""
83
 
        lf = LockDir(self.get_transport(), 'test_lock')
84
 
        lf.create()
85
 
        lf = LockDir(self.get_readonly_transport(), 'test_lock')
86
72
        self.assertRaises(UnlockableTransport, lf.attempt_lock)
87
73
 
88
74
    def test_20_lock_contested(self):
89
75
        """Contention to get a lock"""
90
76
        t = self.get_transport()
91
77
        lf1 = LockDir(t, 'test_lock')
92
 
        lf1.create()
93
78
        lf1.attempt_lock()
94
79
        lf2 = LockDir(t, 'test_lock')
95
80
        try:
107
92
        """Peek at the state of a lock"""
108
93
        t = self.get_transport()
109
94
        lf1 = LockDir(t, 'test_lock')
110
 
        lf1.create()
111
95
        lf1.attempt_lock()
112
96
        # lock is held, should get some info on it
113
97
        info1 = lf1.peek()
124
108
        """Peek over a readonly transport"""
125
109
        t = self.get_transport()
126
110
        lf1 = LockDir(t, 'test_lock')
127
 
        lf1.create()
128
111
        lf2 = LockDir(self.get_readonly_transport(), 'test_lock')
129
112
        self.assertEqual(lf2.peek(), None)
130
113
        lf1.attempt_lock()
141
124
        """
142
125
        t = self.get_transport()
143
126
        lf1 = LockDir(t, 'test_lock')
144
 
        lf1.create()
145
127
        lf2 = LockDir(t, 'test_lock')
146
128
        lf1.attempt_lock()
147
129
        try:
157
139
        """Succeed when waiting on a lock with no contention.
158
140
        """
159
141
        t = self.get_transport()
160
 
        lf1 = LockDir(t, 'test_lock')
161
 
        lf1.create()
 
142
        lf2 = LockDir(t, 'test_lock')
162
143
        try:
163
144
            before = time.time()
164
 
            lf1.wait_lock(timeout=0.4, poll=0.1)
 
145
            lf2.wait_lock(timeout=0.4, poll=0.1)
165
146
            after = time.time()
166
147
            self.assertTrue(after - before <= 1.0)
167
148
        finally:
168
 
            lf1.unlock()
 
149
            lf2.unlock()
169
150
 
170
151
    def test_32_lock_wait_succeed(self):
171
152
        """Succeed when trying to acquire a lock that gets released
172
153
 
173
 
        One thread holds on a lock and then releases it; another 
174
 
        tries to lock it.
 
154
        One thread holds on a lock and then releases it; another tries to lock it.
175
155
        """
176
156
        t = self.get_transport()
177
157
        lf1 = LockDir(t, 'test_lock')
178
 
        lf1.create()
179
158
        lf1.attempt_lock()
180
159
 
181
160
        def wait_and_unlock():
202
181
        """
203
182
        t = self.get_transport()
204
183
        lf1 = LockDir(t, 'test_lock')
205
 
        lf1.create()
206
184
        lf1.attempt_lock()
207
185
 
208
186
        def wait_and_unlock():
224
202
        """Confirm a lock that's already held"""
225
203
        t = self.get_transport()
226
204
        lf1 = LockDir(t, 'test_lock')
227
 
        lf1.create()
228
205
        lf1.attempt_lock()
229
206
        lf1.confirm()
230
207
 
232
209
        """Confirm a lock that's already held"""
233
210
        t = self.get_transport()
234
211
        lf1 = LockDir(t, 'test_lock')
235
 
        lf1.create()
236
212
        self.assertRaises(LockNotHeld, lf1.confirm)
237
213
 
238
214
    def test_42_confirm_broken_manually(self):
239
215
        """Confirm a lock broken by hand"""
240
216
        t = self.get_transport()
241
217
        lf1 = LockDir(t, 'test_lock')
242
 
        lf1.create()
243
218
        lf1.attempt_lock()
244
219
        t.move('test_lock', 'lock_gone_now')
245
220
        self.assertRaises(LockBroken, lf1.confirm)
248
223
        """Break a lock whose caller has forgotten it"""
249
224
        t = self.get_transport()
250
225
        lf1 = LockDir(t, 'test_lock')
251
 
        lf1.create()
252
226
        lf1.attempt_lock()
253
227
        # we incorrectly discard the lock object without unlocking it
254
228
        del lf1
265
239
        """Lock break races with regular release"""
266
240
        t = self.get_transport()
267
241
        lf1 = LockDir(t, 'test_lock')
268
 
        lf1.create()
269
242
        lf1.attempt_lock()
270
243
        # someone else sees it's still locked
271
244
        lf2 = LockDir(t, 'test_lock')
282
255
        """Lock break races with someone else acquiring it"""
283
256
        t = self.get_transport()
284
257
        lf1 = LockDir(t, 'test_lock')
285
 
        lf1.create()
286
258
        lf1.attempt_lock()
287
259
        # someone else sees it's still locked
288
260
        lf2 = LockDir(t, 'test_lock')
295
267
        self.assertRaises(LockBreakMismatch, lf2.force_break,
296
268
                          holder_info)
297
269
        lf3.unlock()
298
 
 
299
 
    def test_46_fake_read_lock(self):
300
 
        t = self.get_transport()
301
 
        lf1 = LockDir(t, 'test_lock')
302
 
        lf1.create()
303
 
        lf1.lock_read()
304
 
        lf1.unlock()
305
 
 
306
 
    def test_50_lockdir_representation(self):
307
 
        """Check the on-disk representation of LockDirs is as expected.
308
 
 
309
 
        There should always be a top-level directory named by the lock.
310
 
        When the lock is held, there should be a lockname/held directory 
311
 
        containing an info file.
312
 
        """
313
 
        t = self.get_transport()
314
 
        lf1 = LockDir(t, 'test_lock')
315
 
        lf1.create()
316
 
        self.assertTrue(t.has('test_lock'))
317
 
        lf1.lock_write()
318
 
        self.assertTrue(t.has('test_lock/held/info'))
319
 
        lf1.unlock()
320
 
        self.assertFalse(t.has('test_lock/held/info'))