/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_bound_sftp.py

  • Committer: Aaron Bentley
  • Date: 2008-10-16 21:27:10 UTC
  • mfrom: (0.15.26 unshelve)
  • mto: (0.16.74 shelf-ui)
  • mto: This revision was merged to the branch mainline in revision 3820.
  • Revision ID: aaron@aaronbentley.com-20081016212710-h9av3nhk76dvmsv5
Merge with unshelve

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for branches bound to an sftp branch."""
 
18
 
 
19
 
 
20
import os
 
21
 
 
22
import bzrlib
 
23
from bzrlib import (
 
24
    bzrdir,
 
25
    )
 
26
from bzrlib.branch import Branch
 
27
from bzrlib.bzrdir import (BzrDir,
 
28
                           BzrDirFormat,
 
29
                           BzrDirFormat6,
 
30
                           BzrDirMetaFormat1,
 
31
                           )
 
32
import bzrlib.errors as errors
 
33
from bzrlib.tests import TestSkipped
 
34
from bzrlib.tests import TestCaseWithTransport
 
35
from bzrlib.transport.local import LocalURLServer
 
36
from bzrlib.transport.memory import MemoryServer
 
37
 
 
38
 
 
39
class BoundSFTPBranch(TestCaseWithTransport):
 
40
 
 
41
    def setUp(self):
 
42
        TestCaseWithTransport.setUp(self)
 
43
        self.vfs_transport_factory = MemoryServer
 
44
        if self.transport_server is LocalURLServer:
 
45
            self.transport_server = None
 
46
 
 
47
    def create_branches(self):
 
48
        self.build_tree(['base/', 'base/a', 'base/b'])
 
49
        format = bzrdir.format_registry.make_bzrdir('knit')
 
50
        try:
 
51
            wt_base = BzrDir.create_standalone_workingtree(
 
52
                self.get_url('base'), format=format)
 
53
        except errors.NotLocalUrl:
 
54
            raise TestSkipped('Not a local URL')
 
55
    
 
56
        b_base = wt_base.branch
 
57
 
 
58
        wt_base.add('a')
 
59
        wt_base.add('b')
 
60
        wt_base.commit('first', rev_id='r@b-1')
 
61
 
 
62
        wt_child = b_base.bzrdir.sprout('child').open_workingtree()
 
63
        self.sftp_base = Branch.open(self.get_url('base'))
 
64
        wt_child.branch.bind(self.sftp_base)
 
65
        # check the branch histories are ready for using in tests.
 
66
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
67
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
68
        return b_base, wt_child
 
69
 
 
70
    def test_simple_binding(self):
 
71
        self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
 
72
        try:
 
73
            wt_base = BzrDir.create_standalone_workingtree(self.get_url('base'))
 
74
        except errors.NotLocalUrl:
 
75
            raise TestSkipped('Not a local URL')
 
76
 
 
77
        wt_base.add('a')
 
78
        wt_base.add('b')
 
79
        wt_base.commit('first', rev_id='r@b-1')
 
80
 
 
81
        b_base = wt_base.branch
 
82
        # manually make a branch we can bind, because the default format
 
83
        # may not be bindable-from, and we want to test the side effects etc
 
84
        # of bondage.
 
85
        format = bzrdir.format_registry.make_bzrdir('knit')
 
86
        b_child = BzrDir.create_branch_convenience('child', format=format)
 
87
        self.assertEqual(None, b_child.get_bound_location())
 
88
        self.assertEqual(None, b_child.get_master_branch())
 
89
 
 
90
        sftp_b_base = Branch.open(self.get_url('base'))
 
91
        b_child.bind(sftp_b_base)
 
92
        self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
 
93
        # the bind must not have given b_child history:
 
94
        self.assertEqual([], b_child.revision_history())
 
95
        # we should be able to update the branch at this point:
 
96
        self.assertEqual(None, b_child.update())
 
97
        # and now there must be history.
 
98
        self.assertEqual(['r@b-1'], b_child.revision_history())
 
99
        # this line is more of a working tree test line, but - what the hey,
 
100
        # it has work to do.
 
101
        b_child.bzrdir.open_workingtree().update()
 
102
        self.failUnlessExists('child/a')
 
103
        self.failUnlessExists('child/b')
 
104
 
 
105
        b_child.unbind()
 
106
        self.assertEqual(None, b_child.get_bound_location())
 
107
 
 
108
    def test_bound_commit(self):
 
109
        b_base, wt_child = self.create_branches()
 
110
 
 
111
        open('child/a', 'wb').write('new contents\n')
 
112
        wt_child.commit('modified a', rev_id='r@c-2')
 
113
 
 
114
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
115
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
116
 
 
117
    def test_bound_commit_fails_when_out_of_date(self):
 
118
        # Make sure commit fails if out of date.
 
119
        b_base, wt_child = self.create_branches()
 
120
 
 
121
        open('base/a', 'wb').write('new base contents\n')
 
122
        b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
 
123
 
 
124
        open('child/b', 'wb').write('new b child contents\n')
 
125
        self.assertRaises(errors.BoundBranchOutOfDate,
 
126
                wt_child.commit, 'child', rev_id='r@c-2')
 
127
 
 
128
        sftp_b_base = Branch.open(self.get_url('base'))
 
129
 
 
130
        # This is all that cmd_update does
 
131
        wt_child.pull(sftp_b_base, overwrite=False)
 
132
 
 
133
        wt_child.commit('child', rev_id='r@c-3')
 
134
 
 
135
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
136
                wt_child.branch.revision_history())
 
137
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
138
                b_base.revision_history())
 
139
        self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
 
140
                sftp_b_base.revision_history())
 
141
 
 
142
    def test_double_binding(self):
 
143
        b_base, wt_child = self.create_branches()
 
144
 
 
145
        wt_child2 = wt_child.branch.create_checkout('child2')
 
146
 
 
147
        open('child2/a', 'wb').write('new contents\n')
 
148
        self.assertRaises(errors.CommitToDoubleBoundBranch,
 
149
                wt_child2.commit, 'child2', rev_id='r@d-2')
 
150
 
 
151
    def test_unbinding(self):
 
152
        from bzrlib.transport import get_transport
 
153
        b_base, wt_child = self.create_branches()
 
154
 
 
155
        # TestCaseWithSFTPServer only allows you to connect one time
 
156
        # to the SFTP server. So we have to create a connection and
 
157
        # keep it around, so that it can be reused
 
158
        __unused_t = get_transport(self.get_url('.'))
 
159
 
 
160
        wt_base = b_base.bzrdir.open_workingtree()
 
161
        open('base/a', 'wb').write('new base contents\n')
 
162
        wt_base.commit('base', rev_id='r@b-2')
 
163
 
 
164
        open('child/b', 'wb').write('new b child contents\n')
 
165
        self.assertRaises(errors.BoundBranchOutOfDate,
 
166
                wt_child.commit, 'child', rev_id='r@c-2')
 
167
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
168
        wt_child.branch.unbind()
 
169
        wt_child.commit('child', rev_id='r@c-2')
 
170
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
171
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
172
 
 
173
        sftp_b_base = Branch.open(self.get_url('base'))
 
174
        self.assertRaises(errors.DivergedBranches,
 
175
                wt_child.branch.bind, sftp_b_base)
 
176
 
 
177
    def test_commit_remote_bound(self):
 
178
        # Make sure it is detected if the current base is bound during the
 
179
        # objects lifetime, when the child goes to commit.
 
180
        b_base, wt_child = self.create_branches()
 
181
 
 
182
        b_base.bzrdir.sprout('newbase')
 
183
 
 
184
        sftp_b_base = Branch.open(self.get_url('base'))
 
185
        sftp_b_newbase = Branch.open(self.get_url('newbase'))
 
186
 
 
187
        sftp_b_base.bind(sftp_b_newbase)
 
188
 
 
189
        open('child/a', 'wb').write('new contents\n')
 
190
        self.assertRaises(errors.CommitToDoubleBoundBranch,
 
191
                wt_child.commit, 'failure', rev_id='r@c-2')
 
192
 
 
193
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
194
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
195
        self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
 
196
 
 
197
    def test_bind_diverged(self):
 
198
        b_base, wt_child = self.create_branches()
 
199
 
 
200
        wt_child.branch.unbind()
 
201
        open('child/a', 'ab').write('child contents\n')
 
202
        wt_child_rev = wt_child.commit('child', rev_id='r@c-2')
 
203
 
 
204
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
205
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
206
 
 
207
        open('base/b', 'ab').write('base contents\n')
 
208
        b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
 
209
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
210
 
 
211
        sftp_b_base = Branch.open(self.get_url('base'))
 
212
 
 
213
        self.assertRaises(errors.DivergedBranches,
 
214
                wt_child.branch.bind, sftp_b_base)
 
215
 
 
216
        wt_child.merge_from_branch(sftp_b_base)
 
217
        self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
 
218
        wt_child.commit('merged', rev_id='r@c-3')
 
219
 
 
220
        # After a merge, trying to bind again should succeed but not push the
 
221
        # new change.
 
222
        wt_child.branch.bind(sftp_b_base)
 
223
 
 
224
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
225
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
 
226
            wt_child.branch.revision_history())
 
227
 
 
228
    def test_bind_parent_ahead_preserves_parent(self):
 
229
        b_base, wt_child = self.create_branches()
 
230
 
 
231
        wt_child.branch.unbind()
 
232
 
 
233
        open('a', 'ab').write('base changes\n')
 
234
        wt_base = b_base.bzrdir.open_workingtree()
 
235
        wt_base.commit('base', rev_id='r@b-2')
 
236
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
237
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
238
 
 
239
        sftp_b_base = Branch.open(self.get_url('base'))
 
240
        wt_child.branch.bind(sftp_b_base)
 
241
 
 
242
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
243
 
 
244
        wt_child.branch.unbind()
 
245
 
 
246
        # Check and make sure it also works if parent is ahead multiple
 
247
        wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
 
248
        wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
 
249
        wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
 
250
 
 
251
        self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
 
252
                b_base.revision_history())
 
253
 
 
254
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
255
 
 
256
        wt_child.branch.bind(sftp_b_base)
 
257
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
 
258
 
 
259
    def test_bind_child_ahead_preserves_child(self):
 
260
        b_base, wt_child = self.create_branches()
 
261
 
 
262
        wt_child.branch.unbind()
 
263
 
 
264
        wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
 
265
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
266
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
267
 
 
268
        sftp_b_base = Branch.open(self.get_url('base'))
 
269
        wt_child.branch.bind(sftp_b_base)
 
270
 
 
271
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
272
 
 
273
        # Check and make sure it also works if child is ahead multiple
 
274
        wt_child.branch.unbind()
 
275
        wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
 
276
        wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
 
277
        wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
 
278
 
 
279
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
 
280
                wt_child.branch.revision_history())
 
281
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
282
 
 
283
        wt_child.branch.bind(sftp_b_base)
 
284
        self.assertEqual(['r@b-1'], b_base.revision_history())
 
285
 
 
286
    def test_commit_after_merge(self):
 
287
        b_base, wt_child = self.create_branches()
 
288
 
 
289
        # We want merge to be able to be a local only
 
290
        # operation, because it does not alter the branch data.
 
291
 
 
292
        # But we can't fail afterwards
 
293
 
 
294
        wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
 
295
 
 
296
        open('other/c', 'wb').write('file c\n')
 
297
        wt_other.add('c')
 
298
        wt_other.commit('adding c', rev_id='r@d-2')
 
299
 
 
300
        self.failIf(wt_child.branch.repository.has_revision('r@d-2'))
 
301
        self.failIf(b_base.repository.has_revision('r@d-2'))
 
302
 
 
303
        wt_child.merge_from_branch(wt_other.branch)
 
304
 
 
305
        self.failUnlessExists('child/c')
 
306
        self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
 
307
        self.failUnless(wt_child.branch.repository.has_revision('r@d-2'))
 
308
        self.failIf(b_base.repository.has_revision('r@d-2'))
 
309
 
 
310
        # Commit should succeed, and cause merged revisions to
 
311
        # be pushed into base
 
312
        wt_child.commit('merge other', rev_id='r@c-2')
 
313
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
 
314
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
315
        self.failUnless(b_base.repository.has_revision('r@d-2'))
 
316
 
 
317
    def test_commit_fails(self):
 
318
        b_base, wt_child = self.create_branches()
 
319
 
 
320
        open('a', 'ab').write('child adds some text\n')
 
321
 
 
322
        # this deletes the branch from memory
 
323
        del b_base
 
324
        # and this moves it out of the way on disk
 
325
        os.rename('base', 'hidden_base')
 
326
 
 
327
        self.assertRaises(errors.BoundBranchConnectionFailure,
 
328
                wt_child.commit, 'added text', rev_id='r@c-2')
 
329
 
 
330
    # TODO: jam 20051231 We need invasive failure tests, so that we can show
 
331
    #       performance even when something fails.
 
332
 
 
333