1
# Copyright (C) 2005 Robey Pointer <robey@lag.net>, Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
Tests for branches bound to an sftp branch.
23
from bzrlib.tests.test_sftp import TestCaseWithSFTPServer, paramiko_loaded
24
import bzrlib.errors as errors
25
from bzrlib.branch import Branch
26
from bzrlib.clone import copy_branch
29
class BoundSFTPBranch(TestCaseWithSFTPServer):
31
def create_branches(self):
34
self.build_tree(['base/', 'base/a', 'base/b'])
35
b_base = Branch.initialize('base')
37
wt_base = b_base.working_tree()
40
wt_base.commit('first', rev_id='r@b-1')
42
b_child = copy_branch(b_base, 'child')
43
b_child.set_bound_location(self._sftp_url + 'base')
45
self.assertEqual(['r@b-1'], b_base.revision_history())
46
self.assertEqual(['r@b-1'], b_child.revision_history())
48
return b_base, b_child
50
def test_simple_binding(self):
52
self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
53
b_base = Branch.initialize('base')
55
wt_base = b_base.working_tree()
58
wt_base.commit('first', rev_id='r@b-1')
60
b_child = Branch.initialize('child')
61
self.assertEqual(None, b_child.get_bound_location())
62
self.assertEqual(None, b_child.get_master_branch())
64
sftp_b_base = Branch.open(self._sftp_url + 'base')
65
b_child.bind(sftp_b_base)
66
self.failUnlessExists('child/.bzr/bound')
67
self.failUnlessExists('child/a')
68
self.failUnlessExists('child/b')
71
self.failIf(os.path.lexists('child/.bzr/bound'))
73
def test_bound_commit(self):
74
b_base, b_child = self.create_branches()
76
open('child/a', 'wb').write('new contents\n')
77
wt_child = b_child.working_tree()
78
wt_child.commit('modified a', rev_id='r@c-2')
80
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
81
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
83
def test_bound_fail(self):
84
# Make sure commit fails if out of date.
85
b_base, b_child = self.create_branches()
87
open('base/a', 'wb').write('new base contents\n')
88
b_base.working_tree().commit('base', rev_id='r@b-2')
90
wt_child = b_child.working_tree()
91
open('child/b', 'wb').write('new b child contents\n')
92
self.assertRaises(errors.BoundBranchOutOfDate,
93
wt_child.commit, 'child', rev_id='r@c-2')
95
sftp_b_base = Branch.open(self._sftp_url + 'base')
97
# This is all that cmd_update does
98
wt_child.pull(sftp_b_base, overwrite=False)
100
wt_child.commit('child', rev_id='r@c-3')
102
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
103
b_child.revision_history())
104
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
105
b_base.revision_history())
106
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
107
sftp_b_base.revision_history())
109
def test_double_binding(self):
110
b_base, b_child = self.create_branches()
112
b_child2 = copy_branch(b_child, 'child2')
114
b_child2.bind(b_child)
116
open('child2/a', 'wb').write('new contents\n')
117
self.assertRaises(errors.CommitToDoubleBoundBranch,
118
b_child2.working_tree().commit, 'child2', rev_id='r@d-2')
120
def test_unbinding(self):
121
from bzrlib.transport import get_transport
122
b_base, b_child = self.create_branches()
124
# TestCaseWithSFTPServer only allows you to connect one time
125
# to the SFTP server. So we have to create a connection and
126
# keep it around, so that it can be reused
127
__unused_t = get_transport(self._sftp_url)
129
wt_base = b_base.working_tree()
130
open('base/a', 'wb').write('new base contents\n')
131
wt_base.commit('base', rev_id='r@b-2')
133
wt_child = b_child.working_tree()
134
open('child/b', 'wb').write('new b child contents\n')
135
self.assertRaises(errors.BoundBranchOutOfDate,
136
wt_child.commit, 'child', rev_id='r@c-2')
137
self.assertEqual(['r@b-1'], b_child.revision_history())
139
wt_child.commit('child', rev_id='r@c-2')
140
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
141
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
143
sftp_b_base = Branch.open(self._sftp_url + 'base')
144
self.assertRaises(errors.DivergedBranches,
145
b_child.bind, sftp_b_base)
147
def test_commit_remote_bound(self):
148
# Make sure it is detected if the current base
149
# suddenly is bound when child goes to commit
150
b_base, b_child = self.create_branches()
152
copy_branch(b_base, 'newbase')
154
sftp_b_base = Branch.open(self._sftp_url + 'base')
155
sftp_b_newbase = Branch.open(self._sftp_url + 'newbase')
157
sftp_b_base.bind(sftp_b_newbase)
159
open('child/a', 'wb').write('new contents\n')
160
self.assertRaises(errors.CommitToDoubleBoundBranch,
161
b_child.working_tree().commit, 'failure', rev_id='r@c-2')
163
self.assertEqual(['r@b-1'], b_base.revision_history())
164
self.assertEqual(['r@b-1'], b_child.revision_history())
165
self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
167
def test_pull_updates_both(self):
168
b_base, b_child = self.create_branches()
170
b_newchild = copy_branch(b_base, 'newchild')
171
open('newchild/b', 'wb').write('newchild b contents\n')
172
b_newchild.working_tree().commit('newchild', rev_id='r@d-2')
173
self.assertEqual(['r@b-1', 'r@d-2'], b_newchild.revision_history())
175
b_child.pull(b_newchild)
176
self.assertEqual(['r@b-1', 'r@d-2'], b_child.revision_history())
177
self.assertEqual(['r@b-1', 'r@d-2'], b_base.revision_history())
179
def test_bind_diverged(self):
180
from bzrlib.merge import merge, merge_inner
182
b_base, b_child = self.create_branches()
185
open('child/a', 'ab').write('child contents\n')
186
wt_child = b_child.working_tree()
187
wt_child.commit('child', rev_id='r@c-2')
189
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
190
self.assertEqual(['r@b-1'], b_base.revision_history())
192
open('base/b', 'ab').write('base contents\n')
193
b_base.working_tree().commit('base', rev_id='r@b-2')
194
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
196
sftp_b_base = Branch.open(self._sftp_url + 'base')
198
self.assertRaises(errors.DivergedBranches,
199
b_child.bind, sftp_b_base)
201
# TODO: jam 20051230 merge_inner doesn't set pending merges
202
# Is this on purpose?
203
# merge_inner also doesn't fetch any missing revisions
204
#merge_inner(b_child, sftp_b_base.revision_tree('r@b-2'),
205
# b_child.revision_tree('r@b-1'))
206
# TODO: jam 20051230 merge(..., (None, None), ...) seems to
207
# cause an infinite loop of some sort. It definitely doesn't
208
# work, you have to use list notation
209
merge((sftp_b_base.base, 2), [None, None], this_dir=b_child.base)
211
self.assertEqual(['r@b-2'], wt_child.pending_merges())
212
wt_child.commit('merged', rev_id='r@c-3')
214
# After a merge, trying to bind again should succeed
215
# by pushing the new change to base
216
b_child.bind(sftp_b_base)
218
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
219
b_base.revision_history())
220
self.assertEqual(['r@b-1', 'r@b-2', 'r@c-3'],
221
b_child.revision_history())
223
def test_bind_parent_ahead(self):
224
b_base, b_child = self.create_branches()
228
open('a', 'ab').write('base changes\n')
229
wt_base = b_base.working_tree()
230
wt_base.commit('base', rev_id='r@b-2')
231
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
232
self.assertEqual(['r@b-1'], b_child.revision_history())
234
sftp_b_base = Branch.open(self._sftp_url + 'base')
235
b_child.bind(sftp_b_base)
237
self.assertEqual(['r@b-1', 'r@b-2'], b_child.revision_history())
241
# Check and make sure it also works if parent is ahead multiple
242
wt_base.commit('base 3', rev_id='r@b-3', allow_pointless=True)
243
wt_base.commit('base 4', rev_id='r@b-4', allow_pointless=True)
244
wt_base.commit('base 5', rev_id='r@b-5', allow_pointless=True)
246
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
247
b_base.revision_history())
249
self.assertEqual(['r@b-1', 'r@b-2'], b_child.revision_history())
251
b_child.bind(sftp_b_base)
252
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
253
b_child.revision_history())
255
def test_bind_child_ahead(self):
256
b_base, b_child = self.create_branches()
260
wt_child = b_child.working_tree()
261
wt_child.commit('child', rev_id='r@c-2', allow_pointless=True)
262
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
263
self.assertEqual(['r@b-1'], b_base.revision_history())
265
sftp_b_base = Branch.open(self._sftp_url + 'base')
266
b_child.bind(sftp_b_base)
268
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
270
# Check and make sure it also works if child is ahead multiple
272
wt_child.commit('child 3', rev_id='r@c-3', allow_pointless=True)
273
wt_child.commit('child 4', rev_id='r@c-4', allow_pointless=True)
274
wt_child.commit('child 5', rev_id='r@c-5', allow_pointless=True)
276
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
277
b_child.revision_history())
278
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
280
b_child.bind(sftp_b_base)
281
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
282
b_base.revision_history())
284
def test_commit_after_merge(self):
285
from bzrlib.merge import merge, merge_inner
287
b_base, b_child = self.create_branches()
289
# We want merge to be able to be a local only
290
# operation, because it can be without violating
291
# the binding invariants.
292
# But we can't fail afterwards
294
b_other = copy_branch(b_child, 'other')
296
open('other/c', 'wb').write('file c\n')
297
wt_other = b_other.working_tree()
299
wt_other.commit('adding c', rev_id='r@d-2')
301
self.failIf(b_child.has_revision('r@d-2'))
302
self.failIf(b_base.has_revision('r@d-2'))
304
wt_child = b_child.working_tree()
305
# TODO: jam 20051230 merge_inner doesn't set pending merges
306
# Is this on purpose?
307
# merge_inner also doesn't fetch any missing revisions
308
#merge_inner(b_child, b_other.revision_tree('r@d-2'),
309
# b_child.revision_tree('r@b-1'))
310
merge((b_other.base, 2), [None, None], this_dir=b_child.base)
312
self.failUnlessExists('child/c')
313
self.assertEqual(['r@d-2'], wt_child.pending_merges())
314
self.failUnless(b_child.has_revision('r@d-2'))
315
self.failIf(b_base.has_revision('r@d-2'))
317
# Commit should succeed, and cause merged revisions to
318
# be pulled into base
319
wt_child.commit('merge other', rev_id='r@c-2')
320
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
321
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
322
self.failUnless(b_base.has_revision('r@d-2'))
324
def test_pull_overwrite_fails(self):
325
b_base, b_child = self.create_branches()
327
b_other = copy_branch(b_child, 'other')
328
wt_other = b_other.working_tree()
330
open('other/a', 'wb').write('new contents\n')
331
wt_other.commit('changed a', rev_id='r@d-2')
333
open('child/a', 'wb').write('also changed a\n')
334
wt_child = b_child.working_tree()
335
wt_child.commit('child modified a', rev_id='r@c-2')
337
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
338
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
339
self.assertEqual(['r@b-1', 'r@d-2'], b_other.revision_history())
341
# It might be possible that we want pull --overwrite to
343
# If we want it, just change this test to make sure that
344
# both base and child are updated properly
345
self.assertRaises(errors.OverwriteBoundBranch,
346
wt_child.pull, b_other, overwrite=True)
348
# It should fail without changing the local revision
349
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
350
self.assertEqual(['r@b-1', 'r@c-2'], b_child.revision_history())
353
if not paramiko_loaded: