# Copyright (C) 2009 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


from bzrlib import (
    branch,
    errors,
    )
from bzrlib.smart import (
    server,
    )
from bzrlib.tests.per_repository import TestCaseWithRepository


class TestFetch(TestCaseWithRepository):

    def make_source_branch(self):
        # It would be nice if there was a way to force this to be memory-only
        builder = self.make_branch_builder('source')
        content = ['content lines\n'
                   'for the first revision\n'
                   'which is a marginal amount of content\n'
                  ]
        builder.start_series()
        builder.build_snapshot('A-id', None, [
            ('add', ('', 'root-id', 'directory', None)),
            ('add', ('a', 'a-id', 'file', ''.join(content))),
            ])
        content.append('and some more lines for B\n')
        builder.build_snapshot('B-id', ['A-id'], [
            ('modify', ('a-id', ''.join(content)))])
        content.append('and yet even more content for C\n')
        builder.build_snapshot('C-id', ['B-id'], [
            ('modify', ('a-id', ''.join(content)))])
        builder.finish_series()
        source_b = builder.get_branch()
        source_b.lock_read()
        self.addCleanup(source_b.unlock)
        return content, source_b

    def test_sprout_from_stacked_with_short_history(self):
        content, source_b = self.make_source_branch()
        # Split the generated content into a base branch, and a stacked branch
        # Use 'make_branch' which gives us a bzr:// branch when appropriate,
        # rather than creating a branch-on-disk
        stack_b = self.make_branch('stack-on')
        stack_b.pull(source_b, stop_revision='B-id')
        target_b = self.make_branch('target')
        target_b.set_stacked_on_url('../stack-on')
        target_b.pull(source_b, stop_revision='C-id')
        # At this point, we should have a target branch, with 1 revision, on
        # top of the source.
        final_b = self.make_branch('final')
        final_b.pull(target_b)
        final_b.lock_read()
        self.addCleanup(final_b.unlock)
        self.assertEqual('C-id', final_b.last_revision())
        text_keys = [('a-id', 'A-id'), ('a-id', 'B-id'), ('a-id', 'C-id')]
        stream = final_b.repository.texts.get_record_stream(text_keys,
            'unordered', True)
        records = sorted([(r.key, r.get_bytes_as('fulltext')) for r in stream])
        self.assertEqual([
            (('a-id', 'A-id'), ''.join(content[:-2])),
            (('a-id', 'B-id'), ''.join(content[:-1])),
            (('a-id', 'C-id'), ''.join(content)),
            ], records)

    def test_sprout_from_smart_stacked_with_short_history(self):
        content, source_b = self.make_source_branch()
        transport = self.make_smart_server('server')
        transport.ensure_base()
        url = transport.abspath('')
        stack_b = source_b.bzrdir.sprout(url + '/stack-on', revision_id='B-id')
        # self.make_branch only takes relative paths, so we do it the 'hard'
        # way
        target_transport = transport.clone('target')
        target_transport.ensure_base()
        target_bzrdir = self.bzrdir_format.initialize_on_transport(
                            target_transport)
        target_bzrdir.create_repository()
        target_b = target_bzrdir.create_branch()
        target_b.set_stacked_on_url('../stack-on')
        target_b.pull(source_b, stop_revision='C-id')
        # Now we should be able to branch from the remote location to a local
        # location
        final_b = target_b.bzrdir.sprout('final').open_branch()
        self.assertEqual('C-id', final_b.last_revision())

        # bzrdir.sprout() has slightly different code paths if you supply a
        # revision_id versus not. If you supply revision_id, then you get a
        # PendingAncestryResult for the search, versus a SearchResult...
        final2_b = target_b.bzrdir.sprout('final2',
                                          revision_id='C-id').open_branch()
        self.assertEqual('C-id', final_b.last_revision())

    def make_source_with_ghost_and_stacked_target(self):
        builder = self.make_branch_builder('source')
        builder.start_series()
        builder.build_snapshot('A-id', None, [
            ('add', ('', 'root-id', 'directory', None)),
            ('add', ('file', 'file-id', 'file', 'content\n'))])
        builder.build_snapshot('B-id', ['A-id', 'ghost-id'], [])
        builder.finish_series()
        source_b = builder.get_branch()
        source_b.lock_read()
        self.addCleanup(source_b.unlock)
        base = self.make_branch('base')
        base.pull(source_b, stop_revision='A-id')
        stacked = self.make_branch('stacked')
        stacked.set_stacked_on_url('../base')
        return source_b, base, stacked

    def test_fetch_with_ghost_stacked(self):
        (source_b, base,
         stacked) = self.make_source_with_ghost_and_stacked_target()
        stacked.pull(source_b, stop_revision='B-id')

    def test_fetch_into_smart_stacked_with_ghost(self):
        (source_b, base,
         stacked) = self.make_source_with_ghost_and_stacked_target()
        # Now, create a smart server on 'stacked' and re-open to force the
        # target to be a smart target
        trans = self.make_smart_server('stacked')
        stacked = branch.Branch.open(trans.base)
        stacked.lock_write()
        self.addCleanup(stacked.unlock)
        stacked.pull(source_b, stop_revision='B-id')

    def test_fetch_to_stacked_from_smart_with_ghost(self):
        (source_b, base,
         stacked) = self.make_source_with_ghost_and_stacked_target()
        # Now, create a smart server on 'source' and re-open to force the
        # target to be a smart target
        trans = self.make_smart_server('source')
        source_b = branch.Branch.open(trans.base)
        source_b.lock_read()
        self.addCleanup(source_b.unlock)
        stacked.pull(source_b, stop_revision='B-id')
