1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branches bound to an sftp branch."""
26
from bzrlib.branch import Branch
27
from bzrlib.bzrdir import (BzrDir,
32
import bzrlib.errors as errors
33
from bzrlib.tests import TestSkipped
34
from bzrlib.tests import TestCaseWithTransport
35
from bzrlib.transport.local import LocalURLServer
36
from bzrlib.transport.memory import MemoryServer
39
class BoundSFTPBranch(TestCaseWithTransport):
42
TestCaseWithTransport.setUp(self)
43
self.vfs_transport_factory = MemoryServer
44
if self.transport_server is LocalURLServer:
45
self.transport_server = None
47
def create_branches(self):
48
self.build_tree(['base/', 'base/a', 'base/b'])
49
format = bzrdir.format_registry.make_bzrdir('knit')
51
wt_base = BzrDir.create_standalone_workingtree(
52
self.get_url('base'), format=format)
53
except errors.NotLocalUrl:
54
raise TestSkipped('Not a local URL')
56
b_base = wt_base.branch
60
wt_base.commit('first', rev_id='r@b-1')
62
wt_child = b_base.bzrdir.sprout('child').open_workingtree()
63
self.sftp_base = Branch.open(self.get_url('base'))
64
wt_child.branch.bind(self.sftp_base)
65
# check the branch histories are ready for using in tests.
66
self.assertEqual(['r@b-1'], b_base.revision_history())
67
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
68
return b_base, wt_child
70
def test_simple_binding(self):
71
self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
73
wt_base = BzrDir.create_standalone_workingtree(self.get_url('base'))
74
except errors.NotLocalUrl:
75
raise TestSkipped('Not a local URL')
79
wt_base.commit('first', rev_id='r@b-1')
81
b_base = wt_base.branch
82
# manually make a branch we can bind, because the default format
83
# may not be bindable-from, and we want to test the side effects etc
85
format = bzrdir.format_registry.make_bzrdir('knit')
86
b_child = BzrDir.create_branch_convenience('child', format=format)
87
self.assertEqual(None, b_child.get_bound_location())
88
self.assertEqual(None, b_child.get_master_branch())
90
sftp_b_base = Branch.open(self.get_url('base'))
91
b_child.bind(sftp_b_base)
92
self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
93
# the bind must not have given b_child history:
94
self.assertEqual([], b_child.revision_history())
95
# we should be able to update the branch at this point:
96
self.assertEqual(None, b_child.update())
97
# and now there must be history.
98
self.assertEqual(['r@b-1'], b_child.revision_history())
99
# this line is more of a working tree test line, but - what the hey,
101
b_child.bzrdir.open_workingtree().update()
102
self.failUnlessExists('child/a')
103
self.failUnlessExists('child/b')
106
self.assertEqual(None, b_child.get_bound_location())
108
def test_bound_commit(self):
109
b_base, wt_child = self.create_branches()
111
open('child/a', 'wb').write('new contents\n')
112
wt_child.commit('modified a', rev_id='r@c-2')
114
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
115
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
117
def test_bound_commit_fails_when_out_of_date(self):
118
# Make sure commit fails if out of date.
119
b_base, wt_child = self.create_branches()
121
open('base/a', 'wb').write('new base contents\n')
122
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
124
open('child/b', 'wb').write('new b child contents\n')
125
self.assertRaises(errors.BoundBranchOutOfDate,
126
wt_child.commit, 'child', rev_id='r@c-2')
128
sftp_b_base = Branch.open(self.get_url('base'))
130
# This is all that cmd_update does
131
wt_child.pull(sftp_b_base, overwrite=False)
133
wt_child.commit('child', rev_id='r@c-3')
135
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
136
wt_child.branch.revision_history())
137
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
138
b_base.revision_history())
139
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
140
sftp_b_base.revision_history())
142
def test_double_binding(self):
143
b_base, wt_child = self.create_branches()
145
wt_child2 = wt_child.branch.create_checkout('child2')
147
open('child2/a', 'wb').write('new contents\n')
148
self.assertRaises(errors.CommitToDoubleBoundBranch,
149
wt_child2.commit, 'child2', rev_id='r@d-2')
151
def test_unbinding(self):
152
from bzrlib.transport import get_transport
153
b_base, wt_child = self.create_branches()
155
# TestCaseWithSFTPServer only allows you to connect one time
156
# to the SFTP server. So we have to create a connection and
157
# keep it around, so that it can be reused
158
__unused_t = get_transport(self.get_url('.'))
160
wt_base = b_base.bzrdir.open_workingtree()
161
open('base/a', 'wb').write('new base contents\n')
162
wt_base.commit('base', rev_id='r@b-2')
164
open('child/b', 'wb').write('new b child contents\n')
165
self.assertRaises(errors.BoundBranchOutOfDate,
166
wt_child.commit, 'child', rev_id='r@c-2')
167
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
168
wt_child.branch.unbind()
169
wt_child.commit('child', rev_id='r@c-2')
170
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
171
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
173
sftp_b_base = Branch.open(self.get_url('base'))
174
self.assertRaises(errors.DivergedBranches,
175
wt_child.branch.bind, sftp_b_base)
177
def test_commit_remote_bound(self):
178
# Make sure it is detected if the current base is bound during the
179
# objects lifetime, when the child goes to commit.
180
b_base, wt_child = self.create_branches()
182
b_base.bzrdir.sprout('newbase')
184
sftp_b_base = Branch.open(self.get_url('base'))
185
sftp_b_newbase = Branch.open(self.get_url('newbase'))
187
sftp_b_base.bind(sftp_b_newbase)
189
open('child/a', 'wb').write('new contents\n')
190
self.assertRaises(errors.CommitToDoubleBoundBranch,
191
wt_child.commit, 'failure', rev_id='r@c-2')
193
self.assertEqual(['r@b-1'], b_base.revision_history())
194
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
195
self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
197
def test_bind_diverged(self):
198
b_base, wt_child = self.create_branches()
200
wt_child.branch.unbind()
201
open('child/a', 'ab').write('child contents\n')
202
wt_child_rev = wt_child.commit('child', rev_id='r@c-2')
204
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
205
self.assertEqual(['r@b-1'], b_base.revision_history())
207
open('base/b', 'ab').write('base contents\n')
208
b_base.bzrdir.open_workingtree().commit('base', rev_id='r@b-2')
209
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
211
sftp_b_base = Branch.open(self.get_url('base'))
213
self.assertRaises(errors.DivergedBranches,
214
wt_child.branch.bind, sftp_b_base)
216
wt_child.merge_from_branch(sftp_b_base)
217
self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
218
wt_child.commit('merged', rev_id='r@c-3')
220
# After a merge, trying to bind again should succeed but not push the
222
wt_child.branch.bind(sftp_b_base)
224
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
225
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
226
wt_child.branch.revision_history())
228
def test_bind_parent_ahead_preserves_parent(self):
229
b_base, wt_child = self.create_branches()
231
wt_child.branch.unbind()
233
open('a', 'ab').write('base changes\n')
234
wt_base = b_base.bzrdir.open_workingtree()
235
wt_base.commit('base', rev_id='r@b-2')
236
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
237
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
239
sftp_b_base = Branch.open(self.get_url('base'))
240
wt_child.branch.bind(sftp_b_base)
242
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
244
wt_child.branch.unbind()
246
# Check and make sure it also works if parent is ahead multiple
247
wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
248
wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
249
wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
251
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
252
b_base.revision_history())
254
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
256
wt_child.branch.bind(sftp_b_base)
257
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
259
def test_bind_child_ahead_preserves_child(self):
260
b_base, wt_child = self.create_branches()
262
wt_child.branch.unbind()
264
wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
265
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
266
self.assertEqual(['r@b-1'], b_base.revision_history())
268
sftp_b_base = Branch.open(self.get_url('base'))
269
wt_child.branch.bind(sftp_b_base)
271
self.assertEqual(['r@b-1'], b_base.revision_history())
273
# Check and make sure it also works if child is ahead multiple
274
wt_child.branch.unbind()
275
wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
276
wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
277
wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
279
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
280
wt_child.branch.revision_history())
281
self.assertEqual(['r@b-1'], b_base.revision_history())
283
wt_child.branch.bind(sftp_b_base)
284
self.assertEqual(['r@b-1'], b_base.revision_history())
286
def test_commit_after_merge(self):
287
b_base, wt_child = self.create_branches()
289
# We want merge to be able to be a local only
290
# operation, because it does not alter the branch data.
292
# But we can't fail afterwards
294
wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
296
open('other/c', 'wb').write('file c\n')
298
wt_other.commit('adding c', rev_id='r@d-2')
300
self.failIf(wt_child.branch.repository.has_revision('r@d-2'))
301
self.failIf(b_base.repository.has_revision('r@d-2'))
303
wt_child.merge_from_branch(wt_other.branch)
305
self.failUnlessExists('child/c')
306
self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
307
self.failUnless(wt_child.branch.repository.has_revision('r@d-2'))
308
self.failIf(b_base.repository.has_revision('r@d-2'))
310
# Commit should succeed, and cause merged revisions to
311
# be pushed into base
312
wt_child.commit('merge other', rev_id='r@c-2')
313
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
314
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
315
self.failUnless(b_base.repository.has_revision('r@d-2'))
317
def test_commit_fails(self):
318
b_base, wt_child = self.create_branches()
320
open('a', 'ab').write('child adds some text\n')
322
# this deletes the branch from memory
324
# and this moves it out of the way on disk
325
os.rename('base', 'hidden_base')
327
self.assertRaises(errors.BoundBranchConnectionFailure,
328
wt_child.commit, 'added text', rev_id='r@c-2')
330
# TODO: jam 20051231 We need invasive failure tests, so that we can show
331
# performance even when something fails.