/brz/remove-bazaar

To get this branch, use:
bzr branch http://gegoxaren.bato24.eu/bzr/brz/remove-bazaar

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_bound_sftp.py

First attempt to merge .dev and resolve the conflicts (but tests are 
failing)

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
import os
21
21
 
22
22
import bzrlib
 
23
from bzrlib import (
 
24
    bzrdir,
 
25
    )
23
26
from bzrlib.branch import Branch
24
27
from bzrlib.bzrdir import (BzrDir,
25
28
                           BzrDirFormat,
27
30
                           BzrDirMetaFormat1,
28
31
                           )
29
32
import bzrlib.errors as errors
30
 
from bzrlib.tests.test_sftp_transport import TestCaseWithSFTPServer, paramiko_loaded
31
 
 
32
 
 
33
 
class BoundSFTPBranch(TestCaseWithSFTPServer):
 
33
from bzrlib.tests import TestSkipped
 
34
from bzrlib.tests import TestCaseWithTransport
 
35
from bzrlib.transport.local import LocalURLServer
 
36
from bzrlib.transport.memory import MemoryServer
 
37
 
 
38
 
 
39
class BoundSFTPBranch(TestCaseWithTransport):
 
40
 
 
41
    def setUp(self):
 
42
        TestCaseWithTransport.setUp(self)
 
43
        self.vfs_transport_factory = MemoryServer
 
44
        if self.transport_server is LocalURLServer:
 
45
            self.transport_server = None
34
46
 
35
47
    def create_branches(self):
36
48
        self.build_tree(['base/', 'base/a', 'base/b'])
37
 
        old_format = BzrDirFormat.get_default_format()
38
 
        BzrDirFormat.set_default_format(BzrDirMetaFormat1())
 
49
        format = bzrdir.format_registry.make_bzrdir('knit')
39
50
        try:
40
 
            wt_base = BzrDir.create_standalone_workingtree('base')
41
 
        finally:
42
 
            BzrDirFormat.set_default_format(old_format)
 
51
            wt_base = BzrDir.create_standalone_workingtree(
 
52
                self.get_url('base'), format=format)
 
53
        except errors.NotLocalUrl:
 
54
            raise TestSkipped('Not a local URL')
43
55
    
44
56
        b_base = wt_base.branch
45
57
 
50
62
        wt_child = b_base.bzrdir.sprout('child').open_workingtree()
51
63
        self.sftp_base = Branch.open(self.get_url('base'))
52
64
        wt_child.branch.bind(self.sftp_base)
53
 
 
 
65
        # check the branch histories are ready for using in tests.
54
66
        self.assertEqual(['r@b-1'], b_base.revision_history())
55
67
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
56
 
 
57
68
        return b_base, wt_child
58
69
 
59
 
    def tearDown(self):
60
 
        self.sftp_base = None
61
 
        bzrlib.transport.sftp.clear_connection_cache()
62
 
        super(BoundSFTPBranch, self).tearDown()
63
 
 
64
70
    def test_simple_binding(self):
65
71
        self.build_tree(['base/', 'base/a', 'base/b', 'child/'])
66
 
        wt_base = BzrDir.create_standalone_workingtree('base')
 
72
        try:
 
73
            wt_base = BzrDir.create_standalone_workingtree(self.get_url('base'))
 
74
        except errors.NotLocalUrl:
 
75
            raise TestSkipped('Not a local URL')
67
76
 
68
77
        wt_base.add('a')
69
78
        wt_base.add('b')
70
79
        wt_base.commit('first', rev_id='r@b-1')
71
80
 
72
81
        b_base = wt_base.branch
73
 
        old_format = BzrDirFormat.get_default_format()
74
 
        BzrDirFormat.set_default_format(BzrDirMetaFormat1())
75
 
        try:
76
 
            b_child = BzrDir.create_branch_convenience('child')
77
 
        finally:
78
 
            BzrDirFormat.set_default_format(old_format)
 
82
        # manually make a branch we can bind, because the default format
 
83
        # may not be bindable-from, and we want to test the side effects etc
 
84
        # of bondage.
 
85
        format = bzrdir.format_registry.make_bzrdir('knit')
 
86
        b_child = BzrDir.create_branch_convenience('child', format=format)
79
87
        self.assertEqual(None, b_child.get_bound_location())
80
88
        self.assertEqual(None, b_child.get_master_branch())
81
89
 
82
90
        sftp_b_base = Branch.open(self.get_url('base'))
83
91
        b_child.bind(sftp_b_base)
84
92
        self.assertEqual(sftp_b_base.base, b_child.get_bound_location())
85
 
        # this line is more of a working tree test line, but - what the hey.
 
93
        # the bind must not have given b_child history:
 
94
        self.assertEqual([], b_child.revision_history())
 
95
        # we should be able to update the branch at this point:
 
96
        self.assertEqual(None, b_child.update())
 
97
        # and now there must be history.
 
98
        self.assertEqual(['r@b-1'], b_child.revision_history())
 
99
        # this line is more of a working tree test line, but - what the hey,
 
100
        # it has work to do.
86
101
        b_child.bzrdir.open_workingtree().update()
87
102
        self.failUnlessExists('child/a')
88
103
        self.failUnlessExists('child/b')
99
114
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
100
115
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
101
116
 
102
 
    def test_bound_fail(self):
 
117
    def test_bound_commit_fails_when_out_of_date(self):
103
118
        # Make sure commit fails if out of date.
104
119
        b_base, wt_child = self.create_branches()
105
120
 
127
142
    def test_double_binding(self):
128
143
        b_base, wt_child = self.create_branches()
129
144
 
130
 
        wt_child2 = wt_child.bzrdir.sprout('child2').open_workingtree()
131
 
 
132
 
        wt_child2.branch.bind(wt_child.branch)
 
145
        wt_child2 = wt_child.branch.create_checkout('child2')
133
146
 
134
147
        open('child2/a', 'wb').write('new contents\n')
135
148
        self.assertRaises(errors.CommitToDoubleBoundBranch,
162
175
                wt_child.branch.bind, sftp_b_base)
163
176
 
164
177
    def test_commit_remote_bound(self):
165
 
        # Make sure it is detected if the current base
166
 
        # suddenly is bound when child goes to commit
 
178
        # Make sure it is detected if the current base is bound during the
 
179
        # objects lifetime, when the child goes to commit.
167
180
        b_base, wt_child = self.create_branches()
168
181
 
169
182
        b_base.bzrdir.sprout('newbase')
181
194
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
182
195
        self.assertEqual(['r@b-1'], sftp_b_newbase.revision_history())
183
196
 
184
 
    def test_pull_updates_both(self):
185
 
        b_base, wt_child = self.create_branches()
186
 
 
187
 
        wt_newchild = b_base.bzrdir.sprout('newchild').open_workingtree()
188
 
        open('newchild/b', 'wb').write('newchild b contents\n')
189
 
        wt_newchild.commit('newchild', rev_id='r@d-2')
190
 
        self.assertEqual(['r@b-1', 'r@d-2'], wt_newchild.branch.revision_history())
191
 
 
192
 
        wt_child.pull(wt_newchild.branch)
193
 
        self.assertEqual(['r@b-1', 'r@d-2'], wt_child.branch.revision_history())
194
 
        self.assertEqual(['r@b-1', 'r@d-2'], b_base.revision_history())
195
 
 
196
197
    def test_bind_diverged(self):
197
 
        from bzrlib.builtins import merge
198
 
 
199
198
        b_base, wt_child = self.create_branches()
200
199
 
201
200
        wt_child.branch.unbind()
214
213
        self.assertRaises(errors.DivergedBranches,
215
214
                wt_child.branch.bind, sftp_b_base)
216
215
 
217
 
        # TODO: jam 20051230 merge_inner doesn't set pending merges
218
 
        #       Is this on purpose?
219
 
        #       merge_inner also doesn't fetch any missing revisions
220
 
        #merge_inner(wt_child.branch, sftp_b_base.revision_tree('r@b-2'), 
221
 
        #        wt_child.branch.revision_tree('r@b-1'))
222
 
        # TODO: jam 20051230 merge(..., (None, None), ...) seems to
223
 
        #       cause an infinite loop of some sort. It definitely doesn't
224
 
        #       work, you have to use list notation
225
 
        merge((sftp_b_base.base, 2), [None, None], this_dir=wt_child.branch.base)
226
 
 
 
216
        wt_child.merge_from_branch(sftp_b_base)
227
217
        self.assertEqual([wt_child_rev, 'r@b-2'], wt_child.get_parent_ids())
228
218
        wt_child.commit('merged', rev_id='r@c-3')
229
219
 
230
 
        # After a merge, trying to bind again should succeed
231
 
        # by pushing the new change to base
 
220
        # After a merge, trying to bind again should succeed but not push the
 
221
        # new change.
232
222
        wt_child.branch.bind(sftp_b_base)
233
223
 
234
 
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
235
 
                b_base.revision_history())
236
 
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
237
 
                wt_child.branch.revision_history())
 
224
        self.assertEqual(['r@b-1', 'r@b-2'], b_base.revision_history())
 
225
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3'],
 
226
            wt_child.branch.revision_history())
238
227
 
239
 
    def test_bind_parent_ahead(self):
 
228
    def test_bind_parent_ahead_preserves_parent(self):
240
229
        b_base, wt_child = self.create_branches()
241
230
 
242
231
        wt_child.branch.unbind()
250
239
        sftp_b_base = Branch.open(self.get_url('base'))
251
240
        wt_child.branch.bind(sftp_b_base)
252
241
 
253
 
        self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
 
242
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
254
243
 
255
244
        wt_child.branch.unbind()
256
245
 
262
251
        self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
263
252
                b_base.revision_history())
264
253
 
265
 
        self.assertEqual(['r@b-1', 'r@b-2'], wt_child.branch.revision_history())
 
254
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
266
255
 
267
256
        wt_child.branch.bind(sftp_b_base)
268
 
        self.assertEqual(['r@b-1', 'r@b-2', 'r@b-3', 'r@b-4', 'r@b-5'],
269
 
                wt_child.branch.revision_history())
 
257
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
270
258
 
271
 
    def test_bind_child_ahead(self):
 
259
    def test_bind_child_ahead_preserves_child(self):
272
260
        b_base, wt_child = self.create_branches()
273
261
 
274
262
        wt_child.branch.unbind()
280
268
        sftp_b_base = Branch.open(self.get_url('base'))
281
269
        wt_child.branch.bind(sftp_b_base)
282
270
 
283
 
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
271
        self.assertEqual(['r@b-1'], b_base.revision_history())
284
272
 
285
273
        # Check and make sure it also works if child is ahead multiple
286
274
        wt_child.branch.unbind()
290
278
 
291
279
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
292
280
                wt_child.branch.revision_history())
293
 
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
 
281
        self.assertEqual(['r@b-1'], b_base.revision_history())
294
282
 
295
283
        wt_child.branch.bind(sftp_b_base)
296
 
        self.assertEqual(['r@b-1', 'r@c-2', 'r@c-3', 'r@c-4', 'r@c-5'],
297
 
                b_base.revision_history())
 
284
        self.assertEqual(['r@b-1'], b_base.revision_history())
298
285
 
299
286
    def test_commit_after_merge(self):
300
 
        from bzrlib.builtins import merge
301
 
 
302
287
        b_base, wt_child = self.create_branches()
303
288
 
304
289
        # We want merge to be able to be a local only
315
300
        self.failIf(wt_child.branch.repository.has_revision('r@d-2'))
316
301
        self.failIf(b_base.repository.has_revision('r@d-2'))
317
302
 
318
 
        # TODO: jam 20051230 merge_inner doesn't set pending merges
319
 
        #       Is this on purpose?
320
 
        #       merge_inner also doesn't fetch any missing revisions
321
 
        #merge_inner(wt_child.branch, b_other.revision_tree('r@d-2'),
322
 
        #        wt_child.branch.revision_tree('r@b-1'))
323
 
        merge((wt_other.branch.base, 2), [None, None], this_dir=wt_child.branch.base)
 
303
        wt_child.merge_from_branch(wt_other.branch)
324
304
 
325
305
        self.failUnlessExists('child/c')
326
306
        self.assertEqual(['r@d-2'], wt_child.get_parent_ids()[1:])
328
308
        self.failIf(b_base.repository.has_revision('r@d-2'))
329
309
 
330
310
        # Commit should succeed, and cause merged revisions to
331
 
        # be pulled into base
 
311
        # be pushed into base
332
312
        wt_child.commit('merge other', rev_id='r@c-2')
333
313
        self.assertEqual(['r@b-1', 'r@c-2'], wt_child.branch.revision_history())
334
314
        self.assertEqual(['r@b-1', 'r@c-2'], b_base.revision_history())
339
319
 
340
320
        open('a', 'ab').write('child adds some text\n')
341
321
 
 
322
        # this deletes the branch from memory
342
323
        del b_base
 
324
        # and this moves it out of the way on disk
343
325
        os.rename('base', 'hidden_base')
344
326
 
345
327
        self.assertRaises(errors.BoundBranchConnectionFailure,
346
328
                wt_child.commit, 'added text', rev_id='r@c-2')
347
329
 
348
 
    def test_pull_fails(self):
349
 
        b_base, wt_child = self.create_branches()
350
 
 
351
 
        wt_other = wt_child.bzrdir.sprout('other').open_workingtree()
352
 
        open('other/a', 'wb').write('new contents\n')
353
 
        wt_other.commit('changed a', rev_id='r@d-2')
354
 
 
355
 
        self.assertEqual(['r@b-1'], b_base.revision_history())
356
 
        self.assertEqual(['r@b-1'], wt_child.branch.revision_history())
357
 
        self.assertEqual(['r@b-1', 'r@d-2'], wt_other.branch.revision_history())
358
 
 
359
 
        del b_base
360
 
        os.rename('base', 'hidden_base')
361
 
 
362
 
        self.assertRaises(errors.BoundBranchConnectionFailure,
363
 
                wt_child.pull, wt_other.branch)
364
 
 
365
330
    # TODO: jam 20051231 We need invasive failure tests, so that we can show
366
331
    #       performance even when something fails.
367
332
 
368
333
 
369
 
if not paramiko_loaded:
370
 
    del BoundSFTPBranch
371