/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to brzlib/tests/test_lockdir.py

  • Committer: Jelmer Vernooij
  • Date: 2017-05-21 12:41:27 UTC
  • mto: This revision was merged to the branch mainline in revision 6623.
  • Revision ID: jelmer@jelmer.uk-20170521124127-iv8etg0vwymyai6y
s/bzr/brz/ in apport config.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
import os
20
20
import time
21
21
 
22
 
import breezy
23
 
from .. import (
 
22
import brzlib
 
23
from brzlib import (
24
24
    config,
25
25
    errors,
26
26
    lock,
29
29
    tests,
30
30
    transport,
31
31
    )
32
 
from ..errors import (
 
32
from brzlib.errors import (
33
33
    LockBreakMismatch,
34
34
    LockBroken,
35
35
    LockContention,
36
36
    LockFailed,
37
37
    LockNotHeld,
38
38
    )
39
 
from ..lockdir import (
 
39
from brzlib.lockdir import (
40
40
    LockDir,
41
41
    LockHeldInfo,
42
42
    )
43
 
from . import (
 
43
from brzlib.tests import (
44
44
    features,
45
45
    TestCase,
46
46
    TestCaseInTempDir,
132
132
            # a single process are not detected
133
133
            lf2.attempt_lock()
134
134
            self.fail('Failed to detect lock collision')
135
 
        except LockContention as e:
 
135
        except LockContention, e:
136
136
            self.assertEqual(e.lock, lf2)
137
137
            self.assertContainsRe(str(e),
138
 
                                  r'^Could not acquire.*test_lock.*$')
 
138
                    r'^Could not acquire.*test_lock.*$')
139
139
        lf1.unlock()
140
140
 
141
141
    def test_20_lock_peek(self):
148
148
        # lock is held, should get some info on it
149
149
        info1 = lf1.peek()
150
150
        self.assertEqual(set(info1.info_dict.keys()),
151
 
                         {'user', 'nonce', 'hostname', 'pid', 'start_time'})
 
151
            set(['user', 'nonce', 'hostname', 'pid', 'start_time']))
152
152
        # should get the same info if we look at it through a different
153
153
        # instance
154
154
        info2 = LockDir(t, 'test_lock').peek()
190
190
            # it should only take about 0.4 seconds, but we allow more time in
191
191
            # case the machine is heavily loaded
192
192
            self.assertTrue(after - before <= 8.0,
193
 
                            "took %f seconds to detect lock contention" % (after - before))
 
193
                "took %f seconds to detect lock contention" % (after - before))
194
194
        finally:
195
195
            lf1.unlock()
196
196
        self.assertEqual(1, len(self._logged_reports))
197
197
        self.assertContainsRe(self._logged_reports[0][0],
198
 
                              r'Unable to obtain lock .* held by jrandom@example\.com on .*'
199
 
                              r' \(process #\d+\), acquired .* ago\.\n'
200
 
                              r'Will continue to try until \d{2}:\d{2}:\d{2}, unless '
201
 
                              r'you press Ctrl-C.\n'
202
 
                              r'See "brz help break-lock" for more.')
 
198
            r'Unable to obtain lock .* held by jrandom@example\.com on .*'
 
199
            r' \(process #\d+\), acquired .* ago\.\n'
 
200
            r'Will continue to try until \d{2}:\d{2}:\d{2}, unless '
 
201
            r'you press Ctrl-C.\n'
 
202
            r'See "bzr help break-lock" for more.')
203
203
 
204
204
    def test_31_lock_wait_easy(self):
205
205
        """Succeed when waiting on a lock with no contention.
330
330
        ld1.lock_write()
331
331
        # do this without IO redirection to ensure it doesn't prompt.
332
332
        self.assertRaises(AssertionError, ld1.break_lock)
333
 
        orig_factory = breezy.ui.ui_factory
334
 
        breezy.ui.ui_factory = breezy.ui.CannedInputUIFactory([True])
 
333
        orig_factory = brzlib.ui.ui_factory
 
334
        brzlib.ui.ui_factory = brzlib.ui.CannedInputUIFactory([True])
335
335
        try:
336
336
            ld2.break_lock()
337
337
            self.assertRaises(LockBroken, ld1.unlock)
338
338
        finally:
339
 
            breezy.ui.ui_factory = orig_factory
 
339
            brzlib.ui.ui_factory = orig_factory
340
340
 
341
341
    def test_break_lock_corrupt_info(self):
342
342
        """break_lock works even if the info file is corrupt (and tells the UI
346
346
        ld2 = self.get_lock()
347
347
        ld.create()
348
348
        ld.lock_write()
349
 
        ld.transport.put_bytes_non_atomic('test_lock/held/info', b'\0')
 
349
        ld.transport.put_bytes_non_atomic('test_lock/held/info', '\0')
350
350
 
351
 
        class LoggingUIFactory(breezy.ui.SilentUIFactory):
 
351
        class LoggingUIFactory(brzlib.ui.SilentUIFactory):
352
352
            def __init__(self):
353
353
                self.prompts = []
354
354
 
357
357
                return True
358
358
 
359
359
        ui = LoggingUIFactory()
360
 
        self.overrideAttr(breezy.ui, 'ui_factory', ui)
 
360
        self.overrideAttr(brzlib.ui, 'ui_factory', ui)
361
361
        ld2.break_lock()
362
362
        self.assertLength(1, ui.prompts)
363
363
        self.assertEqual('boolean', ui.prompts[0][0])
374
374
        ld.lock_write()
375
375
        ld.transport.delete('test_lock/held/info')
376
376
 
377
 
        class LoggingUIFactory(breezy.ui.SilentUIFactory):
 
377
        class LoggingUIFactory(brzlib.ui.SilentUIFactory):
378
378
            def __init__(self):
379
379
                self.prompts = []
380
380
 
383
383
                return True
384
384
 
385
385
        ui = LoggingUIFactory()
386
 
        orig_factory = breezy.ui.ui_factory
387
 
        breezy.ui.ui_factory = ui
 
386
        orig_factory = brzlib.ui.ui_factory
 
387
        brzlib.ui.ui_factory = ui
388
388
        try:
389
389
            ld2.break_lock()
390
390
            self.assertRaises(LockBroken, ld.unlock)
391
391
            self.assertLength(0, ui.prompts)
392
392
        finally:
393
 
            breezy.ui.ui_factory = orig_factory
 
393
            brzlib.ui.ui_factory = orig_factory
394
394
        # Suppress warnings due to ld not being unlocked
395
395
        # XXX: if lock_broken hook was invoked in this case, this hack would
396
396
        # not be necessary.  - Andrew Bennetts, 2010-09-06.
429
429
        finally:
430
430
            ld1.unlock()
431
431
        self.assertEqual(info_list['user'], u'jrandom@example.com')
432
 
        self.assertContainsRe(info_list['pid'], '^\\d+$')
433
 
        self.assertContainsRe(info_list['time_ago'], '^\\d+ seconds? ago$')
 
432
        self.assertContainsRe(info_list['pid'], '^\d+$')
 
433
        self.assertContainsRe(info_list['time_ago'], r'^\d+ seconds? ago$')
434
434
 
435
435
    def test_lock_without_email(self):
436
436
        global_config = config.GlobalStack()
508
508
        t = self.get_transport()
509
509
        t.mkdir('test_lock')
510
510
        t.mkdir('test_lock/held')
511
 
        t.put_bytes('test_lock/held/info', b'')
 
511
        t.put_bytes('test_lock/held/info', '')
512
512
        lf = LockDir(t, 'test_lock')
513
513
        info = lf.peek()
514
514
        formatted_info = info.to_readable_dict()
515
515
        self.assertEqual(
516
516
            dict(user='<unknown>', hostname='<unknown>', pid='<unknown>',
517
 
                 time_ago='(unknown)'),
 
517
                time_ago='(unknown)'),
518
518
            formatted_info)
519
519
 
520
520
    def test_corrupt_lockdir_info(self):
526
526
        t = self.get_transport()
527
527
        t.mkdir('test_lock')
528
528
        t.mkdir('test_lock/held')
529
 
        t.put_bytes('test_lock/held/info', b'\0')
 
529
        t.put_bytes('test_lock/held/info', '\0')
530
530
        lf = LockDir(t, 'test_lock')
531
531
        self.assertRaises(errors.LockCorrupt, lf.peek)
532
532
        # Currently attempt_lock gives LockContention, but LockCorrupt would be
665
665
 
666
666
    def test_unicode(self):
667
667
        info = LockHeldInfo.for_this_process(None)
668
 
        self.assertContainsRe(str(info),
669
 
                              r'held by .* on .* \(process #\d+\), acquired .* ago')
 
668
        self.assertContainsRe(unicode(info),
 
669
            r'held by .* on .* \(process #\d+\), acquired .* ago')
670
670
 
671
671
    def test_is_locked_by_this_process(self):
672
672
        info = LockHeldInfo.for_this_process(None)
685
685
    def test_lock_holder_dead_process(self):
686
686
        """Detect that the holder (this process) is still running."""
687
687
        self.overrideAttr(lockdir, 'get_host_name',
688
 
                          lambda: 'aproperhostname')
 
688
            lambda: 'aproperhostname')
689
689
        info = LockHeldInfo.for_this_process(None)
690
690
        info.info_dict['pid'] = '123123123'
691
691
        self.assertTrue(info.is_lock_holder_known_dead())
713
713
        known for sure.
714
714
        """
715
715
        self.overrideAttr(lockdir, 'get_host_name',
716
 
                          lambda: 'localhost')
 
716
            lambda: 'localhost')
717
717
        info = LockHeldInfo.for_this_process(None)
718
718
        info.info_dict['pid'] = '123123123'
719
719
        self.assertFalse(info.is_lock_holder_known_dead())
731
731
        This generates a warning but no other user interaction.
732
732
        """
733
733
        self.overrideAttr(lockdir, 'get_host_name',
734
 
                          lambda: 'aproperhostname')
735
 
        # Stealing dead locks is enabled by default.
 
734
            lambda: 'aproperhostname')
 
735
        # This is off by default at present; see the discussion in the bug.
 
736
        # If you change the default, don't forget to update the docs.
 
737
        config.GlobalStack().set('locks.steal_dead', True)
736
738
        # Create a lock pretending to come from a different nonexistent
737
739
        # process on the same machine.
738
740
        l1 = LockDir(self.get_transport(), 'a',
739
 
                     extra_holder_info={'pid': '12312313'})
 
741
            extra_holder_info={'pid': '12312313'})
740
742
        token_1 = l1.attempt_lock()
741
743
        l2 = LockDir(self.get_transport(), 'a')
742
744
        token_2 = l2.attempt_lock()
743
745
        # l1 will notice its lock was stolen.
744
746
        self.assertRaises(errors.LockBroken,
745
 
                          l1.unlock)
 
747
            l1.unlock)
746
748
        l2.unlock()
747
749
 
748
750
    def test_auto_break_stale_lock_configured_off(self):
749
751
        """Automatic breaking can be turned off"""
750
752
        l1 = LockDir(self.get_transport(), 'a',
751
 
                     extra_holder_info={'pid': '12312313'})
752
 
        # Stealing dead locks is enabled by default, so disable it.
753
 
        config.GlobalStack().set('locks.steal_dead', False)
 
753
            extra_holder_info={'pid': '12312313'})
754
754
        token_1 = l1.attempt_lock()
755
755
        self.addCleanup(l1.unlock)
756
756
        l2 = LockDir(self.get_transport(), 'a')
757
 
        # This fails now, because dead lock breaking is disabled.
 
757
        # This fails now, because dead lock breaking is off by default.
758
758
        self.assertRaises(LockContention,
759
 
                          l2.attempt_lock)
 
759
            l2.attempt_lock)
760
760
        # and it's in fact not broken
761
761
        l1.confirm()