/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to breezy/tests/test_lockdir.py

  • Committer: Jelmer Vernooij
  • Date: 2020-03-22 01:35:14 UTC
  • mfrom: (7490.7.6 work)
  • mto: This revision was merged to the branch mainline in revision 7499.
  • Revision ID: jelmer@jelmer.uk-20200322013514-7vw1ntwho04rcuj3
merge lp:brz/3.1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
    LockDir,
41
41
    LockHeldInfo,
42
42
    )
43
 
from ..sixish import (
44
 
    text_type,
45
 
    )
46
43
from . import (
47
44
    features,
48
45
    TestCase,
138
135
        except LockContention as e:
139
136
            self.assertEqual(e.lock, lf2)
140
137
            self.assertContainsRe(str(e),
141
 
                    r'^Could not acquire.*test_lock.*$')
 
138
                                  r'^Could not acquire.*test_lock.*$')
142
139
        lf1.unlock()
143
140
 
144
141
    def test_20_lock_peek(self):
151
148
        # lock is held, should get some info on it
152
149
        info1 = lf1.peek()
153
150
        self.assertEqual(set(info1.info_dict.keys()),
154
 
            {'user', 'nonce', 'hostname', 'pid', 'start_time'})
 
151
                         {'user', 'nonce', 'hostname', 'pid', 'start_time'})
155
152
        # should get the same info if we look at it through a different
156
153
        # instance
157
154
        info2 = LockDir(t, 'test_lock').peek()
193
190
            # it should only take about 0.4 seconds, but we allow more time in
194
191
            # case the machine is heavily loaded
195
192
            self.assertTrue(after - before <= 8.0,
196
 
                "took %f seconds to detect lock contention" % (after - before))
 
193
                            "took %f seconds to detect lock contention" % (after - before))
197
194
        finally:
198
195
            lf1.unlock()
199
196
        self.assertEqual(1, len(self._logged_reports))
200
197
        self.assertContainsRe(self._logged_reports[0][0],
201
 
            r'Unable to obtain lock .* held by jrandom@example\.com on .*'
202
 
            r' \(process #\d+\), acquired .* ago\.\n'
203
 
            r'Will continue to try until \d{2}:\d{2}:\d{2}, unless '
204
 
            r'you press Ctrl-C.\n'
205
 
            r'See "brz help break-lock" for more.')
 
198
                              r'Unable to obtain lock .* held by jrandom@example\.com on .*'
 
199
                              r' \(process #\d+\), acquired .* ago\.\n'
 
200
                              r'Will continue to try until \d{2}:\d{2}:\d{2}, unless '
 
201
                              r'you press Ctrl-C.\n'
 
202
                              r'See "brz help break-lock" for more.')
206
203
 
207
204
    def test_31_lock_wait_easy(self):
208
205
        """Succeed when waiting on a lock with no contention.
517
514
        formatted_info = info.to_readable_dict()
518
515
        self.assertEqual(
519
516
            dict(user='<unknown>', hostname='<unknown>', pid='<unknown>',
520
 
                time_ago='(unknown)'),
 
517
                 time_ago='(unknown)'),
521
518
            formatted_info)
522
519
 
523
520
    def test_corrupt_lockdir_info(self):
668
665
 
669
666
    def test_unicode(self):
670
667
        info = LockHeldInfo.for_this_process(None)
671
 
        self.assertContainsRe(text_type(info),
672
 
            r'held by .* on .* \(process #\d+\), acquired .* ago')
 
668
        self.assertContainsRe(str(info),
 
669
                              r'held by .* on .* \(process #\d+\), acquired .* ago')
673
670
 
674
671
    def test_is_locked_by_this_process(self):
675
672
        info = LockHeldInfo.for_this_process(None)
688
685
    def test_lock_holder_dead_process(self):
689
686
        """Detect that the holder (this process) is still running."""
690
687
        self.overrideAttr(lockdir, 'get_host_name',
691
 
            lambda: 'aproperhostname')
 
688
                          lambda: 'aproperhostname')
692
689
        info = LockHeldInfo.for_this_process(None)
693
690
        info.info_dict['pid'] = '123123123'
694
691
        self.assertTrue(info.is_lock_holder_known_dead())
716
713
        known for sure.
717
714
        """
718
715
        self.overrideAttr(lockdir, 'get_host_name',
719
 
            lambda: 'localhost')
 
716
                          lambda: 'localhost')
720
717
        info = LockHeldInfo.for_this_process(None)
721
718
        info.info_dict['pid'] = '123123123'
722
719
        self.assertFalse(info.is_lock_holder_known_dead())
734
731
        This generates a warning but no other user interaction.
735
732
        """
736
733
        self.overrideAttr(lockdir, 'get_host_name',
737
 
            lambda: 'aproperhostname')
738
 
        # This is off by default at present; see the discussion in the bug.
739
 
        # If you change the default, don't forget to update the docs.
740
 
        config.GlobalStack().set('locks.steal_dead', True)
 
734
                          lambda: 'aproperhostname')
 
735
        # Stealing dead locks is enabled by default.
741
736
        # Create a lock pretending to come from a different nonexistent
742
737
        # process on the same machine.
743
738
        l1 = LockDir(self.get_transport(), 'a',
744
 
            extra_holder_info={'pid': '12312313'})
 
739
                     extra_holder_info={'pid': '12312313'})
745
740
        token_1 = l1.attempt_lock()
746
741
        l2 = LockDir(self.get_transport(), 'a')
747
742
        token_2 = l2.attempt_lock()
748
743
        # l1 will notice its lock was stolen.
749
744
        self.assertRaises(errors.LockBroken,
750
 
            l1.unlock)
 
745
                          l1.unlock)
751
746
        l2.unlock()
752
747
 
753
748
    def test_auto_break_stale_lock_configured_off(self):
754
749
        """Automatic breaking can be turned off"""
755
750
        l1 = LockDir(self.get_transport(), 'a',
756
 
            extra_holder_info={'pid': '12312313'})
 
751
                     extra_holder_info={'pid': '12312313'})
 
752
        # Stealing dead locks is enabled by default, so disable it.
 
753
        config.GlobalStack().set('locks.steal_dead', False)
757
754
        token_1 = l1.attempt_lock()
758
755
        self.addCleanup(l1.unlock)
759
756
        l2 = LockDir(self.get_transport(), 'a')
760
 
        # This fails now, because dead lock breaking is off by default.
 
757
        # This fails now, because dead lock breaking is disabled.
761
758
        self.assertRaises(LockContention,
762
 
            l2.attempt_lock)
 
759
                          l2.attempt_lock)
763
760
        # and it's in fact not broken
764
761
        l1.confirm()