# Copyright (C) 2007-2010 Canonical Ltd
# Authors:  Robert Collins <robert.collins@canonical.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Tests for WorkingTreeFormat4"""

import os
import time

from bzrlib import (
    bzrdir,
    dirstate,
    errors,
    inventory,
    osutils,
    workingtree_4,
    )
from bzrlib.lockdir import LockDir
from bzrlib.tests import TestCaseWithTransport, TestSkipped
from bzrlib.tree import InterTree


class TestWorkingTreeFormat4(TestCaseWithTransport):
    """Tests specific to WorkingTreeFormat4."""

    def test_disk_layout(self):
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
        control.create_repository()
        control.create_branch()
        tree = workingtree_4.WorkingTreeFormat4().initialize(control)
        # we want:
        # format 'Bazaar Working Tree format 4'
        # stat-cache = ??
        t = control.get_workingtree_transport(None)
        self.assertEqualDiff('Bazaar Working Tree Format 4 (bzr 0.15)\n',
                             t.get('format').read())
        self.assertFalse(t.has('inventory.basis'))
        # no last-revision file means 'None' or 'NULLREVISION'
        self.assertFalse(t.has('last-revision'))
        state = dirstate.DirState.on_file(t.local_abspath('dirstate'))
        state.lock_read()
        try:
            self.assertEqual([], state.get_parent_ids())
        finally:
            state.unlock()

    def test_uses_lockdir(self):
        """WorkingTreeFormat4 uses its own LockDir:

            - lock is a directory
            - when the WorkingTree is locked, LockDir can see that
        """
        # this test could be factored into a subclass of tests common to both
        # format 3 and 4, but for now its not much of an issue as there is only
        # one in common.
        t = self.get_transport()
        tree = self.make_workingtree()
        self.assertIsDirectory('.bzr', t)
        self.assertIsDirectory('.bzr/checkout', t)
        self.assertIsDirectory('.bzr/checkout/lock', t)
        our_lock = LockDir(t, '.bzr/checkout/lock')
        self.assertEquals(our_lock.peek(), None)
        tree.lock_write()
        self.assertTrue(our_lock.peek())
        tree.unlock()
        self.assertEquals(our_lock.peek(), None)

    def make_workingtree(self, relpath=''):
        url = self.get_url(relpath)
        if relpath:
            self.build_tree([relpath + '/'])
        dir = bzrdir.BzrDirMetaFormat1().initialize(url)
        repo = dir.create_repository()
        branch = dir.create_branch()
        try:
            return workingtree_4.WorkingTreeFormat4().initialize(dir)
        except errors.NotLocalUrl:
            raise TestSkipped('Not a local URL')

    def test_dirstate_stores_all_parent_inventories(self):
        tree = self.make_workingtree()

        # We're going to build in tree a working tree
        # with three parent trees, with some files in common.

        # We really don't want to do commit or merge in the new dirstate-based
        # tree, because that might not work yet.  So instead we build
        # revisions elsewhere and pull them across, doing by hand part of the
        # work that merge would do.

        subtree = self.make_branch_and_tree('subdir')
        # writelock the tree so its repository doesn't get readlocked by
        # the revision tree locks. This works around the bug where we dont
        # permit lock upgrading.
        subtree.lock_write()
        self.addCleanup(subtree.unlock)
        self.build_tree(['subdir/file-a',])
        subtree.add(['file-a'], ['id-a'])
        rev1 = subtree.commit('commit in subdir')

        subtree2 = subtree.bzrdir.sprout('subdir2').open_workingtree()
        self.build_tree(['subdir2/file-b'])
        subtree2.add(['file-b'], ['id-b'])
        rev2 = subtree2.commit('commit in subdir2')

        subtree.flush()
        subtree3 = subtree.bzrdir.sprout('subdir3').open_workingtree()
        rev3 = subtree3.commit('merge from subdir2')

        repo = tree.branch.repository
        repo.fetch(subtree.branch.repository, rev1)
        repo.fetch(subtree2.branch.repository, rev2)
        repo.fetch(subtree3.branch.repository, rev3)
        # will also pull the others...

        # create repository based revision trees
        rev1_revtree = repo.revision_tree(rev1)
        rev2_revtree = repo.revision_tree(rev2)
        rev3_revtree = repo.revision_tree(rev3)
        # tree doesn't contain a text merge yet but we'll just
        # set the parents as if a merge had taken place.
        # this should cause the tree data to be folded into the
        # dirstate.
        tree.set_parent_trees([
            (rev1, rev1_revtree),
            (rev2, rev2_revtree),
            (rev3, rev3_revtree), ])

        # create tree-sourced revision trees
        rev1_tree = tree.revision_tree(rev1)
        rev1_tree.lock_read()
        self.addCleanup(rev1_tree.unlock)
        rev2_tree = tree.revision_tree(rev2)
        rev2_tree.lock_read()
        self.addCleanup(rev2_tree.unlock)
        rev3_tree = tree.revision_tree(rev3)
        rev3_tree.lock_read()
        self.addCleanup(rev3_tree.unlock)

        # now we should be able to get them back out
        self.assertTreesEqual(rev1_revtree, rev1_tree)
        self.assertTreesEqual(rev2_revtree, rev2_tree)
        self.assertTreesEqual(rev3_revtree, rev3_tree)

    def test_dirstate_doesnt_read_parents_from_repo_when_setting(self):
        """Setting parent trees on a dirstate working tree takes
        the trees it's given and doesn't need to read them from the
        repository.
        """
        tree = self.make_workingtree()

        subtree = self.make_branch_and_tree('subdir')
        rev1 = subtree.commit('commit in subdir')
        rev1_tree = subtree.basis_tree()
        rev1_tree.lock_read()
        self.addCleanup(rev1_tree.unlock)

        tree.branch.pull(subtree.branch)

        # break the repository's legs to make sure it only uses the trees
        # it's given; any calls to forbidden methods will raise an
        # AssertionError
        repo = tree.branch.repository
        repo.get_revision = self.fail
        repo.get_inventory = self.fail
        repo._get_inventory_xml = self.fail
        # try to set the parent trees.
        tree.set_parent_trees([(rev1, rev1_tree)])

    def test_dirstate_doesnt_read_from_repo_when_returning_cache_tree(self):
        """Getting parent trees from a dirstate tree does not read from the
        repos inventory store. This is an important part of the dirstate
        performance optimisation work.
        """
        tree = self.make_workingtree()

        subtree = self.make_branch_and_tree('subdir')
        # writelock the tree so its repository doesn't get readlocked by
        # the revision tree locks. This works around the bug where we dont
        # permit lock upgrading.
        subtree.lock_write()
        self.addCleanup(subtree.unlock)
        rev1 = subtree.commit('commit in subdir')
        rev1_tree = subtree.basis_tree()
        rev1_tree.lock_read()
        rev1_tree.inventory
        self.addCleanup(rev1_tree.unlock)
        rev2 = subtree.commit('second commit in subdir', allow_pointless=True)
        rev2_tree = subtree.basis_tree()
        rev2_tree.lock_read()
        rev2_tree.inventory
        self.addCleanup(rev2_tree.unlock)

        tree.branch.pull(subtree.branch)

        # break the repository's legs to make sure it only uses the trees
        # it's given; any calls to forbidden methods will raise an
        # AssertionError
        repo = tree.branch.repository
        # dont uncomment this: the revision object must be accessed to
        # answer 'get_parent_ids' for the revision tree- dirstate does not
        # cache the parents of a parent tree at this point.
        #repo.get_revision = self.fail
        repo.get_inventory = self.fail
        repo._get_inventory_xml = self.fail
        # set the parent trees.
        tree.set_parent_trees([(rev1, rev1_tree), (rev2, rev2_tree)])
        # read the first tree
        result_rev1_tree = tree.revision_tree(rev1)
        # read the second
        result_rev2_tree = tree.revision_tree(rev2)
        # compare - there should be no differences between the handed and
        # returned trees
        self.assertTreesEqual(rev1_tree, result_rev1_tree)
        self.assertTreesEqual(rev2_tree, result_rev2_tree)

    def test_dirstate_doesnt_cache_non_parent_trees(self):
        """Getting parent trees from a dirstate tree does not read from the
        repos inventory store. This is an important part of the dirstate
        performance optimisation work.
        """
        tree = self.make_workingtree()

        # make a tree that we can try for, which is able to be returned but
        # must not be
        subtree = self.make_branch_and_tree('subdir')
        rev1 = subtree.commit('commit in subdir')
        tree.branch.pull(subtree.branch)
        # check it fails
        self.assertRaises(errors.NoSuchRevision, tree.revision_tree, rev1)

    def test_no_dirstate_outside_lock(self):
        # temporary test until the code is mature enough to test from outside.
        """Getting a dirstate object fails if there is no lock."""
        def lock_and_call_current_dirstate(tree, lock_method):
            getattr(tree, lock_method)()
            tree.current_dirstate()
            tree.unlock()
        tree = self.make_workingtree()
        self.assertRaises(errors.ObjectNotLocked, tree.current_dirstate)
        lock_and_call_current_dirstate(tree, 'lock_read')
        self.assertRaises(errors.ObjectNotLocked, tree.current_dirstate)
        lock_and_call_current_dirstate(tree, 'lock_write')
        self.assertRaises(errors.ObjectNotLocked, tree.current_dirstate)
        lock_and_call_current_dirstate(tree, 'lock_tree_write')
        self.assertRaises(errors.ObjectNotLocked, tree.current_dirstate)

    def test_new_dirstate_on_new_lock(self):
        # until we have detection for when a dirstate can be reused, we
        # want to reparse dirstate on every new lock.
        known_dirstates = set()
        def lock_and_compare_all_current_dirstate(tree, lock_method):
            getattr(tree, lock_method)()
            state = tree.current_dirstate()
            self.assertFalse(state in known_dirstates)
            known_dirstates.add(state)
            tree.unlock()
        tree = self.make_workingtree()
        # lock twice with each type to prevent silly per-lock-type bugs.
        # each lock and compare looks for a unique state object.
        lock_and_compare_all_current_dirstate(tree, 'lock_read')
        lock_and_compare_all_current_dirstate(tree, 'lock_read')
        lock_and_compare_all_current_dirstate(tree, 'lock_tree_write')
        lock_and_compare_all_current_dirstate(tree, 'lock_tree_write')
        lock_and_compare_all_current_dirstate(tree, 'lock_write')
        lock_and_compare_all_current_dirstate(tree, 'lock_write')

    def test_constructing_invalid_interdirstate_raises(self):
        tree = self.make_workingtree()
        rev_id = tree.commit('first post')
        rev_id2 = tree.commit('second post')
        rev_tree = tree.branch.repository.revision_tree(rev_id)
        # Exception is not a great thing to raise, but this test is
        # very short, and code is used to sanity check other tests, so
        # a full error object is YAGNI.
        self.assertRaises(
            Exception, workingtree_4.InterDirStateTree, rev_tree, tree)
        self.assertRaises(
            Exception, workingtree_4.InterDirStateTree, tree, rev_tree)

    def test_revtree_to_revtree_not_interdirstate(self):
        # we should not get a dirstate optimiser for two repository sourced
        # revtrees. we can't prove a negative, so we dont do exhaustive tests
        # of all formats; though that could be written in the future it doesn't
        # seem well worth it.
        tree = self.make_workingtree()
        rev_id = tree.commit('first post')
        rev_id2 = tree.commit('second post')
        rev_tree = tree.branch.repository.revision_tree(rev_id)
        rev_tree2 = tree.branch.repository.revision_tree(rev_id2)
        optimiser = InterTree.get(rev_tree, rev_tree2)
        self.assertIsInstance(optimiser, InterTree)
        self.assertFalse(isinstance(optimiser, workingtree_4.InterDirStateTree))
        optimiser = InterTree.get(rev_tree2, rev_tree)
        self.assertIsInstance(optimiser, InterTree)
        self.assertFalse(isinstance(optimiser, workingtree_4.InterDirStateTree))

    def test_revtree_not_in_dirstate_to_dirstate_not_interdirstate(self):
        # we should not get a dirstate optimiser when the revision id for of
        # the source is not in the dirstate of the target.
        tree = self.make_workingtree()
        rev_id = tree.commit('first post')
        rev_id2 = tree.commit('second post')
        rev_tree = tree.branch.repository.revision_tree(rev_id)
        tree.lock_read()
        optimiser = InterTree.get(rev_tree, tree)
        self.assertIsInstance(optimiser, InterTree)
        self.assertFalse(isinstance(optimiser, workingtree_4.InterDirStateTree))
        optimiser = InterTree.get(tree, rev_tree)
        self.assertIsInstance(optimiser, InterTree)
        self.assertFalse(isinstance(optimiser, workingtree_4.InterDirStateTree))
        tree.unlock()

    def test_empty_basis_to_dirstate_tree(self):
        # we should get a InterDirStateTree for doing
        # 'changes_from' from the first basis dirstate revision tree to a
        # WorkingTree4.
        tree = self.make_workingtree()
        tree.lock_read()
        basis_tree = tree.basis_tree()
        basis_tree.lock_read()
        optimiser = InterTree.get(basis_tree, tree)
        tree.unlock()
        basis_tree.unlock()
        self.assertIsInstance(optimiser, workingtree_4.InterDirStateTree)

    def test_nonempty_basis_to_dirstate_tree(self):
        # we should get a InterDirStateTree for doing
        # 'changes_from' from a non-null basis dirstate revision tree to a
        # WorkingTree4.
        tree = self.make_workingtree()
        tree.commit('first post')
        tree.lock_read()
        basis_tree = tree.basis_tree()
        basis_tree.lock_read()
        optimiser = InterTree.get(basis_tree, tree)
        tree.unlock()
        basis_tree.unlock()
        self.assertIsInstance(optimiser, workingtree_4.InterDirStateTree)

    def test_empty_basis_revtree_to_dirstate_tree(self):
        # we should get a InterDirStateTree for doing
        # 'changes_from' from an empty repository based rev tree to a
        # WorkingTree4.
        tree = self.make_workingtree()
        tree.lock_read()
        basis_tree = tree.branch.repository.revision_tree(tree.last_revision())
        basis_tree.lock_read()
        optimiser = InterTree.get(basis_tree, tree)
        tree.unlock()
        basis_tree.unlock()
        self.assertIsInstance(optimiser, workingtree_4.InterDirStateTree)

    def test_nonempty_basis_revtree_to_dirstate_tree(self):
        # we should get a InterDirStateTree for doing
        # 'changes_from' from a non-null repository based rev tree to a
        # WorkingTree4.
        tree = self.make_workingtree()
        tree.commit('first post')
        tree.lock_read()
        basis_tree = tree.branch.repository.revision_tree(tree.last_revision())
        basis_tree.lock_read()
        optimiser = InterTree.get(basis_tree, tree)
        tree.unlock()
        basis_tree.unlock()
        self.assertIsInstance(optimiser, workingtree_4.InterDirStateTree)

    def test_tree_to_basis_in_other_tree(self):
        # we should get a InterDirStateTree when
        # the source revid is in the dirstate object of the target and
        # the dirstates are different. This is largely covered by testing
        # with repository revtrees, so is just for extra confidence.
        tree = self.make_workingtree('a')
        tree.commit('first post')
        tree2 = self.make_workingtree('b')
        tree2.pull(tree.branch)
        basis_tree = tree.basis_tree()
        tree2.lock_read()
        basis_tree.lock_read()
        optimiser = InterTree.get(basis_tree, tree2)
        tree2.unlock()
        basis_tree.unlock()
        self.assertIsInstance(optimiser, workingtree_4.InterDirStateTree)

    def test_merged_revtree_to_tree(self):
        # we should get a InterDirStateTree when
        # the source tree is a merged tree present in the dirstate of target.
        tree = self.make_workingtree('a')
        tree.commit('first post')
        tree.commit('tree 1 commit 2')
        tree2 = self.make_workingtree('b')
        tree2.pull(tree.branch)
        tree2.commit('tree 2 commit 2')
        tree.merge_from_branch(tree2.branch)
        second_parent_tree = tree.revision_tree(tree.get_parent_ids()[1])
        second_parent_tree.lock_read()
        tree.lock_read()
        optimiser = InterTree.get(second_parent_tree, tree)
        tree.unlock()
        second_parent_tree.unlock()
        self.assertIsInstance(optimiser, workingtree_4.InterDirStateTree)

    def test_id2path(self):
        tree = self.make_workingtree('tree')
        self.build_tree(['tree/a', 'tree/b'])
        tree.add(['a'], ['a-id'])
        self.assertEqual(u'a', tree.id2path('a-id'))
        self.assertRaises(errors.NoSuchId, tree.id2path, 'a')
        tree.commit('a')
        tree.add(['b'], ['b-id'])

        try:
            new_path = u'b\u03bcrry'
            tree.rename_one('a', new_path)
        except UnicodeEncodeError:
            # support running the test on non-unicode platforms
            new_path = 'c'
            tree.rename_one('a', new_path)
        self.assertEqual(new_path, tree.id2path('a-id'))
        tree.commit(u'b\xb5rry')
        tree.unversion(['a-id'])
        self.assertRaises(errors.NoSuchId, tree.id2path, 'a-id')
        self.assertEqual('b', tree.id2path('b-id'))
        self.assertRaises(errors.NoSuchId, tree.id2path, 'c-id')

    def test_unique_root_id_per_tree(self):
        # each time you initialize a new tree, it gets a different root id
        format_name = 'dirstate-with-subtree'
        tree1 = self.make_branch_and_tree('tree1',
            format=format_name)
        tree2 = self.make_branch_and_tree('tree2',
            format=format_name)
        self.assertNotEqual(tree1.get_root_id(), tree2.get_root_id())
        # when you branch, it inherits the same root id
        rev1 = tree1.commit('first post')
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
        self.assertEqual(tree3.get_root_id(), tree1.get_root_id())

    def test_set_root_id(self):
        # similar to some code that fails in the dirstate-plus-subtree branch
        # -- setting the root id while adding a parent seems to scramble the
        # dirstate invariants. -- mbp 20070303
        def validate():
            wt.lock_read()
            try:
                wt.current_dirstate()._validate()
            finally:
                wt.unlock()
        wt = self.make_workingtree('tree')
        wt.set_root_id('TREE-ROOTID')
        validate()
        wt.commit('somenthing')
        validate()
        # now switch and commit again
        wt.set_root_id('tree-rootid')
        validate()
        wt.commit('again')
        validate()

    def test_default_root_id(self):
        tree = self.make_branch_and_tree('tag', format='dirstate-tags')
        self.assertEqual(inventory.ROOT_ID, tree.get_root_id())
        tree = self.make_branch_and_tree('subtree',
                                         format='dirstate-with-subtree')
        self.assertNotEqual(inventory.ROOT_ID, tree.get_root_id())

    def test_non_subtree_with_nested_trees(self):
        # prior to dirstate, st/diff/commit ignored nested trees.
        # dirstate, as opposed to dirstate-with-subtree, should
        # behave the same way.
        tree = self.make_branch_and_tree('.', format='dirstate')
        self.assertFalse(tree.supports_tree_reference())
        self.build_tree(['dir/'])
        # for testing easily.
        tree.set_root_id('root')
        tree.add(['dir'], ['dir-id'])
        subtree = self.make_branch_and_tree('dir')
        # the most primitive operation: kind
        self.assertEqual('directory', tree.kind('dir-id'))
        # a diff against the basis should give us a directory and the root (as
        # the root is new too).
        tree.lock_read()
        expected = [('dir-id',
            (None, u'dir'),
            True,
            (False, True),
            (None, 'root'),
            (None, u'dir'),
            (None, 'directory'),
            (None, False)),
            ('root', (None, u''), True, (False, True), (None, None),
            (None, u''), (None, 'directory'), (None, 0))]
        self.assertEqual(expected, list(tree.iter_changes(tree.basis_tree(),
            specific_files=['dir'])))
        tree.unlock()
        # do a commit, we want to trigger the dirstate fast-path too
        tree.commit('first post')
        # change the path for the subdir, which will trigger getting all
        # its data:
        os.rename('dir', 'also-dir')
        # now the diff will use the fast path
        tree.lock_read()
        expected = [('dir-id',
            (u'dir', u'dir'),
            True,
            (True, True),
            ('root', 'root'),
            ('dir', 'dir'),
            ('directory', None),
            (False, False))]
        self.assertEqual(expected, list(tree.iter_changes(tree.basis_tree())))
        tree.unlock()

    def test_with_subtree_supports_tree_references(self):
        # dirstate-with-subtree should support tree-references.
        tree = self.make_branch_and_tree('.', format='dirstate-with-subtree')
        self.assertTrue(tree.supports_tree_reference())
        # having checked this is on, the tree interface, and intertree
        # interface tests, will proceed to test the subtree support of
        # workingtree_4.

    def test_iter_changes_ignores_unversioned_dirs(self):
        """iter_changes should not descend into unversioned directories."""
        tree = self.make_branch_and_tree('.', format='dirstate')
        # We have an unversioned directory at the root, a versioned one with
        # other versioned files and an unversioned directory, and another
        # versioned dir with nothing but an unversioned directory.
        self.build_tree(['unversioned/',
                         'unversioned/a',
                         'unversioned/b/',
                         'versioned/',
                         'versioned/unversioned/',
                         'versioned/unversioned/a',
                         'versioned/unversioned/b/',
                         'versioned2/',
                         'versioned2/a',
                         'versioned2/unversioned/',
                         'versioned2/unversioned/a',
                         'versioned2/unversioned/b/',
                        ])
        tree.add(['versioned', 'versioned2', 'versioned2/a'])
        tree.commit('one', rev_id='rev-1')
        # Trap osutils._walkdirs_utf8 to spy on what dirs have been accessed.
        returned = []
        def walkdirs_spy(*args, **kwargs):
            for val in orig(*args, **kwargs):
                returned.append(val[0][0])
                yield val
        orig = self.overrideAttr(osutils, '_walkdirs_utf8', walkdirs_spy)

        basis = tree.basis_tree()
        tree.lock_read()
        self.addCleanup(tree.unlock)
        basis.lock_read()
        self.addCleanup(basis.unlock)
        changes = [c[1] for c in
                   tree.iter_changes(basis, want_unversioned=True)]
        self.assertEqual([(None, 'unversioned'),
                          (None, 'versioned/unversioned'),
                          (None, 'versioned2/unversioned'),
                         ], changes)
        self.assertEqual(['', 'versioned', 'versioned2'], returned)
        del returned[:] # reset
        changes = [c[1] for c in tree.iter_changes(basis)]
        self.assertEqual([], changes)
        self.assertEqual(['', 'versioned', 'versioned2'], returned)

    def test_iter_changes_unversioned_error(self):
        """ Check if a PathsNotVersionedError is correctly raised and the
            paths list contains all unversioned entries only.
        """
        tree = self.make_branch_and_tree('tree')
        self.build_tree_contents([('tree/bar', '')])
        tree.add(['bar'], ['bar-id'])
        tree.lock_read()
        self.addCleanup(tree.unlock)
        tree_iter_changes = lambda files: [
            c for c in tree.iter_changes(tree.basis_tree(), specific_files=files,
                                         require_versioned=True)
        ]
        e = self.assertRaises(errors.PathsNotVersionedError,
                              tree_iter_changes, ['bar', 'foo'])
        self.assertEqual(e.paths, ['foo'])

    def get_tree_with_cachable_file_foo(self):
        tree = self.make_branch_and_tree('.')
        self.build_tree(['foo'])
        tree.add(['foo'], ['foo-id'])
        # a 4 second old timestamp is always hashable - sucks to delay
        # the test suite, but not testing this is worse.
        time.sleep(4)
        return tree

    def test_commit_updates_hash_cache(self):
        tree = self.get_tree_with_cachable_file_foo()
        revid = tree.commit('a commit')
        # tree's dirstate should now have a valid stat entry for foo.
        tree.lock_read()
        self.addCleanup(tree.unlock)
        entry = tree._get_entry(path='foo')
        expected_sha1 = osutils.sha_file_by_name('foo')
        self.assertEqual(expected_sha1, entry[1][0][1])

    def test_observed_sha1_cachable(self):
        tree = self.get_tree_with_cachable_file_foo()
        expected_sha1 = osutils.sha_file_by_name('foo')
        statvalue = os.lstat("foo")
        tree.lock_write()
        try:
            tree._observed_sha1("foo-id", "foo", (expected_sha1, statvalue))
            self.assertEqual(expected_sha1,
                tree._get_entry(path="foo")[1][0][1])
        finally:
            tree.unlock()
        tree = tree.bzrdir.open_workingtree()
        tree.lock_read()
        self.addCleanup(tree.unlock)
        self.assertEqual(expected_sha1, tree._get_entry(path="foo")[1][0][1])

    def test_observed_sha1_new_file(self):
        tree = self.make_branch_and_tree('.')
        self.build_tree(['foo'])
        tree.add(['foo'], ['foo-id'])
        tree.lock_read()
        try:
            current_sha1 = tree._get_entry(path="foo")[1][0][1]
        finally:
            tree.unlock()
        tree.lock_write()
        try:
            tree._observed_sha1("foo-id", "foo",
                (osutils.sha_file_by_name('foo'), os.lstat("foo")))
            # Must not have changed
            self.assertEqual(current_sha1,
                tree._get_entry(path="foo")[1][0][1])
        finally:
            tree.unlock()

    def test_get_file_with_stat_id_only(self):
        # Explicit test to ensure we get a lstat value from WT4 trees.
        tree = self.make_branch_and_tree('.')
        self.build_tree(['foo'])
        tree.add(['foo'], ['foo-id'])
        tree.lock_read()
        self.addCleanup(tree.unlock)
        file_obj, statvalue = tree.get_file_with_stat('foo-id')
        expected = os.lstat('foo')
        self.assertEqualStat(expected, statvalue)
        self.assertEqual(["contents of foo\n"], file_obj.readlines())


class TestCorruptDirstate(TestCaseWithTransport):
    """Tests for how we handle when the dirstate has been corrupted."""

    def create_wt4(self):
        control = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
        control.create_repository()
        control.create_branch()
        tree = workingtree_4.WorkingTreeFormat4().initialize(control)
        return tree

    def test_invalid_rename(self):
        tree = self.create_wt4()
        # Create a corrupted dirstate
        tree.lock_write()
        try:
            # We need a parent, or we always compare with NULL
            tree.commit('init')
            state = tree.current_dirstate()
            state._read_dirblocks_if_needed()
            # Now add in an invalid entry, a rename with a dangling pointer
            state._dirblocks[1][1].append((('', 'foo', 'foo-id'),
                                            [('f', '', 0, False, ''),
                                             ('r', 'bar', 0 , False, '')]))
            self.assertListRaises(errors.CorruptDirstate,
                                  tree.iter_changes, tree.basis_tree())
        finally:
            tree.unlock()

    def get_simple_dirblocks(self, state):
        """Extract the simple information from the DirState.

        This returns the dirblocks, only with the sha1sum and stat details
        filtered out.
        """
        simple_blocks = []
        for block in state._dirblocks:
            simple_block = (block[0], [])
            for entry in block[1]:
                # Include the key for each entry, and for each parent include
                # just the minikind, so we know if it was
                # present/absent/renamed/etc
                simple_block[1].append((entry[0], [i[0] for i in entry[1]]))
            simple_blocks.append(simple_block)
        return simple_blocks

    def test_update_basis_with_invalid_delta(self):
        """When given an invalid delta, it should abort, and not be saved."""
        self.build_tree(['dir/', 'dir/file'])
        tree = self.create_wt4()
        tree.lock_write()
        self.addCleanup(tree.unlock)
        tree.add(['dir', 'dir/file'], ['dir-id', 'file-id'])
        first_revision_id = tree.commit('init')

        root_id = tree.path2id('')
        state = tree.current_dirstate()
        state._read_dirblocks_if_needed()
        self.assertEqual([
            ('', [(('', '', root_id), ['d', 'd'])]),
            ('', [(('', 'dir', 'dir-id'), ['d', 'd'])]),
            ('dir', [(('dir', 'file', 'file-id'), ['f', 'f'])]),
        ],  self.get_simple_dirblocks(state))

        tree.remove(['dir/file'])
        self.assertEqual([
            ('', [(('', '', root_id), ['d', 'd'])]),
            ('', [(('', 'dir', 'dir-id'), ['d', 'd'])]),
            ('dir', [(('dir', 'file', 'file-id'), ['a', 'f'])]),
        ],  self.get_simple_dirblocks(state))
        # Make sure the removal is written to disk
        tree.flush()

        # self.assertRaises(Exception, tree.update_basis_by_delta,
        new_dir = inventory.InventoryDirectory('dir-id', 'new-dir', root_id)
        new_dir.revision = 'new-revision-id'
        new_file = inventory.InventoryFile('file-id', 'new-file', root_id)
        new_file.revision = 'new-revision-id'
        self.assertRaises(errors.InconsistentDelta,
            tree.update_basis_by_delta, 'new-revision-id',
            [('dir', 'new-dir', 'dir-id', new_dir),
             ('dir/file', 'new-dir/new-file', 'file-id', new_file),
            ])
        del state

        # Now when we re-read the file it should not have been modified
        tree.unlock()
        tree.lock_read()
        self.assertEqual(first_revision_id, tree.last_revision())
        state = tree.current_dirstate()
        state._read_dirblocks_if_needed()
        self.assertEqual([
            ('', [(('', '', root_id), ['d', 'd'])]),
            ('', [(('', 'dir', 'dir-id'), ['d', 'd'])]),
            ('dir', [(('dir', 'file', 'file-id'), ['a', 'f'])]),
        ],  self.get_simple_dirblocks(state))
