/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_repository_reference/test_fetch.py

  • Committer: Martin Pool
  • Date: 2009-06-10 02:51:23 UTC
  • mfrom: (4423 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4464.
  • Revision ID: mbp@sourcefrog.net-20090610025123-2u0c0ng5jcezjzqh
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from bzrlib import (
 
19
    branch,
 
20
    errors,
 
21
    )
 
22
from bzrlib.smart import (
 
23
    server,
 
24
    )
 
25
from bzrlib.tests.per_repository import TestCaseWithRepository
 
26
 
 
27
 
 
28
class TestFetch(TestCaseWithRepository):
 
29
 
 
30
    def make_source_branch(self):
 
31
        # It would be nice if there was a way to force this to be memory-only
 
32
        builder = self.make_branch_builder('source')
 
33
        content = ['content lines\n'
 
34
                   'for the first revision\n'
 
35
                   'which is a marginal amount of content\n'
 
36
                  ]
 
37
        builder.start_series()
 
38
        builder.build_snapshot('A-id', None, [
 
39
            ('add', ('', 'root-id', 'directory', None)),
 
40
            ('add', ('a', 'a-id', 'file', ''.join(content))),
 
41
            ])
 
42
        content.append('and some more lines for B\n')
 
43
        builder.build_snapshot('B-id', ['A-id'], [
 
44
            ('modify', ('a-id', ''.join(content)))])
 
45
        content.append('and yet even more content for C\n')
 
46
        builder.build_snapshot('C-id', ['B-id'], [
 
47
            ('modify', ('a-id', ''.join(content)))])
 
48
        builder.finish_series()
 
49
        source_b = builder.get_branch()
 
50
        source_b.lock_read()
 
51
        self.addCleanup(source_b.unlock)
 
52
        return content, source_b
 
53
 
 
54
    def test_sprout_from_stacked_with_short_history(self):
 
55
        content, source_b = self.make_source_branch()
 
56
        # Split the generated content into a base branch, and a stacked branch
 
57
        # Use 'make_branch' which gives us a bzr:// branch when appropriate,
 
58
        # rather than creating a branch-on-disk
 
59
        stack_b = self.make_branch('stack-on')
 
60
        stack_b.pull(source_b, stop_revision='B-id')
 
61
        target_b = self.make_branch('target')
 
62
        target_b.set_stacked_on_url('../stack-on')
 
63
        target_b.pull(source_b, stop_revision='C-id')
 
64
        # At this point, we should have a target branch, with 1 revision, on
 
65
        # top of the source.
 
66
        final_b = self.make_branch('final')
 
67
        final_b.pull(target_b)
 
68
        final_b.lock_read()
 
69
        self.addCleanup(final_b.unlock)
 
70
        self.assertEqual('C-id', final_b.last_revision())
 
71
        text_keys = [('a-id', 'A-id'), ('a-id', 'B-id'), ('a-id', 'C-id')]
 
72
        stream = final_b.repository.texts.get_record_stream(text_keys,
 
73
            'unordered', True)
 
74
        records = sorted([(r.key, r.get_bytes_as('fulltext')) for r in stream])
 
75
        self.assertEqual([
 
76
            (('a-id', 'A-id'), ''.join(content[:-2])),
 
77
            (('a-id', 'B-id'), ''.join(content[:-1])),
 
78
            (('a-id', 'C-id'), ''.join(content)),
 
79
            ], records)
 
80
 
 
81
    def test_sprout_from_smart_stacked_with_short_history(self):
 
82
        content, source_b = self.make_source_branch()
 
83
        transport = self.make_smart_server('server')
 
84
        transport.ensure_base()
 
85
        url = transport.abspath('')
 
86
        stack_b = source_b.bzrdir.sprout(url + '/stack-on', revision_id='B-id')
 
87
        # self.make_branch only takes relative paths, so we do it the 'hard'
 
88
        # way
 
89
        target_transport = transport.clone('target')
 
90
        target_transport.ensure_base()
 
91
        target_bzrdir = self.bzrdir_format.initialize_on_transport(
 
92
                            target_transport)
 
93
        target_bzrdir.create_repository()
 
94
        target_b = target_bzrdir.create_branch()
 
95
        target_b.set_stacked_on_url('../stack-on')
 
96
        target_b.pull(source_b, stop_revision='C-id')
 
97
        # Now we should be able to branch from the remote location to a local
 
98
        # location
 
99
        final_b = target_b.bzrdir.sprout('final').open_branch()
 
100
        self.assertEqual('C-id', final_b.last_revision())
 
101
 
 
102
        # bzrdir.sprout() has slightly different code paths if you supply a
 
103
        # revision_id versus not. If you supply revision_id, then you get a
 
104
        # PendingAncestryResult for the search, versus a SearchResult...
 
105
        final2_b = target_b.bzrdir.sprout('final2',
 
106
                                          revision_id='C-id').open_branch()
 
107
        self.assertEqual('C-id', final_b.last_revision())
 
108
 
 
109
    def make_source_with_ghost_and_stacked_target(self):
 
110
        builder = self.make_branch_builder('source')
 
111
        builder.start_series()
 
112
        builder.build_snapshot('A-id', None, [
 
113
            ('add', ('', 'root-id', 'directory', None)),
 
114
            ('add', ('file', 'file-id', 'file', 'content\n'))])
 
115
        builder.build_snapshot('B-id', ['A-id', 'ghost-id'], [])
 
116
        builder.finish_series()
 
117
        source_b = builder.get_branch()
 
118
        source_b.lock_read()
 
119
        self.addCleanup(source_b.unlock)
 
120
        base = self.make_branch('base')
 
121
        base.pull(source_b, stop_revision='A-id')
 
122
        stacked = self.make_branch('stacked')
 
123
        stacked.set_stacked_on_url('../base')
 
124
        return source_b, base, stacked
 
125
 
 
126
    def test_fetch_with_ghost_stacked(self):
 
127
        (source_b, base,
 
128
         stacked) = self.make_source_with_ghost_and_stacked_target()
 
129
        stacked.pull(source_b, stop_revision='B-id')
 
130
 
 
131
    def test_fetch_into_smart_stacked_with_ghost(self):
 
132
        (source_b, base,
 
133
         stacked) = self.make_source_with_ghost_and_stacked_target()
 
134
        # Now, create a smart server on 'stacked' and re-open to force the
 
135
        # target to be a smart target
 
136
        trans = self.make_smart_server('stacked')
 
137
        stacked = branch.Branch.open(trans.base)
 
138
        stacked.lock_write()
 
139
        self.addCleanup(stacked.unlock)
 
140
        stacked.pull(source_b, stop_revision='B-id')
 
141
 
 
142
    def test_fetch_to_stacked_from_smart_with_ghost(self):
 
143
        (source_b, base,
 
144
         stacked) = self.make_source_with_ghost_and_stacked_target()
 
145
        # Now, create a smart server on 'source' and re-open to force the
 
146
        # target to be a smart target
 
147
        trans = self.make_smart_server('source')
 
148
        source_b = branch.Branch.open(trans.base)
 
149
        source_b.lock_read()
 
150
        self.addCleanup(source_b.unlock)
 
151
        stacked.pull(source_b, stop_revision='B-id')