/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lock.py

  • Committer: John Arbash Meinel
  • Date: 2009-12-22 16:28:47 UTC
  • mto: This revision was merged to the branch mainline in revision 4922.
  • Revision ID: john@arbash-meinel.com-20091222162847-tvnsc69to4l4uf5r
Implement a permute_for_extension helper.

Use it for all of the 'simple' extension permutations.
It basically permutes all tests in the current module, by setting TestCase.module.
Which works well for most of our extension tests. Some had more advanced
handling of permutations (extra permutations, custom vars, etc.)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for OS Locks."""
 
18
 
 
19
 
 
20
 
 
21
from bzrlib import (
 
22
    debug,
 
23
    errors,
 
24
    lock,
 
25
    tests,
 
26
    )
 
27
 
 
28
 
 
29
def load_tests(standard_tests, module, loader):
 
30
    """Parameterize tests for all versions of groupcompress."""
 
31
    scenarios = []
 
32
    for name, write_lock, read_lock in lock._lock_classes:
 
33
        scenarios.append((name, {'write_lock': write_lock,
 
34
                                 'read_lock': read_lock}))
 
35
    suite = loader.suiteClass()
 
36
    result = tests.multiply_tests(standard_tests, scenarios, suite)
 
37
    return result
 
38
 
 
39
 
 
40
class TestOSLock(tests.TestCaseInTempDir):
 
41
 
 
42
    # Set by load_tests
 
43
    read_lock = None
 
44
    write_lock = None
 
45
 
 
46
    def setUp(self):
 
47
        super(TestOSLock, self).setUp()
 
48
        self.build_tree(['a-lock-file'])
 
49
 
 
50
    def test_create_read_lock(self):
 
51
        r_lock = self.read_lock('a-lock-file')
 
52
        r_lock.unlock()
 
53
 
 
54
    def test_create_write_lock(self):
 
55
        w_lock = self.write_lock('a-lock-file')
 
56
        w_lock.unlock()
 
57
 
 
58
    def test_read_locks_share(self):
 
59
        r_lock = self.read_lock('a-lock-file')
 
60
        try:
 
61
            lock2 = self.read_lock('a-lock-file')
 
62
            lock2.unlock()
 
63
        finally:
 
64
            r_lock.unlock()
 
65
 
 
66
    def test_write_locks_are_exclusive(self):
 
67
        w_lock = self.write_lock('a-lock-file')
 
68
        try:
 
69
            self.assertRaises(errors.LockContention,
 
70
                              self.write_lock, 'a-lock-file')
 
71
        finally:
 
72
            w_lock.unlock()
 
73
 
 
74
    def test_read_locks_block_write_locks(self):
 
75
        r_lock = self.read_lock('a-lock-file')
 
76
        try:
 
77
            if lock.have_fcntl and self.write_lock is lock._fcntl_WriteLock:
 
78
                # With -Dlock, fcntl locks are properly exclusive
 
79
                debug.debug_flags.add('strict_locks')
 
80
                self.assertRaises(errors.LockContention,
 
81
                                  self.write_lock, 'a-lock-file')
 
82
                # But not without it
 
83
                debug.debug_flags.remove('strict_locks')
 
84
                try:
 
85
                    w_lock = self.write_lock('a-lock-file')
 
86
                except errors.LockContention:
 
87
                    self.fail('Unexpected success. fcntl read locks'
 
88
                              ' do not usually block write locks')
 
89
                else:
 
90
                    w_lock.unlock()
 
91
                    self.knownFailure('fcntl read locks don\'t'
 
92
                                      ' block write locks without -Dlock')
 
93
            else:
 
94
                self.assertRaises(errors.LockContention,
 
95
                                  self.write_lock, 'a-lock-file')
 
96
        finally:
 
97
            r_lock.unlock()
 
98
 
 
99
    def test_write_locks_block_read_lock(self):
 
100
        w_lock = self.write_lock('a-lock-file')
 
101
        try:
 
102
            if lock.have_fcntl and self.read_lock is lock._fcntl_ReadLock:
 
103
                # With -Dlock, fcntl locks are properly exclusive
 
104
                debug.debug_flags.add('strict_locks')
 
105
                self.assertRaises(errors.LockContention,
 
106
                                  self.read_lock, 'a-lock-file')
 
107
                # But not without it
 
108
                debug.debug_flags.remove('strict_locks')
 
109
                try:
 
110
                    r_lock = self.read_lock('a-lock-file')
 
111
                except errors.LockContention:
 
112
                    self.fail('Unexpected success. fcntl write locks'
 
113
                              ' do not usually block read locks')
 
114
                else:
 
115
                    r_lock.unlock()
 
116
                    self.knownFailure('fcntl write locks don\'t'
 
117
                                      ' block read locks without -Dlock')
 
118
            else:
 
119
                self.assertRaises(errors.LockContention,
 
120
                                  self.read_lock, 'a-lock-file')
 
121
        finally:
 
122
            w_lock.unlock()
 
123
 
 
124
 
 
125
    def test_temporary_write_lock(self):
 
126
        r_lock = self.read_lock('a-lock-file')
 
127
        try:
 
128
            status, w_lock = r_lock.temporary_write_lock()
 
129
            self.assertTrue(status)
 
130
            # This should block another write lock
 
131
            try:
 
132
                self.assertRaises(errors.LockContention,
 
133
                                  self.write_lock, 'a-lock-file')
 
134
            finally:
 
135
                r_lock = w_lock.restore_read_lock()
 
136
            # We should be able to take a read lock now
 
137
            r_lock2 = self.read_lock('a-lock-file')
 
138
            r_lock2.unlock()
 
139
        finally:
 
140
            r_lock.unlock()
 
141
 
 
142
    def test_temporary_write_lock_fails(self):
 
143
        r_lock = self.read_lock('a-lock-file')
 
144
        try:
 
145
            r_lock2 = self.read_lock('a-lock-file')
 
146
            try:
 
147
                status, w_lock = r_lock.temporary_write_lock()
 
148
                self.assertFalse(status)
 
149
                # Taking out the lock requires unlocking and locking again, so
 
150
                # we have to replace the original object
 
151
                r_lock = w_lock
 
152
            finally:
 
153
                r_lock2.unlock()
 
154
            # We should be able to take a read lock now
 
155
            r_lock2 = self.read_lock('a-lock-file')
 
156
            r_lock2.unlock()
 
157
        finally:
 
158
            r_lock.unlock()