/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_lockdir.py

Merge with bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
25
25
from bzrlib import (
26
26
    config,
27
27
    errors,
 
28
    lock,
28
29
    osutils,
29
30
    tests,
30
31
    transport,
658
659
        self.assertRaises(errors.LockContention, ld2.attempt_lock)
659
660
        # no kibble
660
661
        check_dir(['held'])
 
662
 
 
663
    def record_hook(self, result):
 
664
        self._calls.append(result)
 
665
 
 
666
    def reset_hooks(self):
 
667
        self._old_hooks = lock.Lock.hooks
 
668
        self.addCleanup(self.restore_hooks)
 
669
        lock.Lock.hooks = lock.LockHooks()
 
670
 
 
671
    def restore_hooks(self):
 
672
        lock.Lock.hooks = self._old_hooks
 
673
 
 
674
    def test_LockDir_acquired_success(self):
 
675
        # the LockDir.lock_acquired hook fires when a lock is acquired.
 
676
        self._calls = []
 
677
        self.reset_hooks()
 
678
        LockDir.hooks.install_named_hook('lock_acquired',
 
679
            self.record_hook, 'record_hook')
 
680
        ld = self.get_lock()
 
681
        ld.create()
 
682
        self.assertEqual([], self._calls)
 
683
        result = ld.attempt_lock()
 
684
        lock_path = ld.transport.abspath(ld.path)
 
685
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
 
686
        ld.unlock()
 
687
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
 
688
 
 
689
    def test_LockDir_acquired_fail(self):
 
690
        # the LockDir.lock_acquired hook does not fire on failure.
 
691
        self._calls = []
 
692
        self.reset_hooks()
 
693
        ld = self.get_lock()
 
694
        ld.create()
 
695
        ld2 = self.get_lock()
 
696
        ld2.attempt_lock()
 
697
        # install a lock hook now, when the disk lock is locked
 
698
        LockDir.hooks.install_named_hook('lock_acquired',
 
699
            self.record_hook, 'record_hook')
 
700
        self.assertRaises(errors.LockContention, ld.attempt_lock)
 
701
        self.assertEqual([], self._calls)
 
702
        ld2.unlock()
 
703
        self.assertEqual([], self._calls)
 
704
 
 
705
    def test_LockDir_released_success(self):
 
706
        # the LockDir.lock_released hook fires when a lock is acquired.
 
707
        self._calls = []
 
708
        self.reset_hooks()
 
709
        LockDir.hooks.install_named_hook('lock_released',
 
710
            self.record_hook, 'record_hook')
 
711
        ld = self.get_lock()
 
712
        ld.create()
 
713
        self.assertEqual([], self._calls)
 
714
        result = ld.attempt_lock()
 
715
        self.assertEqual([], self._calls)
 
716
        ld.unlock()
 
717
        lock_path = ld.transport.abspath(ld.path)
 
718
        self.assertEqual([lock.LockResult(lock_path, result)], self._calls)
 
719
 
 
720
    def test_LockDir_released_fail(self):
 
721
        # the LockDir.lock_released hook does not fire on failure.
 
722
        self._calls = []
 
723
        self.reset_hooks()
 
724
        ld = self.get_lock()
 
725
        ld.create()
 
726
        ld2 = self.get_lock()
 
727
        ld.attempt_lock()
 
728
        ld2.force_break(ld2.peek())
 
729
        LockDir.hooks.install_named_hook('lock_released',
 
730
            self.record_hook, 'record_hook')
 
731
        self.assertRaises(LockBroken, ld.unlock)
 
732
        self.assertEqual([], self._calls)