29
32
import bzrlib.errors as errors
30
from bzrlib.tests.test_sftp_transport import TestCaseWithSFTPServer, paramiko_loaded
33
class BoundSFTPBranch(TestCaseWithSFTPServer):
33
from bzrlib.tests import TestSkipped
34
from bzrlib.tests import TestCaseWithTransport
35
from bzrlib.transport.local import LocalURLServer
36
from bzrlib.transport.memory import MemoryServer
39
class BoundSFTPBranch(TestCaseWithTransport):
42
TestCaseWithTransport.setUp(self)
43
self.vfs_transport_factory = MemoryServer
44
if self.transport_server is LocalURLServer:
45
self.transport_server = None
35
47
def create_branches(self):
36
48
self.build_tree(['base/', 'base/a', 'base/b'])
37
old_format = BzrDirFormat.get_default_format()
38
BzrDirFormat.set_default_format(BzrDirMetaFormat1())
49
format = bzrdir.format_registry.make_bzrdir('knit')
40
wt_base = BzrDir.create_standalone_workingtree('base')
42
BzrDirFormat.set_default_format(old_format)
51
wt_base = BzrDir.create_standalone_workingtree(
52
self.get_url('base'), format=format)
53
except errors.NotLocalUrl:
54
raise TestSkipped('Not a local URL')
44
56
b_base = wt_base.branch
50
62
wt_child = b_base.bzrdir.sprout('child').open_workingtree()
51
63
self.sftp_base = Branch.open(self.get_url('base'))
52
64
wt_child.branch.bind(self.sftp_base)
65
# check the branch histories are ready for using in tests.
54
66
self.assertEqual(['r@b-1'], b_base.revision_history())
55
67
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
57
68
return b_base, wt_child
61
bzrlib.transport.sftp.clear_connection_cache()
62
super(BoundSFTPBranch, self).tearDown()
64
70
def test_simple_binding(self):
65
71
self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
66
wt_base = BzrDir.create_standalone_workingtree('base')
73
wt_base = BzrDir.create_standalone_workingtree(self.get_url('base'))
74
except errors.NotLocalUrl:
75
raise TestSkipped('Not a local URL')
70
79
wt_base.commit('first', rev_id='r@b-1')
72
81
b_base = wt_base.branch
73
old_format = BzrDirFormat.get_default_format()
74
BzrDirFormat.set_default_format(BzrDirMetaFormat1())
76
b_child = BzrDir.create_branch_convenience('child')
78
BzrDirFormat.set_default_format(old_format)
82
# manually make a branch we can bind, because the default format
83
# may not be bindable-from, and we want to test the side effects etc
85
format = bzrdir.format_registry.make_bzrdir('knit')
86
b_child = BzrDir.create_branch_convenience('child', format=format)
79
87
self.assertEqual(None, b_child.get_bound_location())
80
88
self.assertEqual(None, b_child.get_master_branch())
82
90
sftp_b_base = Branch.open(self.get_url('base'))
83
91
b_child.bind(sftp_b_base)
84
92
self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
85
# this line is more of a working tree test line, but - what the hey.
93
# the bind must not have given b_child history:
94
self.assertEqual([], b_child.revision_history())
95
# we should be able to update the branch at this point:
96
self.assertEqual(None, b_child.update())
97
# and now there must be history.
98
self.assertEqual(['r@b-1'], b_child.revision_history())
99
# this line is more of a working tree test line, but - what the hey,
86
101
b_child.bzrdir.open_workingtree().update()
87
102
self.failUnlessExists('child/a')
88
103
self.failUnlessExists('child/b')
99
114
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
100
115
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
102
def test_bound_fail(self):
117
def test_bound_commit_fails_when_out_of_date(self):
103
118
# Make sure commit fails if out of date.
104
119
b_base, wt_child = self.create_branches()
127
142
def test_double_binding(self):
128
143
b_base, wt_child = self.create_branches()
130
wt_child2 = wt_child.bzrdir.sprout('child2').open_workingtree()
132
wt_child2.branch.bind(wt_child.branch)
145
wt_child2 = wt_child.branch.create_checkout('child2')
134
147
open('child2/a', 'wb').write('new contents\n')
135
148
self.assertRaises(errors.CommitToDoubleBoundBranch,
162
175
wt_child.branch.bind, sftp_b_base)
164
177
def test_commit_remote_bound(self):
165
# Make sure it is detected if the current base
166
# suddenly is bound when child goes to commit
178
# Make sure it is detected if the current base is bound during the
179
# objects lifetime, when the child goes to commit.
167
180
b_base, wt_child = self.create_branches()
169
182
b_base.bzrdir.sprout('newbase')
181
194
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
182
195
self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
184
def test_pull_updates_both(self):
185
b_base, wt_child = self.create_branches()
187
wt_newchild = b_base.bzrdir.sprout('newchild').open_workingtree()
188
open('newchild/b', 'wb').write('newchild b contents\n')
189
wt_newchild.commit('newchild', rev_id='r@d-2')
190
self.assertEqual(['r@b-1', 'r@d-2'], wt_newchild.branch.revision_history())
192
wt_child.pull(wt_newchild.branch)
193
self.assertEqual(['r@b-1', 'r@d-2'], wt_child.branch.revision_history())
194
self.assertEqual(['r@b-1', 'r@d-2'], b_base.revision_history())
196
197
def test_bind_diverged(self):
197
from bzrlib.builtins import merge
199
198
b_base, wt_child = self.create_branches()
201
200
wt_child.branch.unbind()
214
213
self.assertRaises(errors.DivergedBranches,
215
214
wt_child.branch.bind, sftp_b_base)
217
# TODO: jam 20051230 merge_inner doesn't set pending merges
218
# Is this on purpose?
219
# merge_inner also doesn't fetch any missing revisions
220
#merge_inner(wt_child.branch, sftp_b_base.revision_tree('r@b-2'),
221
# wt_child.branch.revision_tree('r@b-1'))
222
# TODO: jam 20051230 merge(..., (None, None), ...) seems to
223
# cause an infinite loop of some sort. It definitely doesn't
224
# work, you have to use list notation
225
merge((sftp_b_base.base, 2), [None, None], this_dir=wt_child.branch.base)
216
wt_child.merge_from_branch(sftp_b_base)
227
217
self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
228
218
wt_child.commit('merged', rev_id='r@c-3')
230
# After a merge, trying to bind again should succeed
231
# by pushing the new change to base
220
# After a merge, trying to bind again should succeed but not push the
232
222
wt_child.branch.bind(sftp_b_base)
234
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
235
b_base.revision_history())
236
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
237
wt_child.branch.revision_history())
224
self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
225
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
226
wt_child.branch.revision_history())
239
def test_bind_parent_ahead(self):
228
def test_bind_parent_ahead_preserves_parent(self):
240
229
b_base, wt_child = self.create_branches()
242
231
wt_child.branch.unbind()
262
251
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
263
252
b_base.revision_history())
265
self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
254
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
267
256
wt_child.branch.bind(sftp_b_base)
268
self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
269
wt_child.branch.revision_history())
257
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
271
def test_bind_child_ahead(self):
259
def test_bind_child_ahead_preserves_child(self):
272
260
b_base, wt_child = self.create_branches()
274
262
wt_child.branch.unbind()
280
268
sftp_b_base = Branch.open(self.get_url('base'))
281
269
wt_child.branch.bind(sftp_b_base)
283
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
271
self.assertEqual(['r@b-1'], b_base.revision_history())
285
273
# Check and make sure it also works if child is ahead multiple
286
274
wt_child.branch.unbind()
291
279
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
292
280
wt_child.branch.revision_history())
293
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
281
self.assertEqual(['r@b-1'], b_base.revision_history())
295
283
wt_child.branch.bind(sftp_b_base)
296
self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
297
b_base.revision_history())
284
self.assertEqual(['r@b-1'], b_base.revision_history())
299
286
def test_commit_after_merge(self):
300
from bzrlib.builtins import merge
302
287
b_base, wt_child = self.create_branches()
304
289
# We want merge to be able to be a local only
315
300
self.failIf(wt_child.branch.repository.has_revision('r@d-2'))
316
301
self.failIf(b_base.repository.has_revision('r@d-2'))
318
# TODO: jam 20051230 merge_inner doesn't set pending merges
319
# Is this on purpose?
320
# merge_inner also doesn't fetch any missing revisions
321
#merge_inner(wt_child.branch, b_other.revision_tree('r@d-2'),
322
# wt_child.branch.revision_tree('r@b-1'))
323
merge((wt_other.branch.base, 2), [None, None], this_dir=wt_child.branch.base)
303
wt_child.merge_from_branch(wt_other.branch)
325
305
self.failUnlessExists('child/c')
326
306
self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
328
308
self.failIf(b_base.repository.has_revision('r@d-2'))
330
310
# Commit should succeed, and cause merged revisions to
331
# be pulled into base
311
# be pushed into base
332
312
wt_child.commit('merge other', rev_id='r@c-2')
333
313
self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
334
314
self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
340
320
open('a', 'ab').write('child adds some text\n')
322
# this deletes the branch from memory
324
# and this moves it out of the way on disk
343
325
os.rename('base', 'hidden_base')
345
327
self.assertRaises(errors.BoundBranchConnectionFailure,
346
328
wt_child.commit, 'added text', rev_id='r@c-2')
348
def test_pull_fails(self):
349
b_base, wt_child = self.create_branches()
351
wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
352
open('other/a', 'wb').write('new contents\n')
353
wt_other.commit('changed a', rev_id='r@d-2')
355
self.assertEqual(['r@b-1'], b_base.revision_history())
356
self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
357
self.assertEqual(['r@b-1', 'r@d-2'], wt_other.branch.revision_history())
360
os.rename('base', 'hidden_base')
362
self.assertRaises(errors.BoundBranchConnectionFailure,
363
wt_child.pull, wt_other.branch)
365
330
# TODO: jam 20051231 We need invasive failure tests, so that we can show
366
331
# performance even when something fails.
369
if not paramiko_loaded: