/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lock.py

  • Committer: Martin Pool
  • Date: 2005-08-24 08:59:32 UTC
  • Revision ID: mbp@sourcefrog.net-20050824085932-c61f1f1f1c930e13
- Add a simple UIFactory 

  The idea of this is to let a client of bzrlib set some 
  policy about how output is displayed.

  In this revision all that's done is that progress bars
  are constructed by a policy established by the application
  rather than being randomly constructed in the library 
  or passed down the calls.  This avoids progress bars
  popping up while running the test suite and cleans up
  some code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2009 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for OS Locks."""
18
 
 
19
 
 
20
 
 
21
 
from bzrlib import (
22
 
    debug,
23
 
    errors,
24
 
    lock,
25
 
    tests,
26
 
    )
27
 
 
28
 
 
29
 
def load_tests(standard_tests, module, loader):
30
 
    """Parameterize tests for all versions of groupcompress."""
31
 
    scenarios = []
32
 
    for name, write_lock, read_lock in lock._lock_classes:
33
 
        scenarios.append((name, {'write_lock': write_lock,
34
 
                                 'read_lock': read_lock}))
35
 
    suite = loader.suiteClass()
36
 
    result = tests.multiply_tests(standard_tests, scenarios, suite)
37
 
    return result
38
 
 
39
 
 
40
 
class TestOSLock(tests.TestCaseInTempDir):
41
 
 
42
 
    # Set by load_tests
43
 
    read_lock = None
44
 
    write_lock = None
45
 
 
46
 
    def setUp(self):
47
 
        super(TestOSLock, self).setUp()
48
 
        self.build_tree(['a-lock-file'])
49
 
 
50
 
    def test_create_read_lock(self):
51
 
        r_lock = self.read_lock('a-lock-file')
52
 
        r_lock.unlock()
53
 
 
54
 
    def test_create_write_lock(self):
55
 
        w_lock = self.write_lock('a-lock-file')
56
 
        w_lock.unlock()
57
 
 
58
 
    def test_read_locks_share(self):
59
 
        r_lock = self.read_lock('a-lock-file')
60
 
        try:
61
 
            lock2 = self.read_lock('a-lock-file')
62
 
            lock2.unlock()
63
 
        finally:
64
 
            r_lock.unlock()
65
 
 
66
 
    def test_write_locks_are_exclusive(self):
67
 
        w_lock = self.write_lock('a-lock-file')
68
 
        try:
69
 
            self.assertRaises(errors.LockContention,
70
 
                              self.write_lock, 'a-lock-file')
71
 
        finally:
72
 
            w_lock.unlock()
73
 
 
74
 
    def test_read_locks_block_write_locks(self):
75
 
        r_lock = self.read_lock('a-lock-file')
76
 
        try:
77
 
            if lock.have_fcntl and self.write_lock is lock._fcntl_WriteLock:
78
 
                # With -Dlock, fcntl locks are properly exclusive
79
 
                debug.debug_flags.add('strict_locks')
80
 
                self.assertRaises(errors.LockContention,
81
 
                                  self.write_lock, 'a-lock-file')
82
 
                # But not without it
83
 
                debug.debug_flags.remove('strict_locks')
84
 
                try:
85
 
                    w_lock = self.write_lock('a-lock-file')
86
 
                except errors.LockContention:
87
 
                    self.fail('Unexpected success. fcntl read locks'
88
 
                              ' do not usually block write locks')
89
 
                else:
90
 
                    w_lock.unlock()
91
 
                    self.knownFailure('fcntl read locks don\'t'
92
 
                                      ' block write locks without -Dlock')
93
 
            else:
94
 
                self.assertRaises(errors.LockContention,
95
 
                                  self.write_lock, 'a-lock-file')
96
 
        finally:
97
 
            r_lock.unlock()
98
 
 
99
 
    def test_write_locks_block_read_lock(self):
100
 
        w_lock = self.write_lock('a-lock-file')
101
 
        try:
102
 
            if lock.have_fcntl and self.read_lock is lock._fcntl_ReadLock:
103
 
                # With -Dlock, fcntl locks are properly exclusive
104
 
                debug.debug_flags.add('strict_locks')
105
 
                self.assertRaises(errors.LockContention,
106
 
                                  self.read_lock, 'a-lock-file')
107
 
                # But not without it
108
 
                debug.debug_flags.remove('strict_locks')
109
 
                try:
110
 
                    r_lock = self.read_lock('a-lock-file')
111
 
                except errors.LockContention:
112
 
                    self.fail('Unexpected success. fcntl write locks'
113
 
                              ' do not usually block read locks')
114
 
                else:
115
 
                    r_lock.unlock()
116
 
                    self.knownFailure('fcntl write locks don\'t'
117
 
                                      ' block read locks without -Dlock')
118
 
            else:
119
 
                self.assertRaises(errors.LockContention,
120
 
                                  self.read_lock, 'a-lock-file')
121
 
        finally:
122
 
            w_lock.unlock()
123
 
 
124
 
 
125
 
    def test_temporary_write_lock(self):
126
 
        r_lock = self.read_lock('a-lock-file')
127
 
        try:
128
 
            status, w_lock = r_lock.temporary_write_lock()
129
 
            self.assertTrue(status)
130
 
            # This should block another write lock
131
 
            try:
132
 
                self.assertRaises(errors.LockContention,
133
 
                                  self.write_lock, 'a-lock-file')
134
 
            finally:
135
 
                r_lock = w_lock.restore_read_lock()
136
 
            # We should be able to take a read lock now
137
 
            r_lock2 = self.read_lock('a-lock-file')
138
 
            r_lock2.unlock()
139
 
        finally:
140
 
            r_lock.unlock()
141
 
 
142
 
    def test_temporary_write_lock_fails(self):
143
 
        r_lock = self.read_lock('a-lock-file')
144
 
        try:
145
 
            r_lock2 = self.read_lock('a-lock-file')
146
 
            try:
147
 
                status, w_lock = r_lock.temporary_write_lock()
148
 
                self.assertFalse(status)
149
 
                # Taking out the lock requires unlocking and locking again, so
150
 
                # we have to replace the original object
151
 
                r_lock = w_lock
152
 
            finally:
153
 
                r_lock2.unlock()
154
 
            # We should be able to take a read lock now
155
 
            r_lock2 = self.read_lock('a-lock-file')
156
 
            r_lock2.unlock()
157
 
        finally:
158
 
            r_lock.unlock()